本博客记录初次使用hbase coprocessor的过程。协处理器分两种类型,系统协处理器可以全局导入region server上的所有数据表,表协处理器即是用户可以指定一张表使用协处理器。协处理器框架为了更好支持其行为的灵活性,提供了两个不同方面的插件。一个是观察者(observer),类似于关系数据库的触发器。另一个是终端(endpoint),动态的终端有点像存储过程。
本次主要使用EndPoint完成计数和求和的功能。终端是动态RPC插件的接口,它的实现代码被安装在服务器端,从而能够通过HBase RPC唤醒。客户端类库提供了非常方便的方法来调用这些动态接口,它们可以在任意时候调用一个终端,它们的实现代码会被目标region远程执行,结果会返回到终端。用户可以结合使用这些强大的插件接口,为HBase添加全新的特性。

准备工作

1、开发endpoint需要用到google protobuf,protobuf用于生成RPC框架代码,protpbuf版本需要和hbase对应,版本跨度太大可能导致未知问题,我开始就是踩了这个坑,具体版本可查看hbase安装目录下的lib中protobuf-java-[version].jar。本次使用的hbase版本是1.2.6,对应protobuf是2.5.0,从网上下载protoc-2.5.0-win32.zip,解压后可得到protoc.exe,将protoc.exe配置到环境变量中备用,protobuf的详细使用方法可参考网上其他教程。
protobuf下载链接

2、创建一个maven工程,pom.xml添加如下依赖:

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><jdk.version>1.7</jdk.version><hbase.version>1.2.5</hbase.version></properties><dependencies><dependency><groupId>com.google.protobuf</groupId><artifactId>protobuf-java</artifactId><version>2.5.0</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>${hbase.version}</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>${hbase.version}</version></dependency></dependencies>

创建一个Endpoint的基本流程可以归纳为:
(1)创建一个通信协议:准备一个proto文件,然后使用protoc工具来生成协议类文件。这个文件需要在服务端及客户端存 在。
(2)创建一个Service类,实现具体的业务逻辑
(3)创建表时指定使用这个EndPoint,或者是全局配置。
(4)创建一个Client类,调用这个RPC方法。

(一)创建测试表

HBase表中有一个family命名为0, 一个column命名为c,rowkey为某个id,使用hbase shell创建表并添加测试数据。

create 'test','0'
put 'test','id1','0:c',100
put 'test','id2','0:c',200
put 'test','id3','0:c',300
put 'test','id4','0:c',400
put 'test','id5','0:c',500

接下来我们需要实现数据计数并计算“0:c”列的和。

(二)准备proto文件

新建文件,命名为count_sum.proto,添加如下内容:

syntax = "proto2";
option java_package = "com.hny.hbase.coprocessor";
option java_outer_classname = "CountAndSumProtocol";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;message CountAndSumRequest {required string family = 1;required string column = 2;
}message CountAndSumResponse {required int64 count = 1 [default = 0];required double sum = 2 [default = 0];
}service RowCountAndSumService {rpc getCountAndSum(CountAndSumRequest)returns (CountAndSumResponse);
}

(二)使用protoc生成类文件

windows下使用cmd进入上一步创建的proto文件的目录下,执行如下命令(由于已经将protoc.exe加入了环境变量,所以可以直接执行,如果提示protoc命令不存在可将protoc.exe复制到当前目录下也可以)

protoc --java_out=./ count_sum.proto

命令执行完成后会在当前目录下生产一个名称为CountAndSumProtocol的类,将这个类复制到IDE中,这个类文件有几个地方需要注意:
1、生成了一个CountAndSumRequest 内部类,表示请求信息
2、生成了一个CountAndSumResponse 内部类,表示返回信息
3、生成了一个 RowCountAndSumService 内部类,表示所提供的服务,这个类还有一个内部接口,这个接口定义了 getCountAndSum()这个方法。
我们下面需要做的就是实现这个接口的这个方法,提供真正的服务。

(三)实现真实的服务

在CountAndSumProtocol同目录下创建类CountAndSum,继承CountAndSumProtocol,同时需要实现Coprocessor和CoprocessorService2个接口:

package com.hny.hbase.coprocessor;import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.Coprocessor;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;public class CountAndSum extends CountAndSumProtocol.RowCountAndSumService implements Coprocessor, CoprocessorService {private RegionCoprocessorEnvironment env;@Overridepublic void getCountAndSum(RpcController controller, CountAndSumProtocol.CountAndSumRequest request, RpcCallback<CountAndSumProtocol.CountAndSumResponse> done) {String family = request.getFamily();if (null == family || "".equals(family)) {throw new NullPointerException("you need specify the family");}String column = request.getColumn();if (null == column || "".equals(column)) {throw new NullPointerException("you need specify the column");}Scan scan = new Scan();scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));CountAndSumProtocol.CountAndSumResponse response = null;InternalScanner scanner = null;try {// 计数long count = 0;// 求和double sum = 0;scanner = env.getRegion().getScanner(scan);List<Cell> results = new ArrayList<>();boolean hasMore;// 切记不要用while(){}的方式,这种方式会丢失最后一条数据do {hasMore = scanner.next(results);if (results.isEmpty()) {continue;}Cell kv = results.get(0);double value = 0;try {value = Double.parseDouble(Bytes.toString(CellUtil.cloneValue(kv)));} catch (Exception e) {}count++;sum += value;results.clear();} while (hasMore);// 生成responseresponse = CountAndSumProtocol.CountAndSumResponse.newBuilder().setCount(count).setSum(sum).build();} catch (IOException e) {e.printStackTrace();ResponseConverter.setControllerException(controller, e);} finally {if (scanner != null) {try {scanner.close();} catch (IOException ignored) {}}}done.run(response);}@Overridepublic void start(CoprocessorEnvironment env) throws IOException {if (env instanceof RegionCoprocessorEnvironment) {this.env = (RegionCoprocessorEnvironment) env;} else {throw new CoprocessorException("Must be loaded on a table region!");}}@Overridepublic void stop(CoprocessorEnvironment env) throws IOException {// do nothing}@Overridepublic Service getService() {return this;}
}

它需要实现以下4个方法,下面我们逐一讨论一下:
getService():这个方法直接返回自身即可。
start(CoprocessorEnvironment env):这个方法会在coprocessor启动时调用,这里判断了是否在一个region内被使用,而不是master,WAL等环境下被调用。
stop(CoprocessorEnvironment env):这个方法会在coprocessor完成时被调用,可用于关闭资源等,这里为空。
getCountAndSum(…):这是整个类的核心方法,用于实现真正的业务逻辑。关键的步骤有:
(1)根据request创建一个Scanner,然后使用它创建一个 InternalScanner,可以更高效的进行scan
(2)对扫描出来的行进行分析处理,将结果保存在几个变量中。
(3)调用response的各个set()方法,设置返回的结果。
(4)使用 done.run(response); 返回结果到客户端。

(四)部署coprocessor

将上述2个类进行打包,打包时不用包含protobuf和hbase相关的依赖。本示例暂时使用静态部署的方式,将jar复制到每个regionserver节点的hbase/lib目录下,然后修改hbase-site.xml,添加如下属性:

<property><name>hbase.coprocessor.region.classes</name><value>com.hny.hbase.coprocessor.CountAndSum</value>
</property>

重启hbase。
建议在hbase-site.xml中再加入以下配置,防止协处理器出现错误时导致regionServer挂掉。

<property><name>hbase.coprocessor.abortonerror</name><value>false</value>
</property>

(五)编写调用端

客户端的作用是将各个region的结果再次进行合并,客户端需要依赖CountAndSumProtocol类,代码如下:

package com.hny.hbase.coprocessor.client;import com.hny.hbase.coprocessor.CountAndSumProtocol;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;
import java.util.Map;public class CountAndSumClient {public static class CountAndSumResult {public long count;public double sum;}private Connection connection;public CountAndSumClient(Connection connection) {this.connection = connection;}public CountAndSumResult call(String tableName, String family, String column, StringstartRow, String endRow) throws Throwable {Table table = connection.getTable(TableName.valueOf(Bytes.toBytes(tableName)));final CountAndSumProtocol.CountAndSumRequest request = CountAndSumProtocol.CountAndSumRequest.newBuilder().setFamily(family).setColumn(column).build();byte[] startKey = (null != startRow) ? Bytes.toBytes(startRow) : null;byte[] endKey = (null != endRow) ? Bytes.toBytes(endRow) : null;// coprocessorService方法的第二、三个参数是定位region的,是不是范围查询,在startKey和endKey之间的region上的数据都会参与计算Map<byte[], CountAndSumResult> map = table.coprocessorService(CountAndSumProtocol.RowCountAndSumService.class,startKey, endKey, new Batch.Call<CountAndSumProtocol.RowCountAndSumService,CountAndSumResult>() {@Overridepublic CountAndSumResult call(CountAndSumProtocol.RowCountAndSumService service) throws IOException {BlockingRpcCallback<CountAndSumProtocol.CountAndSumResponse> rpcCallback = new BlockingRpcCallback<>();service.getCountAndSum(null, request, rpcCallback);CountAndSumProtocol.CountAndSumResponse response = rpcCallback.get();//直接返回response也行。CountAndSumResult responseInfo = new CountAndSumResult();responseInfo.count = response.getCount();responseInfo.sum = response.getSum();return responseInfo;}});CountAndSumResult result = new CountAndSumResult();for (CountAndSumResult ri : map.values()) {result.count += ri.count;result.sum += ri.sum;}return result;}
}

测试代码:

public class Test{public static void main(String[] args) throws Throwable {// 使用该方式需要将hbase-site.xml复制到resources目录下Configuration conf = HBaseConfiguration.create();// hbase-site.xml不在resources目录下时使用如下方式指定// conf.addResource(new Path("/home/hadoop/conf/hbase", "hbase-site.xml"));Connection connection = ConnectionFactory.createConnection(conf);String tableName = "test";CountAndSumClient client = new CountAndSumClient(connection);CountAndSumResult result = client.call(tableName, "0", "c", null, null);System.out.println("count: " + result.count + ", sum: " + result.sum);}
} 

运行测试代码输出如下:

count: 5, sum: 1500.0

注意:部署到集群的jar包包括Service类和protocol类,而运行任务的jar包包括client类与protocol类。

参考文章:https://blog.csdn.net/jediael_lu/article/details/76577072

Hbase Coprocessor(协处理器)的使用相关推荐

  1. Hbase Coprocessor 协处理器 与 JavaAPI

    协处理器概念 一.协处理器有两种: observer 和 endpoint 1.observer协处理器 Observer 类似于传统数据库中的触发器,当发生某些事件的时候这类协处理器会被 Serve ...

  2. 使用HBase Coprocessor协处理器

    原文: http://www.zhyea.com/2017/04/13/using-hbase-coprocessor.html HBase的Coprocessor是模仿谷歌BigTable的Copr ...

  3. 2021年大数据HBase(十六):HBase的协处理器(Coprocessor)

    全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 HBase的协处理器(Coprocessor) 一.起源 二 ...

  4. HBase的协处理器(Coprocessor)、HBase如何使用二级索引、observer协处理器、 endpoint协处理器、Hbase 协处理器加载方式

    HBase的协处理器(Coprocessor).HBase不可以使用二级索引吗? 起源 Hbase 作为列族数据库最经常被人诟病的特性包括: 无法轻易建立"二级索引" 难以执 行求 ...

  5. Sqoop导入HBase,并借助Coprocessor协处理器同步索引到ES

    1.环境 Mysql 5.6 Sqoop 1.4.6 Hadoop 2.5.2 HBase 0.98 Elasticsearch 2.3.5 2.安装(略过) 3.HBase Coprocessor实 ...

  6. HBase建表高级属性,hbase应用案例看行键设计,HBase和mapreduce结合,从Hbase中读取数据、分析,写入hdfs,从hdfs中读取数据写入Hbase,协处理器和二级索引

    1. Hbase高级应用 1.1建表高级属性 下面几个shell 命令在hbase操作中可以起到很到的作用,且主要体现在建表的过程中,看下面几个create 属性 1. BLOOMFILTER 默认是 ...

  7. 技术篇-HBase Coprocessor 的实现与应用

    本次分享的内容主要分为以下五点: Coprocessor 简介 Endpoint 服务端实现 Endpoint 客户端实现 Observer 实现二级索引 Coprocessor 应用场景 1. Co ...

  8. 五十四、HBase的协处理器

    在旧版本的(<0.92)HBase中无法轻易建立"二级索引",难以执行求和.计数.排序等操作.例如统计数据表的总行数,需要使用Counter方法,执行一次MapReduce ...

  9. Hbase Coprocessors 协处理器

    Table of Contents Hbase 协处理器的概述 与其他协处理器的比较 Triggers and Stored Procedure MapReduce AOP 协处理器如何实现 Copr ...

最新文章

  1. python selenium p_Python爬虫(二十一)_Selenium与PhantomJS
  2. 站长日常工作必备记录详细单
  3. css设置元素继承父元素宽度_CSS设置HTML元素的高度与宽度的各种情况总结
  4. Android Studio 添加 Genymotion插件
  5. 收藏:asp.net
  6. 《企业的边界》的书摘
  7. python pycurl
  8. Android两个子线程之间通信
  9. 网站制作---网站伪静态的介绍
  10. java读取csv文件忽略bom头_PHP 下载文件时如何自动添加bom头及解释BOM头和去掉bom头的方法...
  11. 考勤打卡记录数据库表结构_考勤系统数据表结构
  12. 【React入门实践】结合Ant-Design从0带你手把手开发【个人中心-信息修改】页面
  13. 1分钟学会网站采集方法详解
  14. 树莓派3下开启SSH服务
  15. HDU6078 Wavel Sequence
  16. Linux的vx开头的文件,微博 Qzone 微信 Linux文件及目录常用命令,进来瞅瞅
  17. 数字认证是做什么的?数字认证有什么用?
  18. 推荐系统CTR(CVR)预估模型(多任务学习)之ESMM
  19. Android 刘海屏适配全攻略
  20. 什么是TCP/IP UDP 详解

热门文章

  1. mysql之第n高的薪水
  2. 【计算机毕业设计】java ssm网上宠物商店系统
  3. 测试用例设计方法(转)
  4. PID控制及位置式与增量式区别
  5. 粒子的散射模拟matlab程序,基于Matlab的α粒子的散射实验模拟.pdf
  6. 【笔记整理 - 操作系统】(时间较早)
  7. oracle11g memory_target,oracle11g要求在操作系统层设定共享内存/dev/shm,且大于MEMORY_TARGET...
  8. 华为手机一键修改机器码信息
  9. Alexa 一键下单不好用?蓦然认知推出语音对话购物
  10. [论文翻译]A review on image segmentation techniques