一、hbase协处理器

hbase协处理器可以分为Observer Coprocessor和Endpoint Coprocessor两种 ,即观察者协处理器和端点协处理器。协处理都是运行在hbase的regionsrver服务器上(和MR的计算向数据移动类似),客户端通过RPC调用服务器上的协处理器并接受返回结果。所有协处理都必须直接或间接实现Coprocessor接口。用户自定义实现的协处理器需要打包上传到hbase的jar包中,即habse安装目录下的lib目录。

协处理器的启用方式

协处理器可以单独作用于某个表也可以作用于所有表。

  1. 在hbase-site.xml中配置可以开启全局协处理器,作用于所有表,多个协处理器用逗号隔开。

    <property><name>hbase.coprocessor.user.region.classes</name><value>org.apache.hadoop.hbase.coprocessor.AggregateImplementation</value>
    </property>
  2. 作用于指定表的协处理器可以通过shell开启,也可以通过代码启用。
  • shell命令,添加协处理器

    (1)disable指定表:hbase> disable 'total'
    (2)添加aggregation: hbase> alter 'total', METHOD => 'table_att','coprocessor'=>'|org.apache.hadoop.hbase.coprocessor.AggregateImplementation||'
    (3)重启指定表: hbase> enable 'total'
  • 代码开启协处理器,可以再创建表时就指定协处理器,也可以表已经创建再修改表属性,添加协处理器。
    创建表时指定协处理器:

    TableDescriptor desc = TableDescriptorBuilder.newBuilder(tn).setColumnFamily(ColumnFamilyDescriptorBuilder.of("cf")).setCompactionEnabled(false)//关闭自动major compaction.setCoprocessor("org.apache.hadoop.hbase.coprocessor.AggregateImplementation").build();

修改表属性,添加协处理器:

                Admin admin = null;HTable hTable = null;TableName tn = TableName.valueOf(tableName);try {admin = buildAdmin();if (!admin.tableExists(tn)) {return false;}hTable = buildHTable(tableName);TableDescriptor oldTableDescriptor = hTable.getDescriptor();TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(oldTableDescriptor);if (isAdd) {//添加协处理器builder = builder.setCoprocessor(coprocessor);} else {//移除协处理器builder = builder.removeCoprocessor(coprocessor);}admin.disableTable(tn);admin.modifyTable(builder.build());admin.enableTable(tn);} catch (Exception e) {try {if (admin != null && admin.isTableDisabled(tn)) {admin.enableTable(tn);}} catch (IOException ioException) {ioException.printStackTrace();}e.printStackTrace();} finally {closeHTable(hTable);closeAdmin(admin);}
  • 当然,可以添加协处理器,就可以删除协处理器:
    shell删除协处理器,name为对应协处理器的id。

    alter 'users', METHOD => 'table_att_unset', NAME => 'coprocessor$3'

代码也可以删除协处理器,如上代码:builder.setCoprocessor(coprocessor);

3. table_att参数说明,如下,Coprocessor的值被“|”分隔符分为了四段信息。

hbase alter 'users', METHOD => 'table_att', 'Coprocessor'=>
'hdfs://<namenode>:<port>/user/<hadoop-user>/coprocessor.jar|org.myname.hbase.Coprocessor.RegionObserverExample|1073741823| arg1=1,arg2=2'

1、第一段信息前为jar包路径,自定义协处理器的jar可以上传到hdfs上,也可以直接放在hbase的lib目录下(分割符前为空字符)。hdfs路径只可以只指定目录,这样hbse会加载改目录下的所有jar包。

2、第二段信息是协处理器类包括包路径的完整类名。

3、第三段信息是协处理器的优先级,为一个整数,框架将使用优先级确定在同一钩子上注册的所有已配置观察者的执行顺序。此字段可以留空。在这种情况下,框架将分配一个默认的优先级值。

4、第四段信息为传给协处理器的参数。

二、Endpoint Coprocessor

端点协处理器类似于MySQL中的存储过程,先将开发的的处理逻辑代码上传到服务器,当客户端通过RPC调用时,服务器执行该代码,并将计算结果返回给客户端。

AggregateImplementation

hbase提供了一个默认实现端点协处理器AggregateImplementation,继承AggregateService和实现RegionCoprocessor。

协处理器Aggregation为了弥补Hbase作为列存储数据库,难以进行求和、计数、排序等操作,能够进行一些简单的聚合操作,Aggregation将计算放在了server端,即region上,减少了网络通讯开销。

AggregationClient封装了一系列hbase的聚合函数,提供了在服务端处理聚合数据的方法。Aggregation客户端的聚合函数一般都包括三个参数,第一个TableName,指定操作表;第二个ColumnInterpreter,表列值的解析器,hbase提供Long、Double、BigDecimal三种类型的解析器,也可以自己实现;第三个Scan,表记录的扫描器。

  1. rowCount:行计数方法

    AggregationClient aggregationClient = new AggregationClient(conf);
    Scan scan = new Scan();
    ColumnInterpreter ci = new LongColumnInterpreter();
    TableName tn = TableName.valueOf("User");
    long count = aggregationClient.rowCount(tn, ci ,scan);
    System.out.println(count);
  2. max:列的最大值,这里列值解析器用的是LongColumnInterpreter,对应列值存入的必须是Long值的字节数组。
    AggregationClient aggregationClient = new AggregationClient(conf);
    Scan scan = new Scan();
    ColumnInterpreter ci = new LongColumnInterpreter();scan.addColumn(Bytes.toBytes("int"), Bytes.toBytes("lnum"));
    //max必须指定列族,
    Object result = aggregationClient.max(tn, ci, scan);
    System.out.println(result);
  3. sum:求和
  4. avg:平均数
  5. medin:中位数
  6. std:标准差

自定义ColumnInterpreter

AggregateImplementation协处理器构造需要传入一个列值解析器,将value解析成long或int都能够才能进行计算,hbase默认提供了Long、Double、BigDecimal三种类型的列解析器。对于列值不是Long、Double、BigDecimal三种类型的,可以自己实现列解析器,比如String、Integer以及自定义的protobuf对象等。注意,虽然hbase列保存的值都是bytes,但是不同的类型转成byes的长度不一样,如下int和long保存的长度是不一样的。

 int1                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x0A                                              int1                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x0A                                                               int10                                          column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x01                                              int10                                          column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x01                                                               int2                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x09                                              int2                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x09                                                               int3                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x08                                              int3                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x08                                                               int4                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x07                                              int4                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x07                                                               int5                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x06                                              int5                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x06                                                               int6                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x05                                              int6                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x05                                                               int7                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x04                                              int7                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x04                                                               int8                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x03                                              int8                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x03                                                               int9                                           column=int:lnum, timestamp=2020-09-01T09:08:59.426Z, value=\x00\x00\x00\x00\x00\x00\x00\x02                                              int9                                           column=int:num, timestamp=2020-09-01T09:04:00.379Z, value=\x00\x00\x00\x02                 

自定义列拦截器,需要继承ColumnInterpreter类,当然,也可以进程hbase的默认是哪个类型实现,如LongColumnInterpreter,这样只要重写getValue解析字节数组方法就行,可以复用其他方法实现。

package cn.lsh.hbase.coprocessor;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;/*** 自定义AggregationClient的列解析器*/
public class MyStringIntColumnInterpreter extends ColumnInterpreter<Long, Long, HBaseProtos.EmptyMsg, HBaseProtos.LongMsg, HBaseProtos.LongMsg> {@Overridepublic Long getValue(byte[] bytes, byte[] bytes1, Cell cell) throws IOException {return Long.valueOf(Bytes.toString(CellUtil.cloneValue(cell)));}@Overridepublic Long add(Long aLong, Long s1) {//加法,进行sum运算时会调用if (aLong == null) {return s1;}if (s1 == null) {return aLong;}return aLong + s1;}@Overridepublic Long getMaxValue() {return Long.MAX_VALUE;}@Overridepublic Long getMinValue() {return Long.MIN_VALUE;}@Overridepublic Long multiply(Long aLong, Long s1) {//乘法if (aLong == null) {return s1;}if (s1 == null) {return aLong;}return aLong * s1;}@Overridepublic Long increment(Long aLong) {//增量值return aLong == null ? null : aLong + 1;}@Overridepublic Long castToReturnType(Long aLong) {return aLong;}@Overridepublic int compare(Long aLong, Long t1) {//比较return aLong == null ? -1 : t1 == null ? 1 : aLong.compareTo(t1);}@Overridepublic double divideForAvg(Long aLong, Long aLong2) {//除法return (aLong == null || aLong2 == null) ? Double.NaN : (aLong.doubleValue() / aLong2.doubleValue());}@Overridepublic HBaseProtos.EmptyMsg getRequestData() {return HBaseProtos.EmptyMsg.getDefaultInstance();}@Overridepublic void initialize(HBaseProtos.EmptyMsg emptyMsg) {}@Overridepublic HBaseProtos.LongMsg getProtoForCellType(Long aLong) {return HBaseProtos.LongMsg.newBuilder().setLongMsg(aLong).build();}@Overridepublic Long getCellValueFromProto(HBaseProtos.LongMsg longMsg) {return longMsg.getLongMsg();}@Overridepublic HBaseProtos.LongMsg getProtoForPromotedType(Long aLong) {return HBaseProtos.LongMsg.newBuilder().setLongMsg(aLong).build();}@Overridepublic Long getPromotedValueFromProto(HBaseProtos.LongMsg longMsg) {return longMsg.getLongMsg();}@Overridepublic Long castToCellType(Long aLong) {return aLong;}public static void main(String[] args) {MyStringIntColumnInterpreter ci = new MyStringIntColumnInterpreter();ci.add(null, null);}
}

注意!!!由于协处理器是在服务端运行的,所以自定义的列出器类需要打包上传到表所在的Region服务器hbase安装目录下的lib文件夹里,并重启RegionServer,一般需要上传到集群中所有的RegionServer上,重启整个集群。因为单个RegionServer重启,hbase会为表分配其他的region。

自定义实现端点协处理器

自定义的端点协处理器需要实现Coprocessor或RegionCoprocessor,并且继承一个protobuf生成的抽象服务。端点协处理器RPC调用是基于的protobuf序列化协议来实现,所以需要先定义请求和响应proto类,以及提供的服务。proto定义文件如下:

syntax = "proto2";
option java_package = "cn.lsh.protobuf";
option java_outer_classname = "Sum";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;message SumRequest {required string family = 1;required string column = 2;
}message SumResponse {required int64 sum = 1;
}//定义一个服务
service SumService {rpc getSum(SumRequest)returns (SumResponse);
}

通过protoc生成对应的proto类。

自定义端点协处理器类:

public class SumEndPoint extends Sum.SumService implements RegionCoprocessor{private RegionCoprocessorEnvironment renv;@Overridepublic void getSum(RpcController controller, Sum.SumRequest request, RpcCallback<Sum.SumResponse> done) {//求和Scan scan = new Scan();scan.addFamily(Bytes.toBytes(request.getFamily()));scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));RegionScanner scanner = null;Sum.SumResponse response = null;try {List<Cell> results = new ArrayList<>();boolean hasMore;scanner = renv.getRegion().getScanner(scan);long sum = 0L;do {hasMore = scanner.next(results);for (Cell cell : results) {sum += Bytes.toLong(CellUtil.cloneValue(cell));}results.clear();} while (hasMore);response = Sum.SumResponse.newBuilder().setSum(sum).build();} catch (IOException e) {ResponseConverter.setControllerException(controller, e);} finally {if (scanner != null) {try {scanner.close();} catch (IOException e) {}}}done.run(response);}@Overridepublic void start(CoprocessorEnvironment env) throws IOException {if (env instanceof RegionCoprocessorEnvironment) {this.renv = (RegionCoprocessorEnvironment) env;} else {throw new CoprocessorException("Must be loaded on a table region");}}@Overridepublic void stop(CoprocessorEnvironment env) throws IOException {//求和计算结束}@Overridepublic Iterable<Service> getServices() {return Collections.singletonList(this);}
}

这里官方教程是实现了Coprocessor和CoprocessorService,但是CoprocessorService接口已过时,且测试发现实现Coprocessor的start方法并不能初始化RegionCoprocessorEnvironment,暂不知道原因,改为实现RegionCoprocessor测试求和函数执行成功。

客户端调用:

public class EndPointTest {private HTable hTable;private Connection conn;@Beforepublic void init() throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.property.clientPort", "2181");conf.set("hbase.zookeeper.quorum", "node01,node02,node03");//提高RPC通信时长conf.setLong("hbase.rpc.timeout", 600000);//设置Scan缓存conf.setLong("hbase.client.scanner.caching", 1000);conn = ConnectionFactory.createConnection(conf);hTable = (HTable) conn.getTable(TableName.valueOf("users"));}@Testpublic void testSum() throws Throwable {Sum.SumRequest request = Sum.SumRequest.newBuilder().setFamily("salaryDet").setColumn("gross").build();Map<byte[], Long> results = hTable.coprocessorService(Sum.SumService.class, null, null, (Sum.SumService aggregate) -> {CoprocessorRpcUtils.BlockingRpcCallback<Sum.SumResponse> rpcCallback = new CoprocessorRpcUtils.BlockingRpcCallback<>();aggregate.getSum(null, request, rpcCallback);Sum.SumResponse response = rpcCallback.get();return response.getSum();});for (Map.Entry<byte[], Long> entry : results.entrySet()) {System.out.println("key: " + Bytes.toString(entry.getKey()) + ", sum: " + entry.getValue());}}@Afterpublic void destroy() throws Exception {hTable.close();conn.close();}
}

三、 Observer Coprocessor

观察者协处理器,类似于mysql的触发器,采用AOP编程思想,在定义的一系列操作前后执行用户设置处理逻辑,根据切入点的不同可以分为以下不同类型的观察者:

  1. RegionObserver:观察region上的事件,如get、put、delet、scan等;
  2. RegionServerObserver:观察RegionServer上的事件,如stopRegionServer、createReplicationEndPoint等;
  3. MasterObserver:观察Master上的事件,如createTable、deleteTable等;
  4. WalObserver:观察写入预写日志(WAL)的事件,如wALWrite、wALRolldeng 。

不同的观察者实现不同的Observer,但都必须实现Coprocessor或其子实现。观察者协处理器有以下几种比较典型的应用:

  1. 权限校验,可以在get、put、delete等操作的前置切面方法中检查权限;

  2. 外键关联,habse不支持外键,但可以通过观察者协处理器的方式实现多张变的同步更新;

  3. 二级索引,可以通过观察者协处理器实现二级索引。

自定义一个RegionObserver,如下:

package cn.lsh.hbase.coprocessor;import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;/*** 观察者协处理器,类似于mysql的触发器,采用切面编程,在定义的一系列操作前后执行用户设置处理逻辑*/
public class RegionObserverCoprocessor implements RegionCoprocessor, RegionObserver {Logger log = LoggerFactory.getLogger(RegionObserverCoprocessor.class);/**协处理器是运行于region中的,每一个region都会加载协处理器*/private RegionCoprocessorEnvironment env = null;private static final byte[] TABLE = Bytes.toBytes("users");private static final byte[] CF = Bytes.toBytes("salaryDet");private static final byte[] CF2 = Bytes.toBytes("personalDet");private static final byte[] ADMIN = Bytes.toBytes("admin");@Overridepublic void start(CoprocessorEnvironment env) throws IOException {//如果您从HBase -site.xml加载一个协处理器,然后使用HBase Shell再次加载相同的协处理器,那么它将被第二次加载。// 同一个类将存在两次,第二个实例将具有更高的ID(因此优先级较低)。其效果是,重复的协处理器被有效地忽略。this.env = (RegionCoprocessorEnvironment) env;}@Overridepublic void stop(CoprocessorEnvironment env) throws IOException {}@Overridepublic Optional<RegionObserver> getRegionObserver() {return Optional.of(this);}@Overridepublic void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, final Get get, final List<Cell> result) throws IOException {log.info("---Get操作前置处理 start---");//get操作前置处理if (Bytes.equals(get.getRow(), ADMIN)) {result.add(new KeyValue(get.getRow(), Bytes.toBytes("错误"), Bytes.toBytes("详细"), Bytes.toBytes("您不能获取管理员信息")));//禁止访问,直接返回错误信息c.bypass();} else if (Bytes.equals(get.getRow(), Bytes.toBytes("jverne"))) {//增加查询条件get.addFamily(CF);//增加一条记录KeyValue kv = new KeyValue(get.getRow(), CF2, Bytes.toBytes("addPre"), Bytes.toBytes("boss"));result.add(kv);}log.info("---Get操作前置处理 end---");}@Overridepublic void postGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result) throws IOException {log.info("---Get操作后置处理 start---");if (Bytes.equals(get.getRow(), Bytes.toBytes("cdickens"))) {//修改查询条件,应该无效get = new Get(Bytes.toBytes("jverne"));//增加一列记录KeyValue kv = new KeyValue(get.getRow(), CF2, Bytes.toBytes("addPre"), Bytes.toBytes("coprocessor"));result.add(kv);c.bypass();}log.info("---Get操作后置处理 end---");}@Overridepublic void prePut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {log.info("---Put操作前置处理 start---");if (Bytes.equals(ADMIN, put.getRow())) {//对于admin用户,存在就不覆盖c.bypass();}log.info("---Put操作前置处理 end---");}@Overridepublic void postPut(ObserverContext<RegionCoprocessorEnvironment> c, Put put, WALEdit edit, Durability durability) throws IOException {log.info("---Put操作后置处理 start---");log.info("---Put操作后置处理 end---");}@Overridepublic boolean preCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, boolean result) throws IOException {return false;}@Overridepublic boolean postCheckAndPut(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Put put, boolean result) throws IOException {return false;}@Overridepublic void preDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {log.info("---Delete操作前置处理 start---");if (Bytes.equals(ADMIN, delete.getRow())) {throw new IOException("不能删除ADMIN用户");}log.info("---Delete操作前置处理 end---");}@Overridepublic void postDelete(ObserverContext<RegionCoprocessorEnvironment> c, Delete delete, WALEdit edit, Durability durability) throws IOException {log.info("---Delete操作后置处理 start---");log.info("---Delete操作后置处理 end---");}@Overridepublic boolean preCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {return false;}@Overridepublic boolean postCheckAndDelete(ObserverContext<RegionCoprocessorEnvironment> c, byte[] row, byte[] family, byte[] qualifier, CompareOperator op, ByteArrayComparable comparator, Delete delete, boolean result) throws IOException {return false;}@Overridepublic boolean preExists(ObserverContext<RegionCoprocessorEnvironment> c, Get get, boolean exists) throws IOException {return false;}@Overridepublic boolean postExists(ObserverContext<RegionCoprocessorEnvironment> c, Get get, boolean exists) throws IOException {return false;}@Overridepublic void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan) throws IOException {log.info("---Scan开启操作前置处理 start---");//添加一个过滤器,不允许扫描rowKey为ADMIN的行的信息Filter filter = new RowFilter(CompareOperator.NOT_EQUAL, new BinaryComparator(ADMIN));scan.setFilter(filter);log.info("---Scan开启操作前置处理 end---");}@Overridepublic RegionScanner postScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan, RegionScanner s) throws IOException {log.info("---Scan开启操作后置处理 start---");log.info("---Scan开启操作后置处理 end---");//注意在这里不能反回nullreturn s;}@Overridepublic void preScannerClose(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner s) throws IOException {}@Overridepublic boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {return hasNext;}@Overridepublic boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {log.info("---ScanNext操作后置处理 start---");//从扫描结果汇总移除ADMIN信息result.removeIf(r -> Bytes.equals(ADMIN, r.getRow()));//虽然preScannerOpen添加了过滤器,但是有可能被其他过滤器覆盖掉,所以可以在这里显示的移除ADMIN信息log.info("---ScanNext操作后置处理 end---");return hasNext;}
}

Log4j的日志可以打印在对应RegionServer的运行日志中。

以上涉及的完整代码可参见:hadoop学习之-hbase代码

Hbase学习之——协处理Coprocessor的使用相关推荐

  1. HBase学习指南之HBase原理和Shell使用

    HBase学习指南之HBase原理和Shell使用 参考资料: 1.https://www.cnblogs.com/nexiyi/p/hbase_shell.html,hbase shell 转载于: ...

  2. Hbase学习笔记(概念和搭建)

    Hbase学习笔记 1.hbase的基本介绍 简介 hbase是bigtable的开源java版本,是建立在hdfs之上,提供给高可靠性,高性能,列存储,可伸缩,实时读写的nosql的数据库系统,它介 ...

  3. 【HBase学习笔记-尚硅谷-Java API shell命令 谷粒微博案例】

    HBase学习笔记 HBase 一.HBase简介 1.HBase介绍 2.HBase的逻辑结构和物理结构 3.数据模型 4.基本架构 二.快速入门 1.配置HBase 2.命令 三.API 1.获取 ...

  4. HBase学习01--Hbase的安装

    HBase学习01–Hbase的安装 一.单机模式: 1.1 解压软件包 tar -zxvf hbase-1.1.3-bin.tar.gz 1.2 配置JAVA_HOME环境变量 cd /usr/lo ...

  5. HBase学习(7)-HBase原理

    原文来自:扎心了,老铁的<HBase学习之路 (七)HBase 原理>

  6. HBase学习(5)-MapReduce操作HBase

    原文来自:扎心了,老铁的<HBase学习之路 (五)MapReduce操作Hbase>

  7. Hbase学习文档(超详细单机安装)

    Hbase学习文档(超详细单机安装) 一.前言 1.1简述 本文分为五个部分:linux主机名的设置.jdk的安装.hadoop的安装.单机模式下hbase的安装.hbase的shell常用命令及ja ...

  8. Hbase 学习(三)Coprocessors

    Coprocessors 之前我们的filter都是在客户端定义,然后传到服务端去执行的,这个Coprocessors是在服务端定义,在客户端调用,然后在服务端执行,他有点儿想我们熟悉的存储过程,传一 ...

  9. HBASE学习使用经验

    问题导读: 1. NOSQL是如何产生的以及Hbase 在NOSQL中的地位如何? 2. Hbase框架是如何架构出来的 ? 3. Hbase 是如何检索一条数据以及检索时间复杂度是多少? 4. 如何 ...

最新文章

  1. java 随机生成常用汉字_Java随机生成中文汉字
  2. JS定时器使用,定时定点,固定时刻,循环执行
  3. mfc编程 孙鑫_孙鑫VC++视频教程笔记-(3)MFC程序框架的剖析 附1-SDI程序流程图
  4. (原创)一步一步学ZedBoard Zynq(一):ZedBoard的第一个工程Helloworld
  5. YDOOK:STM32: 最新版选型手册下载 2021
  6. usbcan、can分析仪的产品特点和功能特点
  7. WIN10_cmd命令提示符更换用户启动
  8. 超好用的网盘下载工具---PanDownload
  9. 第四篇Scrum冲刺博客
  10. CSS基础教程——纯CSS开发的气泡式提示框
  11. MediCool天使投资计划
  12. 3.Go语言中常量,变量, 及其命名规则以及代码风格
  13. 谷歌雅虎将联手实行即时信息兼容性计划
  14. win7+VS2015+OpenCV3.20的搭建
  15. OpenStack高级控制服务之使用编配服务(Heat)实现自动化部署云主机
  16. 最新江苏水利水电安全员模拟真题及答案解析
  17. 热成像进入AI人工智能时代!精准人脸识别体温计,实名制测量体温
  18. 事件抽取(event extraction)
  19. 【机器人学】逆运动学
  20. vue实现答题卡页面

热门文章

  1. 为什么复制的门禁卡只能用一次_手机NFC为什么可以复制小区用的门禁卡?
  2. 广告营销场景下的隐私计算实践:阿里妈妈营销隐私计算平台SDH
  3. 新一年TurboGate邮件网关再次提醒小心勒索邮件
  4. 【Java:线程与进程 详解+案例】
  5. 蓝桥试题 算法提高 使用指针逆序输出 JAVA
  6. 手工雕刻图纸_手工玉石雕刻图样大全
  7. 窥探现代浏览器架构(一)
  8. Vs调试报错:不能将 “const char *“ 类型的值分配到 “char *“ 类型的实体
  9. 基于微信小程序的物流仓储系统-计算机毕业设计
  10. java异常面试_java中异常的面试