文章目录

  • 1. Zookeeper 入门
    • 1.1 概述
    • 1.2 特点
    • 1.3 数据结构
    • 1.4 应用场景
  • 2. Zookeeper 安装
    • 2.1 下载地址
    • 2.2 本地模式安装部署
    • 2.3 分布式安装部署
    • 2.4 配置参数解读
  • 3. Zookeeper 内部原理
    • 3.1 选举机制
    • 3.2 节点类型
    • 3.3 Stat 结构体
    • 3.4 监听器原理
    • 3.5 写数据流程
  • 4. Zookeeper 实战
    • 4.1 客户端命令行操作
    • 4.2 API 操作
      • 4.3.1 IDEA 环境搭建
      • 4.3.2 创建 ZooKeeper 客户端
      • 4.3.4 获取子节点并监听节点变化
      • 4.3.5 判断 Znode 是否存在
    • 4.3 监听服务器节点动态上下线案例
  • 5 zookeeper框架
    • 5.1 org.apache.zookeeper
    • 5.2 zkclient
      • 5.2.1 简介
      • 5.2.2 Maven依赖
      • 5.2.3 ZkClient 的设计
      • 5.2.4 重要处理流程说明
      • 5.2.5 客户端处理变更(Watcher通知)
      • 5.2.6 序列化处理
      • 5.2.7 ZkClient如何解决使用ZooKeeper客户端遇到的问题的呢?
      • 5.2.8 API介绍
      • 5.2.9 demo
    • 5.3 Curator
      • 5.3.1 简介
      • 5.3.2 版本问题
      • 5.3.3 CuratorFramework
      • 5.3.4 curator-recipes
      • 5.3.5 知识点
      • 5.3.6 Maven依赖
      • 5.3.7 api
      • 5.3.8 使用Curator高级API特性之Cache缓存监控节点变化
  • 5.4 使用Curator创建/验证ACL(访问权限列表)
    • 5.4.1 连通Zk时,就指定登录权限
    • 5.4.2写一个把明文的账号密码转换为加密后的密文的工具类
    • 5.4.3使用自定义工具类AclUtils,一次性给多个用户赋Acl权限
    • 5.4.4级联创建节点,并赋予节点操作权限
    • 5.4.5读取节点数据
    • 5.4.6修改具有ACL权限节点的data数据
    • 5.4.7两种方法判断node节点是否存(优先使用第一种)
  • 7 分布式锁
    • 7.1.重入式排它锁InterProcessMutex
    • 7.2.不可重入排它锁InterProcessSemaphoreMutex
    • 7.3.可重入读写锁InterProcessReadWriteLock 、InterProcessLock
    • 7.4.多锁对象容器(多共享锁) ,将多个锁作为单个实体管理,InterProcessMultiLock、InterProcessLock
    • 7.5.代码
  • 8.分布式计数器

1. Zookeeper 入门

1.1 概述

Zookeeper 是一个开源的分布式的,为分布式应用提供协调服务的 Apache 项目。
  Zookeeper 从设计模式角度来理解:是一个基于观案者模式设计的分布式服务管理框架,它负责存储和管理大家都关心的数据,然后接受观察者的注册,一旦这些数据的状态发生变化,Zookeeper 就将负责通知已经在 Zookeeper 上注册的那些观察者做出相应的反应。
  

1.2 特点

1)Zookeeper:一个领导者(Leader) ,多个跟随者(Follower)组成的集群。
2)集群中只要有半数以上节点存活,Zookeeper 集群就能正常服务。
3)全局数据一致:每个 Server 保存一份相同的数据副本,Client 无论连接到哪个 Server,数据都是一致的。
4)更新请求顺序进行,来自同一个 Client 的更新请求按其发送顺序依次执行。
5)数据更新原子性,一次数据更新要么成功,要么失败。
6)实时性,在一定时间范围内,Client 能读到最新数据。

1.3 数据结构

ZooKeeper 数据模型的结构与 Unix 文件系统很类似,整体上可以看作是一棵树,每个节点称做一个 ZNode。每一个 ZNode 默认能够存储 1 MB 的数据,每个 ZNode 都可以通过其路径唯一标识。

1.4 应用场景

提供的服务包括:统一命名服务、统一配置管理、统一集群管理、服务器节点动态上下线、软负载均衡等。
统一命名服务

在分布式环境下,经常需要对应用/服务进行统一命名 ,便于识别。例如:IP 不容易记住,而域名容易记住。
统一配置管理
(1)分布式环境下,配置文件同步非常常见。
  ① 一般要求一个集群中,所有节点的配置信息是一致的,比如 Kafka 集群。
  ② 对配置文件修改后,希望能够快速同步到各个节点上。
(2)配置管理可交由 ZooKeeper 实现。
  ① 可将配置信息写入 ZooKeeper 上的一个 Znode 。
  ② 各个客户端服务器监听这个 Znode。
  ③ 一旦 Znode 中的数据被修改,ZooKeeper 将通知各个客户端服务器。

统一集群管理
(1)分布式环境中,实时掌握每个节点的状态是必要的。
  可根据节点实时状态做出一些调整。
(2)ZooKeeper 可以实现实时监控节点状态变化
  ① 可将节点信息写入Z ooKeeper 上的一个 ZNode。
  ② 监听这个 ZNode 可获取它的实时状态变化。
服务器动态上下线

软负载均衡

在 Zookeeper 中记录每台服务器的访问数,让访问数最少的服务器去处理最新的客户端请求。

2. Zookeeper 安装

2.1 下载地址

zookeeper 官网

2.2 本地模式安装部署

准备工作

tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local/
mv apache-zookeeper-3.5.6-bin/ zookeeper
mv zoo_sample.cfg zoo.cfg
mkdir -p /usr/local/zookeeper/datavim zoo.cfg
dataDir=/usr/local/zookeeper/data

vim /etc/profile
在配置文件中添加以下内容
#ZOOKEEPER
export ZOOKEEPER_HOME=/hadoop/zookeeper-3.5.6
export PATH=PATH:PATH:PATH:ZOOKEEPER_HOME/bin
source /etc/profile

启动 Zookeeper
zkServer.sh start

启动客户端
zkCli.sh
退出客户端
quit
停止 Zookeeper
zkServer.sh stop

2.3 分布式安装部署

集群规划
在 master、slave1 和 slave2 三个节点上部署 Zookeeper。
tar -zxvf apache-zookeeper-3.5.6-bin.tar.gz -C /usr/local
mv apache-zookeeper-3.5.6-bin/ zookeeper
同步 /usr/local/zookeeper 目录内容到 slave1、slave2

xsync zookeeper/
配置服务器编号
① 在 /usr/local/zookeeper/ 这个目录下创建 zkData
mkdir data
② /usr/local/zookeeper/data 目录下创建一个 myid 的文件
touch myid
③ 编辑 myid 文件
vim myid
在文件中添加与 server 对应的编号:
0
④ 分发到其他机器上
xsync myid
并分别在 slave1、slave2 上修改 myid 文件中内容为 1、2

配置 zoo.cfg 文件
① 将 /usr/local/zookeeper/conf 这个路径下的 zoo_sample.cfg 修改为 zoo.cfg
mv zoo_sample.cfg zoo.cfg
② 打开 zoo.cfg 文件,修改 dataDir 路径
dataDir=/usr/local/zookeeper/data

增加如下配置
server.0=master:2888:3888
server.1=slave1:2888:3888
server.2=slave2:2888:3888
同步 zoo.cfg 配置文件
xsync zoo.cfg

修改环境变量
① 打开配置文件
vim /etc/profile
② 在配置文件中添加以下内容
#ZOOKEEPER
export ZOOKEEPER_HOME=/usr/local/zookeeper
export PATH=PATH:PATH:PATH:ZOOKEEPER_HOME/bin
③ 同步配置文件
xsync /etc/profile
④ 使配置文件生效(三台机器)
source /etc/profile

集群操作
① 三台机器分别启动 Zookeeper
zkServer.sh start
② 三台机器分别关闭 Zookeeper
zkServer.sh stop
编写 Zookeeper 的群起群关脚本

① 在 /usr/local/bin 目录下创建 zk 文件
vim zk.sh

#!/bin/bash
case $1 in
"start"){for i in master slave1 slave2doecho "****************** $i *********************"ssh $i "source /etc/profile && zkServer.sh start"done
};;"stop"){for i in master slave1 slave2doecho "****************** $i *********************"ssh $i "source /etc/profile && zkServer.sh stop"done
};;esac

修改脚本 zk 具有执行权限
chmod 777 zk.sh
调用脚本形式:zk start 或 zk stop

2.4 配置参数解读

Zookeeper 中的配置文件 zoo.cfg 中参数含义解读如下:
tickTime =2000:通信心跳数,Zookeeper 服务器与客户端心跳时间,单位毫秒
Zookeeper 使用的基本时间,服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是每个tickTime 时间就会发送一个心跳,时间单位为毫秒。它用于心跳机制,并且设置最小的 session 超时时间为两倍心跳时间。(session 的最小超时时间是 2*tickTime)

initLimit =10:LF 初始通信时限
集群中的 Follower 跟随者服务器与 Leader 领导者服务器之间初始连接时能容忍的最多心跳数(tickTime的数量),用它来限定集群中的 Zookeeper 服务器连接到 Leader 的时限。

syncLimit =5:LF 同步通信时限
集群中 Leader 与 Follower 之间的最大响应时间单位,假如响应超过 syncLimit * tickTime,Leader 认为 Follwer 死掉,从服务器列表中删除 Follwer。

dataDir:数据文件目录+数据持久化路径
主要用于保存 Zookeeper 中的数据。

clientPort =2181:客户端连接端口
监听客户端连接的端口。

server.A=B:C:D
A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个server。
B 是这个服务器的 ip 地址;
C 是这个服务器与集群中的 Leader 服务器交换信息的端口;
D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。

3. Zookeeper 内部原理

3.1 选举机制

半数机制

集群中半数以上机器存活,集群可用。所以 Zookeeper 适合安装奇数台服务器。

Zookeeper 虽然在配置文件中并没有指定 Master 和 Slave。但是,Zookeeper 工作时,是有一个节点为 Leader,其他则为 Follower,Leader 是通过内部的选举机制临时产生的。

选举过程例子

假设有五台服务器组成的 Zookeeper 集群,它们的 id 从1-5,同时它们都是最新启动的,也就是没有历史数据,在存放数据量这一点上,都是一样的。假设这些服务器依序启动。

① 服务器 1 启动,此时只有它一台服务器启动了,它发出去的报文没有任何响应,所以它的选举状态一直是 LOOKING 状态。

② 服务器 2 启动,它与最开始启动的服务器 1 进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以 id 值较大的服务器 2 胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是 3),所以服务器 1、2 还是继续保持 LOOKING 状态。

③ 服务器 3 启动,根据前面的理论分析,服务器 3 成为服务器 1、2、3 中的老大,而与上面不同的是,此时有三台服务器选举了它,所以它成为了这次选举的 Leader。

④ 服务器 4 启动,根据前面的分析,理论上服务器4应该是服务器 1、2、3、4 中最大的,但是由于前面已经有半数以上的服务器选举了服务器 3,所以它只能接收当小弟的命了。

⑤ 服务器 5 启动,同 4 一样当小弟。

3.2 节点类型

持久(Persistent)

客户端和服务器端断开连接后,创建的节点不删除

短暂(Ephemeral)

客户端和服务器端断开连接后,创建的节点自己删除

节点类型

① 持久化目录节点

客户端与 Zookeeper 断开连接后,该节点依旧存在。

② 持久化顺序编号目录节点

客户端与 Zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行顺序编号

③ 临时目录节点

客户端与 Zookeeper 断开连接后,该节点被删除

④ 临时顺序编号目录节点

客户端与 Zookeeper 断开连接后,该节点被删除,只是 Zookeeper 给该节点名称进行顺序编号。

说明: 创建 znode 时设置顺序标识,znode 名称后会附加一个值,顺序号是一个单调递增的计数器,由父节点维护。
注意: 在分布式系统中,顺序号可以被用于为所有的事件进行全局排序,这样客户端可以通过顺序号推断事件的顺序。

3.3 Stat 结构体

czxid: 创建节点的事务 zxid

每次修改 ZooKeeper 状态都会收到一个 zxid 形式的时间戳,也就是 ZooKeepe r事务 ID。
事务 ID 是 ZooKeeper 中所有修改总的次序。每个修改都有唯一的 zxid,若 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生。

ctime: znode 被创建的毫秒数(从 1970 年开始)

mzxid: znode 最后更新的事务 zxid

mtime: znode 最后修改的毫秒数(从 1970 年开始)

pZxid: znode 最后更新的子节点 zxid

cversion : znode 子节点变化号,znode 子节点修改次数

dataversion: znode 数据变化号

aclVersion: znode 访问控制列表的变化号

ephemeralOwner: 如果是临时节点,这个是 znode 拥有者的 session id。如果不是临时节点则是 0。

dataLength: znode 的数据长度

numChildren: znode 子节点数量

3.4 监听器原理

监听原理详解:

① 首先要有一个 main() 线程
② 在 main 线程中创建 Zokeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener) 。
③ 通过 connect 线程将注册的监听事件发送给 Zookeeper。
④ 在 Zookeeper 的注册监听器列表中将注册的监听事件添加到列表中。
⑤ Zookeeper 监听到有数据或路径变化,就会将这个消息发送给 listener 线程。
⑥ listener 线程内部调用了 process() 方法。

常见的监听
① 监听节点数据的变化
get -w path
② 监听子节点增减的变化
ls -w path

3.5 写数据流程

4. Zookeeper 实战

4.1 客户端命令行操作

启动客户端
zkCli.sh
显示所有操作命令
help
查看当前 znode 中所包含的内容
ls /
ls2 /
查看当前节点详细数据
ls -s /

分别创建 2 个普通节点
create /animals “dog”
create /animals/small “ant”
获得节点的值
get /animals
get /animals/small

创建短暂节点
create -e /animals/big “elephant”
创建带序号的节点
create -s /animals/middle “hourse”

修改节点数据值
set /animals/small “bug”
节点的值变化监听
① 在 slave1 主机上注册监听 /animals 节点数据变化
get -w /animals
② 在 slave2 主机上修改 /animals 节点的数据
set /animals “cat”
③ 观察 slave1 主机收到子节点变化的监听

节点的子节点变化监听(路径变化)
① 在 slave1 主机上注册监听 /animals 节点的子节点变化
ls -w /animals
② 在 slave2 主机 /animals 节点上创建子节点
create /animals/mini “fly”
③ 观察 slave1 主机收到子节点变化的监听

删除节点
delete /animals/big
递归删除节点
deleteall /animals/mini
查看节点状态
stat /animals

4.2 API 操作

4.3.1 IDEA 环境搭建

创建一个 Maven 工程
在 pom 文件中添加依赖

    <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.5.9</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency></dependencies>

在项目的 src/main/resources 目录下,新建一个文件,命名为 “log4j.properties”,在文件中填入:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

4.3.2 创建 ZooKeeper 客户端

@SpringBootTest
public class ZookeeperTest {

private static String connectString = "localhost:2181";
private static int sessionTimeout = 2000;
private static ZooKeeper zkClient;@Test
public static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {}});
}

}

4.3.3 创建子节点
先将上面的 init() 方法前面的注解 @Test 改为 @BeforeAll

// 创建子节点

@SpringBootTest
public class ZookeeperTest {private static String connectString = "localhost:2181";private static int sessionTimeout = 2000;private static ZooKeeper zkClient;@BeforeAllpublic static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {}});}@Testpublic void createNode() throws Exception {
// 参数1:要创建的节点的路径; 参数2:节点数据 ; 参数3:节点权限 ;参数4:节点的类型String path = zkClient.create("/demo1", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println(path);}
}

4.3.4 获取子节点并监听节点变化

// 获取子节点并监听节点变化
@SpringBootTest
public class WatchTest {private static String connectString = "localhost:2181";private static int sessionTimeout = 2000;private static ZooKeeper zkClient;@Testpublic void getChildrenAndWatch() throws Exception {List<String> children = zkClient.getChildren("/", true);for (String child : children) {System.out.println(child);}// 延时阻塞Thread.sleep(Long.MAX_VALUE);}@BeforeAllpublic static void init() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent watchedEvent) {List<String> children = null;try {children = zkClient.getChildren("/", true);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}for (String child : children) {System.out.println(child);}}});}
}

4.3.5 判断 Znode 是否存在

// 判断znode是否存在
@Test
public void exist() throws Exception {
Stat stat = zkClient.exists(“/animals”, false);
System.out.println(stat == null ? “not exist” : “exist”);
}

4.3 监听服务器节点动态上下线案例

需求

某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。

需求分析

代码实现

① 先在集群上创建 /servers 节点
create /servers “servers”
② 服务器端向 Zookeeper 注册代码

package zookeeper;

import org.apache.zookeeper.*;

import java.io.IOException;

public class DistributeServer {

private String connectString = "master:2181,slave1:2181,slave2:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;public static void main(String[] args) throws Exception {args = new String[]{"slave1"};DistributeServer server = new DistributeServer();// 1.连接zookeeper集群server.getConnect();// 2.注册节点server.register(args[0]);// 3.业务逻辑处理server.business();
}private void getConnect() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {}});
}private void register(String hostname) throws KeeperException, InterruptedException {String path = zkClient.create("/servers/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(hostname + " is online");
}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);
}

}

③ 客户端代码

package zookeeper;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class DistributeClient {

private String connectString = "master:2181,slave1:2181,slave2:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;public static void main(String[] args) throws Exception {DistributeClient client = new DistributeClient();// 1.连接zookeeper集群client.getConnect();// 2.注册监听client.getChildren();// 3.业务逻辑处理client.business();
}private void getConnect() throws IOException {zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {public void process(WatchedEvent event) {try {getChildren();} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}}});
}private void getChildren() throws KeeperException, InterruptedException {List<String> children = zkClient.getChildren("/servers", true);// 存储服务器节点主机名称集合ArrayList<String> hosts = new ArrayList<String>();for (String child : children) {byte[] data = zkClient.getData("/servers/" + child, false, null);hosts.add(new String(data));}System.out.println(hosts);
}private void business() throws InterruptedException {Thread.sleep(Long.MAX_VALUE);
}

}

5 zookeeper框架

5.1 org.apache.zookeeper

5.2 zkclient

5.2.1 简介

ZkClient 是由 Datameer 的工程师开发的开源客户端,对 Zookeeper 的原生 API 进行了包装,实现了超时重连、Watcher 反复注册等功能。
在使用 ZooKeeper 的 Java 客户端时,经常需要处理几个问题:重复注册 watcher、session失效重连、异常处理。
IZKConnection:是一个ZkClient与Zookeeper之间的一个适配器;在代码里直接使用的是ZKClient,实质上还是委托了zookeeper来处理了。

在ZKClient中,根据事件类型,分为

节点事件(数据事件),对应的事件处理器是IZKDataListener;
子节点事件,对应的事件处理器是IZKChildListener;
Session事件,对应的事件处理器是IZKStatusListener;
ZkEventThread:是专门用来处理事件的线程

目前已经运用到了很多项目中,知名的有 Dubbo、Kafka、Helix。

5.2.2 Maven依赖

<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version>
</dependency>

5.2.3 ZkClient 的设计

从上述结构上看,IZKConnection 是一个 ZkClient 与 ZooKeeper 之间的一个适配器。在代码里直接使用的是 ZKClient,其实质还是委托了 zookeeper 来处理了。

使用 ZooKeeper 客户端来注册 watcher 有几种方法: 1、创建 ZooKeeper 对象时指定默认的 Watcher,2、getData(),3、exists(),4、 getchildren。其中 getdata,exists 注册的是某个节点的事件处理器(watcher),getchildren 注册的是子节点的事件处理器(watcher)。而在 ZKClient 中,根据事件类型,分为了节点事件(数据事件)、子节点事件。对应的事件处理器则是 IZKDataListener 和 IZKChildListener。另外加入了 Session 相关的事件和事件处理器。

ZkEventThread 是专门用来处理事件的线程。

5.2.4 重要处理流程说明

启动 ZKClient

在创建 ZKClient 对象时,就完成了到 ZooKeeper 服务器连接的建立。具体过程是这样的:

启动时,指定好 connection string,连接超时时间,序列化工具等。
创建并启动 eventThread,用于接收事件,并调度事件监听器 Listener 的执行。
连接到 zookeeper 服务器,同时将 ZKClient 自身作为默认的 Watcher。
为节点注册Watcher:

ZooKeeper 的三个方法:getData、getChildren、exists.

ZKClient 都提供了相应的代理方法。就拿 exists 来看:

可以看到,是否注册 watcher,由 hasListeners(path)来决定的。

hasListeners 就是看有没有与该数据节点绑定的 listener。

所以,默认情况下,都会自动的为指定的 path 注册 watcher,并且是默认的 watcher (ZKClient)。怎么才能让 hasListeners 判定值为 true 呢,也就是怎么才能为 path 绑定 Listener 呢?
ZKClient提供了订阅功能:

一个新建的会话,只需要在取得响应的数据节点后,调用 subscribteXxx 就可以订阅上相应的事件了。

5.2.5 客户端处理变更(Watcher通知)

前面已经知道,ZKClient 是默认的 Watcher,并且在为各个数据节点注册的 Watcher 都是这个默认的 Watcher。那么该是如何将各种事件通知给相应的 Listener 呢?

处理过程大致可以概括为下面的步骤:

判断变更类型:变更类型分为 State 变更、ChildNode 变更(创建子节点、删除子节点、修改子节点数据)、NodeData 变更(创建指定 node,删除节点,节点数据变更)。
取出与 path 关联的 Listeners,并为每一个 Listener 创建一个 ZKEvent,将 ZkEvent 交给 ZkEventThread 处理。
ZkEventThread 线程,拿到 ZkEvent 后,只需要调用 ZkEvent 的 run 方法进行处理。 从这里也可以知道,具体的怎么如何调用 Listener,还要依赖于 ZkEvent 的 run()实现了。
注册监听 watcher:

接口类 注册监听方法 解除监听方法
IZkChildListener(子节点) ZkClient的subscribeChildChanges方法 ZkClient 的unsubscribeChildChanges 方法
IZkDataListener(数据) ZkClient 的subscribeDataChanges 方法 ZkClient 的 unsubscribeDataChanges 方法
IZkStateListener(客户端状 态) ZkClient 的 subscribeStateChanges 方 法 ZkClient 的 unsubscribeStateChanges 方法

在 ZkClient 中客户端可以通过注册相关的事件监听来实现对 Zookeeper 服务端时间的订阅。

其中 ZkClient 提供的监听事件接口有以下几种:

其中 ZkClient 还提供了一个 unsubscribeAll 方法,来解除所有监听。

Zookeeper 中提供的变更操作有:节点的创建、删除,节点数据的修改:

创建操作,数据节点分为四种,ZKClient 分别为他们提供了相应的代理:

删除节点的操作:

修改节点数据的操作:

writeDataReturnStat():写数据并返回数据的状态。
updateDataSerialized():修改已序列化的数据。执行过程是:先读取数据,然后使用DataUpdater 对数据修改,最后调用 writeData 将修改后的数据发送给服务端。

5.2.6 序列化处理

ZooKeeper 中,会涉及到序列化、反序列化的操作有两种:getData、setData。在 ZKClient 中,分别用 readData、writeData 来替代了。

对于 readData:先调用 zookeeper 的 getData,然后进行使用 ZKSerializer 进行反序列化工 作。

对于 writeData:先使用 ZKSerializer 将对象序列化后,再调用 zookeeper 的 setData。

5.2.7 ZkClient如何解决使用ZooKeeper客户端遇到的问题的呢?

Watcher 自动重注册:这个要是依赖于 hasListeners()的判断,来决定是否再次注册。如果对此有不清晰的,可以看上面的流程处理的说明。
Session 失效重连:如果发现会话过期,就先关闭已有连接,再重新建立连接。
异常处理:对比 ZooKeeper 和 ZKClient,就可以发现 ZooKeeper 的所有操作都是抛异常 的,而 ZKClient 的所有操作,都不会抛异常的。在发生异常时,它或做日志,或返回空, 或做相应的 Listener 调用。
相比于 ZooKeeper 官方客户端,使用 ZKClient 时,只需要关注实际的 Listener 实现即可。所 以这个客户端,还是推荐大家使用的。
https://www.cnblogs.com/jinchengll/p/12333213.html

5.2.8 API介绍

启动ZKClient:在创建ZKClient对象时,就完成了到ZooKeeper服务器连接的建立
1、启动时,制定好connection string,连接超时时间,序列化工具等
2、创建并启动eventThread,用于接收事件,并调度事件监听器Listener的执行
3、连接到Zookeeper服务器,同时将ZKClient自身作为默认的Watcher

为节点注册Watcher
Zookeeper 原始API的三个方法:getData,getChildren、exists,ZKClient都提供了相应的代理方法,比如exists,

hasListeners是看有没有与该数据节点绑定的listener


所以,默认情况下,都会自动的为指定的path注册watcher,并且是默认的watcher(ZKClient),那么怎样才能让hasListeners值为true呢,也就是怎么才能为path绑定Listener呢?
ZKClient提供了订阅功能,一个新建的会话,只需要在取得响应的数据节点后,调用subscribeXXX就可以订阅上相应的事件了。

5.2.9 demo

  1. createParents可以递归创建节点(public void createPersistent(String path, boolean createParents))
  2. 无需注册watcher(前面也说了,ZKClient帮我们做好了)
  3. 节点内容可以传任意类型数据
  4. 可以自定义内容的序列化和反序列化
  5. 在没指定zkSerializer时,默认使用java自动的序列化和反序列化
public class ZkClientCrud<T> {ZkClient zkClient ;final static Logger logger = LoggerFactory.getLogger(ZkClientCrud.class);public ZkClientCrud(ZkSerializer zkSerializer) {logger.info("链接zk开始");// zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout);zkClient=new ZkClient(ZookeeperUtil.connectString,ZookeeperUtil.sessionTimeout,ZookeeperUtil.sessionTimeout,zkSerializer);}public void createEphemeral(String path,Object data){zkClient.createEphemeral(path,data);}/**** 支持创建递归方式* @param path* @param createParents*/public void createPersistent(String path,boolean createParents){zkClient.createPersistent(path,createParents);}/**** 创建节点 跟上data数据* @param path* @param data*/public void createPersistent(String path,Object data){zkClient.createPersistent(path,data);}/**** 子节点* @param path* @return*/public  List<String> getChildren(String path){return zkClient.getChildren(path);}public  T readData(String path){return zkClient.readData(path);}public  void  writeData(String path,Object data){zkClient.writeData(path,data);}//递归删除public  void deleteRecursive(String path){zkClient.deleteRecursive(path);}
}
public class ZkClientCrudTest {final static Logger logger = LoggerFactory.getLogger(ZkClientCrudTest.class);public static void main(String[] args) {ZkClientCrud<User> zkClientCrud=new ZkClientCrud<User>(new SerializableSerializer());String path="/root";zkClientCrud.deleteRecursive(path);zkClientCrud.createPersistent(path,"hi");/*  zkClientCrud.createPersistent(path+"/a/b/c",true);//递归创建 但是不能设在value//zkClientCrud.createPersistent(path,"hi");logger.info(zkClientCrud.readData(path));//更新zkClientCrud.writeData(path,"hello");logger.info(zkClientCrud.readData(path));logger.info(String.valueOf(zkClientCrud.getChildren(path)));//子节点List<String> list=zkClientCrud.getChildren(path);for(String child:list){logger.info("子节点:"+child);}*/User user=new User();user.setId(1);user.setName("张三");zkClientCrud.writeData(path,user);System.out.println(zkClientCrud.readData(path).getName());;}
}
@Data
@NoArgsConstructor
@AllArgsConstructor
public class User implements Serializable {private Integer id;private String name;
}

Watcher

public class ZkClientWatcher {ZkClient zkClient;public ZkClientWatcher() {zkClient = new ZkClient(new ZkConnection(ZookeeperUtil.connectString), ZookeeperUtil.sessionTimeout);}public void createPersistent(String path, Object data) {zkClient.createPersistent(path, data);}public void writeData(String path, Object object) {zkClient.writeData(path, object);}public void delete(String path) {zkClient.delete(path);}public boolean exists(String path) {return zkClient.exists(path);}public void deleteRecursive(String path) {zkClient.deleteRecursive(path);}//对父节点添加监听数据变化。public void subscribe(String path) {zkClient.subscribeDataChanges(path, new IZkDataListener() {@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {System.out.printf("变更的节点为:%s,数据:%s\r\n", dataPath, data);}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {System.out.printf("删除的节点为:%s\r\n", dataPath);}});}//对父节点添加监听子节点变化。public void subscribe2(String path) {zkClient.subscribeChildChanges(path, new IZkChildListener() {@Overridepublic void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println("父节点: " + parentPath + ",子节点:" + currentChilds + "\r\n");}});}//客户端状态public void subscribe3(String path) {zkClient.subscribeStateChanges(new IZkStateListener() {@Overridepublic void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {if (state == Watcher.Event.KeeperState.SyncConnected) {//当我重新启动后start,监听触发System.out.println("连接成功");} else if (state == Watcher.Event.KeeperState.Disconnected) {System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发} elseSystem.out.println("其他状态" + state);}@Overridepublic void handleNewSession() throws Exception {System.out.println("重建session");}@Overridepublic void handleSessionEstablishmentError(Throwable error) throws Exception {}});}/*  @Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}@Overridepublic void handleDataDeleted(String dataPath) throws Exception {}*/
}
public class ZkClientWatcherTest {public static void main(String[] args) throws InterruptedException {ZkClientWatcher zkClientWatche=new ZkClientWatcher();String path="/root";zkClientWatche.deleteRecursive(path);zkClientWatche.createPersistent(path,"hello");zkClientWatche.subscribe(path);zkClientWatche.subscribe2(path);// zkClientWatche.subscribe3(path);//需要启服务// Thread.sleep(Integer.MAX_VALUE);zkClientWatche.createPersistent(path+"/root2","word");TimeUnit.SECONDS.sleep(1);zkClientWatche.writeData(path,"hi");TimeUnit.SECONDS.sleep(1);//zkClientWatche.delete(path);//如果目录下有内容 不能删除 会报 Directory not empty for /root的异常zkClientWatche.deleteRecursive(path);TimeUnit.SECONDS.sleep(1); //这个main线程就结束}
}
public class ZookeeperUtil {/** zookeeper服务器地址 */
//    public static final String connectString = "192.168.0.101:2181,192.168.0.102:2181,192.168.0.104:2181";public static final String connectString = "localhost:2181";/** 定义session失效时间 */public static final int sessionTimeout = 5000;public static final String path = "/root";
}

5.3 Curator

5.3.1 简介

zookeeper不是为高可用性设计的,但它使用ZAB协议达到了极高的一致性。所以它经常被选作注册中心、配置中心、分布式锁等场景。
它的性能是非常有限的,而且API并不是那么好用。xjjdog倾向于使用基于Raft协议的Etcd或者Consul,它们更加轻量级一些。
Curator是netflix公司开源的一套zookeeper客户端,目前是Apache的顶级项目。与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。Curator解决了很多zookeeper客户端非常底层的细节开发工作,包括连接重连、反复注册wathcer和NodeExistsException 异常等。

Zookeeper 原生API问题:
1.超时重连,不支持自动,需要手动操作
2.Watch注册一次后会失效
3.不支持递归创建节点

Zookeeper API 升级版 Curator:
1.解决watcher的注册一次就失效
2.提供更多解决方案并且实现简单
3.提供常用的ZooKeeper工具类
4.编程风格更爽,点点点就可以了
5.可以递归创建节点等

Curator由一系列的模块构成,对于一般开发者而言,常用的是curator-framework和curator-recipes。

5.3.2 版本问题

Curator2.x.x版本兼容Zookeeper的3.4.x和3.5.x。
Curator3.x.x只兼容Zookeeper 3.5.x,并且提供了一些诸如动态重新配置、watch删除等新特性。
Curator4 统一对 ZooKeeper 3.4.x 和 3.5.x 的支持

5.3.3 CuratorFramework

Curator-Framework是ZooKeeper Client更高的抽象API,最佳核心的功能就是自动连接管理:
当ZooKeeper客户端内部出现异常, 将自动进行重连或重试, 该过程对外几乎完全透明
监控节点数据变化事件NodeDataChanged,需要时调用updateServerList()方法
Curator recipes自动移除监控

更加清晰的API
简化了ZooKeeper原生的方法, 事件等, 提供流式fluent的接口,提供Recipes实现 : 选举,共享锁, 路径cache, 分布式队列,分布式优先队列等。

5.3.4 curator-recipes

curator-recipes:封装了一些高级特性,如:Cache事件监听、 Elections选举、分布式锁、分布式计数器、分布式Barrier、Queues队列等

5.3.5 知识点

1.使用curator建立与zk的连接
2.使用curator添加/递归添加节点
3.使用curator删除/递归删除节点
4.使用curator创建/验证 ACL(访问权限列表)
5.使用curator监听 单个/父 节点的变化(watch事件)
6.基于curator实现Zookeeper分布式锁(需要掌握基本的多线程知识)
7.基于curator实现分布式计数器

5.3.6 Maven依赖

<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><!--建议和本地安装版本保持一致--><version>3.7.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>5.2.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.2.0</version>
</dependency>

5.3.7 api

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ZkConnectCuratorUtil {final static Logger log = LoggerFactory.getLogger(ZkConnectCuratorUtil.class);public CuratorFramework zkClient = null; //zk的客户端工具Curator(在本类通过new实例化的是,自动start)private static final int MAX_RETRY_TIMES = 3; //定义失败重试次数private static final int BASE_SLEEP_TIME_MS = 5000; //连接失败后,再次重试的间隔时间 单位:毫秒private static final int SESSION_TIME_OUT = 1000000; //会话存活时间,根据业务灵活指定 单位:毫秒private static final String ZK_SERVER_IP_PORT = "localhost:2181";//Zookeeper服务所在的IP和客户端端口private static final String NAMESPACE = "workspace";//指定后,默认操作的所有的节点都会在该工作空间下进行//本类通过new ZkCuratorUtil()时,自动连通zkClientpublic ZkConnectCuratorUtil() {RetryPolicy retryPolicy = new RetryNTimes(MAX_RETRY_TIMES, BASE_SLEEP_TIME_MS);//首次连接失败后,重试策略zkClient = CuratorFrameworkFactory.builder()//.authorization("digest", "root:root".getBytes())//登录超级管理(需单独配).connectString(ZK_SERVER_IP_PORT).sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy).namespace(NAMESPACE).build();zkClient.start();}public void closeZKClient() {if (zkClient != null) {this.zkClient.close();}}public static void main(String[] args) {ZkConnectCuratorUtil zkUtil=new ZkConnectCuratorUtil();boolean ifStarted=zkUtil.zkClient.isStarted();System.out.println("当前客户的状态:" + (ifStarted ? "连接中" : "已关闭"));zkUtil.closeZKClient();boolean ifClose = zkUtil.zkClient.isStarted();System.out.println("当前客户的状态:" + (ifClose ? "连接成功" : "已关闭"));}
}
public class CuratorDao {//使用curator(递归)添加节点//级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持)public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception {zkClient.create().creatingParentContainersIfNeeded()//创建父节点,如果需要的话.withMode(CreateMode.PERSISTENT) //指定节点是临时的,还是永久的.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定节点的操作权限.forPath(nodePath, nodeData.getBytes());System.out.println(nodePath + "节点已成功创建…");}//使用curator(递归)删除节点//删除node节点及其子节点public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception {zkClient.delete().guaranteed()                //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功.deletingChildrenIfNeeded()  //级联删除子节点//.withVersion(1)//版本号可以据需使用.forPath(nodePath);System.out.println(nodePath + "节点已删除成功…");}//使用curator更新节点数据//更新节点data数据public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception {zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本号据需使用,默认可以不带System.out.println(nodePath + "节点数据已修改成功…");}//使用curator查询节点数据//查询node节点数据public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = new Stat();byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath);System.out.println("节点" + nodePath + "的数据为" + new String(data));System.out.println("节点的版本号为:" + stat.getVersion());}//使用curator查询节点的子节点//打印node子节点public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception {List<String> childNodes = zkClient.getChildren().forPath(parentNodePath);System.out.println("开始打印子节点");for (String str : childNodes) {System.out.println(str);}}//使用curator判断节点是否存在//判断node节点是否存在public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = zkClient.checkExists().forPath(nodePath);System.out.println(null == stat ? "节点不存在" : "节点存在");}/**************使用Curator高级API特性之Cache缓存监控节点变化*************/@Testpublic void test() throws Exception {ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();CuratorFramework zkClient = zkUtil.zkClient;
//        CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui");
//        CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test");
//        CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi");
//        CuratorDao.getNodeData(zkClient,"/xiaosi/test");
//        CuratorDao.printChildNodes(zkClient, "/xiaosi");CuratorDao.checkNodeExists(zkClient, "/xiaosi");}}

5.3.8 使用Curator高级API特性之Cache缓存监控节点变化

cache是一种缓存机制,可以借助cache实现监听。
简单来说,cache在客户端缓存了znode的各种状态,当感知到zk集群的znode状态变化,会触发event事件,注册的监听器会处理这些事件。
curator支持的cache种类有4种Path Cache,Node Cache,Tree Cache,Curator Cache
1)Path Cache
Path Cache用来观察ZNode的子节点并缓存状态,如果ZNode的子节点被创建,更新或者删除,那么Path Cache会更新缓存,并且触发事件给注册的监听器。
它是通过PathChildrenCache类来实现的,监听器注册是通过PathChildrenCacheListener。
2)Node Cache
Node Cache用来观察ZNode自身,如果ZNode节点本身被创建,更新或者删除,那么Node Cache会更新缓存,并触发事件给注册的监听器。
它是通过NodeCache类来实现的,监听器对应的接口为NodeCacheListener。
3)Tree Cache
Tree Cache是上两种的合体,Tree Cache观察的是自身+所有子节点的所有数据,并缓存所有节点数据。
它是通过TreeCache类来实现的,监听器对应的接口为TreeCacheListener。
4)Curator Cache ( requires ZooKeeper 3.6+)
Curator Cache,是在zk3.6新版本添加的特性,该版本的出现是为了逐步淘汰上面3监听。
它是通过CuratorCache类来实现的,监听器对应的接口为CuratorCacheListener。

Curator一次性的watch

import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;public class MyCuratorWatcher implements CuratorWatcher {@Overridepublic void process(WatchedEvent event) throws Exception {System.out.println("触发watcher,节点路径为:" + event.getPath());switch (event.getType()) {case NodeCreated:break;default:break;}}
}//一次性的watchpublic static void watchOnce(CuratorFramework zkClient,String nodePath) throws Exception {zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);}

NodeCache监听当前节点变化,通过NodeCacheListener接口持续监听节点的变化来实现

//持续监听的watchpublic static void watchForeverByNodeCache(CuratorFramework zkClient,String nodePath) throws Exception {final NodeCache nodeCache=new NodeCache(zkClient, nodePath);//把监听节点,转换为nodeCachenodeCache.start(false);//默认为false  设置为true时,会自动把节点数据存放到nodeCache中;设置为false时,初始化数据为空ChildData cacheData=nodeCache.getCurrentData(); if(null==cacheData) {System.out.println("NodeCache节点的初始化数据为空……");}else {System.out.println("NodeCache节点的初始化数据为"+new String(cacheData.getData()));}//设置循环监听nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData cdata=nodeCache.getCurrentData();if(null==cdata) {System.out.println("节点发生了变化,可能刚刚被删除!");nodeCache.close();//关闭监听}else {String data=new String(cdata.getData());String path=nodeCache.getCurrentData().getPath();System.out.println("节点路径"+path+"数据发生了变化,最新数据为:"+data);}}});}

PathChildrenCache只监听子节点变化
通过PathChildrenCacheListener接口持续监听子节点来实现

//持续监听watch子节点的任何变化public static void watchForeverByPathChildrenCache(CuratorFramework zkClient,String nodePath) throws Exception {final PathChildrenCache childrenCache=new PathChildrenCache(zkClient, nodePath,true);//把监听节点,转换为childrenCache/*** StartMode:初始化方式*    POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件(会进入下面的第一个case)* NORMAL:异步初始化 (不会进入下面的第一个case)*  BUILD_INITIAL_CACHE: 同步初始化(把节点数据同步缓存到Cache中)*/childrenCache.start(StartMode.NORMAL);List<ChildData> childDataList=childrenCache.getCurrentData();System.out.println("当前节点所有子节点的数据列表如下:");for (ChildData childData : childDataList) {System.out.println(new String(childData.getData()));}childrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("子节点初始化OK…");break;case CHILD_ADDED:System.out.println("子节点"+event.getData().getPath()+"已被成功添加,数据data="+new String(event.getData().getData()));break;case CHILD_UPDATED:System.out.println("子节点"+event.getData().getPath()+"数据发生变化,新数据data="+new String(event.getData().getData()));break;case CHILD_REMOVED:System.out.println("子节点"+event.getData().getPath()+"已被移除~");break;case CONNECTION_RECONNECTED:System.out.println("正在尝试重新建立连接…");break;case CONNECTION_SUSPENDED:System.out.println("连接状态被暂时停止…");break;default:break;}}});}

TreeCache是上两者的合体,既监听自身,也监听所有子节点变化
通过TreeCacheListener接口来实现

public static void treeCache(CuratorFramework zkClient) throws Exception {final String path = "/treeChildrenCache";final TreeCache treeCache = new TreeCache(zkClient, path);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {switch (event.getType()){case NODE_ADDED:System.out.println("节点变动触发:NODE_ADDED:" + event.getData().getPath());break;case NODE_REMOVED:System.out.println("节点变动触发:NODE_REMOVED:" + event.getData().getPath());break;case NODE_UPDATED:System.out.println("节点变动触发:NODE_UPDATED:" + event.getData().getPath());break;case CONNECTION_LOST:System.out.println("节点变动触发:CONNECTION_LOST:" + event.getData().getPath());break;case CONNECTION_RECONNECTED:System.out.println("节点变动触发:CONNECTION_RECONNECTED:" + event.getData().getPath());break;case CONNECTION_SUSPENDED:System.out.println("节点变动触发:CONNECTION_SUSPENDED:" + event.getData().getPath());break;case INITIALIZED:System.out.println("节点变动触发:INITIALIZED:" + event.getData().getPath());break;default:break;}}});//据需可以继续做一些其他的增删改操作zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep(1000);zkClient.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");Thread.sleep(1000);zkClient.delete().forPath(path + "/c1");Thread.sleep(1000);zkClient.delete().forPath(path);Thread.sleep(1000);zkClient.close();}

Curator Cache,是在zk3.6新版本添加的特性,Curator需5.+
它的出现是为了替换以上3个监听(NodeCache、PathCache、TreeCache),它通过CuratorCacheListener.builder().for
**来选择对应的监听。最后再通过curatorCache.listenable().addListener(listener);注册监听。

public static void curatorCache1(CuratorFramework zkClient) {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.build(zkClient, path);curatorCache.listenable().addListener(new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData oldData, ChildData newdata) {switch (type) {case NODE_CREATED://各种判断break;default:break;}}});}public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.builder(zkClient,path).build();//构建监听器//新旧对照://1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );//2.path cache--> CuratorCacheListener.builder().forPathChildrenCache();//3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache();CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("节点改变了...");}}).build();//添加监听curatorCache.listenable().addListener(listener);//开启监听curatorCache.start();//让线程休眠30s(为了方便测试)Thread.sleep(1000 * 30);}
package org.example.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.testng.annotations.Test;import java.util.List;/*** @ClassName: CuratorDao* @Description:* @Author: 88578* @Date: 2022/5/1 14:17*/
public class CuratorDao {//使用curator(递归)添加节点//级联创建节点(原生API不支持/后台客户端也不支持,但是Curator支持)public static void createNodes(CuratorFramework zkClient, String nodePath, String nodeData) throws Exception {zkClient.create().creatingParentContainersIfNeeded()//创建父节点,如果需要的话.withMode(CreateMode.PERSISTENT) //指定节点是临时的,还是永久的.withACL(ZooDefs.Ids.OPEN_ACL_UNSAFE) //指定节点的操作权限.forPath(nodePath, nodeData.getBytes());System.out.println(nodePath + "节点已成功创建…");}//使用curator(递归)删除节点//删除node节点及其子节点public static void deleteNodeWithChild(CuratorFramework zkClient, String nodePath) throws Exception {zkClient.delete().guaranteed()                //保证删除:如果删除失败,那么在后端还是继续会删除,直到成功.deletingChildrenIfNeeded()  //级联删除子节点//.withVersion(1)//版本号可以据需使用.forPath(nodePath);System.out.println(nodePath + "节点已删除成功…");}//使用curator更新节点数据//更新节点data数据public static void updateNodeData(CuratorFramework zkClient, String nodePath, String nodeNewData) throws Exception {zkClient.setData().withVersion(0).forPath(nodePath, nodeNewData.getBytes());//版本号据需使用,默认可以不带System.out.println(nodePath + "节点数据已修改成功…");}//使用curator查询节点数据//查询node节点数据public static void getNodeData(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = new Stat();byte[] data = zkClient.getData().storingStatIn(stat).forPath(nodePath);System.out.println("节点" + nodePath + "的数据为" + new String(data));System.out.println("节点的版本号为:" + stat.getVersion());}//使用curator查询节点的子节点//打印node子节点public static void printChildNodes(CuratorFramework zkClient, String parentNodePath) throws Exception {List<String> childNodes = zkClient.getChildren().forPath(parentNodePath);System.out.println("开始打印子节点");for (String str : childNodes) {System.out.println(str);}}//使用curator判断节点是否存在//判断node节点是否存在public static void checkNodeExists(CuratorFramework zkClient, String nodePath) throws Exception {Stat stat = zkClient.checkExists().forPath(nodePath);System.out.println(null == stat ? "节点不存在" : "节点存在");}/**************使用Curator高级API特性之Cache缓存监控节点变化*************///一次性的watchpublic static void watchOnce(CuratorFramework zkClient, String nodePath) throws Exception {zkClient.getData().usingWatcher(new MyCuratorWatcher()).forPath(nodePath);}//NodeCache监听当前节点变化//通过NodeCacheListener接口持续监听节点的变化来实现//持续监听的watchpublic static void watchForeverByNodeCache(CuratorFramework zkClient, String nodePath) throws Exception {final NodeCache nodeCache = new NodeCache(zkClient, nodePath);//把监听节点,转换为nodeCachenodeCache.start(false);//默认为false  设置为true时,会自动把节点数据存放到nodeCache中;设置为false时,初始化数据为空ChildData cacheData = nodeCache.getCurrentData();if (null == cacheData) {System.out.println("NodeCache节点的初始化数据为空……");} else {System.out.println("NodeCache节点的初始化数据为" + new String(cacheData.getData()));}//设置循环监听nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {ChildData cdata = nodeCache.getCurrentData();if (null == cdata) {System.out.println("节点发生了变化,可能刚刚被删除!");nodeCache.close();//关闭监听} else {String data = new String(cdata.getData());String path = nodeCache.getCurrentData().getPath();System.out.println("节点路径" + path + "数据发生了变化,最新数据为:" + data);}}});}//PathChildrenCache只监听子节点变化//通过PathChildrenCacheListener接口持续监听子节点来实现//持续监听watch子节点的任何变化public static void watchForeverByPathChildrenCache(CuratorFramework zkClient, String nodePath) throws Exception {final PathChildrenCache childrenCache = new PathChildrenCache(zkClient, nodePath, true);//把监听节点,转换为childrenCache/*** StartMode:初始化方式*   POST_INITIALIZED_EVENT: 异步初始化,初始化之后会触发事件(会进入下面的第一个case)* NORMAL:异步初始化 (不会进入下面的第一个case)*  BUILD_INITIAL_CACHE: 同步初始化(把节点数据同步缓存到Cache中)*/childrenCache.start(PathChildrenCache.StartMode.NORMAL);List<ChildData> childDataList = childrenCache.getCurrentData();System.out.println("当前节点所有子节点的数据列表如下:");for (ChildData childData : childDataList) {System.out.println(new String(childData.getData()));}childrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case INITIALIZED:System.out.println("子节点初始化OK…");break;case CHILD_ADDED:System.out.println("子节点" + event.getData().getPath() + "已被成功添加,数据data=" + new String(event.getData().getData()));break;case CHILD_UPDATED:System.out.println("子节点" + event.getData().getPath() + "数据发生变化,新数据data=" + new String(event.getData().getData()));break;case CHILD_REMOVED:System.out.println("子节点" + event.getData().getPath() + "已被移除~");break;case CONNECTION_RECONNECTED:System.out.println("正在尝试重新建立连接…");break;case CONNECTION_SUSPENDED:System.out.println("连接状态被暂时停止…");break;default:break;}}});}//TreeCache是上两者的合体,既监听自身,也监听所有子节点变化//通过TreeCacheListener接口来实现public static void treeCache(CuratorFramework zkClient, String nodePath) throws Exception {
//        final String path = "/treeChildrenCache";final TreeCache treeCache = new TreeCache(zkClient, nodePath);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {switch (event.getType()) {case NODE_ADDED:System.out.println("节点变动触发:NODE_ADDED:" + event.getData().getPath());break;case NODE_REMOVED:System.out.println("节点变动触发:NODE_REMOVED:" + event.getData().getPath());break;case NODE_UPDATED:System.out.println("节点变动触发:NODE_UPDATED:" + event.getData().getPath());break;case CONNECTION_LOST:System.out.println("节点变动触发:CONNECTION_LOST:" + event.getData().getPath());break;case CONNECTION_RECONNECTED:System.out.println("节点变动触发:CONNECTION_RECONNECTED:" + event.getData().getPath());break;case CONNECTION_SUSPENDED:System.out.println("节点变动触发:CONNECTION_SUSPENDED:" + event.getData().getPath());break;case INITIALIZED:System.out.println("节点变动触发:INITIALIZED:" + event.getData().getPath());break;default:break;}}});//据需可以继续做一些其他的增删改操作zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);Thread.sleep(1000);zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath + "/c1");Thread.sleep(1000);zkClient.delete().forPath(nodePath + "/c1");Thread.sleep(1000);zkClient.delete().forPath(nodePath);Thread.sleep(1000);zkClient.close();}/*Curator Cache,是在zk3.6新版本添加的特性,Curator需5.*+它的出现是为了替换以上3个监听(NodeCache、PathCache、TreeCache),它通过CuratorCacheListener.builder().for***来选择对应的监听。最后再通过curatorCache.listenable().addListener(listener);注册监听。*/public static void curatorCache1(CuratorFramework zkClient) {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.build(zkClient, path);curatorCache.listenable().addListener(new CuratorCacheListener() {@Overridepublic void event(Type type, ChildData oldData, ChildData newdata) {switch (type) {case NODE_CREATED://各种判断break;default:break;}}});}public static void curatorCache2(CuratorFramework zkClient) throws InterruptedException {final String path = "/curatorCache";CuratorCache curatorCache = CuratorCache.builder(zkClient, path).build();//构建监听器//新旧对照://1.node cache--> CuratorCacheListener.builder().forNodeCache(new NodeCacheListener(){} );//2.path cache--> CuratorCacheListener.builder().forPathChildrenCache();//3.tree cache--> CuratorCacheListener.builder().forTreeCache.forTreeCache();CuratorCacheListener listener = CuratorCacheListener.builder().forNodeCache(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("节点改变了...");}}).build();//添加监听curatorCache.listenable().addListener(listener);//开启监听curatorCache.start();//让线程休眠30s(为了方便测试)Thread.sleep(1000 * 30);}@Testpublic void test() throws Exception {ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();//new的同时,zk也被启动CuratorFramework zkClient = zkUtil.zkClient;
//        CuratorDao.createNodes(zkClient,"/xiaosi/test","siguogui");
//        CuratorDao.deleteNodeWithChild(zkClient,"/xiaosi/test");
//        CuratorDao.updateNodeData(zkClient,"/xiaosi/test","xiaosi");
//        CuratorDao.getNodeData(zkClient,"/xiaosi/test");
//        CuratorDao.printChildNodes(zkClient, "/xiaosi");CuratorDao.checkNodeExists(zkClient, "/xiaosi");}public static void main(String[] args) throws Exception {ZkConnectCuratorUtil zkUtil = new ZkConnectCuratorUtil();//new的同时,zk也被启动CuratorFramework zkClient = zkUtil.zkClient;
//        CuratorDao.watchOnce(zkClient, "/xiaosi/test");
//        CuratorDao.watchForeverByNodeCache(zkClient, "/xiaosi/test");
//        CuratorDao.watchForeverByPathChildrenCache(zkClient, "/xiaosi/test");CuratorDao.treeCache(zkClient, "/xiaosi/test4");CuratorDao dao = new CuratorDao();String nodePath = "/super/succ";dao.createNodes(zkClient, nodePath, "super");//创建节点
//      dao.updateNodeData(zkClient, nodePath, "hello");//更新节点数据
//      dao.deleteNodeWithChild(zkClient, nodePath);
//      dao.getNodeData(zkClient, nodePath);
//      dao.printChildNodes(zkClient, nodePath);
//      dao.checkNodeExists(zkClient, nodePath);
//      dao.watchOnce(zkClient, nodePath);
//      dao.watchForeverByNodeCache(zkClient, nodePath);
//      dao.watchForeverByPathChildrenCache(zkClient, nodePath);Thread.sleep(300000); //延迟sleep时间,便于后才修改节点,看前台是否会继续触发watchcto.closeZKClient();}
}

5.4 使用Curator创建/验证ACL(访问权限列表)

5.4.1 连通Zk时,就指定登录权限

//本类代码,只涉及ACL操作
public class CuratorAcl {public CuratorFramework client = null;public static final String workspace="workspace";public static final String zkServerPath = "192.168.31.216:2181";public CuratorAcl() {RetryPolicy retryPolicy = new RetryNTimes(3, 5000);client = CuratorFrameworkFactory.builder().authorization("digest", "mayun:mayun".getBytes())//通常情况下,登录账号、密码可以通过构造参数传入,暂时固定,据需修改.connectString(zkServerPath).sessionTimeoutMs(20000).retryPolicy(retryPolicy).namespace(workspace).build();client.start();}public void closeZKClient() {if (client != null) {this.client.close();}}
}

5.4.2写一个把明文的账号密码转换为加密后的密文的工具类

//把明文的账号密码转换为加密后的密文
public class AclUtils {public static String getDigestUserPwd(String loginId_Username_Passwd) {String digest = "";try {digest = DigestAuthenticationProvider.generateDigest(loginId_Username_Passwd);} catch (NoSuchAlgorithmException e) {e.printStackTrace();}return digest;}public static void main(String[] args) throws IOException, InterruptedException, KeeperException, Exception {String id = "mayun:mayun";String idDigested = getDigestUserPwd(id);System.out.println(idDigested); // mayun:KThXmEntEPZyHsQk7tbP5ZzEevk=}
}

5.4.3使用自定义工具类AclUtils,一次性给多个用户赋Acl权限

    public static List<ACL> getAcls() throws NoSuchAlgorithmException{List<ACL> acls=new ArrayList<ACL>();Id mayun =new Id("digest", AclUtils.getDigestUserPwd("mayun:mayun"));Id lilei =new Id("digest", AclUtils.getDigestUserPwd("lilei:lilei"));acls.add(new ACL(Perms.ALL, mayun));//给mayun一次性赋值所有权限acls.add(new ACL(Perms.READ, lilei));acls.add(new ACL(Perms.DELETE | Perms.CREATE, lilei));//给lilei分两次赋权限(目的:看不同的赋权方式)return acls;}

5.4.4级联创建节点,并赋予节点操作权限

 public static void createNodesCascade(CuratorAcl cto,String nodePath,String nodeData,List<ACL> acls) throws Exception {String result=cto.client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls, true)//给节点赋权限.forPath(nodePath, nodeData.getBytes());System.out.println("创建成功,result="+result);     }

5.4.5读取节点数据

   public  void getNodeData(CuratorAcl cto,String nodePath) throws Exception {Stat stat = new Stat();byte[] data = cto.client.getData().storingStatIn(stat).forPath(nodePath);if(null!=stat) {System.out.println("节点" + nodePath + "的数据为: " + new String(data));System.out.println("该节点的版本号为: " + stat.getVersion());}}

5.4.6修改具有ACL权限节点的data数据

 public void modNodeDataWhichWithAcl(CuratorAcl cto,String nodePath,String nodeNewData) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("节点修改后的数据为:"+nodeNewData);cto.client.setData().forPath(nodePath, nodeNewData.getBytes());System.out.println("修改成功");}

5.4.7两种方法判断node节点是否存(优先使用第一种)

 public void checkNodeExists(CuratorAcl cto,String nodePath) throws Exception {cto.getNodeData(cto, nodePath);System.out.println("-----------=================-------------");//判断节点是否存在,方法一(路径前面会自动添加workspace)Stat stat=cto.client.checkExists().forPath(nodePath);System.out.println("======="+stat==null?"不存在":"存在");//判断节点是否存在,方法二(路径前面需手动添加workspace)Stat stat2 = cto.client.getZookeeperClient().getZooKeeper().exists("/"+workspace+nodePath, false);System.out.println("======="+stat2==null?"不存在":"存在");}

ACL权限的main方法测试
通过java代码给某个节点添加ACL权限后,后台登陆zk客户端时,是无法直接操作该节点被ACL控制的权限的操作的,要想操作具有ACL权限的节点,方法只有两个。
1、知道该节点输入用户都有哪些,用这些用户的账号密码登录
2、使用超级用户登录
#getAcl /succ/testDigest 查看都有哪些用户对该节点有操作权限
#addauth digest succ:succ 登录

 public static void main(String[] args) throws Exception {CuratorAcl cto = new CuratorAcl();boolean isZkCuratorStarted = cto.client.isStarted();System.out.println("当前客户的状态:" + (isZkCuratorStarted ? "连接成功" : "已关闭"));String nodePath1 = "/acl/tom/bin";String nodePath2 = "/acl/father/child/sub";
//      cto.createNodesCascade(cto, nodePath1, "aclTest", getAcls());//首次创建,报错,只能创建父节点,子节点无法创建
//      cto.client.setACL().withACL(getAcls()).forPath("/curatorNode");//给节点创建权限
//      cto.getNodeData(cto, "/super");
//      cto.getNodeData(cto, "/acl");cto.checkNodeExists(cto, nodePath2);cto.closeZKClient();boolean isZkCuratorStarted2 = cto.client.isStarted();System.out.println("当前客户的状态:" + (isZkCuratorStarted2 ? "连接成功" : "已关闭"));}

7 分布式锁

Curator的5种分布式锁及其对应的核心类:

1.重入式排它锁 Shared Reentrant Lock,实现类:InterProcessMutex

2.不可重入排它锁 Shared Lock ,实现类:InterProcessSemaphoreMutex

3.可重入读写锁 Shared Reentrant Read Write Lock,实现类: InterProcessReadWriteLock 、InterProcessLock

4.多锁对象容器(多共享锁) Multi Shared Lock,将多个锁作为单个实体管理的容器,实现类:InterProcessMultiLock、InterProcessLock

5.共享信号锁Shared Semaphore ,实现类:InterProcessSemaphoreV2

跨 JVM 工作的计数信号量。使用相同锁路径的所有 JVM 中的所有进程将实现进程间有限的租用集。此外,这个信号量大多是“公平的”——每个用户将按照请求的顺序获得租用(从 ZK 的角度来看)。

有两种模式可用于确定信号量的最大租用。在第一种模式中,最大租用是由给定路径的用户维护的约定。在第二种模式中,SharedCountReader 用作给定路径的信号量的方法,以确定最大租用。

7.1.重入式排它锁InterProcessMutex

public InterProcessMutex(CuratorFramework client, String path)
获取/释放锁的API

public void acquire() throws Exception;//获取锁,获取不到锁一直阻塞,zk连接中断则抛异常
public boolean acquire(long time, TimeUnit unit) throws Exception;//获取锁,超过该时间后,直接返回false,zk连接中断则抛异常
public void release() throws Exception;//释放锁
通过release()方法释放锁。InterProcessMutex 实例可以重用。Revoking ZooKeeper recipes wiki定义了可协商的撤销机制。为了撤销mutex, 调用下面的方法

/**

  • 将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
  • Parameters:
  • listener - the listener
    */
    public void makeRevocable(RevocationListener listener)

7.2.不可重入排它锁InterProcessSemaphoreMutex

public InterProcessSemaphoreMutex(CuratorFramework client, String path)
使用InterProcessSemaphoreMutex,调用方法类似,区别在于该锁是不可重入的,在同一个线程中不可重入

7.3.可重入读写锁InterProcessReadWriteLock 、InterProcessLock

一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁使用时不允许读 (阻塞)。此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 —>读锁 —->释放写锁。从读锁升级成写锁是不成的。

7.4.多锁对象容器(多共享锁) ,将多个锁作为单个实体管理,InterProcessMultiLock、InterProcessLock

Multi Shared Lock是一个锁的容器。当调用acquire, 所有的锁都会被acquire(上锁),如果请求失败,所有的锁都会被release (释放锁)。同样调用release时所有的锁都被release(失败被忽略)。基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。主要涉及两个类:InterProcessMultiLock、InterProcessLock

它的构造函数需要包含的锁的集合,或者一组ZooKeeper的path。

public InterProcessMultiLock(List locks)
public InterProcessMultiLock(CuratorFramework client, List paths)

7.5.代码

public class ZkLock {final static Logger log = LoggerFactory.getLogger(ZkLock.class);public CuratorFramework zkClient = null; // zk的客户端工具Curator(在本类通过new实例化的是,自动start)private static final int BASE_SLEEP_TIME_MS = 1000; // 连接失败后,再次重试的间隔时间 单位:毫秒private static final int MAX_RETRY_TIMES = 10; // 定义失败重试次数private static final int SESSION_TIME_OUT = 1000000; // 会话存活时间,根据业务灵活指定 单位:毫秒private static final String ZK_SERVER_IP_PORT = "localhost:2181";// Zookeeper服务所在的IP和客户端端口private static final String NAMESPACE = "workspace";// 指定后,默认操作的所有的节点都会在该工作空间下进行static int j = 10;//初始化zk客户端public ZkLock() {// 重试策略:初试时间为1s 重试10次RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIME_MS, MAX_RETRY_TIMES);// 通过工厂建立连接zkClient = CuratorFrameworkFactory.builder().connectString(ZK_SERVER_IP_PORT) // 连接地址.sessionTimeoutMs(SESSION_TIME_OUT).retryPolicy(retryPolicy)// 重试策略.build();zkClient.start();}public static void lockTest(CuratorFramework zkClient) throws InterruptedException {// 使用分布式锁,所有系统同时监听同一个节点,达到分布式锁的目的final InterProcessMutex lock = new InterProcessMutex(zkClient, "/test");final CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 10; i++) {//启动10个线程new Thread(new Runnable() {@Overridepublic void run() {try {countDownLatch.await();// 线程等待一起执行lock.acquire();// 分布式锁,数据同步// 处理业务j--;System.out.println(j);} catch (Exception e) {e.printStackTrace();} finally {try {// 释放锁lock.release();} catch (Exception e) {e.printStackTrace();}}}}, "t" + i).start();}Thread.sleep(1000);countDownLatch.countDown();// 模拟十个线程一起并发.指定一起执行}public static void main(String[] args) throws InterruptedException {ZkLock zkl = new ZkLock();ZkLock.lockTest(zkl.zkClient);}
}

8.分布式计数器

利用Zookeeper可以实现一个集群共享的计数器。只要使用相同的path就可以得到最新的计数器值, 这是由ZooKeeper的一致性保证的。Curator有两个计数器:DistributedAtomicInteger,DistributedAtomicLong。这个两个除了计数范围(int、long)不同外,没有任何不同。操作也非常简单,跟AtomicInteger大同小异。

increment() //加1
decrement() //减1
compareAndSet(Integer expectedValue, Integer newValue) //cas操作
get() //获取当前值
add():增加特定的值
subtract(): 减去特定的值
trySet(): 尝试设置计数值
使用的时候,必须检查返回结果的succeeded(), 它代表此操作是否成功。如果操作成功, preValue()代表操作前的值, postValue()代表操作后的值。

public static void count(CuratorFramework zkClient) throws Exception {//分布式计数器DistributedAtomicInteger counter=new DistributedAtomicInteger(zkClient,"/super",new RetryNTimes(3,100));//初始化counter.forceSet(0);AtomicValue<Integer> value = counter.increment();//原子自增System.out.println("原值为"+value.preValue());System.out.println("更改后的值为"+value.postValue());System.out.println("状态"+value.succeeded());}public static void main(String[] args) throws Exception {ZkLock zkl=new ZkLock();//ZkLock.lockTest(zkl.zkClient);ZkLock.count(zkl.zkClient);}

另外Curator还有一些高端的用法:分布式屏障—Barrier、Double-barrier,分布式队列DistributedQueueDistributed Queue
https://blog.csdn.net/succing/article/details/121779721

https://blog.csdn.net/succing/article/details/121793494

https://blog.csdn.net/succing/article/details/121844550
https://blog.csdn.net/succing/article/details/121802687

大数据之Zookeeper相关推荐

  1. 尚硅谷大数据技术Zookeeper教程-笔记01【Zookeeper(入门、本地安装、集群操作)】

    视频地址:[尚硅谷]大数据技术之Zookeeper 3.5.7版本教程_哔哩哔哩_bilibili 尚硅谷大数据技术Zookeeper教程-笔记01[Zookeeper(入门.本地安装.集群操作)] ...

  2. 【大数据】Zookeeper学习笔记

    第1章 Zookeeper入门 1.1 概述 Zookeeper是一个开源的分布式的,为分布式应用提供协调服务的Apache项目. 1.2 特点 1.3 数据结构 1.4 应用场景 提供的服务包括:统 ...

  3. java大数据组件Zookeeper

    zookeeper的作用: Zookeeper是针对大型分布式系统的高可靠的协调系统,如dubbo里面的注册中心.分布式锁等,主要应用于分布式系统中. 分布式应用的优点: 可靠性- 单个或几个系统的故 ...

  4. 大数据006——Zookeeper

    1. 前言 1.1 Zookeeper简介 ZooKeeper是一个分布式的,开源的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件. 目前,大 ...

  5. 大数据之ZooKeeper数据类型和节点操作

    文章目录 前言 一.数据类型 二.连接ZooKeeper集群 三.操作ZooKeeper节点 四.Watch机制 总结 前言 #博学谷IT学习技术支持# 上篇文章主要是ZooKeeper集群的概述,以 ...

  6. 【博学谷学习记录】超强总结,用心分享|大数据之ZooKeeper

    ZooKeeper介绍 ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,满足CAP理论中的CP,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件.它 ...

  7. 大数据组件——Zookeeper配置文件解析

    一.配置文件 zoo.cfg 配置项 默认值 解析内容 tickTime 2000ms Client-Server心跳间隔时间 initLimit 10次 Following能容忍的最多心跳数 syn ...

  8. 大数据学前准备--zookeeper详解与集群搭建(保姆级教程)

    前言 本人是才学完大数据的无业游民,我将会总结学习收获或发表自己的学习心得,期望给初学者也为自己今后复习提供一些帮助. 我将陆续发布大数据阶段所学,包括但不限于(hadoop,hive,hbase,p ...

  9. 大数据教程,大数据学习线路图

    前言先引用一下马云大大的话: 很多人还没搞清楚什么是PC互联网,移动互联网来了,我们还没搞清楚移动互联的时候,大数据时代又来了. 马云 深度解析大数据 "大数据"是近年来IT行业的 ...

  10. 2019版云计算大数据学习路线图(含大纲+视频+工具+书籍+面试)

    新版学习路线图上线,对云计算大数据感兴趣的同学们,赶紧学起来吧! 一.2019新版大数据学习路线图---每阶段能力培养及可掌握的能力 二.2019新版大数据学习路线图---每阶段学习大纲及各阶段知识点 ...

最新文章

  1. [ASP.NET MVC 小牛之路]11 - Filter
  2. PHPExcel 去掉错误提示 保护表格
  3. 定制zabbix的rpm包---spec文件的书写
  4. poj1182(食物链)续
  5. Codeforces Round #277 (Div. 2) 题解
  6. Laravel 中使用Goutte + GuzzleHttp 组件设置 headers无效的原因探究以及解决方案
  7. html文件转换成dwt文件,如何把dwt页面转换成html页面
  8. PowerDesigner概念模型与物理模型相互转换及导出数据字典
  9. objective-c NSMutableAttributedString
  10. JAVA实现微信公众号推送消息
  11. 关于富文本编辑器的图片处理
  12. 将.ipynb文件转换为.py文件
  13. 华南农业大学计算机学院院长,华南农业大学外国语学院院长何高大
  14. 【转载】FPGA功耗的那些事儿
  15. React中createRef()和useRef()的使用方法
  16. MYSQL的简单查询
  17. 2021年熔化焊接与热切割试题及解析及熔化焊接与热切割模拟试题
  18. 在德国如何优雅地和同事说再见
  19. c语言指针与一维数组PPT,C语言第5章指针和一维数组.ppt
  20. 淘米网汪海兵:为爱创业 刚进腾讯很失落(转载)

热门文章

  1. smartadmin_smartadmin 下载_smartadmin 官网
  2. json编辑器插件 vue_vue-json-editor json编辑器
  3. html在线取色,JS实现的RGB网页颜色在线取色器完整实例
  4. python之pygame,详解坦克大战
  5. 【Redis 开发与运维】初识 Redis
  6. 信捷电子凸轮使用_1.电子凸轮入门应用之基础知识介绍
  7. Axure RP 8.0激活码 Mac Windows
  8. Amesim车辆仿真—入门二之善用帮助文档
  9. 2021年《职业防治法》宣传周活动资料海报挂图及职业病知识小手册等
  10. 多旋翼无人机动力、运动学建模及仿真