我在这篇focus在两个主题:如何支持多表同步共用一个jar包,如何持续稳定的与ES交互写入数据。
在 《 使用Hbase协作器(Coprocessor)同步数据到ElasticSearch 》中作者把两个关键组件中的属性和方法都声明为static,这意味什么?类方法和属性在所有的线程中共享,源代码请参考该博客。
问题出来了,当你用如下传参数的方式绑定到多个表:
alter 'test_record', METHOD => 'table_att', 'coprocessor' => 'hdfs:///hbase_es/hbase-observer-elasticsearch-1.0-SNAPSHOT-zcestestrecord.jar|org.eminem.hbase.observer.HbaseDataSyncEsObserver|1001|es_cluster=zcits,es_type=zcestestrecord,es_index=zcestestrecord,es_port=9100,es_host=master'

Hbase中的多个表同步到ES会串数据,什么意思? 比如说,同步Hbase中的A、B表到ES中A`、B`,A表的数据都到B`中了。造成这种错误的原因就是上述两个构件使用了静态的方法和属性。如何改正,就是都改为非静态的方法和类,用到该构件的时候实例化。代码如下:

EsClient构件:
public class EsClient {// ElasticSearch的集群名称private String clusterName;// ElasticSearch的hostprivate String[] nodeHost;// ElasticSearch的端口(Java API用的是Transport端口,也就是TCP)private int nodePort;private TransportClient client = null;private static final Log LOG = LogFactory.getLog(EsClient.class);/*** get Es config* * @return*/public EsClient(String clusterName, String nodeHost, int nodePort) {this.clusterName = clusterName;this.nodeHost = nodeHost.split("-");this.nodePort = nodePort;this.client = initEsClient();}public String getInfo() {List<String> fields = new ArrayList<String>();try {for (Field f : EsClient.class.getDeclaredFields()) {fields.add(f.getName() + "=" + f.get(this));}} catch (IllegalAccessException ex) {ex.printStackTrace();}return StringUtils.join(fields, ", ");}public String getOneNodeHost() {if (this.nodeHost == null || this.nodeHost.length == 0) {return "";}Random rand = new Random();return nodeHost[rand.nextInt(this.nodeHost.length)];}/*** init ES client*/public TransportClient initEsClient() {LOG.info("---------- Init ES Client " + this.clusterName + " -----------");TransportClient client = null;Settings settings = Settings.builder().put("cluster.name", this.clusterName).put("client.transport.sniff", true).build();try {client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(getOneNodeHost()), this.nodePort));} catch (UnknownHostException e) {e.printStackTrace();}return client;}public void repeatInitEsClient() {this.client = initEsClient();}/*** @return the clusterName*/public String getClusterName() {return clusterName;}/*** @param clusterName the clusterName to set*/public void setClusterName(String clusterName) {this.clusterName = clusterName;}/*** @return the nodePort*/public int getNodePort() {return nodePort;}/*** @param nodePort the nodePort to set*/public void setNodePort(int nodePort) {this.nodePort = nodePort;}/*** @return the client*/public TransportClient getClient() {return client;}/*** @param client the client to set*/public void setClient(TransportClient client) {this.client = client;}}

ElasticSearchBulkOperator构件:

public class ElasticSearchBulkOperator {private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);private static final int MAX_BULK_COUNT = 5000;private BulkRequestBuilder bulkRequestBuilder = null;private Lock commitLock = new ReentrantLock();private ScheduledExecutorService scheduledExecutorService = null;private EsClient esClient = null;public ElasticSearchBulkOperator(final EsClient esClient) {LOG.info("----------------- Init Bulk Operator for Table: " + " ----------------");this.esClient = esClient;// init es bulkRequestBuilderthis.bulkRequestBuilder = esClient.getClient().prepareBulk();// init thread pool and set size 1this.scheduledExecutorService = Executors.newScheduledThreadPool(1);// create beeper thread( it will be sync data to ES cluster)use a commitLock to protected bulk es as thread-saveRunnable beeper = new Runnable() {@Overridepublic void run() {commitLock.lock();try {LOG.info("Scheduled Thread start run for ");bulkRequest(0);} catch (Exception ex) {LOG.error("Time Bulk " + " index error : " + ex.getMessage());} finally {commitLock.unlock();}}};// set beeper thread(15 second to delay first execution , 25 second period between successive executions)scheduledExecutorService.scheduleAtFixedRate(beeper, 15, 25, TimeUnit.SECONDS);}/*** shutdown time task immediately*/public void shutdownScheduEx() {if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {scheduledExecutorService.shutdown();}}/*** bulk request when number of builders is grate then threshold* * @param threshold*/public void bulkRequest(int threshold) {int count = bulkRequestBuilder.numberOfActions();if (bulkRequestBuilder.numberOfActions() > threshold) {try {LOG.info("Bulk Request Run " + ", the row count is: " + count);BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();if (bulkItemResponse.hasFailures()) {LOG.error("------------- Begin: Error Response Items of Bulk Requests to ES ------------");LOG.error(bulkItemResponse.buildFailureMessage());LOG.error("------------- End: Error Response Items of Bulk Requests to ES ------------");}bulkRequestBuilder = esClient.getClient().prepareBulk();} catch (Exception e) {// two cause: 1. transport client is closed 2. None of the configured nodes are availableLOG.error(" Bulk Request " + " index error : " + e.getMessage());LOG.error("Reconnect the ES server...");List<DocWriteRequest> tempRequests = bulkRequestBuilder.request().requests();esClient.getClient().close();esClient.repeatInitEsClient();bulkRequestBuilder = esClient.getClient().prepareBulk();bulkRequestBuilder.request().add(tempRequests);}}}/*** add update builder to bulk use commitLock to protected bulk as* thread-save* * @param builder*/public void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {LOG.error(" Add Bulk index error : " + ex.getMessage());} finally {commitLock.unlock();}}/*** add delete builder to bulk use commitLock to protected bulk as* thread-save* * @param builder*/public void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {LOG.error(" delete Bulk index error : " + ex.getMessage());} finally {commitLock.unlock();}}
}

注意:我在TransportClient的setting中用了 "client.transport.sniff"=true,这对持续同步的稳定性至关重要,前提ES是多台机器的集群。这样就可以实现多个表同时绑定一个jar包传入不同参数时,不发生串表的奇怪现象。

ElasticSearchBulkOperator构件的bulkRequest方法至关重要,写不好轻则同步数据丢失,重则Hbase挂掉。 比如,导致Hbase中RegionServer的堆积的RPC过多,导致数据不能写入Hbase,如下图所示:
为什么会出现上图这种情况,我建议研究一下Coprocessor运行机制,以及RegionServer与Master的交互机制。在这里就不多说了。我们有必要花点时间研究一下bulkRequest方法:
我采取定时定量的方式来执行一次BulkRequest方法,
BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();
该次交互有序的请求多条Requests,返回对应的多依次次对应的Responses结果,可能某些条数据没有成功,很大原因是ES的Mapping类型抛出异常,导致数据插入失败,题外话就是一定要进行数据的校验和帅选。言归正传,这些没有成功的数据你如何处理这要看你的处理机制——直接舍弃,还是记在某些地方。千万不要像《 使用Hbase协作器(Coprocessor)同步数据到ElasticSearch 》中那样:
if (!bulkItemResponse.hasFailures()) {
bulkRequestBuilder = ESClient.client.prepareBulk();
}
这样你会死的很惨。
esclient不能保证一直连接不失败吧,所以要有重连机制,这对单台的ES服务器至关重要。上述代码列出了两种esclient连接断掉的原因:1. transport client is closed 2. None of the configured nodes are available。为了不丢失上次请求失败的数据,我们要把这些数据加入到新建的esclient中的bulkRequestBuilder,重新发送,逻辑如下:
LOG.error("Reconnect the ES server...");
List<DocWriteRequest> tempRequests = bulkRequestBuilder.request().requests();
esClient.getClient().close();
esClient.repeatInitEsClient();
bulkRequestBuilder = esClient.getClient().prepareBulk();
bulkRequestBuilder.request().add(tempRequests);
上述都是必须要做的,你也可以扩展,比如设计重连机制。
第二篇就讲到这吧,下一篇写一些如何把Coprocessor的侵入性给Hbase带来的宕机风险降到最低,如何高效的实现同步,以及心得。

面向高稳定,高性能之-Hbase数据实时同步到ElasticSearch(之二)相关推荐

  1. mysql 同步到es_mysql数据实时同步到Elasticsearch

    业务需要把mysql的数据实时同步到ES,实现低延迟的检索到ES中的数据或者进行其它数据分析处理.本文给出以同步mysql binlog的方式实时同步数据到ES的思路, 实践并验证该方式的可行性,以供 ...

  2. mysql 同步 es_mysql数据实时同步到Elasticsearch

    业务需要把mysql的数据实时同步到ES,实现低延迟的检索到ES中的数据或者进行其它数据分析处理.本文给出以同步mysql binlog的方式实时同步数据到ES的思路, 实践并验证该方式的可行性,以供 ...

  3. Centos rsync + notify 实现数据实时同步

    目录 rsync + notify 实现数据实时同步 一.简介 二.原理 三.实验环境 四.实验预期 五.准备工作 六.服务配置 目标服务器配置 源服务器配置 七.实现inodify实时触发rsync ...

  4. Oracle 数据怎么实时同步到 Elasticsearch | 亲测干货建议收藏

    摘要: 很多 DBA 同学经常会遇到要从一个数据库实时同步到另一个数据库的问题,同构数据还相对容易,遇上异构数据.表多.数据量大等情况就难以同步.我自己亲测了一种方式,很快实现了Oracle 数据实时 ...

  5. 数据实时同步或抽取上收的技术分析(转)

    1 实现数据集中的技术手段分析比较 根据业界提供数据同步或抽取的解决方案来看,主要包括以下几大类: l 存储复制技术 l 数据库复制技术 l ETL抽取技术 1.1 存储复制技术 实现原理 存储复制技 ...

  6. DataPipeline与TiDB推出异构数据实时同步解决方案,共筑安全可信基础设施

    近日,DataPipeline数见科技与PingCAP正式宣布,经过联合测评双方已完成DataPipeline企业级实时数据融合平台与TiDB分布式数据库企业版联合解决方案的兼容认证,旨在为全球客户在 ...

  7. Flink-----Flink CDC 实现数据实时同步

    Flink CDC 实现数据实时同步 1.什么是Flink_CDC CDC 全称是 Change Data Capture(变化数据获取) ,它是一个比较广义的概念,只要能捕获变更的数据,我们都可以称 ...

  8. 达梦数据实时同步(DMHS)原理

    1 系统介绍 达梦数据实时同步软件(DMHS)是一个支持异构环境的高性能.高可靠.高可扩展是数据库实时同步复制系统.该系统基于成熟的关系数据库模型和标准接口,跨越多种软硬件平台实现秒级数据实时同步,可 ...

  9. 【达梦数据库】数据实时同步软件 + 数据对比工具

    文章目录 前言 一.数据实时同步软件 1.1 简单介绍 1.2 模块说明 二.数据对比工具 2.1 简单介绍 2.2 架构说明 三.DMETL vs DMHS 总结 前言 达梦数据实时同步软件(DMH ...

最新文章

  1. 视频中的目标检测与跟踪综述
  2. MFC工作笔记0006---#pragma warning(disable:4996)是什么意思
  3. 柬埔寨程序员的计算机梦想
  4. SSL 1887——潜伏者
  5. 在线class文件反编译java
  6. PostgreSQL客户端验证
  7. 你知道Thread线程是如何运作的吗?
  8. 安装neo4j过程中存在的问题
  9. 电视影评-《战狼2》观后感
  10. 矩阵中行向量两两之间的欧氏距离
  11. 小班计算机游戏教案,小班游戏教案10篇
  12. 高性价比成磷酸铁锂杀手锏
  13. 小学计算机课标教学大纲的依据,中小学教学大纲为何改成课程标准?
  14. 计算机公共基础知识实验报告,MIPS单周期CPU实验报告总结.doc
  15. 中国氧苯甲酮行业市场供需与战略研究报告
  16. 数据与分析发展趋势怎么样?值得入行么
  17. 矩形的对角线经过的小方格数量
  18. 我这样回答了Spring 5的新特性,面试官对我刮目相看 | 文末送书
  19. 国际期货黄金手续费怎么算?
  20. 微信和QQ的定位问题

热门文章

  1. 关于对信号归一化后的频谱“消失”问题
  2. 深信服防火墙设备故障机的更换方法
  3. 计算机所有接口都没反应,如何解决Win7系统USB接口没反应的问题
  4. js 生成UUID的几种方法
  5. 微信小程序--微信支付流程
  6. hdu4416——后缀自动机
  7. Linux系统信号定义
  8. Android支持蓝牙midi键盘,CME推出无线MIDI蓝牙适配器WIDI Master,可以进行傻瓜式自动配对...
  9. Nordic Thingy:52 SDK 安装及编译
  10. jQuery事件 笔记