前言

Zookeeper操作方式

这篇文章主要说的是利用java来操作zookeeper,就如操作mysql数据库一样,主要是实现增删改查功能,而实现这些功能的方式主要有以下三种:

  1. zookeeper官方提供的原生的api
  2. zkclient
  3. Apache Curator

简单说下三种方式的区别与各自的优劣:

  • zookeeper自带的客户端是官方提供的,比较底层、使用起来写代码麻烦,很多功能需要自己来实现、不够直接。
  • zkclient是另一个开源的ZooKeeper客户端。
  • Apache Curator是Apache的开源项目,封装了zookeeper自带的客户端,使用相对简便,易于使用。

Curator介绍

这篇主要讲解使用基于Curator操作Zookeeper的操作案例。

Apache Curator是一个比较完善的ZooKeeper客户端框架,通过封装的一套高级API 简化了ZooKeeper的操作。通过查看官方文档,可以发现Curator主要解决了三类问题:

  • 封装ZooKeeper client与ZooKeeper server之间的连接处理
  • 提供了一套Fluent风格的操作API
  • 提供ZooKeeper各种应用场景(recipe, 比如:分布式锁服务、集群领导选举、共享计数器、缓存机制、分布式队列等)的抽象封装

基于Curator使用Zookeeper

导入依赖

        <!-- ZooKeeper 之 Curator--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.2.0</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.2.0</version></dependency>

Zookeeper客户端

创建客户端

使用CuratorFrameworkFactory的两个静态工厂方法来创建zookeeper客户端对象,主要需要设置以下参数:

  • connectString:zookeeper服务器地址及端口号,多个zookeeper服务器地址以“,”分隔。
  • sessionTimeoutMs:会话超时时间,单位毫秒,默认为60000ms。
  • connectionTimeoutMs:连接超时时间,单位毫秒,默认为15000ms。
  • namespace: 为了实现不同的Zookeeper业务之间的隔离,往往会为每个业务分配一个独立的命名空间,即指定一个Zookeeper根路径
  • retryPolicy:重试连接策略,有四种实现,分别为:
    • ExponentialBackoffRetry(重试指定的次数, 且每一次重试之间停顿的时间逐渐增加)
    • RetryNtimes(指定最大重试次数的重试策略)
    • RetryOneTimes(仅重试一次)、
    • RetryUntilElapsed(一直重试直到达到规定的时间)
    //Zookeeper客户端private CuratorFramework client; @Beforepublic void testConnect() {client = CuratorFrameworkFactory.builder().connectString("192.168.124.18:2181")  //连接地址和端口号.sessionTimeoutMs(10000)     //会话超时时间.connectionTimeoutMs(1000)   // 连接超时时间.namespace("/test")      //名称空间.retryPolicy(new ExponentialBackoffRetry(1000, 10))  //重试策略.build();client.start();//开启连接}

关闭客户端

    @Afterpublic void testClose() {if (client != null) {client.close();}}

节点操作

创建节点

主要有以下四种:

  1. 创建普通节点(默认即是持久化节点)
  2. 创建多集节点
  3. 创建指定节点类型的节点:
    • PERSISTENT:持久化节点
    • PERSISTENT_SEQUENTIAL:持久化且带序列号节点
    • EPHEMERAL:临时节点
    • EPHEMERAL_SEQUENTIAL:临时且带序列号节点
  4. 创建带指定数据的节点
    @Testpublic void testCreateNode() throws Exception {//1.普通创建client.create().forPath("/test1");//2.创建多集节点client.create().creatingParentContainersIfNeeded().forPath("/test2/demo");//3.设置创建节点类型client.create().withMode(CreateMode.EPHEMERAL).forPath("/test3");//4.创建带指定数据的节点client.create().forPath("/test4", "This is test4".getBytes(StandardCharsets.UTF_8));}

更新节点

    @Testpublic void testUpdateNode() throws Exception {//更新一个节点的数据内容client.setData().forPath("/test4", "第一次更新".getBytes());//更新一个节点的数据内容,强制指定版本进行更新client.setData().withVersion(1).forPath("/test4", "第二次更新".getBytes());}

查询节点

    @Testpublic void testQueryNode() throws Exception {Stat stat1 = client.checkExists().forPath("/test1");System.out.println("路径节点/test1是否存在:" + (stat1 != null));byte[] bytes = client.getData().forPath("/test2");System.out.println("路径/test2的数据是:" + new String(bytes));Stat stat2 = new Stat();bytes = client.getData().storingStatIn(stat2).forPath("/test4");System.out.println("路径/test4的数据是:" + new String(bytes));System.out.println("路径/test4的数据的版本是:" + stat2.getVersion());}

删除节点

    @Testpublic void testDeleteNode() throws Exception {//只能删除叶子节点client.delete().forPath("/test1");//删除一个节点,并递归删除其所有子节点client.delete().deletingChildrenIfNeeded().forPath("/test2");//强制指定版本进行删除client.delete().withVersion(1).forPath("/test4");//注意:由于一些网络原因,上述的删除操作有可能失败,使用guaranteed()// 如果删除失败,会记录下来,只要会话有效,就会不断的重试,直到删除成功为止client.delete().guaranteed().forPath("/test3");}

异步接口

上面提到的创建、删除、更新、读取等方法都是同步的,Curator提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE create()
DELETE delete()
EXISTS checkExists()
GET_DATA getData()
SET_DATA setData()
CHILDREN getChildren()
SYNC sync(String,Object)
GET_ACL getACL()
SET_ACL setACL()
WATCHED Watcher(Watcher)
CLOSING close()

响应码(getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

异步创建节点

Executor executor = Executors.newFixedThreadPool(2);
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));},executor).forPath("path");

监听器

Curator提供了三种Watcher(Cache)来监听结点的变化:

  • Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。

  • Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。

  • Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。

Node Cache

    @Testpublic void testNodeCache() throws Exception {//最后一个参数表示是否进行压缩NodeCache cache = new NodeCache(client, "/super", false);cache.start(true);//只会监听节点的创建和修改,删除不会监听cache.getListenable().addListener(() -> {System.out.println("路径:" + cache.getCurrentData().getPath());System.out.println("数据:" + new String(cache.getCurrentData().getData()));System.out.println("状态:" + cache.getCurrentData().getStat());});client.create().forPath("/nodeCache", "1234".getBytes());Thread.sleep(1000);client.setData().forPath("/nodeCache", "5678".getBytes());Thread.sleep(1000);client.delete().forPath("/nodeCache");Thread.sleep(5000);}

Path Cache

    @Testpublic void testPathChildrenCache() throws Exception {//第三个参数表示是否接收节点数据内容PathChildrenCache childrenCache = new PathChildrenCache(client, "/super", true);/*** 如果不填写这个参数,则无法监听到子节点的数据更新如果参数为PathChildrenCache.StartMode.BUILD_INITIAL_CACHE,则会预先创建之前指定的/super节点如果参数为PathChildrenCache.StartMode.POST_INITIALIZED_EVENT,效果与BUILD_INITIAL_CACHE相同,只是不会预先创建/super节点参数为PathChildrenCache.StartMode.NORMAL时,与不填写参数是同样的效果,不会监听子节点的数据更新操作*/childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);childrenCache.getListenable().addListener((framework, event) -> {switch (event.getType()) {case CHILD_ADDED:System.out.println("CHILD_ADDED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +new String(event.getData().getData()) + ",状态:" + event.getData().getStat());break;case CHILD_UPDATED:System.out.println("CHILD_UPDATED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +new String(event.getData().getData()) + ",状态:" + event.getData().getStat());break;case CHILD_REMOVED:System.out.println("CHILD_REMOVED,类型:" + event.getType() + ",路径:" + event.getData().getPath() + ",数据:" +new String(event.getData().getData()) + ",状态:" + event.getData().getStat());break;default:break;}});client.create().forPath("/pathChildrenCache", "123".getBytes());client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/super/c1", "c1内容".getBytes());//经测试,不会监听到本节点的数据变更,只会监听到指定节点下子节点数据的变更client.setData().forPath("/pathChildrenCache", "456".getBytes());client.setData().forPath("/pathChildrenCache/c1", "c1新内容".getBytes());client.delete().guaranteed().deletingChildrenIfNeeded().forPath("/super");Thread.sleep(5000);}

Tree Cache

    @Testpublic void testTreeCache() throws Exception {TreeCache treeCache = new TreeCache(client, "/treeCache");treeCache.start();treeCache.getListenable().addListener((curatorFramework, treeCacheEvent) -> {switch (treeCacheEvent.getType()) {case NODE_ADDED:System.out.println("NODE_ADDED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_UPDATED:System.out.println("NODE_UPDATED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;case NODE_REMOVED:System.out.println("NODE_REMOVED:路径:" + treeCacheEvent.getData().getPath() + ",数据:" + new String(treeCacheEvent.getData().getData())+ ",状态:" + treeCacheEvent.getData().getStat());break;default:break;}});client.create().forPath("/treeCache", "123".getBytes());client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath("/treeCache/c1", "456".getBytes());client.setData().forPath("/treeCache", "789".getBytes());client.setData().forPath("/treeCache/c1", "910".getBytes());client.delete().forPath("/treeCache/c1");client.delete().forPath("/treeCache");Thread.sleep(5000);}

事务管理

碰到异常,事务会回滚

    @Testpublic void testTransaction() throws Exception{//定义几个基本操作CuratorOp createOp = client.transactionOp().create().forPath("/curator/one_path","some data".getBytes());CuratorOp setDataOp = client.transactionOp().setData().forPath("/curator","other data".getBytes());CuratorOp deleteOp = client.transactionOp().delete().forPath("/curator");//事务执行结果List<CuratorTransactionResult> results = client.transaction().forOperations(createOp,setDataOp,deleteOp);//遍历输出结果//因为节点“/curator”存在子节点,所以在删除的时候将会报错,事务回滚for(CuratorTransactionResult result : results){System.out.println("执行结果是: " + result.getForPath() + "--" + result.getType());}}

Leader选举

在分布式计算中, leader elections是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是leader(领导者)或者coordinator(协调者). 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务leader. 除此之外, 选举还经常会发生在leader意外宕机的情况下,新的leader要被选举出来。

在zookeeper集群中,leader负责写操作,然后通过Zab协议实现follower的同步,leader或者follower都可以处理读操作。

Curator 有两种leader选举的recipe,分别是

  • LeaderSelector:所有存活的客户端不间断的轮流做Leader
  • LeaderLatch:一旦选举出Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权
public class CuratorLeaderTest {/** Zookeeper info */private static final String ZK_ADDRESS = "192.168.1.100:2181";private static final String ZK_PATH = "/zktest";public static void main(String[] args) throws InterruptedException {LeaderSelectorListener listener = new LeaderSelectorListener() {@Overridepublic void takeLeadership(CuratorFramework client) throws Exception {System.out.println(Thread.currentThread().getName() + " take leadership!");// takeLeadership() method should only return when leadership is being relinquished.Thread.sleep(5000L);System.out.println(Thread.currentThread().getName() + " relinquish leadership!");}@Overridepublic void stateChanged(CuratorFramework client, ConnectionState state) {}};new Thread(() -> {registerListener(listener);}).start();new Thread(() -> {registerListener(listener);}).start();new Thread(() -> {registerListener(listener);}).start();Thread.sleep(Integer.MAX_VALUE);}private static void registerListener(LeaderSelectorListener listener) {// 1.Connect to zkCuratorFramework client = CuratorFrameworkFactory.newClient(ZK_ADDRESS,new RetryNTimes(10, 5000));client.start();// 2.Ensure pathtry {new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient());} catch (Exception e) {e.printStackTrace();}// 3.Register listenerLeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener);selector.autoRequeue();selector.start();}
}

分布式锁

分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。

具体实现请参考我的上一篇文章:Zookeeper实现分布式锁

基于Curator的Zookeeper操作实战相关推荐

  1. kfaka storm写入mysql_基于Storm+Kafka+Zookeeper锁+Memcached+mysql架构全方位系统Storm项目案例实战...

    基于Storm+Kafka+Zookeeper锁+Memcached+mysql架构全方位系统Storm项目案例实战 适应人群 有一定Storm基础.Kafka基础.Memcached基础.Zooke ...

  2. 分布式锁(基于redis和zookeeper)详解

    分布式锁(基于redis和zookeeper)详解 https://blog.csdn.net/a15835774652/article/details/81775044 为什么写这篇文章? 目前网上 ...

  3. 【Elasticsearch】Curator 从入门到实战

    1.概述 转载:Curator 从入门到实战 Curator 是elasticsearch 官方的一个索引管理工具,可以通过配置文件的方式帮助我们对指定的一批索引进行创建/删除.打开/关闭.快照/恢复 ...

  4. Kafka原理+操作+实战

    Kafka原理+操作+实战 前面我和大家交流了kafka的部署安装.对于部署安装这都是小意思,不值得太多的提及.重点还是需要知道kafka原理.熟练掌握kafka命令以及灵活用于kafka场景. 干货 ...

  5. MongoDB+集成SpringBoot+索引+并发优化 - 基于《MongoDB进阶与实战:唐卓章》

    文章目录 MongoDB - 基于<MongoDB进阶与实战:唐卓章> 一.首次安装 服务安装 配置文件修改 可视化工具 Docker部署 二.基本使用 2.1 概念解析 2.2 Mong ...

  6. 基于 Kafka 和 ZooKeeper 的分布式消息队列原理

    转载:https://gitbook.cn/books/5bc446269a9adf54c7ccb8bc/index.html 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量 ...

  7. 再谈基于 Kafka 和 ZooKeeper 的分布式消息队列原理

    关于分布式消息队列,我在几个月前写过一篇文章:<深入浅出理解基于 Kafka 和 ZooKeeper 的分布式消息队列 >.最近,由于写作课程<分布式中间件实践之路>的契机,我 ...

  8. iKcamp|基于Koa2搭建Node.js实战(含视频)☞ 路由koa-router

    路由koa-router--MVC 中重要的环节:Url 处理器 ?? iKcamp 制作团队 原创作者:大哼.阿干.三三.小虎.胖子.小哈.DDU.可木.晃晃 文案校对:李益.大力萌.Au.DDU. ...

  9. 第三章:Python基础の函数和文件操作实战

    本課主題 Set 集合和操作实战 函数介紹和操作实战 参数的深入介绍和操作实战 format 函数操作实战 lambda 表达式介绍 文件操作函数介紹和操作实战 本周作业 Set 集合和操作实战 Se ...

最新文章

  1. C/C++程序训练6---歌德巴赫猜想的证明_JAVA
  2. 机器学习实战笔记(Python实现)-01-机器学习实战
  3. 安装deepin linux
  4. python练习题:列表排序
  5. java bean 验证_Java Bean验证基础
  6. 面试官不讲武德,居然让我讲讲蠕虫和金丝雀!
  7. 如何将c语言程序封装供python调用_C++调用python
  8. 如何正确的通过 C++ Primer 学习 C++?(转自知乎)
  9. Java的正则表达式
  10. 【Day02】测试 Primise、setTimeout等的执行顺序
  11. 资深架构师谈云原生生态的基石Kubernetes
  12. RSLogix 5000 含序列号 20.03版本,带授权
  13. grub4dos 制作WIN7、WINPE2003、Ubuntu、dos工具箱多启动U盘 (不量产)
  14. 细胞自动机,那是什么?
  15. angular项目如何配置国际化(i18n)?
  16. PS进阶篇——如何PS软件给图片部分位置打马赛克(四)
  17. 解密:股票短线起涨点的挂单玄机!
  18. 爱快软路由设置DHCP多个LAN处于同一网段
  19. 阿里合伙人制度的意义及法律分析
  20. Spring(MVC)框架

热门文章

  1. C#图形界面汉诺塔Hanoi
  2. 邮件编码介绍及乱码的解决
  3. html页面整体缩小,浏览器缩放原理以及窗口、html页面大小
  4. 每天一道算法题——拼音翻译成阿拉伯数字(只有数字拼音)
  5. 百度网盘直链下载助手 油猴脚本
  6. 哪个软件配音是免费的?分享这几款好用的配音软件
  7. 已知一个字典包含若干员工信息,姓请编写一个函数,删除性别为男的员工信息
  8. js 实现在线考试切屏代码
  9. 联想台式计算机HDMI使用,联想电脑怎样连接电视
  10. 程序猿爆笑选集(2)