1.环境

  • Mysql 5.6
  • Sqoop 1.4.6
  • Hadoop 2.5.2
  • HBase 0.98
  • Elasticsearch 2.3.5

2.安装(略过)

3.HBase Coprocessor实现

HBase Observer

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.Client;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.ImmutableSettings;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
//import java.util.NavigableMap;public class DataSyncObserver extends BaseRegionObserver {private static Client client = null;private static final Log LOG = LogFactory.getLog(DataSyncObserver.class);/*** 读取HBase Shell的指令参数** @param env*/private void readConfiguration(CoprocessorEnvironment env) {Configuration conf = env.getConfiguration();Config.clusterName = conf.get("es_cluster");Config.nodeHost = conf.get("es_host");Config.nodePort = conf.getInt("es_port", -1);Config.indexName = conf.get("es_index");Config.typeName = conf.get("es_type");LOG.info("observer -- started with config: " + Config.getInfo());}@Overridepublic void start(CoprocessorEnvironment env) throws IOException {readConfiguration(env);
//        Settings settings = ImmutableSettings.settingsBuilder()
//                .put("cluster.name", Config.clusterName).build();
//        client = new TransportClient(settings)
//                .addTransportAddress(new InetSocketTransportAddress(
//                        Config.nodeHost, Config.nodePort));client = MyTransportClient.client;}@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {try {String indexId = new String(put.getRow());Map<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
//            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();Map<String, Object> json = new HashMap<String, Object>();for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {for (Cell cell : entry.getValue()) {String key = Bytes.toString(CellUtil.cloneQualifier(cell));String value = Bytes.toString(CellUtil.cloneValue(cell));json.put(key, value);}}System.out.println();ElasticSearchOperator.addUpdateBuilderToBulk(client.prepareUpdate(Config.indexName, Config.typeName, indexId).setDoc(json).setUpsert(json));LOG.info("observer -- add new doc: " + indexId + " to type: " + Config.typeName);} catch (Exception ex) {LOG.error(ex);}}@Overridepublic void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete, final WALEdit edit, final Durability durability) throws IOException {try {String indexId = new String(delete.getRow());ElasticSearchOperator.addDeleteBuilderToBulk(client.prepareDelete(Config.indexName, Config.typeName, indexId));LOG.info("observer -- delete a doc: " + indexId);} catch (Exception ex) {LOG.error(ex);}}}

ES方法

import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.client.Client;
//import org.elasticsearch.client.transport.TransportClient;
//import org.elasticsearch.common.settings.ImmutableSettings;
//import org.elasticsearch.common.settings.Settings;
//import org.elasticsearch.common.transport.InetSocketTransportAddress;

import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ElasticSearchOperator {// 缓冲池容量private static final int MAX_BULK_COUNT = 10;// 最大提交间隔(秒)private static final int MAX_COMMIT_INTERVAL = 60 * 5;private static Client client = null;private static BulkRequestBuilder bulkRequestBuilder = null;private static Lock commitLock = new ReentrantLock();static {// elasticsearch1.5.0
//        Settings settings = ImmutableSettings.settingsBuilder()
//                .put("cluster.name", Config.clusterName).build();
//        client = new TransportClient(settings)
//                .addTransportAddress(new InetSocketTransportAddress(
//                        Config.nodeHost, Config.nodePort));// 2.3.5client = MyTransportClient.client;bulkRequestBuilder = client.prepareBulk();bulkRequestBuilder.setRefresh(true);Timer timer = new Timer();timer.schedule(new CommitTimer(), 10 * 1000, MAX_COMMIT_INTERVAL * 1000);}/*** 判断缓存池是否已满,批量提交** @param threshold*/private static void bulkRequest(int threshold) {if (bulkRequestBuilder.numberOfActions() > threshold) {BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();if (!bulkResponse.hasFailures()) {bulkRequestBuilder = client.prepareBulk();}}}/*** 加入索引请求到缓冲池** @param builder*/public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {ex.printStackTrace();} finally {commitLock.unlock();}}/*** 加入删除请求到缓冲池** @param builder*/public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {ex.printStackTrace();} finally {commitLock.unlock();}}/*** 定时任务,避免RegionServer迟迟无数据更新,导致ElasticSearch没有与HBase同步*/static class CommitTimer extends TimerTask {@Overridepublic void run() {commitLock.lock();try {bulkRequest(0);} catch (Exception ex) {ex.printStackTrace();} finally {commitLock.unlock();}}}}

打包并上传到hdfs

mvn clean compile assembly:single
mv observer-1.0-SNAPSHOT-jar-with-dependencies.jar observer-hb0.98-es2.3.5.jar
hdfs dfs -put observer-hb0.98-es2.3.5.jar /hbase/lib/

4.创建HBase表,并启用Coprocessor

mysql

hbase shell
create 'region','data'
disable 'region'
alter 'region', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=mysql_region,es_index=hbase,es_port=9300,es_host=localhost'
enable 'region'

oracle

create 'sp','data'
disable 'sp'
alter 'sp', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase/lib/observer-hb0.98-es2.3.5.jar|com.gavin.observer.DataSyncObserver|1001|es_cluster=elas2.3.4,es_type=oracle_sp,es_index=hbase,es_port=9300,es_host=localhost'
enable 'sp'

查看

hbase(main):007:0* describe 'ora_test'

Table ora_test is ENABLED
ora_test, {TABLE_ATTRIBUTES => {coprocessor$1 => 'hdfs:///appdt/hbase
/lib/observer-hb1.2.2-es2.3.5.jar|com.gavin.observer.DataSyncObserver
|1001|es_cluster=elas2.3.4,es_type=ora_test,es_index=hbase,es_port=93
00,es_host=localhost'}
COLUMN FAMILIES DESCRIPTION
{NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW',REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MI
N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B
LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0260 seconds

删除Coprocessor

disable 'ora_test'
alter 'ora_test',METHOD => 'table_att_unset',NAME =>'coprocessor$1'
enable 'ora_test'

查看删除效果

hbase(main):011:0> describe 'ora_test'

Table ora_test is ENABLED
ora_test
COLUMN FAMILIES DESCRIPTION
{NAME => 'data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW',REPLICATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'NONE', MI
N_VERSIONS => '0', TTL => 'FOREVER', KEEP_DELETED_CELLS => 'FALSE', B
LOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}
1 row(s) in 0.0200 seconds

5.使用sqoop上传数据

mysql

bin/sqoop import --connect jdbc:mysql://192.168.1.187:3306/trade_dev --username mysql --password 111111 --table TB_REGION --hbase-table region --hbase-row-key REGION_ID --column-family data

oracle

bin/sqoop import --connect jdbc:oracle:thin:@192.168.16.223:1521/orcl --username sitts --password password --table SITTS.ESB_SERVICE_PARAM --split-by PARAM_ID --hbase-table sp --hbase-row-key PARAM_ID --column-family data

6.校验

HBase

scan 'region'

ES

7.参考

HBase Observer同步数据到ElasticSearch

8.注意

  • 同一个Coprocessor用一个index,不同表可以设置不同type,不然index会乱
  • 修改Java代码后,上传到HDFS的jar包文件必须和之前不一样,否则就算卸载掉原有的coprocessor再重新安装也不能生效
  • 如果你有多个表对多个索引/类型的映射,每个表所加载Observer对应的jar包路径不能相同,否则ElasticSearch会串数据

转载于:https://www.cnblogs.com/itboys/p/9520389.html

Sqoop导入HBase,并借助Coprocessor协处理器同步索引到ES相关推荐

  1. Sqoop导入Hbase案例实战

    创建Mysql下的表sports,语句如下: CREATE TABLE `sports` (`id` int(10) NOT NULL AUTO_INCREMENT COMMENT 'id',`pho ...

  2. hbase使用协处理器同步es

    hbase同步到es hbase版本:1.1.2 es版本:6.3.2 一开始从网上找的也能用,但是发现运行一段时间发现报错,丢失数据,后来又从网上搜索加上自己整理的,终于成功了 不多说上代码 pub ...

  3. sqoop将hbase数据导入mysql_Sqoop将mysql数据导入hbase的血与泪

    Sqoop将mysql数据导入hbase的血与泪(整整搞了大半天) 版权声明:本文为yunshuxueyuan原创文章. 如需转载请标明出处: https://my.oschina.net/yunsh ...

  4. sqoop同步时间戳到mysql_在sqoop导入中使用24小时时间戳

    我想从使用自由格式查询的oracle导入数据,并且需要以24小时格式存储时间戳.在sqoop导入中使用24小时时间戳 sqoop import --connect jdbc:oracle:thin:( ...

  5. HBase 1.x Coprocessor使用指南

    HBase 1.x Coprocessor使用指南 @(HBASE)[hbase] HBase 1x Coprocessor使用指南 一概述 1起因Why HBase Coprocessor 2灵感来 ...

  6. NIFI从mysql导入Hbase

    端口是自定义的 http://desktop:9091/nifi/ mysql数据集准备以[2]为准 需要提前在hbase中建立好数据库: hbase>create "book&quo ...

  7. Sqoop导入导出的时候总是出现等待5分钟的问题解决办法

    25.25.27 Sqoop导入导出的时候总是出现等待5分钟的问题解决办法: HDP3.1 中的 YARN 的 timeline server 默认使用内置的 HBase,不知道为什么,总是过几天就挂 ...

  8. sqoop 导入mysql blob字段,Sqoop导入的数据格式问题

    Sqoop简单介绍 Sqoop是用来在Hadoop平台和其他结构性存储(比如关系型数据库)之间解决大量数据传输问题的工具.也就是说可以从Oracle,MySQL,PostgreSQL等数据库中将数据传 ...

  9. 【Sqoop】sqoop导入导出

    本文简单介绍Sqoop如下操作: Import: ​ rdbms导入HDFS; ​ rdbms导入Hive: ​ rdbms导入Hbase: Export: ​ Hive导出到rdbms: ​ Hba ...

最新文章

  1. 《剑指offer》第十五题(二进制中1的个数)
  2. 【学习笔记】树上启发式合并
  3. input[type=checkbox] ; input[type=radio] 改变默认样式
  4. 驱动人生(离线网卡版)_驱动人生8.0版正式发布,最新功能速看
  5. python pandas 把数据保存成csv文件,以及读取csv文件获取指定行、指定列数据
  6. python计算n到n+100之间所有奇数的和_Python基本操作题
  7. 往ABAP gateway system上和Cloud Foundry上部署HTML5应用
  8. 计算机中国象棋书籍,[建议]中国的象棋永远不能被没有“思维”的电脑所代替(就目前的电脑象棋软件...
  9. 怎么用计算机改变声音的音调,调音台使用教程大全
  10. 构建嵌入式linux系统_用于构建嵌入式Linux系统的4种工具
  11. VisualStudio中解决方案
  12. Ctrl + R 后,悲剧咯、、、、
  13. 手机三十分钟熄屏如何一直亮_如何让手机屏幕常亮
  14. google开发者大会的倒计时动画,没有用Flash
  15. LWN:Fedora 关于无驱动打印的讨论!
  16. 将py文件打包成可exe文件
  17. 网易MUMU模拟器怎么设置不卡?
  18. 艺术类职称计算机考试,2017年职称计算机考试Word2003巩固练习题20
  19. 腾讯百度阿里变身天使投资背后:PE估值偏低
  20. 名帖63 欧阳询 楷书《九成宫醴泉铭》

热门文章

  1. 小宝机器人的储存容量_扫地机器人和吸尘器哪个更实用?
  2. 自考本科计算机类专业查询,自考本科学位如何查询
  3. 计算机知识*.jpg,计算机第一篇考试题(基础知识部分)
  4. maya中的桥接命令
  5. softmax回归的从零开始实现
  6. 基于深度学习和支持向量机的4种苜蓿叶部病害图像识别
  7. 从零开始刷Leetcode——数组(122.167.169)
  8. 百度下拉词框优化推广该如何利用及优化推广?
  9. java 取消引用_java代码优化——消除过期的对象引用
  10. php监考,科学网—监考与被监考 - 张珑的博文