1、Zookeeper入门

1.1、概念

Zookeeper从设计模式角度来理解,是一个基于观察者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生了变化,Zookeeper就负责通知已经在Zookeeper上注册的那些观察者做出相应的反应.

PS:多个开源项目中都应用到了 ZooKeeper,例如 HBase, Spark, Flink, Storm, Kafka, Dubbo 等等。

1.2、Zookeeper特点及理解

zookeeper的核心是原子广播,这个机制保证了各个Server之间的同步。实现这个机制的协议叫做Zab协议,Zab协议有两种模式:恢复模式(选主)、广播模式(同步)。当服务启动或者在领导者崩溃后,Zab就进入了恢复模式,当领导者被选举出来,且大多数Server完成了和leader的状态同步以后,恢复模式就结束了,状态同步保证了leader和server具有相同的系统状态。为了保证事务的顺序一致性,zookeeper采用了递增的事务id号(zxid)来标识事务。

Zookeeper集群对外呈现为一个整体。更新只能成功或者失败,没有中间状态;客户端无论连接到集群中的那台机器,展示给它都是同一个视图;顺序应用客户端的请求,顺序发布相应的消息,通过时间戳zxid(Zookeeper Transaction Id)实现;一个请求被应用,结果将会持久化;通过共享的层次结构命名空间(数据寄存器znode组成,每一个节点都可以存储数据,存储容量一般不能超过 1MiB)进行相互协调,集群间通过 Zab 协议(Zookeeper Atomic Broadcast)来保持数据的一致性;ZooKeeper数据保存在内存中,实现高吞吐量和低延迟;确保系统的客户视图在特定时间范围内是最新的,如果需要最新数据,应该在读数据之前调用sync接口。

1.3、Zookeeper 数据结构详解

Znode的类型分为三类:
① 持久节点(persistent node):节点会被持久;
② 临时节点(ephemeral node):客户端断开连接后,ZooKeeper会自动删除临时节点;
③ 顺序节点(sequential node):每次创建顺序节点时,ZooKeeper都会在路径后面自动添加上10位的数字,从1开始,最大是2147483647 (2^32-1);(每个顺序节点都有一个单独的计数器,并且单调递增的,由Zookeeper的leader实例维护。)
PS:ZNode可以有子节点目录,并且每个ZNode可以存储数据,注意临时节点(Ephemeral)类型的目录节点不能有子节点目录

Zxid(ZooKeeper Transaction Id) 是一个64位的数字,高32位表示纪元,从1开始,每次选举出一个新的leader,就会递增1;低32位是当前纪元维护的单调递增的数字,同样从1开始。

[zk: localhost:2181(CONNECTED) 30] create /test 'hello' # 创建持久节点
Created /test
[zk: localhost:2181(CONNECTED) 31] stat /test
cZxid = 0x3600000008  # 创建的事务标识
ctime = Sat Jul 02 14:11:11 CST 2022 # 创建的时间戳
mZxid = 0x3600000008 # 修改的事务标识,每次修改操作(set)后都会更新mZxid和mtime。
mtime = Sat Jul 02 14:11:11 CST 2022 # 修改的时间戳
pZxid = 0x3600000008 # 直接子节点最后更新的事务标识,子节点有变化时,都会更新pZxid
cversion = 0 # 直接子节点的版本号,子节点有变化时,cversion 的值就会增加1
dataVersion = 0 # 节点数据的版本号,每次对节点进行修改操作(set)后,dataVersion的值都会增加1
aclVersion = 0 # 节点ACL(操作控制列表)的版本号,每次节点的ACL进行变化时,aclVersion 的值就会增加1
ephemeralOwner = 0x0 # 当前节点是临时节点时,该值是客户端持有的session id。
dataLength = 5 # 节点存储的数据长度,单位为 B (字节)
numChildren = 0 # 直接子节点的个数

PS:Session 会话 客户端启动会与服务端建立一个 TCP 长连接,通过这个连接可以发送请求并接受响应,以及接受服务端的 Watcher 事件通知。

zkCli客户端使⽤watch: 支持watch的客户端命令有 stat、get、ls
① get -w /test:⼀次性监听节点,注意这里是监听的数据变化,假如test子节点的数据发生变化也是收不到监听消息的(假如在监听节点下创建和删除子节点这些都是监听不到的,但是删除监听的节点是可以监听到的);
② ls -w /test:监听⽬录,创建和删除⼦节点会收到通知,⼦节点中新增节点不会收到通知;
③ ls -R -w /test:监听⼦节点中⼦节点的变化,但内容的变化不会收到通知;

[zk: localhost:2181(CONNECTED) 33] get -w  /test
hello
[zk: localhost:2181(CONNECTED) 1] set /test "jieky"
[zk: localhost:2181(CONNECTED) 34]
WATCHER::WatchedEvent state:SyncConnected type:NodeDataChanged path:/test

早期版本的zk监听是使用的-w指令,但是这个指令是一次性的。也就是说该指令只能监听一次事件,后续如果要继续监听事件的话就需要再次使用该指令增加监听事务。两次监听之间必然会有时差,这就会导致会有某些事件未被捕捉到。

为了解决上述的问题,ZK在3.6.0版本以后添加了永久监听指令:addWatch,它能保证被触发之后仍然还有监听效果,可以继续监听Znode上的变更。ZooKeeper永久监听: addWatch [-m model] path
① PERSISTENT:持久化订阅,针对当前节点的修改和删除事件,以及当前节点子节点的删除和新增事件;
② PERSISTENT_RECURSIVE:持久化递归订阅,在PERSISTENT的基础上,增加了子节点修改事件的触发,以及子节点的子节点的数据变化都会触发相关事件(满足递归订阅特性)

永久监听指令示例:

1.4、Zookeeper应用场景

中间件(英语:Middleware),又译中间件、中介层,是一类提供系统软件和应用软件之间连接、便于软件各部件之间的沟通的软件,应用软件可以借助中间件在不同的技术架构之间共享信息与资源。中间件位于客户机服务器的操作系统之上,管理着计算资源和网络通信。

观察者模式是软件设计模式的一种。在此种模式中,一个目标对象管理所有相依于它的观察者对象,并且在它本身的状态改变时主动发出通知。这通常透过呼叫各观察者所提供的方法来实现。此种模式通常被用在即时事件处理系统。

数据发布/订阅

数据发布/订阅的一个常见的场景是配置中心,发布者把数据发布到 ZooKeeper 的一个或一系列的节点上,供订阅者进行数据订阅,达到动态获取数据的目的。ZooKeeper 采用的是推拉结合的方式:
① 推: 服务端会推给注册了监控节点的客户端 Wathcer 事件通知
② 拉: 客户端获得通知后,然后主动到服务端拉取最新的数据

实现的思路可以如下:
① 把配置信息写到一个 Znode 上,例如 /Configuration
② 客户端启动初始化阶段读取服务端节点的数据,并且注册一个数据变更的 Watcher
③ 配置变更只需要对 Znode 数据进行 set 操作,数据变更的通知会发送到客户端,客户端重新获取新数据,完成配置动态修改

分布式协调

这个其实是 zookeeper 很经典的一个用法,简单来说,就好比,你 A 系统发送个请求到 mq,然后 B 系统消息消费之后处理了。那 A 系统如何知道 B 系统的处理结果?用 zookeeper 就可以实现分布式系统之间的协调工作。A 系统发送请求之后可以在 zookeeper 上对某个节点的值注册个监听器,一旦 B 系统处理完了就修改 zookeeper 那个节点的值,A 系统立马就可以收到通知,完美解决。

使用 ZooKeeper的好处就是检测系统和被检系统不需要直接相关联,而是通过 ZooKeeper 节点来关联,大大减少系统的耦合。

分布式锁

排它锁:如果事务T1对数据对象O1加上了排他锁,在加锁期间,只允许事务T1对O1进行读取和更新操作,核心是保证当前有且仅有一个事务获得锁,并且锁释放后,所有正在等待获取锁的事务都能被通知到。实现思路:通过ZK的ZNode可以表示一个锁,如/x_lock/lock

获取锁,多个客户端通过调用create接口尝试在/x_lock创建临时节点/x_lock/lock,最终只有一个客户端创建成功,那么该客户端就获取到了锁,同时其他没有获取到锁的客户端将注册监听该节点的变更
释放锁,获取锁的客户端在完成任务或者宕机后,会把临时节点删除,此时其他客户端监听到变化就可以开始新一轮锁的获取

举个排它锁栗子。对某一个数据连续发出两个修改操作,两台机器同时收到了请求,但是只能一台机器先执行完另外一个机器再执行。那么此时就可以使用 zookeeper 分布式锁,一个机器接收到了请求之后先获取 zookeeper 上的一把分布式锁,就是可以去创建一个 znode,接着执行操作;然后另外一个机器也尝试去创建那个 znode,结果发现自己创建不了,因为被别人创建了,那只能等着,等第一个机器执行完了自己再执行。

共享锁:如果事务T1对数据对象O1加上了共享锁,则当前事务T1只能对O1进行读操作,其他事务也只能对该对象O1加共享锁,直到该数据对象上的所有共享锁都被释放。实现思路:通过ZK的ZNode可以表示一个锁,/s_lock/[hostname]-请求类型-序号。

/
├── /host1-R-000000001
├── /host2-R-000000002
├── /host3-W-000000003
├── /host4-R-000000004
├── /host5-R-000000005
├── /host6-R-000000006
└── /host7-W-000000007

① 获取锁,需要获取锁的客户端都在/s_lock创建临时顺序节点,读请求创建R临时节点,写请求创建W临时节点
② 判断读写顺序,共享锁规定,不同的事务可以同时对同一个数据进行读取操作,而更新操作必须在当前没有任何写操作的情况下进行,因此对于读请求,没有比自己序号小的节点或所有比自己序号小的节点都是读请求,对于写请求,则是要求没有比自己序号小的节点
③ 释放锁,获取锁的客户端完成任务或者宕机后,会把练市节点删除,此时其他客户端可以开始新一轮锁的获取

羊群效应:上面共享锁过程中在判断读写顺序的时候会出现一个问题,在整个共享锁的竞争过程中,大量的“Watcher”通知和“子节点列表获取”两个操作重复运行,客户端无端地接收到过多和自己并不相关的事件通知,集群规模比较大时,不仅会影响性能,如果同一时间有多个节点对应的客户端完成事务或者是事务中断引起节点消失,ZK服务器会在短时间内向其他客户端发送大量的事件通知 - 这就是“羊群效应”。

结合上述“羊群效应”,可以提出一个优化方法,对于读请求,只需要关心比自己序号小的最后一个写请求节点;对于写请求,只需要关系比自己序号小的最后一个节点

元数据/配置信息管理

Dubbo 基于 ZooKeeper 实现了服务注册中心。哪一个服务由哪一个机器来提供必需让调用者知道,简单来说就是 ip 地址和服务名称的对应关系。ZooKeeper 通过心跳机制可以检测挂掉的机器并将挂掉机器的 ip 和服务对应关系从列表中删除。主要利用了ZK的统一命名服务和统一配置管理。

负载均衡

负载均衡是一种手段,用来把对某种资源的访问分摊给不同的设备,从而减轻单点的压力。简单来说就是横向扩展,在不更改代码的情况通过添加机器来提高运算能力。通过添加新的机器向 ZooKeeper 注册服务,服务的提供者多了能服务的客户就多了。

实现的思路:
① 首先建立 Servers 节点,并建立监听器监视 Servers 子节点的状态(用于在服务器增添时及时同步当前集群中服务器列表)
② 在每个服务器启动时,在 Servers 节点下建立临时子节点 Worker Server,并在对应的字节点下存入服务器的相关信息,包括服务的地址,IP,端口等等
③ 可以自定义一个负载均衡算法,在每个请求过来时从 ZooKeeper 服务器中获取当前集群服务器列表,根据算法选出其中一个服务器来处理请求

Kafka 中大部分组件都应用了 ZooKeeper

① Broker 注册 `/broker/ids/[0…N] 记录了 Broker 服务器列表记录,这个临时节点的节点数据是 ip 端口之类的信息。
② Topic 注册 /broker/topcs 记录了 Topic 的分区信息和 Broker 的对应关系
③ 生产者负载均衡 生产者需要将消息发送到对应的 Broker 上,生产者通过 Broker 和 Topic 注册的信息,以及 Broker 和 Topic 的对应关系和变化注册事件 Watcher 监听,从而实现一种动态的负载均衡机制。
④ 消息消费进度 Offset 记录 消费者对指定消息分区进行消息消费的过程中,需要定时将分区消息的消费进度 Offset 记录到 ZooKeeper 上,以便消费者进行重启或者其他消费者重新阶段该消息分区的消息消费后,能够从之前的进度开始继续系消费

HA高可用性

Hadoop 利用 ZooKeeper 实现了高可用的功能,包括 HDFS 的 NameNode 和 YARN 的 ResourceManager。此外 YARN 的运行状态是利用 ZooKeeper 来存储的,主要利用了ZK的统一配置管理、分布式锁和集群管理。

ResourceManager HA
① 多个 RM 在 /yarn-leader-election/pseudo-yarn-rm-cluster 竞争创建锁节点
② 注册 Watcher,创建锁成功的 RM 为 Active 节点,创建锁不成功的 RM 监听此节点,并成为 Stanby 状态
③ 当 Active RM 挂了,通知 Standby RM,开始新一轮竞争

Fencing 一般解决脑裂这样的问题,YARN 引入了 Fencing 机制,利用的是 ZooKeeper 数据节点的 ACL 权限控制。如果 RM1 假死,此时 RM2 成为 Active 状态,当 RM1 恢复之后,会试图去更新 ZooKeeper 里的数据,但此时会发现写上了 ACL 权限的节点无法修改,这样就可以避免了脑裂。

Master 选举

分布式系统中 Master 是用来协调集群中其他系统单元,具有对分布式系统状态更改的决定权。比如一些读写分离的应用场景,客户端写请求往往是 Master 来处理的。

利用常见关系型数据库中的主键特性来实现也是可以的,集群中所有机器都向数据库中插入一条相同主键 ID 的记录,数据库会帮助我们自动进行主键冲突检查,可以保证只有一台机器能够成功。

但是有一个问题,如果插入成功的和护短机器成为 Master 后挂了的话,如何通知集群重新选举 Master?

利用 ZooKeeper 创建节点 API 接口,提供了强一致性,能够很好保证在分布式高并发情况下节点的创建一定是全局唯一性。

集群机器都尝试创建节点,创建成功的客户端机器就会成为 Master,失败的客户端机器就在该节点上注册一个 Watcher 用于监控当前 Master 机器是否存活,一旦发现 Master 挂了,其余客户端就可以进行选举了。

集群管理

集群管理主要指集群监控和集群控制两个方面。前者侧重于集群运行时的状态的收集,后者则是对集群进行操作与控制。开发和运维中,面对集群,经常有如下需求:
① 希望知道集群中究竟有多少机器在工作
② 对集群中的每台机器的运行时状态进行数据收集
③ 对集群中机器进行上下线的操作

分布式集群管理体系中有一种传统的基于 Agent 的方式,就是在集群每台机器部署 Agent 来收集机器的 CPU、内存等指标。但是如果需要深入到业务状态进行监控,比如一个分布式消息中间件中,希望监控每个消费者对消息的消费状态,或者一个分布式任务调度系统中,需要对每个机器删的任务执行情况进行监控。对于这些业务紧密耦合的监控需求,统一的 Agent 是不太合适的。

利用 ZooKeeper 实现集群管理监控组件的思路:
在管理机器上线/下线的场景中,为了实现自动化的线上运维,我们必须对机器的上/下线情况有一个全局的监控。通常在新增机器的时候,需要首先将指定的 Agent 部署到这些机器上去。Agent 部署启动之后,会首先向 ZooKeeper 的指定节点进行注册,具体的做法就是在机器列表节点下面创建一个临时子节点,例如 /machine/[Hostname](下文我们以“主机节点”代表这个节点),如下图所示。

当 Agent 在 ZooKeeper 上创建完这个临时子节点后,对 /machines 节点关注的监控中心就会接收到“子节点变更”事件,即上线通知,于是就可以对这个新加入的机器开启相应的后台管理逻辑。另一方面,监控中心同样可以获取到机器下线的通知,这样便实现了对机器上/下线的检测,同时能够很容易的获取到在线的机器列表,对于大规模的扩容和容量评估都有很大的帮助。

分布式队列

使用 ZooKeeper 实现 FIFO 队列,入队操作就是在 queue_fifo 下创建自增序的子节点,并把数据(队列大小)放入节点内。出队操作就是先找到 queue_fifo 下序号最下的那个节点,取出数据,然后删除此节点。

/queue_fifo
|
├── /host1-000000001
├── /host2-000000002
├── /host3-000000003
└── /host4-000000004

创建完节点后,根据以下步骤确定执行顺序:
① 通过 get_children() 接口获取 /queue_fifo 节点下所有子节点;
② 通过自己的节点序号在所有子节点中的顺序;
③ 如果不是最小的子节点,那么进入等待,同时向比自己序号小的最后一个子节点注册 Watcher 监听
接收到 Watcher 通知后重复 1;

Barrier就是栅栏或者屏障,适用于这样的业务场景:当有些操作需要并行执行,但后续操作又需要串行执行,此时必须等待所有并行执行的线程全部结束,才开始串行,于是就需要一个屏障,来控制所有线程同时开始,并等待所有线程全部结束。

利用 ZooKeeper 的实现,开始时 queue_barrier 节点是一个已经存在的默认节点,并且将其节点的数据内容赋值为一个数字 n 来代表 Barrier 值,比如 n=10 代表只有当 /queue_barrier 节点下的子节点个数达到10才会打开 Barrier。之后所有客户端都会在 queue_barrier 节点下创建一个临时节点,如 queue_barrier/host1。

如何控制所有线程同时开始? 所有的线程启动时在 ZooKeeper 节点 /queue_barrier 下插入顺序临时节点,然后检查 /queue/barrier 下所有 children 节点的数量是否为所有的线程数,如果不是,则等待,如果是,则开始执行。具体的步骤如下:
① getData() 获取 /queue_barrier 节点的数据内容
② getChildren() 获取 /queue_barrier 节点下的所有子节点,同时注册对子节点列表变更的 Watche 监听。
③ 统计子节点的个数
④ 如果子节点个数不足10,那么进入等待
⑤ 接收 Watcher 通知后,重复2

如何等待所有线程结束? 所有线程在执行完毕后,都检查 /queue/barrier 下所有 children 节点数量是否为0,若不为0,则继续等待。

用什么类型的节点? 根节点使用持久节点,子节点使用临时节点,根节点为什么要用持久节点?首先因为临时节点不能有子节点,所以根节点要用持久节点,并且在程序中要判断根节点是否存在。 子节点为什么要用临时节点?临时节点随着连接的断开而消失,在程序中,虽然会删除临时节点,但可能会出现程序在节点被删除之前就 crash了,如果是持久节点,节点不会被删除。

Summary

由于 ZooKeeper 可以很好保证分布式环境中数据的强一致性,也是基于这个特点,使得越来越多的分布式系统将 ZooKeeper 作为核心组件进行封装使用。在以上提到的这些分布式系统的常见的应用场景下,利用 ZooKeeper 可以快速的实现相关的组件,而无需重新造轮子。

很多大型开源的分布式系统中应用 ZooKeeper 来实现组件的时候,一般都会考虑使用 Apache Curator 是 Netflix 公司开源的一个 ZooKeeper 客户端,提供了更高层次和更易用的操作 ZooKeeper 的接口。

参考博客:
面试题:4个zookeeper的应用场景,你知道几个?
Zookeeper及其应用场景
ZooKeeper 的应用场景

1.5、zookeeper的zoo.cfg配置参数详解

[atguigu@hadoop102 conf]$ cat zoo.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/opt/module/apache-zookeeper-3.5.7-bin/zkData
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the
# administrator guide before turning on autopurge.
#
# http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888[atguigu@hadoop102 apache-zookeeper-3.5.7-bin]$ cat ./zkData/myid
2

2、Zookeeper实战

2.1、客户端命令行

# 集群启动:hadoop102、hadoop103、hadoop104
[atguigu@hadoop102 zookeeper-3.5.7]$ zkServer.sh start
# 查看状态:hadoop102、hadoop103、hadoop104
[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkServer.sh status
# 启动客户端
[atguigu@hadoop103 zookeeper-3.5.7]$ bin/zkCli.sh
# 查看命令
[zk: localhost:2181(CONNECTED) 2] help
ZooKeeper -server host:port cmd argsaddauth scheme authclose config [-c] [-w] [-s]connect host:portcreate [-s] [-e] [-c] [-t ttl] path [data] [acl]delete [-v version] pathdeleteall pathdelquota [-n|-b] pathget [-s] [-w] pathgetAcl [-s] pathhistory listquota pathls [-s] [-w] [-R] pathls2 path [watch]printwatches on|offquit reconfig [-s] [-v version] [[-file path] | [-members serverID=host:port1:port2;port3[,...]*]] | [-add serverId=host:port1:port2;port3[,...]]* [-remove serverId[,...]*]redo cmdnoremovewatches path [-c|-d|-a] [-l]rmr pathset [-s] [-v version] path datasetAcl [-s] [-v version] [-R] path aclsetquota -n|-b val pathstat [-w] pathsync path
Command not found: Command not found help

2.2、RPC(Remote Procedure Call)

RPC(Remote Procedure Call),通俗地说,就是在一台计算机上调用另一台计算机提供的服务。这里的服务对应RPC中的P(Procedure),表现形式通常是API接口,或者说好比一个本地代码工程中的一个函数。那为什么要用RPC呢?最主要的原因有两点:1、符合低耦合、职责分离、可复用的开发原则,将不同的服务(模块、功能……什么名字都好,理解其本质即可)放在不同的代码工程,甚至不同的计算机(服务器)上,避免所有代码杂糅在一个工程中,难以开发与维护;2、缓解负载压力,不同计算机(服务器)提供不同的服务,各司其职,不用做所有事,降低资源耗尽的风险。

RPC其实没有那么高深,如果不考虑底层实现原理,则对于程序员来说几乎完全透明,就是一套固定步骤的开发流程,和调用本地工程代码中的函数没有差别。以笔者目前所做的项目为例,步骤大体为:准备对应的stub(笔者将其理解为对方所能提供函数,在我方的一个说明)、准备远程调用的通道、控制器对象;开启子线程,利用回调函数Callback Function,等待处理响应,并用线程倒数控制器CountDownLatch让主线程阻塞;开启远程调用。

回调函数:你到一个商店买东西,刚好你要的东西没有货,于是你在店员那里留下了你的电话,过了几天店里有货了,店员就打了你的电话,然后你接到电话后就到店里去取了货。在这个例子里,你的电话号码就叫回调函数,你把电话留给店员就叫登记回调函数,店里后来有货了叫做触发了回调关联的事件,店员给你打电话叫做调用回调函数,你到店里去取货叫做响应回调事件。回答完毕。

2.3、API应用

添加pom文件

<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.7</version></dependency>
</dependencies>

log4j.properties

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

JAVA实例程序

import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.io.IOException;
import java.util.List;/*** ZK的Java客户端* 1. 获取和ZK的连接对象* 2. 操作各种API* 3. 关闭资源*/
public class TestZkClient {private ZooKeeper zk;@Beforepublic void initZkClient() throws IOException {// 连接ZK集群的地址String connectionStr = "hadoop102:2181,hadoop103:2181,hadoop104:2181";// 超时时间int sessionTimeOut = 10000;zk = new ZooKeeper(connectionStr, sessionTimeOut, new Watcher() {public void process(WatchedEvent event) {//                System.out.println("监听到有变化");}});}@Afterpublic void closeZk() throws InterruptedException {zk.close();}@Testpublic void ls() throws IOException, KeeperException, InterruptedException {//用客户端对象做各种操作List<String> children = zk.getChildren("/", false);System.out.println(children);}

JAVA运行输出

2021-12-30 22:45:25,769 INFO [org.apache.zookeeper.ClientCnxn] - Opening socket connection to server hadoop103/192.168.2.35:2181. Will not attempt to authenticate using SASL (unknown error)
2021-12-30 22:45:25,772 INFO [org.apache.zookeeper.ClientCnxn] - Socket connection established, initiating session, client: /192.168.2.9:56683, server: hadoop103/192.168.2.35:2181
2021-12-30 22:45:25,810 INFO [org.apache.zookeeper.ClientCnxn] - Session establishment complete on server hadoop103/192.168.2.35:2181, sessionid = 0x300071438870000, negotiated timeout = 10000
[zookeeper, sanguo]
2021-12-30 22:45:25,948 INFO [org.apache.zookeeper.ZooKeeper] - Session: 0x300071438870000 closed
2021-12-30 22:45:25,948 INFO [org.apache.zookeeper.ClientCnxn] - EventThread shut down for session: 0x300071438870000

zookeeper监控的原理和使用

@Test
public void testLsNode() throws KeeperException, InterruptedException {// 通过构造器注册默认监听事件(在连接成功后会触发)List<String> children = zk.getChildren("/sanguo", new Watcher() {public void process(WatchedEvent event) {System.out.println("监听到/sanguo节点有变化!!!");}});for (String child : children) {System.out.println(child);}for(int i=0;i<10;i++){// 通过exist()/getData()/getChildren()方法绑定事件,// 这三个方法可以绑定自定义watcher或者传入true绑定默认的监听器List<String> children1 = zk.getChildren("/sanguo", new Watcher() {public void process(WatchedEvent event) {System.out.println("监听到/sanguo节点有变化!!!");}});Thread.sleep(10000);for (String child : children1) {System.out.println("jieky:"+ i + child);}}
}

ZooKeeper异步调用命令

@Test
public void asynchronization(){createNode("/jieky","hhhhhhh".getBytes());
}void createNode(String nodePath, byte[] data) {AsyncCallback.StringCallback nodeCreateCallback = new AsyncCallback.StringCallback() {public void processResult(int rc, String path, Object ctx, String name) {switch (KeeperException.Code.get(rc)) {case OK:// 创建节点成功System.out.println("创建节点成功");break;case CONNECTIONLOSS:// 连接丢失,重新发布命令createNode(path, (byte[]) ctx);System.out.println("连接丢失,重新发布命令");return;default:// 其他异常,抛出或记录异常KeeperException e = KeeperException.create(KeeperException.Code.get(rc), path);System.out.println("create node error" + e);}}};zk.create(nodePath, data,ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL,nodeCreateCallback,data);
}

2.4、Zookeeper内部原理

2.4.1、监听器原理

2.4.2、Zookeeper的选举机制

① Zookeeper 选举会发生在服务器初始状态和运行状态下。
② 初始状态下会根据服务器sid的编号对比,编号越大权值越大,投票过半数即可选出Leader。
③ Leader 故障会触发新一轮选举,zxid 代表数据越新,权值也就越大。
④ 在运行期选举还可能会遇到脑裂的情况,ZooKeeper默认采用Quorums方式来避免出现“脑裂”。

ZooKeeper集群脑裂问题处理,值得收藏!

2.4.3、Zookeeper的写和读数据流程

写数据流程

读数据流程

相比写数据流程,读数据流程就简单得多;因为每台server中数据一致性都一样,所以随便访问哪台server读数据就行;没有写数据流程中请求转发、数据同步、成功通知这些步骤。

010 Zookeeper相关推荐

  1. Hadoop生态圈-Zookeeper的工作原理分析

    Hadoop生态圈-Zookeeper的工作原理分析 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任.   无论是是Kafka集群,还是producer和consumer都依赖于Zoo ...

  2. k8s部署zookeeper,kafka集群(李作强)

    采用网上镜像:mirrorgooglecontainers/kubernetes-zookeeper:1.0-3.4.10 准备共享存储:nfs,glusterfs,seaweed或其他,并在node ...

  3. Zookeeper开发者手册

    Zookeeperapi官网:http://zookeeper.apache.org/doc/r<version:版本号>/index.html Zookeeper是一种作用于分布式应用高 ...

  4. 常用的高性能 KV 存储 Redis、Memcached、etcd、Zookeeper 区别

    1. 什么是 KV 存储 KV 是 Key-Value 的缩写,KV 存储也叫键值对存储.简单来说,它是利用 Key 做索引来实现数据的存储.修改.查询和删除功能. 常用的高性能 KV 存储主要有 R ...

  5. kafka+zookeeper搭建步骤kafka问题

    kafka+zookeeper搭建步骤 帅气的名称被占用关注 0.1392018.12.04 13:48:00字数 1,007阅读 88 vmware 安装centOS7 克隆虚拟为:三台 本地你的I ...

  6. ZooKeeper简单使用

    ZooKeeper简单使用 ZooKeeper简单使用 1.ZooKeeper简介 2.ZooKeeper能做什么 3.ZooKeeper核心 3.1.ZooKeeper安装 3.2.ZooKeepe ...

  7. 2021年大数据ZooKeeper(六):ZooKeeper选举机制

    目录 ​​​​​​ZooKeeper选举机制 概念 全新集群选举 非全新集群选举 ZooKeeper选举机制 zookeeper默认的算法是FastLeaderElection,采用投票数大于半数则胜 ...

  8. 2021年大数据ZooKeeper(五):ZooKeeper Java API操作

    目录 ZooKeeper Java API操作 引入maven坐标 节点的操作 ZooKeeper Java API操作 这里操作Zookeeper的JavaAPI使用的是一套zookeeper客户端 ...

  9. 2021年大数据ZooKeeper(四):ZooKeeper的shell操作

    目录 ZooKeeper的shell操作 客户端连接 shell基本操作 操作命令 操作实例 节点属性 ​​​​​​​ZooKeeper Watcher(监听机制) ​​​​​​​Watch机制特点 ...

最新文章

  1. 互联网还留给我们这些出路
  2. Golang适合高并发场景的原因分析
  3. 我和“限速”之间的纠缠(一)
  4. 实验新手必须知道的细胞计数技巧
  5. MySQL 错误 #1113
  6. 1687: 数组操作(非常规思维)
  7. FTP两种工作模式:主动模式(Active FTP)和被动模式(Passive FTP)
  8. ajaxfileupload带参数上传文件
  9. JS替换地址栏参数值
  10. springmvc 传对象报400_springmvc 通过对象来接收参数,为什么默认会返回该对象?
  11. Android点击效果
  12. 同样是做冻品生意,哪类人更挣Q?
  13. 《广义动量定理与系统思考----战争…
  14. HP5100常见错误代码
  15. 2019数学建模F:数字货币存在是否合理?提供一些思路供思考
  16. 三国演义主要人物个人经历
  17. 湖北大学计算机学院王时绘,5G来了,有湖大人的智慧!
  18. SpringBoot电商项目之购物车下单(沙箱支付)
  19. 特斯拉因辅助驾驶发生致命车祸;APUS发布AiLMe大模型;欧洲成立人工智能研究中心来监督大型平台丨每日大事件...
  20. 记Elsevier上Latex投稿

热门文章

  1. 网上书城(搜索页,购物车)
  2. 【深度之眼cs231n第七期】笔记(二十七)
  3. 送走跌宕起伏的2022,迎接拨云睹日的2023
  4. K8s NetworkPolicy与网络插件flannel、calico详细版
  5. Day001--Scala中的下载安装配置及下载安装集成开发环境IDEA
  6. Vue 添加组件和跳转
  7. 武汉加油!中国加油!小峯加油!大家加油!
  8. 笔杆网试用---感官体验篇一
  9. 1、查询姓名中包含‘u’字母的员工记录2、同名去重3、字段计算
  10. 阿里云服务器最低多少钱一个月,租阿里云服务器一年多少钱