Zookeeper节点监听结合Spring
2019独角兽企业重金招聘Python工程师标准>>>
一、添加maven依赖
<!-- ZooKeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
<scope>provided</scope>
</dependency>
<!-- Zookeeper -->
二、类ZookeeperService.java
package cn.com.easy.zookeeper;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.collections.CollectionUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
/*** zookeeper服务类,用于创建操作zookeeper的对象* * @author nibili 2015年5月7日* */
public class ZooKeeperService {
private Logger logger = LoggerFactory.getLogger(ZooKeeperService.class);
public static final int MAX_RETRIES = 3000;
public static final int BASE_SLEEP_TIMEMS = 3000;
/** zookeeper服务器列表 */
private String zookeeperServers = "";
/** zookeeper客户端操纵对象 */
private CuratorFramework client;
/** 监听器集合(一键多值数据结构) */
private Multimap<IZookeeperWatch, Object> watchesMap = ArrayListMultimap.create();
public ZooKeeperService(String zookeeperServers) {
this.zookeeperServers = zookeeperServers;
RetryPolicy retryPolicy = new ExponentialBackoffRetry(BASE_SLEEP_TIMEMS, MAX_RETRIES);
this.client = CuratorFrameworkFactory.builder().connectString(this.zookeeperServers).retryPolicy(retryPolicy).build();
client.start();
}
/*** 取消监听,* * @param zookeeperWatch* 注册监听时的对象* @auth nibili 2015年5月8日*/
public void removeNodeWatch(IZookeeperWatch zookeeperWatch) {
if (zookeeperWatch == null) {
logger.info("称除节点监听,监听器对象不能为空!");
return;
}
Collection<Object> values = watchesMap.get(zookeeperWatch);
if (CollectionUtils.isNotEmpty(values) == true) {
// 移除监听器
NodeCache cache = null;
NodeCacheListener nodeCacheListener = null;
Iterator<Object> it = values.iterator();
for (int i = 0; it.hasNext() && i < 2; i++) {
if (i == 0) {
cache = (NodeCache) it.next();
} else if (i == 1) {
nodeCacheListener = (NodeCacheListener) it.next();
} else {
break;
}
}
if (cache != null && nodeCacheListener != null) {
cache.getListenable().removeListener(nodeCacheListener);
}
} else {
logger.info("没有找到对应的监听器!");
return;
}
}
/*** 监听节点变化* * @param zookeeperWatch* @throws Exception* @auth nibili 2015年5月8日*/
public void addNodeWatch(final IZookeeperWatch zookeeperWatch) throws Exception {
// 是否是每一次触发
final AtomicBoolean isFirst = new AtomicBoolean(true);
final NodeCache cache = new NodeCache(this.client, zookeeperWatch.getWatchPath());
cache.start();
NodeCacheListener nodeCacheListener = new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
// 节点数据
String data = new String(cache.getCurrentData().getData(), "UTF-8");
if (isFirst.get() == true) {
isFirst.set(false);
logger.debug("NodeCache loaded, data is: " + data);
zookeeperWatch.handLoad(data);
} else {
logger.debug("NodeCache changed, data is: " + data);
zookeeperWatch.handChange(data);
}
}
};
cache.getListenable().addListener(nodeCacheListener);
watchesMap.put(zookeeperWatch, cache);
watchesMap.put(zookeeperWatch, nodeCacheListener);
}
/*** 断开连接* * @auth nibili 2015年5月7日*/
public void close() {
client.close();
}
/*** 获取zookeeper操纵对象* * @param servers* @return* @auth nibili 2015年5月7日*/
public CuratorFramework getClient() {
return client;
}
/*** 获取服务器地址* * @return* @throws Exception* @auth nibili 2015年5月7日*/
public String getServers() {
return this.zookeeperServers;
}
/*** 设置节点值* * @param path* @param data* @auth nibili 2015年5月8日*/
public void setPathValue(String path, String data) {
try {
logger.debug("设置结点值,path:" + path + ",data:" + data);
this.client.setData().forPath(path, data.getBytes("UTF-8"));
} catch (Exception e) {
logger.error("设置zookeeper节点值异常,path:" + path + ",data" + data, e);
}
}
/*** 获取节点值* * @param path* @return* @throws Exception* @auth nibili 2015年5月7日*/
public byte[] getPathValue(String path) throws Exception {
if (!exists(this.client, path)) {
throw new RuntimeException("Path " + path + " does not exists.");
}
return client.getData().forPath(path);
}
/*** 节点是否存在* * @param client* @param path* @return* @throws Exception* @auth nibili 2015年5月7日*/
private boolean exists(CuratorFramework client, String path) throws Exception {
Stat stat = client.checkExists().forPath(path);
return !(stat == null);
}
/*** 获取子节点* * @param path* @return* @throws Exception* @auth nibili 2015年5月7日*/
public List<String> getSubPaths(String path) throws Exception {
return client.getChildren().forPath(path);
}
}
三、在appliction.properties文件中添加zookeeper地址
zk.servers=192.168.1.120:2181
四、定义applictionContext.xml
<!--属性文件 -->
<context:property-placeholder
location="classpath*:applicationContext-zookeeper-watch-demo.properties" />
<!-- 使用zookeeper节点监听,通知,功能时,要添加这个bean -->
<bean id="zookeeperService" class="cn.com.easy.zookeeper.ZooKeeperService">
<constructor-arg>
<value>${zk.servers}</value>
</constructor-arg>
</bean>
五、测试
package cn.com.easy.zookeeper;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:/applicationContext-zookeeper-watch-demo.xml")
public class ZookeeperWatchServiceTest {
@Autowired
private ZooKeeperService zookeeperService;
private String path = "/summall/conf/jdbc.username/aa";
@Test
public void addNodeWatch() throws Exception {
try {
zookeeperService.addNodeWatch(new IZookeeperWatch() {
@Override
public void handChange(String data) {
System.out.println("获取到数据变化-1" + path + ":" + data);
}
@Override
public String getWatchPath() {
return path;
}
@Override
public void handLoad(String data) {
System.out.println("获取到数据Loaded-1" + path + ":" + data);
}
});
IZookeeperWatch zookeeperWatch = new IZookeeperWatch() {
@Override
public void handChange(String data) {
System.out.println("获取到数据变化-2" + path + ":" + data);
}
@Override
public String getWatchPath() {
return path;
}
@Override
public void handLoad(String data) {
System.out.println("获取到数据Loaded-2" + path + ":" + data);
}
};
zookeeperService.addNodeWatch(zookeeperWatch);
Thread.sleep(10000);
// 移除监听
zookeeperService.removeNodeWatch(zookeeperWatch);
Thread.sleep(Integer.MAX_VALUE);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Test
public void setPathValue() {
zookeeperService.setPathValue(path, "mypath.......");
}
}
六、注意
有同事说,只有在节点数值有变化时,才会通知到客户端。
但是测试过程中的现象是:设置一个节点的值,尽管每次都设成一样的,一样能通知到客户端。
转载于:https://my.oschina.net/u/1045177/blog/412300
Zookeeper节点监听结合Spring相关推荐
- Apache ZooKeeper - 事件监听机制详解
文章目录 事件监听机制命令 Zookeeper事件类型 实操 -w get -w /path 监听节点数据的变化 ls -w /path 监听子节点的变化(增,删) [监听目录] ls -w /pat ...
- zookeeper 进行监听节点机制
1) server端 在一个server启动时,如tomcat启动时,可以把在tomcat启动程序中,把当前tomcat服务写入到zookeeper 的 znode中(临时节点): 2) client ...
- 使用kazoo连接zookeeper并监听节点数量以及值变化
目前kazoo是连接zk的最新第三方库,最新更新时间为2019年1月,其他第三方连接zk的库都长时间未更新,所以推荐使用kazoo.前面有几篇文章都已经详细给出了zk的部署,接下来是zk最核心的地 ...
- Zookeeper Watch监听
概述 ZooKeeper -server host:port cmd args stat path [watch] ls path [watch] ls2 path [ ...
- 【zookeeper】zookeeper 的监听机制
文章目录 1.概述 2. shell案例 1.概述 视频:zookeeper znode 存储系统解密 上一篇文章:[zookeeper]zookeeper znode 存储系统解密 许多大数据组件都 ...
- 设备节点监听--走在 input 分析之前
在进行 Input 系统分析之前,先简单学习一下 Input 系统和 linux 交互的几个方法,因为 epoll 机制是 InputManagerServer 和 linux 设备节点交互的主要工具 ...
- 用zookeeper体验监听服务器是否还活着
丛网络上及视频里学习的代码稍微改编了一下, 配合自己的情况(三台zookeeper 虚拟机 HSlave1 HSlave2 HSlave3) ,模拟server挂掉后客户端得到的通知情况. 步骤 ...
- Zookeeper监听机制
1.1 监听器原理 首先要有一个main()线程 在main()线程中创建Zookeeper客户端,这时就会创建两个线程connect线程负责网络连接通信,listen线程负责监听 通过connect ...
- java 持久监听blockqueue的变化_Curator目录监听
Curator目录监听 write by donaldhan, 2018-06-29 09:40 引言 上一篇文章,我们简单看一下Curator的CDRWA相关的构造器,及Curator框架实现,[C ...
- qt Android 按键事件,QT无窗口状态下对键盘事件的监听
Question:最近在搞linux下的一个客户端项目,需要接收键盘事件,但是又不能有界面,这种情况怎么处理呢? int main(int argc, char *argv[]) { QApplica ...
最新文章
- python多行字符串输入_python中怎么输入多行字符串
- Nor 与Nand Flash 区别
- sql 外连接的写法。
- 示波器测ab相_独有功能,剑走偏锋,示波器鲜为人知的10大特异功能|测试能力篇...
- editor修改样式 vue_手摸手Electron + Vue实战教程(三)
- linux100day(day4)--文本处理三剑客
- mysql3.5.2 下载_mybatis 3.5.2 jar 下载
- CentOS下查看已经安装好的软件及版本
- java 常用集合list与Set、Map区别及适用场景总结
- do...while(); 语句在宏定义中的应用。
- COSMIC的后端学习之路——1.4 + 1.5 设计模式
- python 趋势线表达式_python添加趋势线
- 在SQL 2014 Server上安装Northwind和Pubs示例数据库
- Linux uart底层设备驱动详解
- 江苏华罗庚中学2021高考成绩查询,2021年常州各高中高考成绩排名及放榜最新消息...
- mysql的四大常用语句_SQL四大基本语句
- linux中inotify+unison实现数据双向实时同步
- 繁体字转换 java_java代码实现简体繁体转换
- 利用matlab画地图
- 转载和积累系列 - 为什么 HashMap 加载因子是0.75?而不是0.8,0.6?