前言

这篇文章主要基于客户端在日常工作中使用zk的角度来写,前面南国已经就zk的理论知识做过一些讲述,对此不太熟悉的可以往前看看。Curator是Apache提供的一个zk的工具包,简化了 ZooKeeper 的操作。它增加了很多使用 ZooKeeper 开发的特性,可以处理 ZooKeeper 集群复杂的连接管理和重试机制。这里我们使用springboot 集成curator来操作zk。
假设你的服务已经正确添加了zk的相关依赖,Curator maveny依赖如下,

<!-- curator-framework -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.2.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions>
</dependency>
<!-- curator-recipes -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.2.0</version><exclusions><exclusion><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId></exclusion></exclusions>
</dependency>

正文

Curator 功能分两大类,一类是对 ZooKeeper 的一些基本命令的封装,比如增删改查,即 Framework 模块;另一类是他的高级特性,即 Recipes 模块。

创建CuratorFramework

Curator 框架通过 CuratorFrameworkFactory 可以通过工厂模式或者builder 模式创建 CuratorFramework 实例。CuratorFramework 实例都是线程安全的,你应该在你的应用中共享同一个 CuratorFramework 实例。
在springboot中相当于创建一个curator的配置类

@Slf4j
@Configuration
public class ZkCoreClient {// zk 服务端集群地址@Value("${zk.url}")private String zkUrl;// session 超时时间private int timeOut = 60000;// zkclient 重试间隔时间private int baseSleepTimeMs = 5000;//zkclient 重试次数private int retryCount = 5;/*** 使用double-check 创建client** @return*/@Beanpublic CuratorFramework init() {CuratorFramework client = CuratorFrameworkFactory.builder().connectString(zkUrl).sessionTimeoutMs(timeOut).retryPolicy(new ExponentialBackoffRetry(baseSleepTimeMs, retryCount))
//                            .namespace(appName).build();// 或者使用工厂模式
//                    client = CuratorFrameworkFactory.newClient(zkUrl,new ExponentialBackoffRetry(baseSleepTimeMs,retryCount)).usingNamespace(appName);client.start();log.info("client is created at ================== {}", LocalDateTime.now());return client;}}

查看curator的基本用法

/*** @author: xiejiahao* Date: 2022/3/14 17:13* Description: zk 工具类* 使用Curator 实现zk 的基本操作-增删查改数据 和监听watch*/
@Slf4j
public class ZkUtils {@AutowiredCuratorFramework client;/*** @param path* @param value* @Description: 创建路径* @Date: 2020-08-22 15:15*/public String createNode(String path, String value) throws Exception {return createNode(path, value, true);}public String createNode(String path, String value, Boolean isEphemeral) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}String node = client.create().creatingParentsIfNeeded().withMode(isEphemeral.equals(true) ? CreateMode.EPHEMERAL_SEQUENTIAL : CreateMode.PERSISTENT_SEQUENTIAL) // 临时顺序节点/持久顺序节点.forPath(path, value.getBytes());log.info("create node : {}", node);return node;}/*** @param path* @Description: 删除节点信息* @Date: 2020-08-22 16:01*/public void deleteNode(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}client.delete().guaranteed() // 保障机制,若未删除成功,只要会话有效会在后台一直尝试删除.deletingChildrenIfNeeded() // 若当前节点包含子节点,子节点也删除.forPath(path);log.info("{} is deleted ", path);}/*** 判断znode是否存在,Stat就是对znode所有属性的一个映射,stat=null表示节点不存在** @param path* @return*/public Stat isExists(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}return client.checkExists().forPath(path);}/*** 查询子节点** @param path* @return* @throws Exception*/public List<String> getChildren(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}return client.getChildren().forPath(path);}/*** @param path* @Description: 获取节点存储的值* @Date: 2020-08-22 16:11*/public String getNodeData(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}Stat stat = new Stat();byte[] bytes = client.getData().storingStatIn(stat).forPath(path);log.info("{} data is : {}", path, new String(bytes));log.info("current stat version is {}, createTime is {}", stat.getVersion(), stat.getCtime());return new String(bytes);}/*** @param path* @param value* @Description: 设置节点 数据* @Date: 2020-08-22 16:17*/public void setNodeData(String path, String value) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}Stat stat = client.checkExists().forPath(path);if (null == stat) {log.info(String.format("{} Znode is not exists", path));throw new RuntimeException(String.format("{} Znode is not exists", path));}String nodeData = getNodeData(path);client.setData().withVersion(stat.getVersion()).forPath(path, value.getBytes());log.info("{} Znode data is set. old vaule is {}, new data is {}", path, nodeData, value);}/*** @param path* @Description: 创建 给定节点的监听事件  监听一个节点的更新和创建事件(不包括删除)* @Date: 2020-08-22 16:47*/public void addWatcherWithNodeCache(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}// dataIsCompressed if true, data in the path is compressedNodeCache nodeCache = new NodeCache(client, path, false);NodeCacheListener listener = () -> {ChildData currentData = nodeCache.getCurrentData();log.info("{} Znode data is chagnge,new data is ---  {}", currentData.getPath(), new String(currentData.getData()));};nodeCache.getListenable().addListener(listener);nodeCache.start();}/*** @param path 给定节点* @Description: 监听给定节点下的子节点的创建、删除、更新* @Date: 2020-08-22 17:14*/public void addWatcherWithChildCache(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}//cacheData if true, node contents are cached in addition to the statPathChildrenCache pathChildrenCache = new PathChildrenCache(client, path, false);PathChildrenCacheListener listener = (client, event) -> {log.info("event path is --{} ,event type is {}", event.getData().getPath(), event.getType());};pathChildrenCache.getListenable().addListener(listener);// StartMode : NORMAL  BUILD_INITIAL_CACHE  POST_INITIALIZED_EVENT// NORMAL:异步初始化, BUILD_INITIAL_CACHE:同步初始化, POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件pathChildrenCache.start(PathChildrenCache.StartMode.NORMAL);}/*** @param path* @Description: 监听 给定节点的创建、更新(不包括删除) 以及 该节点下的子节点的创建、删除、更新动作。* @Date: 2020-08-22 17:35*/public void addWatcherWithTreeCache(String path) throws Exception {if (null == client) {throw new RuntimeException("there is not connect to zkServer...");}TreeCache treeCache = new TreeCache(client, path);TreeCacheListener listener = (client, event) -> {log.info("节点路径 --{} ,节点事件类型: {} , 节点值为: {}", Objects.nonNull(event.getData()) ? event.getData().getPath() : "无数据", event.getType());};treeCache.getListenable().addListener(listener);treeCache.start();}}

Curator Recipes的使用

Recipes 模块主要有 Elections (选举)、Locks (锁)、Barriers (关卡)、Atomic (原子量)、Caches、Queues 等

Electtions 选举

选举主要依赖于 LeaderSelector 和 LeaderLatch 两个类。前者是所有存活的客户端不间断的轮流做 Leader。后者是一旦选举出 Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。这两者在实现上是可以切换的。

Locks(分布式锁)

curator 提供了InterProcessMutex来实现zk的分布式锁,他用acquire获取锁 release释放锁。

ZooKeeper分布式锁:
优点
ZooKeeper分布式锁(如InterProcessMutex),能有效地解决分布式问题,不可重入问题,使用起来也较为简单
缺点
ZooKeeper实现的分布式锁,性能并不太高。
因为每次在创建锁和释放锁的过程中,都要动态创建、销毁暂时节点来实现锁功能,
Zk中创建和删除节点只能通过Leader(主)服务器来执行,然后Leader服务器还需要将数据同步到所有的Follower(从)服务器上,这样频繁的网络通信,系统性能会下降。

总之,在高性能、高并发的应用场景下,不建议使用ZooKeeper的分布式锁,如果在并发量不是太高的应用场景中,还是推荐使用ZooKeeper的分布式锁。
目前分布式锁,比较成熟、主流的方案有两种:

基于Redis的分布式锁。适用于并发量很大、性能要求很高而可靠性问题可以通过其他方案去弥补的场景。
基于ZooKeeper的分布式锁。适用于高可靠,而并发量不是太高的场景
在选型时,选择适合于自己业务场景的方案即可。

简单的demo如下

@AutowiredCuratorFramework client;/*** 首先需要调用create 创建路径* 尝试获取锁 包含解锁操作* @param lockCallback* @param lockKey* @param timeout* @param <T>* @return*/public <T> TwoTuple<Boolean, T> tryLock(LockCallback<T> lockCallback, String lockKey, Long timeout) {InterProcessMutex lock = new InterProcessMutex(client, lockKey);try {if (lock.acquire(timeout, TimeUnit.MILLISECONDS)) {log.info(Thread.currentThread().getName() + " get lock");return new TwoTuple<>(true, lockCallback.exec());}} catch (Exception e) {e.printStackTrace();} finally {try {log.info(Thread.currentThread().getName() + " release lock");lock.release();} catch (Exception e) {e.printStackTrace();}}return new TwoTuple<>(false, null);}

服务注册发现

maven 添加依赖

<!-- curator 做服务发现--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-x-discovery</artifactId><version>4.2.0</version></dependency>
/*** 服务注册:将当前启动的服务信息(采用map的形式)进行注册到zk中。* 这里采用的是map 可以自定义类*/public void registerService(String serviceName, String... urls) throws Exception {ServiceInstanceBuilder<Map> serviceInstanceBuilder = ServiceInstance.builder();serviceInstanceBuilder.address(InetAddress.getLocalHost().getHostAddress());serviceInstanceBuilder.port(Integer.parseInt(environment.getProperty("server.port")));serviceInstanceBuilder.name(serviceName);Map config = new HashMap();config.put("url", urls);serviceInstanceBuilder.payload(config);ServiceInstance<Map> serviceInstance = serviceInstanceBuilder.build();ServiceDiscovery<Map> serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class).client(client).serializer(new JsonInstanceSerializer<Map>(Map.class)).basePath(SERVICE_ROOT_PATH).build();serviceDiscovery.registerService(serviceInstance);serviceDiscovery.start();}/*** 服务发现:通过serviceDiscovery查询到服务*/public void discovery(String serviceName) {try {ServiceDiscovery<Map> serviceDiscovery = ServiceDiscoveryBuilder.builder(Map.class).client(client).basePath(SERVICE_ROOT_PATH).build();serviceDiscovery.start();//根据名称获取服务Collection<ServiceInstance<Map>> services = serviceDiscovery.queryForInstances(serviceName);for (ServiceInstance<Map> service : services) {System.out.print(service.getPayload() + " -- ");System.out.println(service.getAddress() + ":" + service.getPort());}System.out.println();} catch (Exception e) {e.printStackTrace();}}

参考资料:
https://blog.csdn.net/smartbetter/article/details/53083816
https://www.icode9.com/content-1-116250.html

Springboot使用Curator 集成zk相关推荐

  1. SpringBoot 2.x 集成 Redis

    SpringBoot 2.x 集成 Redis windows上搭建redis环境 添加依赖 此处redis客户端使用jedis. <!-- redis --> <dependenc ...

  2. springboot security 权限校验_十二、SpringBoot 优雅的集成Spring Security

    前言 至于什么是Spring security ,主要两个作用,用户认证和授权.即我们常说的,用户只有登录了才能进行其他操作,没有登录的话就重定向到登录界面.有的用户有权限执行某一操作,而有的用户不能 ...

  3. SpringBoot与Docker集成

    SpringBoot与Docker集成 许多人正在使用容器包装其Spring Boot应用程序,而构建容器并不是一件容易的事.这是Spring Boot应用程序开发人员的指南,容器对于开发人员而言并非 ...

  4. springboot使用curator来实现leader选举

    本文来说下springboot使用curator来实现leader选举 文章目录 概述 概述

  5. springboot使用curator实现服务的注册和发现

    本文来说下springboot使用curator实现服务的注册和发现 文章目录 概述 概述

  6. SpringBoot与SpringCloud集成

    SpringBoot与SpringCloud集成 : 简介 Spring Cloud是一系列框架的有序集合.它利用Spring Boot的开发便利性巧妙地简化了分布式系统基础设施的开发,如服务发现注册 ...

  7. SpringBoot和Elasticsearch集成

    SpringBoot和Elasticsearch的集成: 步骤 依赖 在Maven的pom文件中 123456789 <!--SpringBoot默认使用SpringData ElasticSe ...

  8. springboot整合curator实现分布式锁

    理论篇: Curator是Netflix开源的一套ZooKeeper客户端框架. Netflix在使用ZooKeeper的过程中发现ZooKeeper自带的客户端太底层, 应用方在使用的时候需要自己处 ...

  9. SpringBoot使用JWT集成Ng-Alain之Token失效处理

    在 SpringBoot使用JWT集成Ng-Alain中,我们简单介绍了SpringBoot与Ng-Alain的集成,在这种前后端分离框架实践中,我们使用了JWT来作为交互的安全标识,考虑一个问题,从 ...

  10. 一个springboot 项目a集成另一个springboot 项目b

    一个springboot 项目a集成另一个springboot 项目b 并且可以运行访问b的controller层 操作1: 项目b打包依赖修改,把上面的springboot默认打包依赖注释,改为下面 ...

最新文章

  1. unity加载ab后,场景shader不起效问题(物件表现黑色)
  2. python免费试听-小栈春季编程免费试听课 倒数6天!
  3. 最小生成树板子-AcWing 859. Kruskal算法求最小生成树
  4. boost::geometry::strategy::distance::detail::projected_point_ax用法的测试程序
  5. win7误删计算机,Win7系统下文件数据被误删了怎么办
  6. c语言界面飞机图形代码,求个用最简单的的代码来实现图形界面…
  7. SQL Server里的自旋锁介绍
  8. Base64转BufferedImage
  9. jquery ajax请求方式与提示用户正在处理请稍等,等待数据返回时loading的显示
  10. C#保存CookieContainer到文件
  11. oracle r12成本操作,ORACLE-EBS-R12成本模块讲义.ppt
  12. 数字孪生赛博朋克风格智慧城市
  13. Android控件——TextView与EditText
  14. PTA 7-47 打印选课学生名单 (25 point(s))
  15. CABasicAnimation,CAKeyframeAnimation,CATransition,CAAnimationGroup,UIBezierPath之间做动画的不同点和各自的使用范围。
  16. 分期利息计算——考虑免息日期和多利率设定的思路设计和代码实现
  17. html怎么读取lrc文件,lrc文件怎么打开?lrc是什么文件?
  18. usaco-5.1-fc-passed
  19. 基于FPGA的UART全双工数据控制器
  20. Spring--配置

热门文章

  1. yubikey复制_使用YubiKey进行Android应用签名
  2. PMI(Pointwise Mutual Information)
  3. 主板后置音频接口图解_图解主板前置音频线接法(一)
  4. word插入的页眉页脚看不到问题排查
  5. 计算机专业就业发展现状,计算机专业就业形势分析
  6. 李宏毅-人类语言处理-成分句法分析
  7. css基础知识 -- 颜色渐变
  8. (2)颜色渐变及原理
  9. 徐艳(帮别人名字作诗)
  10. “AI四小龙”神话破灭?依图终止IPO,云从大裁员,旷视巨亏不止