2019独角兽企业重金招聘Python工程师标准>>>

一、添加maven依赖

<!-- ZooKeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
<!-- Zookeeper -->

二、类ZookeeperService.java

package cn.com.easy.zookeeper;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
/*** zookeeper服务类,用于创建操作zookeeper的对象* * @author nibili 2015年5月7日* */
public class ZooKeeperService {
private Logger logger = LoggerFactory.getLogger(ZooKeeperService.class);
public static final int MAX_RETRIES = 3000;
public static final int BASE_SLEEP_TIMEMS = 3000;
/** zookeeper服务器列表 */
private String zookeeperServers = "";
/** zookeeper客户端操纵对象 */
private CuratorFramework client;
/** 监听器集合(一键多值数据结构) */
private Multimap<IZookeeperWatch, Object> watchesMap = ArrayListMultimap.create();
public ZooKeeperService(String zookeeperServers) {
this.zookeeperServers = zookeeperServers;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES);
this.client = CuratorFrameworkFactory.builder().connectString(this.zookeeperServers).retryPolicy(retryPolicy).build();
client.start();
}
/*** 取消监听,* * @param zookeeperWatch*            注册监听时的对象* @auth nibili 2015年5月8日*/
public void removeNodeWatch(IZookeeperWatch zookeeperWatch) {
if (zookeeperWatch == null) {
logger.info("称除节点监听,监听器对象不能为空!");
return;
}
Collection<Object> values = watchesMap.get(zookeeperWatch);
if (CollectionUtils.isNotEmpty(values) == true) {
// 移除监听器
NodeCache cache = null;
NodeCacheListener nodeCacheListener = null;
Iterator<Object> it = values.iterator();
for (int i = 0; it.hasNext() && i < 2; i++) {
if (i == 0) {
cache = (NodeCache) it.next();
} else if (i == 1) {
nodeCacheListener = (NodeCacheListener) it.next();
} else {
break;
}
}
if (cache != null && nodeCacheListener != null) {
cache.getListenable().removeListener(nodeCacheListener);
}
} else {
logger.info("没有找到对应的监听器!");
return;
}
}
/*** 监听节点变化* * @param zookeeperWatch* @throws Exception* @auth nibili 2015年5月8日*/
public void addNodeWatch(final IZookeeperWatch zookeeperWatch) throws Exception {
// 是否是每一次触发
final AtomicBoolean isFirst = new AtomicBoolean(true);
final NodeCache cache = new NodeCache(this.client, zookeeperWatch.getWatchPath());
cache.start();
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
// 节点数据
String data = new String(cache.getCurrentData().getData(), "UTF-8");
if (isFirst.get() == true) {
isFirst.set(false);
logger.debug("NodeCache loaded, data is: " + data);
zookeeperWatch.handLoad(data);
} else {
logger.debug("NodeCache changed, data is: " + data);
zookeeperWatch.handChange(data);
}
}
};
cache.getListenable().addListener(nodeCacheListener);
watchesMap.put(zookeeperWatch, cache);
watchesMap.put(zookeeperWatch, nodeCacheListener);
}
/*** 断开连接* * @auth nibili 2015年5月7日*/
public void close() {
client.close();
}
/*** 获取zookeeper操纵对象* * @param servers* @return* @auth nibili 2015年5月7日*/
public CuratorFramework getClient() {
return client;
}
/*** 获取服务器地址* * @return* @throws Exception* @auth nibili 2015年5月7日*/
public String getServers() {
return this.zookeeperServers;
}
/*** 设置节点值* * @param path* @param data* @auth nibili 2015年5月8日*/
public void setPathValue(String path, String data) {
try {
logger.debug("设置结点值,path:" + path + ",data:" + data);
this.client.setData().forPath(path, data.getBytes("UTF-8"));
} catch (Exception e) {
logger.error("设置zookeeper节点值异常,path:" + path + ",data" + data, e);
}
}
/*** 获取节点值* * @param path* @return* @throws Exception* @auth nibili 2015年5月7日*/
public byte[] getPathValue(String path) throws Exception {
if (!exists(this.client, path)) {
throw new RuntimeException("Path " + path + " does not exists.");
}
return client.getData().forPath(path);
}
/*** 节点是否存在* * @param client* @param path* @return* @throws Exception* @auth nibili 2015年5月7日*/
private boolean exists(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
return !(stat == null);
}
/*** 获取子节点* * @param path* @return* @throws Exception* @auth nibili 2015年5月7日*/
public List<String> getSubPaths(String path) throws Exception {
return client.getChildren().forPath(path);
}
}

三、在appliction.properties文件中添加zookeeper地址

zk.servers=192.168.1.120:2181

四、定义applictionContext.xml

<!--属性文件 -->
<context:property-placeholder
location="classpath*:applicationContext-zookeeper-watch-demo.properties" />
<!-- 使用zookeeper节点监听,通知,功能时,要添加这个bean -->
<bean id="zookeeperService" class="cn.com.easy.zookeeper.ZooKeeperService">
<constructor-arg>
<value>${zk.servers}</value>
</constructor-arg>
</bean>

五、测试

package cn.com.easy.zookeeper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:/applicationContext-zookeeper-watch-demo.xml")
public class ZookeeperWatchServiceTest {
@Autowired
private ZooKeeperService zookeeperService;
private String path = "/summall/conf/jdbc.username/aa";
@Test
public void addNodeWatch() throws Exception {
try {
zookeeperService.addNodeWatch(new IZookeeperWatch() {
@Override
public void handChange(String data) {
System.out.println("获取到数据变化-1" + path + ":" + data);
}
@Override
public String getWatchPath() {
return path;
}
@Override
public void handLoad(String data) {
System.out.println("获取到数据Loaded-1" + path + ":" + data);
}
});
IZookeeperWatch zookeeperWatch = new IZookeeperWatch() {
@Override
public void handChange(String data) {
System.out.println("获取到数据变化-2" + path + ":" + data);
}
@Override
public String getWatchPath() {
return path;
}
@Override
public void handLoad(String data) {
System.out.println("获取到数据Loaded-2" + path + ":" + data);
}
};
zookeeperService.addNodeWatch(zookeeperWatch);
Thread.sleep(10000);
// 移除监听
zookeeperService.removeNodeWatch(zookeeperWatch);
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void setPathValue() {
zookeeperService.setPathValue(path, "mypath.......");
}
}


六、注意

有同事说,只有在节点数值有变化时,才会通知到客户端。

但是测试过程中的现象是:设置一个节点的值,尽管每次都设成一样的,一样能通知到客户端。

转载于:https://my.oschina.net/u/1045177/blog/412300

Zookeeper节点监听结合Spring相关推荐

  1. Apache ZooKeeper - 事件监听机制详解

    文章目录 事件监听机制命令 Zookeeper事件类型 实操 -w get -w /path 监听节点数据的变化 ls -w /path 监听子节点的变化(增,删) [监听目录] ls -w /pat ...

  2. zookeeper 进行监听节点机制

    1) server端 在一个server启动时,如tomcat启动时,可以把在tomcat启动程序中,把当前tomcat服务写入到zookeeper 的 znode中(临时节点): 2) client ...

  3. 使用kazoo连接zookeeper并监听节点数量以及值变化

      目前kazoo是连接zk的最新第三方库,最新更新时间为2019年1月,其他第三方连接zk的库都长时间未更新,所以推荐使用kazoo.前面有几篇文章都已经详细给出了zk的部署,接下来是zk最核心的地 ...

  4. Zookeeper Watch监听

    概述 ZooKeeper -server host:port cmd args stat path [watch]         ls path [watch]         ls2 path [ ...

  5. 【zookeeper】zookeeper 的监听机制

    文章目录 1.概述 2. shell案例 1.概述 视频:zookeeper znode 存储系统解密 上一篇文章:[zookeeper]zookeeper znode 存储系统解密 许多大数据组件都 ...

  6. 设备节点监听--走在 input 分析之前

    在进行 Input 系统分析之前,先简单学习一下 Input 系统和 linux 交互的几个方法,因为 epoll 机制是 InputManagerServer 和 linux 设备节点交互的主要工具 ...

  7. 用zookeeper体验监听服务器是否还活着

    丛网络上及视频里学习的代码稍微改编了一下, 配合自己的情况(三台zookeeper 虚拟机  HSlave1 HSlave2 HSlave3)  ,模拟server挂掉后客户端得到的通知情况. 步骤 ...

  8. Zookeeper监听机制

    1.1 监听器原理 首先要有一个main()线程 在main()线程中创建Zookeeper客户端,这时就会创建两个线程connect线程负责网络连接通信,listen线程负责监听 通过connect ...

  9. java 持久监听blockqueue的变化_Curator目录监听

    Curator目录监听 write by donaldhan, 2018-06-29 09:40 引言 上一篇文章,我们简单看一下Curator的CDRWA相关的构造器,及Curator框架实现,[C ...

  10. qt Android 按键事件,QT无窗口状态下对键盘事件的监听

    Question:最近在搞linux下的一个客户端项目,需要接收键盘事件,但是又不能有界面,这种情况怎么处理呢? int main(int argc, char *argv[]) { QApplica ...

最新文章

  1. python多行字符串输入_python中怎么输入多行字符串
  2. Nor 与Nand Flash 区别
  3. sql 外连接的写法。
  4. 示波器测ab相_独有功能,剑走偏锋,示波器鲜为人知的10大特异功能|测试能力篇...
  5. editor修改样式 vue_手摸手Electron + Vue实战教程(三)
  6. linux100day(day4)--文本处理三剑客
  7. mysql3.5.2 下载_mybatis 3.5.2 jar 下载
  8. CentOS下查看已经安装好的软件及版本
  9. java 常用集合list与Set、Map区别及适用场景总结
  10. do...while(); 语句在宏定义中的应用。
  11. COSMIC的后端学习之路——1.4 + 1.5 设计模式
  12. python 趋势线表达式_python添加趋势线
  13. 在SQL 2014 Server上安装Northwind和Pubs示例数据库
  14. Linux uart底层设备驱动详解
  15. 江苏华罗庚中学2021高考成绩查询,2021年常州各高中高考成绩排名及放榜最新消息...
  16. mysql的四大常用语句_SQL四大基本语句
  17. linux中inotify+unison实现数据双向实时同步
  18. 繁体字转换 java_java代码实现简体繁体转换
  19. 利用matlab画地图
  20. 转载和积累系列 - 为什么 HashMap 加载因子是0.75?而不是0.8,0.6?

热门文章

  1. 深入理解java虚拟机系列文章:类的加载、连接与初始化
  2. PowerShell 远程连接与其它技巧
  3. .NET配置文件解析过程详解【转载】
  4. 【架构解密】第六章 深入解析分布式存储
  5. Web安全防御从WAF到应用网关
  6. factory工厂模式之工厂方法FactoryMethod
  7. Python 持久存储
  8. s3c2410_gpio_setpin()等系列函数
  9. Spring Boot + Spring Data JPA项目配置多数据源
  10. Docker提高拉取官网镜像的速度