连接HBase的正确姿势

在云HBase值班的时候,经常会遇见有用户咨询诸如“HBase是否支持连接池?”这样的问题,也有用户因为应用中创建的Connection对象过多,触发Zookeeper的连接数限制,导致客户端连不上的。究其原因,都是因为对HBase客户端的原理不了解造成的。本文介绍HBase客户端的Connection对象与Socket连接的关系并且给出Connection的正确用法。
Connection是什么?
在云HBase用户中,常见的使用Connection的错误方法有:

  1. 自己实现一个Connection对象的资源池,每次使用都从资源池中取出一个Connection对象;
  2. 每个线程一个Connection对象。
  3. 每次访问HBase的时候临时创建一个Connection对象,使用完之后调用close关闭连接。

从这些做法来看,这些用户显然是把Connection对象当成了单机数据库里面的连接对象来用了。然而作为分布式数据库,HBase客户端需要和多个服务器中的不同服务角色建立连接,所以HBase客户端中的Connection对象并不是简单对应一个socket连接。HBase的API文档当中对Connection的定义是:
A cluster connection encapsulating lower level individual connections to actual servers and a connection to zookeeper.
我们知道,HBase客户端要连接三个不同的服务角色:

  1. Zookeeper:主要用于获得meta-region位置,集群Id、master等信息。
  2. HBase Master:主要用于执行HBaseAdmin接口的一些操作,例如建表等。
  3. HBase RegionServer:用于读、写数据。

下图简单示意了客户端与服务器交互的步骤:

HBase客户端的Connection包含了对以上三种Socket连接的封装。Connection对象和实际的Socket连接之间的对应关系如下图:

HBase客户端代码真正对应Socket连接的是RpcConnection对象。HBase使用PoolMap这种数据结构来存储客户端到HBase服务器之间的连接。PoolMap封装ConcurrentHashMap的结构,key是ConnectionId(封装服务器地址和用户ticket),value是一个RpcConnection对象的资源池。当HBase需要连接一个服务器时,首先会根据ConnectionId找到对应的连接池,然后从连接池中取出一个连接对象。
HBase提供三种资源池的实现,分别是Reusable,RoundRobin和ThreadLocal。具体实现可以通过hbase.client.ipc.pool.type配置项指定,默认为Reusable。连接池的大小也可以通过hbase.client.ipc.pool.size配置项指定,默认为1。
连接HBase的正确姿势
从以上分析不难得出,在HBase中Connection类已经实现对连接的管理功能,所以不需要在Connection之上再做额外的管理。另外,Connection是线程安全的,然而Table和Admin则不是线程安全的,因此正确的做法是一个进程共用一个Connection对象,而在不同的线程中使用单独的Table和Admin对象。

//所有进程共用一个Connection对象
connection=ConnectionFactory.createConnection(config);...
//每个线程使用单独的Table对象
Table table = connection.getTable(TableName.valueOf("test"));
try {...
} finally {table.close();
}

HBase客户端默认的是连接池大小是1,也就是每个RegionServer 1个连接。如果应用需要使用更大的连接池或指定其他的资源池类型,也可以通过修改配置实现: 
Connection源码解析
Connection创建RpcClient的核心入口:

/*** constructor* @param conf Configuration object*/
ConnectionImplementation(Configuration conf,ExecutorService pool, User user) throws IOException {...try {...this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId, this.metrics);...} catch (Throwable e) {// avoid leaks: registry, rpcClient, ...LOG.debug("connection construction failed", e);close();throw e;}
}

RpcClient使用PoolMap数据结构存储客户端到HBase服务器之间的连接映射,PoolMap封装ConcurrentHashMap结构,其中key是ConnectionId[new ConnectionId(ticket, md.getService().getName(), addr)],value是RpcConnection对象的资源池。

protected final PoolMap<ConnectionId, T> connections;/*** Construct an IPC client for the cluster <code>clusterId</code>* @param conf configuration* @param clusterId the cluster id* @param localAddr client socket bind address.* @param metrics the connection metrics*/
public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,MetricsConnection metrics) {...this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf));...
}

当HBase需要连接一个服务器时,首先会根据ConnectionId找到对应的连接池,然后从连接池中取出一个连接对象,获取连接的核心实现:

/*** Get a connection from the pool, or create a new one and add it to the pool. Connections to a* given host/port are reused.*/
private T getConnection(ConnectionId remoteId) throws IOException {if (failedServers.isFailedServer(remoteId.getAddress())) {if (LOG.isDebugEnabled()) {LOG.debug("Not trying to connect to " + remoteId.address+ " this server is in the failed servers list");}throw new FailedServerException("This server is in the failed servers list: " + remoteId.address);}T conn;synchronized (connections) {if (!running) {throw new StoppedRpcClientException();}conn = connections.get(remoteId);if (conn == null) {conn = createConnection(remoteId);connections.put(remoteId, conn);}conn.setLastTouched(EnvironmentEdgeManager.currentTime());}return conn;}

连接池根据ConnectionId获取不到连接则创建RpcConnection的具体实现:

protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOException {return new NettyRpcConnection(this, remoteId);
}NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException {super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId,rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor);this.rpcClient = rpcClient;byte connectionHeaderPreamble = getConnectionHeaderPreamble();this.connectionHeaderPreamble =Unpooled.directBuffer(connectionHeaderPreamble.length).writeBytes(connectionHeaderPreamble);ConnectionHeader header = getConnectionHeader();this.connectionHeaderWithLength = Unpooled.directBuffer(4 + header.getSerializedSize());this.connectionHeaderWithLength.writeInt(header.getSerializedSize());header.writeTo(new ByteBufOutputStream(this.connectionHeaderWithLength));
}protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId,String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor)throws IOException {if (remoteId.getAddress().isUnresolved()) {throw new UnknownHostException("unknown host: " + remoteId.getAddress().getHostName());}this.timeoutTimer = timeoutTimer;this.codec = codec;this.compressor = compressor;this.conf = conf;UserGroupInformation ticket = remoteId.getTicket().getUGI();SecurityInfo securityInfo = SecurityInfo.getInfo(remoteId.getServiceName());this.useSasl = isSecurityEnabled;Token<? extends TokenIdentifier> token = null;String serverPrincipal = null;if (useSasl && securityInfo != null) {AuthenticationProtos.TokenIdentifier.Kind tokenKind = securityInfo.getTokenKind();if (tokenKind != null) {TokenSelector<? extends TokenIdentifier> tokenSelector = AbstractRpcClient.TOKEN_HANDLERS.get(tokenKind);if (tokenSelector != null) {token = tokenSelector.selectToken(new Text(clusterId), ticket.getTokens());} else if (LOG.isDebugEnabled()) {LOG.debug("No token selector found for type " + tokenKind);}}String serverKey = securityInfo.getServerPrincipal();if (serverKey == null) {throw new IOException("Can't obtain server Kerberos config key from SecurityInfo");}serverPrincipal = SecurityUtil.getServerPrincipal(conf.get(serverKey),remoteId.address.getAddress().getCanonicalHostName().toLowerCase());if (LOG.isDebugEnabled()) {LOG.debug("RPC Server Kerberos principal name for service=" + remoteId.getServiceName()+ " is " + serverPrincipal);}}this.token = token;this.serverPrincipal = serverPrincipal;if (!useSasl) {authMethod = AuthMethod.SIMPLE;} else if (token != null) {authMethod = AuthMethod.DIGEST;} else {authMethod = AuthMethod.KERBEROS;}// Log if debug AND non-default auth, else if trace enabled.// No point logging obvious.if ((LOG.isDebugEnabled() && !authMethod.equals(AuthMethod.SIMPLE)) ||LOG.isTraceEnabled()) {// Only log if not default auth.LOG.debug("Use " + authMethod + " authentication for service " + remoteId.serviceName+ ", sasl=" + useSasl);}reloginMaxBackoff = conf.getInt("hbase.security.relogin.maxbackoff", 5000);this.remoteId = remoteId;
}

原文:https://mp.weixin.qq.com/s/CWJlGYhrw_VacSqJ_Q8KZg

连接HBase的正确姿势相关推荐

  1. eclipse连接mysql_专题一、flask构建mysql数据库正确姿势

    每周壹总结,一起共同充电第121篇 应用程序最核心的就是数据,每天我们写程序其实也是在处理数据的过程,那么很有必要系统性的讲讲和梳理python的flask框架是如何进行数据交互操作的. 趁这3天假期 ...

  2. 计算机用户组连接打印机,工作组链接域内共享打印机的正确姿势

    不在域内的工作组中电脑链接域内共享打印机的正确姿势如下: 链接步骤: 1.确保是同一个局域网,能ping通. 2.域内共享打印机设置成共享,当然,这一句基本上是废话,不开共享链接个屁: 3.检查来宾账 ...

  3. 开发函数计算的正确姿势——支持 ES6 语法和 webpack 压缩

    为什么80%的码农都做不了架构师?>>>    首先介绍下在本文出现的几个比较重要的概念: 函数计算(Function Compute): 函数计算是一个事件驱动的服务,通过函数计算 ...

  4. 开发函数计算的正确姿势 —— 爬虫

    2019独角兽企业重金招聘Python工程师标准>>> 在 <函数计算本地运行与调试 - Fun Local 基本用法> 中,我们介绍了利用 Fun Local 本地运行 ...

  5. 怎么用linux的HDD存储,Linux学习的正确姿势12:Linux存储概览

    原标题:Linux学习的正确姿势12:Linux存储概览 从工作原理区分 机械 HDD 固态 SSD SSD的优势 SSD是摒弃传统磁介质,采用电子存储介质进行数据存储和读取的一种技术,突破了传统机械 ...

  6. Navicat使用Instant Client创建连接到Oracle数据库的正确姿势

    太长不看版: 1.你什么操作系统,Instant Client就选什么操作系统 2.你的navicat是多少位(32.64),Instant Client就选多少位 3.你的Oracle是哪个版本,I ...

  7. 开发函数计算的正确姿势 —— 使用 Fun Local 本地运行与调试

    前言 首先介绍下在本文出现的几个比较重要的概念: 函数计算(Function Compute): 函数计算是一个事件驱动的服务,通过函数计算,用户无需管理服务器等运行情况,只需编写代码并上传.函数计算 ...

  8. AI 玩微信跳一跳的正确姿势:跳一跳 Auto-Jump 算法详解

    作者丨安捷 & 肖泰洪 学校丨北京大学硕士生 研究方向丨计算机视觉 本文经授权转载自知乎专栏「学术兴趣小组」. 最近,微信小游戏跳一跳可以说是火遍了全国,从小孩子到大孩子仿佛每一个人都在刷跳一 ...

  9. 使用代码片段的正确姿势,打造高效的vscode开发环境

    全文3928字,阅读时间 10分钟,未来节约时间 15分钟/每天 代码片段(code snippet) 相信大家都或多或少有接触过. 在完成一个项目以后,往往都会写出许多有价值的代码,或是绞尽脑汁解决 ...

最新文章

  1. js - 浅拷贝和深拷贝
  2. 操作系统-信号量的使用
  3. SQL之 UNION ALL 和UNION
  4. [C++11]override关键字的使用
  5. [C++]vector创建二维数组
  6. 免费流量监控软件,最大可同时监控1000台电脑
  7. git 将dev分支的代码合并到master并添加对应的Tag
  8. 人设倒了扶起来:Lazarus 组织利用含木马的IDA Pro 攻击研究员
  9. 拓端tecdat|R语言ggplot2 对Facebook用户数据可视化分析
  10. Elasticsearch入门四:Elasticsearch-搜索、过滤、聚合
  11. CNC:机械工程之机械制图的几何特征、测量、配合、公差带、图纸标注、公差、配合、表面粗糙度之详细攻略
  12. IDEA 运行 Tomcat 中文乱码的各种问题
  13. python sklearn包中的主成分分析_sklearn主成分分析 NBA球队数据PCA降维可视化
  14. 互联网公司常用架构模式梳理
  15. HDFS文件访问权限
  16. WSL 2 网络配置
  17. 鸿蒙操作系统pc,不负期待!鸿蒙操作系统将于6月2日正式发布,你的手机更新了吗?...
  18. java中float和double为什么会转为科学记数法?
  19. C++后端开发(校招实习生)学习路线
  20. grbl源码解析——速度前瞻(2)

热门文章

  1. 智慧水库水位库容监测系统解决方案
  2. Paper:《YOLOv4: Optimal Speed and Accuracy of Object Detection》的翻译与解读
  3. Windows下解压分卷压缩方法
  4. asp.net(入门理解)
  5. rust休闲玩家_《Rust》坚持强制限定角色性别 玩家怒喷开发商傻蛋
  6. 面对日益激烈的互联网各类电商的竞争,你是如何看待电商之间的价格战的呢?
  7. “独裁”的张小龙和他的微信帝国诞生记
  8. Dart- move html element
  9. 1076万毕业生,面对有史以来最大规模毕业潮,麻了
  10. 家电 计算机和电讯领域 英语,美国电子电器工程硕士11个分支方向,你懂吗?...