zookeeper--ClientCnxn
之前完成的项目作用zookeeper用作服务发现,抽象出注册和监听功能,供其他项目使用。对zookeeper的一些细节作个记录吧。
我们知道通过Zookeeper对象实例实现对zookeeper服务的调用。首先的问题就是消息的发送和接受,以及事件的回调。这里我们把目光转向ClientCnxn
创建实例
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {..../*** send线程完成发送request,接受response,生成event* event线程派发event*/sendThread = new SendThread(clientCnxnSocket);eventThread = new EventThread();
}
`SendThread线程`
public void run() {//最大心跳间隔final int MAX_SEND_PING_INTERVAL = 10000; while (state.isAlive()) {try {//1、如果末连接,随机休眠1~1000,根据closing(用户请求)决定是跳出还是重连//2、如果需要登录,进行登录请求,根据结果生成事件if (zooKeeperSaslClient != null) {。。。if (sendAuthEvent == true) {eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,authState,null));}}//3、如果waitTimeOut超时,抛出SessionTimeoutExceptionif (to <= 0) {throw new SessionTimeoutException(...);}//4、心跳sendPing()//5、如果是只读模式pingRwServer();//6、开始读、写IOclientCnxnSocket.doTransport(to, pendingQ, outgoingQ, ClientCnxn);} catch (Throwable e) {//发送Disconnected事件if (state.isAlive()) {eventThread.queueEvent(new WatchedEvent(Event.EventType.None,Event.KeeperState.Disconnected,null));}}}//如果alive状态发送Disconnected事件if (state.isAlive()) {eventThread.queueEvent(new WatchedEvent(Event.EventType.None,Event.KeeperState.Disconnected, null));}
}
`SendThread线程`
void readResponse(ByteBuffer incomingBuffer) throws IOException {//心跳响应if (replyHdr.getXid() == -2) {// -2 is the xid for pingsif (LOG.isDebugEnabled()) {LOG.debug("Got ping response for sessionid: 0x"+ Long.toHexString(sessionId)+ " after "+ ((System.nanoTime() - lastPingSentNs) / 1000000)+ "ms");}return;}//认证响应if (replyHdr.getXid() == -4) {// -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); }if (LOG.isDebugEnabled()) {LOG.debug("Got auth sessionid:0x"+ Long.toHexString(sessionId));}return;}//通知if (replyHdr.getXid() == -1) {// -1 means notificationif (LOG.isDebugEnabled()) {LOG.debug("Got notification sessionid:0x"+ Long.toHexString(sessionId));}WatcherEvent event = new WatcherEvent();event.deserialize(bbia, "response");// convert from a server path to a client pathif (chrootPath != null) {String serverPath = event.getPath();if(serverPath.compareTo(chrootPath)==0)event.setPath("/");else if (serverPath.length() > chrootPath.length())event.setPath(serverPath.substring(chrootPath.length()));else {LOG.warn("Got server path " + event.getPath()+ " which is too short for chroot path "+ chrootPath);}}WatchedEvent we = new WatchedEvent(event);if (LOG.isDebugEnabled()) {LOG.debug("Got " + we + " for sessionid 0x"+ Long.toHexString(sessionId));}eventThread.queueEvent( we );return;}...finishPacket(packet);
}
`EventThread`
public void run() {try {isRunning = true;while (true) {Object event = waitingEvents.take();if (event == eventOfDeath) {wasKilled = true;} else {processEvent(event);}if (wasKilled)synchronized (waitingEvents) {if (waitingEvents.isEmpty()) {isRunning = false;break;}}}} catch (InterruptedException e) {LOG.error("Event thread exiting due to interruption", e);}LOG.info("EventThread shut down");
}
zookeeper--ClientCnxn相关推荐
- Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
文章目录 个人简介 一.业务场景:报错kafka.zookeeper.ZooKeeperClientTimeoutException: Timed out waiting for connection ...
- Zookeeper源码解读
1.1. 客户端源码 1.1.1. 总体流程 启动客户端 zkCli.sh文件里面的配置 实际运行 public static void main(String args[])throws Keepe ...
- ZooKeeper Watch Java API浅析getChildren
2019独角兽企业重金招聘Python工程师标准>>> Watch是ZooKeeper中非常重要的一个机制,它可以监控ZooKeeper中节点的变化情况,告知客户端.下面,我们以代码 ...
- zookeeper服务发现实战及原理--spring-cloud-zookeeper源码分析
1.为什么要服务发现? 服务实例的网络位置都是动态分配的.由于扩展.失败和升级,服务实例会经常动态改变,因此,客户端代码需要使用更加复杂的服务发现机制. 2.常见的服务发现开源组件 etcd-用于共享 ...
- Zookeeper 客户端API调用示例(基本使用,增删改查znode数据,监听znode,其它案例,其它网络参考资料)
9.1 基本使用 org.apache.zookeeper.Zookeeper是客户端入口主类,负责建立与server的会话 它提供以下几类主要方法 : 功能 描述 create 在本地目录树中创建 ...
- Zookeeper的一次迁移故障
前阶段同事迁移Zookeeper(是给Kafka使用的以及flume使用)后发现所有Flume-producer/consumer端集体报错: 1 2 3 4 07 Jan 2014 01:19:32 ...
- Linux(CentOS)中常用软件安装,使用及异常——Zookeeper, Kafka
欢迎支持笔者新作:<深入理解Kafka:核心设计与实践原理>和<RabbitMQ实战指南>,同时欢迎关注笔者的微信公众号:朱小厮的博客. 欢迎跳转到本文的原文链接:https: ...
- dubbo源码解析-zookeeper创建节点
前言 在之前dubbo源码解析-本地暴露中的前言部分提到了两道高频的面试题,其中一道dubbo中zookeeper做注册中心,如果注册中心集群都挂掉,那发布者和订阅者还能通信吗?在上周的dubbo源码 ...
- Will not attempt to authenticate using SASL | dubbo项目启动特别慢,拉取 zookeeper 服务日志打印特别慢
大家好,我是烤鸭: 今天分享一下使用dubbo遇到的几个问题. 1. cause: KeeperErrorCode = ConnectionLoss for /dubbo/ xxx 异常如下: ...
- Apache Zookeeper入门1
口水:Zookeeper是我目前接触过Apache开源系统中比较复杂的一个产品,要搞清楚这个东东里面的运作关系还真不是一时半会可以搞定的事,本人目前只略知皮毛之术. ZooKeeper 是什么? Zo ...
最新文章
- 图解计算机图形学三维变换算法
- Android 系统(53)---关于触摸屏快速点击事件误识别为滑动事件
- 微软在 Build 2020 上“展示”新版 Edge for Linux
- SQL tp3.2 批量更新 saveAll
- arduino cc3000 php,【arduino】新手求助, 想问一下关於CC3000设置的问题
- 不为人知的华为和小米真相
- 基于SOM算法的Iris数据分类
- 第三章 磁共振成像原理
- 万张脸部图片合成平均脸,26家公司颜值大PK
- 产品经理 项目经理 技术经理的区别
- python语言基础练习
- php 获取数组四分位,如何在JavaScript(或PHP)中获得数组的中位数和四分位数/百分位数?...
- 极米h3s和坚果j10、当贝f3三款投影实测对比来了!
- 聊聊开源类ChatGPT工作——MOSS
- 会议记录怎样做得又快又好?一分钟学会录音转文字
- 基于PP-Human V2的闯入人员检测
- Quake1和2的pak资源文件格式说明
- 学习英文-学以致用【场景:吃饭与家务】
- 构建神经网络模型方法,神经网络建立数学模型
- 关于计算机的小故事英语作文,急求一篇英语作文《关于自己的故事》急求一篇英语作文《关于自己的小故事》~...
热门文章
- Oracle索引基础
- 云服务器网站直接域名登录,[一叶花开]网站搭建-天翼云服务器跳过备案实现可直接访问IP地址以及解析域名...
- 光猫拨号和软路由拨号失败服务器无响应,光猫拨号好还是无线路由器拨号好(一文解答你的疑惑)...
- jdk9模块化简单介绍
- 微盛·企微管家杨明:未来企业微信生态能诞生中国的Salesforce
- golang的乐观锁与悲观锁
- 怎么防止解决百度转码问题
- 中英文在线语音转文字的方法
- 20个经典的Java应用
- 什么是java修饰符_java修饰符是什么?