利用Hbase Coprocessor 实现将插入hbase中的数据保存至ElasticSearch中,实现二级索引目的

版本:

Hbase: 2.1

ES:6.3.0

一、Coprocessor代码开发

协处理器类

package wiki.hadoop.coprocessor;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.log4j.Logger;import wiki.hadoop.es.ESClient;
import wiki.hadoop.es.ElasticSearchBulkOperator;import java.io.IOException;
import java.util.*;public class HbaseDataSyncEsObserver implements RegionObserver , RegionCoprocessor {private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class);private String index = "user_test";private String type = "user_test_type";public Optional<RegionObserver> getRegionObserver() {return Optional.of(this);}@Overridepublic void start(CoprocessorEnvironment env) throws IOException {// init ES clientESClient.initEsClient();LOG.info("****init start*****");}@Overridepublic void stop(CoprocessorEnvironment env) throws IOException {ESClient.closeEsClient();// shutdown time taskElasticSearchBulkOperator.shutdownScheduEx();LOG.info("****end*****");}@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {String indexId = new String(put.getRow());try {NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();Map<String, Object> infoJson = new HashMap<>();Map<String, Object> json = new HashMap<>();for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {for (Cell cell : entry.getValue()) {String key = Bytes.toString(CellUtil.cloneQualifier(cell));String value = Bytes.toString(CellUtil.cloneValue(cell));json.put(key, value);}}// set hbase family to esinfoJson.put("info", json);LOG.info(json.toString());ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(index,type, indexId).setDocAsUpsert(true).setDoc(json));LOG.info("**** postPut success*****");} catch (Exception ex) {LOG.error("observer put  a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());}}@Overridepublic void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {String indexId = new String(delete.getRow());try {ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(index,type, indexId));LOG.info("**** postDelete success*****");} catch (Exception ex) {LOG.error(ex);LOG.error("observer delete  a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());}}
}

es工具类

package wiki.hadoop.es;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ElasticSearchBulkOperator {private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);private static final int MAX_BULK_COUNT = 10000;private static BulkRequestBuilder bulkRequestBuilder = null;private static final Lock commitLock = new ReentrantLock();private static ScheduledExecutorService scheduledExecutorService = null;static {// 初始化  bulkRequestBuilderbulkRequestBuilder = ESClient.client.prepareBulk();bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);// 初始化线程池大小为1scheduledExecutorService = Executors.newScheduledThreadPool(1);//创建一个Runnable对象,提交待写入的数据,并使用commitLock锁保证线程安全final Runnable beeper = () -> {commitLock.lock();try {LOG.info("Before submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());//提交数据至esbulkRequest(0);LOG.info("After submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());} catch (Exception ex) {System.out.println(ex.getMessage());} finally {commitLock.unlock();}};//初始化延迟10s执行 runnable方法,后期每隔30s执行一次scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);}public static void shutdownScheduEx() {if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {scheduledExecutorService.shutdown();}}private static void bulkRequest(int threshold) {if (bulkRequestBuilder.numberOfActions() > threshold) {BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();if (!bulkItemResponse.hasFailures()) {bulkRequestBuilder = ESClient.client.prepareBulk();}}}/*** add update builder to bulk* use commitLock to protected bulk as thread-save* @param builder*/public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {LOG.error(" update Bulk " + "gejx_test" + " index error : " + ex.getMessage());} finally {commitLock.unlock();}}/*** add delete builder to bulk* use commitLock to protected bulk as thread-save** @param builder*/public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {LOG.error(" delete Bulk " + "gejx_test" + " index error : " + ex.getMessage());} finally {commitLock.unlock();}}
}
package wiki.hadoop.es;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;/*** ES Cleint class*/
public class ESClient {public static Client client;private static final Log log = LogFactory.getLog(ESClient.class);/*** init ES client*/public static void initEsClient() throws UnknownHostException {log.info("初始化es连接开始");System.setProperty("es.set.netty.runtime.available.processors", "false");Settings esSettings = Settings.builder().put("cluster.name", "log_cluster")//设置ES实例的名称.put("client.transport.sniff", true).build();client = new  PreBuiltTransportClient(esSettings).addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300)).addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300)).addTransportAddress(new TransportAddress(InetAddress.getByName("host3"), 9300));log.info("初始化es连接完成");}/*** Close ES client*/public static void closeEsClient() {client.close();log.info("es连接关闭");}
}

二、ES创建索引

简单创建个测试的

PUT user_test
{"settings": {"number_of_replicas": 1, "number_of_shards": 5}
}PUT user_test/_mapping/user_test_type
{"user_test_type":{"properties":{"name":{"type":"text"},"city":{"type":"text"},"province":{"type":"text"},"followers_count":{"type":"long"},"friends_count":{"type":"long"},"statuses_count":{"type":"long"}}}
}

三、协处理器安装

将jar包传至HDFS中,执行命令进行协处理器安装

disable 'es_test'
alter 'es_test' , METHOD =>'table_att','coprocessor'=>'/es-coprocessor-0.0.4-jar-with-dependencies.jar|wiki.hadoop.coprocessor.HbaseDataSyncEsObserver|1001'
enable 'es_test'
desc 'es_test'

四、添加数据进行测试

在kibana中进行查询es数据发现成功插入,在regionServer中查询日志,也发现了我们代码中打印的日志。

五、可能遇到的问题

5.1 提示netty jar包冲突

2020-08-12 09:24:55,842 INFO wiki.hadoop.es.ESClient: 初始化es连接开始
2020-08-12 09:24:56,145 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: HBase metrics system started
2020-08-12 09:24:56,260 ERROR org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost: Failed to load coprocessor wiki.hadoop.coprocessor.HbaseDataSyncEsObserver
java.lang.NoSuchMethodError: io.netty.util.AttributeKey.newInstance(Ljava/lang/String;)Lio/netty/util/AttributeKey;at org.elasticsearch.transport.netty4.Netty4Transport.<clinit>(Netty4Transport.java:232)at org.elasticsearch.transport.Netty4Plugin.getSettings(Netty4Plugin.java:56)at org.elasticsearch.plugins.PluginsService.lambda$getPluginSettings$0(PluginsService.java:89)at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)at org.elasticsearch.plugins.PluginsService.getPluginSettings(PluginsService.java:89)at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:144)at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:280)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)at wiki.hadoop.es.ESClient.initEsClient(ESClient.java:35)at wiki.hadoop.coprocessor.HbaseDataSyncEsObserver.start(HbaseDataSyncEsObserver.java:85)at org.apache.hadoop.hbase.coprocessor.BaseEnvironment.startup(BaseEnvironment.java:72)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.checkAndLoadInstance(CoprocessorHost.java:263)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:226)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:185)at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.loadTableCoprocessors(RegionCoprocessorHost.java:378)at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.<init>(RegionCoprocessorHost.java:274)at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:800)at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:702)at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at org.apache.hadoop.hbase.regionserver.HRegion.newHRegion(HRegion.java:6785)at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:6985)

查看后发现hbase-server中包含netty包,我们直接把他过滤掉,他和es的netty冲突了

<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><exclusions><exclusion><groupId>io.netty</groupId><artifactId>netty-all</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><exclusions><exclusion><groupId>org.apache.htrace</groupId><artifactId>htrace-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency>

重新打包安装,查看hbase regionserver 中发现运行正常。去es中查询也运行正常

2020-08-12 16:08:28,215 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=跟我一起学知识}
2020-08-12 16:08:28,216 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=教你做点心}
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****

具体代码

https://github.com/zhangshenghang/hbase-coprocessors

参考:https://blog.csdn.net/weixin_42348946/article/details/97941152

Hbase 协处理器之将数据保存到es (二级索引)相关推荐

  1. 把数据保存到cook_将用户信息保存到Cookie中

    /** * 把用户保存到Cookie * * @param request * @param response * @param member */ private void rememberPwdA ...

  2. Unity Easy Save数据保存插件将数据保存到Android手机根目录

    Unity Easy Save数据保存插件将数据保存到Android手机根目录 原由 导入插件 设置保存路径 测试脚本 界面 测试验证 问题 原由 开发时碰到一个需求,两个手机APP,一个保存数据,一 ...

  3. 谷粒商城项目8——商品上架 上架商品sku保存到es nginx配置

    文章目录 一.商城业务 1.商品上架 1.1 ES 的存储结构分析 1.2 PUT product 1.3 一些细节 2.商品上架-构造基本数据 3.商品上架-业务代码: 4.商品上架-search模 ...

  4. Pandas的学习(读取mongodb数据库集合到DataFrame,将DataFrame类型数据保存到mongodb数据库中)

    1.读取mongodb数据库集合到DataFrame import pymongo import pandas as pdclient = pymongo.MongoClient("数据库连 ...

  5. html 保存xlsx,HTML SaveXLSX按钮防止将数据保存到SlickGrid的XLSX文件中

    我在网页上有一个SlickGrid,我正在尝试添加一个按钮来调用函数CreateXLSX().当我编辑Main.jade的代码,我输入:HTML SaveXLSX按钮防止将数据保存到SlickGrid ...

  6. C++读取txt数据为二维数组 将数据保存到txt文本中

      C++读取txt数据为二维数组 保存txt文本数据     C++文件读写操作有:ofstream,ifstream,fstream: #include <fstream> ofstr ...

  7. vue 将数据保存到vuex中

    在项目中遇到这样一个问题,就是在登入的时候同时需要从后台获取到左边的导航,但是如果使用h5的localStorage来保存导航信息,会出现app加载进去之后localStorage才保存进浏览器,在m ...

  8. Python中用pandas将numpy中的数组数据保存到csv文件

    Python中用pandas将numpy中的数组数据保存到csv文件 本博客转载自:[1]https://blog.csdn.net/grey_csdn/article/details/7018587 ...

  9. 将labview连续数据保存到mysql数据库器

    这一篇是在之前完成Labview和mysql连接,并且进行了简单的CRUD删除的基础上来的.我们一般不会拿Labview来做学生这种数据管理系统,而是对于基本传感器数据的采集和保存,而传感器采集数据会 ...

最新文章

  1. s-sed替换或者修改文件指定行,同时匹配多个字符串,替换换行符为指定字符
  2. 查询数据库所有表、字段、触发器等
  3. Jquery通过Ajax方式来提交Form表单
  4. [原]Python命令
  5. Handler实现与机制 Blocking Queue IdleHandler使用
  6. kafka之Producer同步与异步消息发送及事务幂等性案例应用实战
  7. Spring Cloud构建微服务架构(四)分布式配置中心
  8. 日志中台不重不丢实现浅谈
  9. 论基于candence的组装清单做法
  10. 案例:实现第一个Filter程序
  11. python自动接收邮件_Python自动发送和收取邮件的方法
  12. STM8 ADC转换模式-------单次模式
  13. python内存泄漏解决方案_Python内存泄漏和内存溢出的解决方案
  14. 修改注册表设置桌面和收藏夹路径
  15. 在linux下搭建tftp服务器,嵌入式linux常用服务配置之tftp服务器配置
  16. kindeditor使用方法
  17. ecap捕捉epwm波形的占空比及频率(总结)
  18. bug bounty - 绕过限制劫持Skype账号
  19. 【drawio笔记】向ERD表,列表和UML类添加行
  20. easyconnect xp登录_登录说明

热门文章

  1. 归并排序 java_马士兵说之归并排序
  2. 倒计时小工具_这款高颜值的 APP 可以让小仙女/男神们的日子过得更精致
  3. Android studio中获取按钮组的信息如何循环遍历
  4. 解决Entry fileTemplates//Singleton.java.ft not found in C:/Dev/android-studio/lib/resources_en.jar
  5. JAVAWEB入门之Servlet_体系结构
  6. android中的后退功能,如何在Android应用中实现一个返回键功能
  7. mysql 触发器不能同时 insert or update or delete_MySQL6:触发器
  8. 深入理解Java中的位操作
  9. java putifabsent_java8中Map的一些骚操作总结
  10. html5画布太极图,canvas实现太极图