Hbase 协处理器之将数据保存到es (二级索引)
利用Hbase Coprocessor 实现将插入hbase中的数据保存至ElasticSearch中,实现二级索引目的
版本:
Hbase: 2.1
ES:6.3.0
一、Coprocessor代码开发
协处理器类
package wiki.hadoop.coprocessor;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.log4j.Logger;import wiki.hadoop.es.ESClient;
import wiki.hadoop.es.ElasticSearchBulkOperator;import java.io.IOException;
import java.util.*;public class HbaseDataSyncEsObserver implements RegionObserver , RegionCoprocessor {private static final Logger LOG = Logger.getLogger(HbaseDataSyncEsObserver.class);private String index = "user_test";private String type = "user_test_type";public Optional<RegionObserver> getRegionObserver() {return Optional.of(this);}@Overridepublic void start(CoprocessorEnvironment env) throws IOException {// init ES clientESClient.initEsClient();LOG.info("****init start*****");}@Overridepublic void stop(CoprocessorEnvironment env) throws IOException {ESClient.closeEsClient();// shutdown time taskElasticSearchBulkOperator.shutdownScheduEx();LOG.info("****end*****");}@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {String indexId = new String(put.getRow());try {NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();Map<String, Object> infoJson = new HashMap<>();Map<String, Object> json = new HashMap<>();for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {for (Cell cell : entry.getValue()) {String key = Bytes.toString(CellUtil.cloneQualifier(cell));String value = Bytes.toString(CellUtil.cloneValue(cell));json.put(key, value);}}// set hbase family to esinfoJson.put("info", json);LOG.info(json.toString());ElasticSearchBulkOperator.addUpdateBuilderToBulk(ESClient.client.prepareUpdate(index,type, indexId).setDocAsUpsert(true).setDoc(json));LOG.info("**** postPut success*****");} catch (Exception ex) {LOG.error("observer put a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());}}@Overridepublic void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {String indexId = new String(delete.getRow());try {ElasticSearchBulkOperator.addDeleteBuilderToBulk(ESClient.client.prepareDelete(index,type, indexId));LOG.info("**** postDelete success*****");} catch (Exception ex) {LOG.error(ex);LOG.error("observer delete a doc, index [ " + "user_test" + " ]" + "indexId [" + indexId + "] error : " + ex.getMessage());}}
}
es工具类
package wiki.hadoop.es;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequestBuilder;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;public class ElasticSearchBulkOperator {private static final Log LOG = LogFactory.getLog(ElasticSearchBulkOperator.class);private static final int MAX_BULK_COUNT = 10000;private static BulkRequestBuilder bulkRequestBuilder = null;private static final Lock commitLock = new ReentrantLock();private static ScheduledExecutorService scheduledExecutorService = null;static {// 初始化 bulkRequestBuilderbulkRequestBuilder = ESClient.client.prepareBulk();bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);// 初始化线程池大小为1scheduledExecutorService = Executors.newScheduledThreadPool(1);//创建一个Runnable对象,提交待写入的数据,并使用commitLock锁保证线程安全final Runnable beeper = () -> {commitLock.lock();try {LOG.info("Before submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());//提交数据至esbulkRequest(0);LOG.info("After submission bulkRequest size : " +bulkRequestBuilder.numberOfActions());} catch (Exception ex) {System.out.println(ex.getMessage());} finally {commitLock.unlock();}};//初始化延迟10s执行 runnable方法,后期每隔30s执行一次scheduledExecutorService.scheduleAtFixedRate(beeper, 10, 30, TimeUnit.SECONDS);}public static void shutdownScheduEx() {if (null != scheduledExecutorService && !scheduledExecutorService.isShutdown()) {scheduledExecutorService.shutdown();}}private static void bulkRequest(int threshold) {if (bulkRequestBuilder.numberOfActions() > threshold) {BulkResponse bulkItemResponse = bulkRequestBuilder.execute().actionGet();if (!bulkItemResponse.hasFailures()) {bulkRequestBuilder = ESClient.client.prepareBulk();}}}/*** add update builder to bulk* use commitLock to protected bulk as thread-save* @param builder*/public static void addUpdateBuilderToBulk(UpdateRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {LOG.error(" update Bulk " + "gejx_test" + " index error : " + ex.getMessage());} finally {commitLock.unlock();}}/*** add delete builder to bulk* use commitLock to protected bulk as thread-save** @param builder*/public static void addDeleteBuilderToBulk(DeleteRequestBuilder builder) {commitLock.lock();try {bulkRequestBuilder.add(builder);bulkRequest(MAX_BULK_COUNT);} catch (Exception ex) {LOG.error(" delete Bulk " + "gejx_test" + " index error : " + ex.getMessage());} finally {commitLock.unlock();}}
}
package wiki.hadoop.es;import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.transport.client.PreBuiltTransportClient;import java.net.InetAddress;
import java.net.UnknownHostException;/*** ES Cleint class*/
public class ESClient {public static Client client;private static final Log log = LogFactory.getLog(ESClient.class);/*** init ES client*/public static void initEsClient() throws UnknownHostException {log.info("初始化es连接开始");System.setProperty("es.set.netty.runtime.available.processors", "false");Settings esSettings = Settings.builder().put("cluster.name", "log_cluster")//设置ES实例的名称.put("client.transport.sniff", true).build();client = new PreBuiltTransportClient(esSettings).addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300)).addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300)).addTransportAddress(new TransportAddress(InetAddress.getByName("host3"), 9300));log.info("初始化es连接完成");}/*** Close ES client*/public static void closeEsClient() {client.close();log.info("es连接关闭");}
}
二、ES创建索引
简单创建个测试的
PUT user_test
{"settings": {"number_of_replicas": 1, "number_of_shards": 5}
}PUT user_test/_mapping/user_test_type
{"user_test_type":{"properties":{"name":{"type":"text"},"city":{"type":"text"},"province":{"type":"text"},"followers_count":{"type":"long"},"friends_count":{"type":"long"},"statuses_count":{"type":"long"}}}
}
三、协处理器安装
将jar包传至HDFS中,执行命令进行协处理器安装
disable 'es_test'
alter 'es_test' , METHOD =>'table_att','coprocessor'=>'/es-coprocessor-0.0.4-jar-with-dependencies.jar|wiki.hadoop.coprocessor.HbaseDataSyncEsObserver|1001'
enable 'es_test'
desc 'es_test'
四、添加数据进行测试
在kibana中进行查询es数据发现成功插入,在regionServer中查询日志,也发现了我们代码中打印的日志。
五、可能遇到的问题
5.1 提示netty jar包冲突
2020-08-12 09:24:55,842 INFO wiki.hadoop.es.ESClient: 初始化es连接开始
2020-08-12 09:24:56,145 INFO org.apache.hadoop.metrics2.impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2020-08-12 09:24:56,146 INFO org.apache.hadoop.metrics2.impl.MetricsSystemImpl: HBase metrics system started
2020-08-12 09:24:56,260 ERROR org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost: Failed to load coprocessor wiki.hadoop.coprocessor.HbaseDataSyncEsObserver
java.lang.NoSuchMethodError: io.netty.util.AttributeKey.newInstance(Ljava/lang/String;)Lio/netty/util/AttributeKey;at org.elasticsearch.transport.netty4.Netty4Transport.<clinit>(Netty4Transport.java:232)at org.elasticsearch.transport.Netty4Plugin.getSettings(Netty4Plugin.java:56)at org.elasticsearch.plugins.PluginsService.lambda$getPluginSettings$0(PluginsService.java:89)at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)at org.elasticsearch.plugins.PluginsService.getPluginSettings(PluginsService.java:89)at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:144)at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:280)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)at wiki.hadoop.es.ESClient.initEsClient(ESClient.java:35)at wiki.hadoop.coprocessor.HbaseDataSyncEsObserver.start(HbaseDataSyncEsObserver.java:85)at org.apache.hadoop.hbase.coprocessor.BaseEnvironment.startup(BaseEnvironment.java:72)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.checkAndLoadInstance(CoprocessorHost.java:263)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:226)at org.apache.hadoop.hbase.coprocessor.CoprocessorHost.load(CoprocessorHost.java:185)at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.loadTableCoprocessors(RegionCoprocessorHost.java:378)at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.<init>(RegionCoprocessorHost.java:274)at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:800)at org.apache.hadoop.hbase.regionserver.HRegion.<init>(HRegion.java:702)at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)at java.lang.reflect.Constructor.newInstance(Constructor.java:423)at org.apache.hadoop.hbase.regionserver.HRegion.newHRegion(HRegion.java:6785)at org.apache.hadoop.hbase.regionserver.HRegion.openHRegion(HRegion.java:6985)
查看后发现hbase-server中包含netty包,我们直接把他过滤掉,他和es的netty冲突了
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><exclusions><exclusion><groupId>io.netty</groupId><artifactId>netty-all</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><exclusions><exclusion><groupId>org.apache.htrace</groupId><artifactId>htrace-core</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId></dependency><dependency><groupId>org.elasticsearch.client</groupId><artifactId>transport</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId></dependency>
重新打包安装,查看hbase regionserver 中发现运行正常。去es中查询也运行正常
2020-08-12 16:08:28,215 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=跟我一起学知识}
2020-08-12 16:08:28,216 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: { name=教你做点心}
2020-08-12 16:08:33,222 INFO wiki.hadoop.coprocessor.WechatUserSyncEsObserver: **** postPut success*****
具体代码
https://github.com/zhangshenghang/hbase-coprocessors
参考:https://blog.csdn.net/weixin_42348946/article/details/97941152
Hbase 协处理器之将数据保存到es (二级索引)相关推荐
- 把数据保存到cook_将用户信息保存到Cookie中
/** * 把用户保存到Cookie * * @param request * @param response * @param member */ private void rememberPwdA ...
- Unity Easy Save数据保存插件将数据保存到Android手机根目录
Unity Easy Save数据保存插件将数据保存到Android手机根目录 原由 导入插件 设置保存路径 测试脚本 界面 测试验证 问题 原由 开发时碰到一个需求,两个手机APP,一个保存数据,一 ...
- 谷粒商城项目8——商品上架 上架商品sku保存到es nginx配置
文章目录 一.商城业务 1.商品上架 1.1 ES 的存储结构分析 1.2 PUT product 1.3 一些细节 2.商品上架-构造基本数据 3.商品上架-业务代码: 4.商品上架-search模 ...
- Pandas的学习(读取mongodb数据库集合到DataFrame,将DataFrame类型数据保存到mongodb数据库中)
1.读取mongodb数据库集合到DataFrame import pymongo import pandas as pdclient = pymongo.MongoClient("数据库连 ...
- html 保存xlsx,HTML SaveXLSX按钮防止将数据保存到SlickGrid的XLSX文件中
我在网页上有一个SlickGrid,我正在尝试添加一个按钮来调用函数CreateXLSX().当我编辑Main.jade的代码,我输入:HTML SaveXLSX按钮防止将数据保存到SlickGrid ...
- C++读取txt数据为二维数组 将数据保存到txt文本中
C++读取txt数据为二维数组 保存txt文本数据 C++文件读写操作有:ofstream,ifstream,fstream: #include <fstream> ofstr ...
- vue 将数据保存到vuex中
在项目中遇到这样一个问题,就是在登入的时候同时需要从后台获取到左边的导航,但是如果使用h5的localStorage来保存导航信息,会出现app加载进去之后localStorage才保存进浏览器,在m ...
- Python中用pandas将numpy中的数组数据保存到csv文件
Python中用pandas将numpy中的数组数据保存到csv文件 本博客转载自:[1]https://blog.csdn.net/grey_csdn/article/details/7018587 ...
- 将labview连续数据保存到mysql数据库器
这一篇是在之前完成Labview和mysql连接,并且进行了简单的CRUD删除的基础上来的.我们一般不会拿Labview来做学生这种数据管理系统,而是对于基本传感器数据的采集和保存,而传感器采集数据会 ...
最新文章
- s-sed替换或者修改文件指定行,同时匹配多个字符串,替换换行符为指定字符
- 查询数据库所有表、字段、触发器等
- Jquery通过Ajax方式来提交Form表单
- [原]Python命令
- Handler实现与机制 Blocking Queue IdleHandler使用
- kafka之Producer同步与异步消息发送及事务幂等性案例应用实战
- Spring Cloud构建微服务架构(四)分布式配置中心
- 日志中台不重不丢实现浅谈
- 论基于candence的组装清单做法
- 案例:实现第一个Filter程序
- python自动接收邮件_Python自动发送和收取邮件的方法
- STM8 ADC转换模式-------单次模式
- python内存泄漏解决方案_Python内存泄漏和内存溢出的解决方案
- 修改注册表设置桌面和收藏夹路径
- 在linux下搭建tftp服务器,嵌入式linux常用服务配置之tftp服务器配置
- kindeditor使用方法
- ecap捕捉epwm波形的占空比及频率(总结)
- bug bounty - 绕过限制劫持Skype账号
- 【drawio笔记】向ERD表,列表和UML类添加行
- easyconnect xp登录_登录说明
热门文章
- 归并排序 java_马士兵说之归并排序
- 倒计时小工具_这款高颜值的 APP 可以让小仙女/男神们的日子过得更精致
- Android studio中获取按钮组的信息如何循环遍历
- 解决Entry fileTemplates//Singleton.java.ft not found in C:/Dev/android-studio/lib/resources_en.jar
- JAVAWEB入门之Servlet_体系结构
- android中的后退功能,如何在Android应用中实现一个返回键功能
- mysql 触发器不能同时 insert or update or delete_MySQL6:触发器
- 深入理解Java中的位操作
- java putifabsent_java8中Map的一些骚操作总结
- html5画布太极图,canvas实现太极图