Curator 三种 Watcher 监听实现
项目背景:
实时 Flink任务中,需要实现不停实时任务,清除关联维表的本地缓存。
方案:
方案采用 Zookeeper 的配置中心的功能,即当需要清除正在运行 Flink App 的维表本地缓存时,通过web端配置 Zookeeper 指定节点上的配置值,Flink App 上 Watch 指定节点上的值,一旦发生变化,Flink 即清除本地维表数据缓存。
Zookeeper特性背景
Curator提供了三种Watcher(Cache)来监听结点的变化
- Path Cache : 监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
- Node Cache : 只是监听某一个特定的节点
- Tree Cache : 可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合
demo说明:
* PathCacheE : PathCache 的实现
* NodeCacheE : NodeCache 的实现
* TreeCacheE : TreeCache 的实现
项目中各个Flink任务对应各自的ZK节点,所以采用 Node Cache 即可
PathCacheE:监控一个ZNode的子节点
/*** Path Cache用来监控一个ZNode的子节点. <br>* 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的变更将通过PathChildrenCacheListener通知。*/
public class PathCacheE {/*** 监控 PATH 子节点的变化*/private static final String PATH = "/example/pathCache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();new Thread(new ClientListenerTask(server)).start();Thread.sleep(500);new Thread(new ClientProviderTask(server)).start();}/*** 负责监控子节点的变化(增加, 更新,删除)*/private static class ClientListenerTask implements Runnable {private TestingServer server;ClientListenerTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();PathChildrenCache cache = new PathChildrenCache(client, PATH, true);cache.start();PathChildrenCacheListener cacheListener = (client1, event) -> {System.err.println("Event TYpe :" + event.getType());if (null != event.getData()) {System.err.println("data :" + event.getData().getPath() + " = " + new String(event.getData().getData()));}};cache.getListenable().addListener(cacheListener);Thread.sleep(60_000);cache.close();client.close();} catch (Exception e) {e.printStackTrace();}}}/*** 负责增加, 更新,删除子节点*/private static class ClientProviderTask implements Runnable {private TestingServer server;ClientProviderTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());Thread.sleep(10);client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());Thread.sleep(10);client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());Thread.sleep(10);client.delete().forPath("/example/pathCache/test01");Thread.sleep(10);client.delete().forPath("/example/pathCache/test02");Thread.sleep(10);client.close();} catch (Exception e) {e.printStackTrace();}}}}
NodeCacheE:监听某一个特定的节点
/*** Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点*/
public class NodeCacheE {/*** 监控 PATH 节点的变化*/private static final String PATH = "/example/nodeCache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();new Thread(new ClientListenerTask(server)).start();Thread.sleep(500);new Thread(new ClientProviderTask(server)).start();}/*** 负责监控子节点的变化(增加, 更新,删除)*/private static class ClientListenerTask implements Runnable {private TestingServer server;ClientListenerTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH);final NodeCache cache = new NodeCache(client, PATH);NodeCacheListener listener = () -> {ChildData data = cache.getCurrentData();if (null != data) {System.err.println("节点数据:" + new String(cache.getCurrentData().getData()));} else {System.err.println("节点被删除!");}};cache.getListenable().addListener(listener);cache.start();Thread.sleep(60_000);cache.close();client.close();} catch (Exception e) {e.printStackTrace();}}}/*** 负责更新,删除子节点*/private static class ClientProviderTask implements Runnable {private TestingServer server;ClientProviderTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.setData().forPath(PATH, "01".getBytes());Thread.sleep(100);client.setData().forPath(PATH, "02".getBytes());Thread.sleep(100);client.delete().deletingChildrenIfNeeded().forPath(PATH);Thread.sleep(100);client.close();} catch (Exception e) {e.printStackTrace();}}}}
TreeCacheE:监控整个树上的所有节点,类似于PathCache和NodeCache的组合
/*** Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合*/
public class TreeCacheE {/*** 监控 PATH (子)节点的变化*/private static final String PATH = "/example/treeCache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();new Thread(new ClientListenerTask(server)).start();Thread.sleep(500);new Thread(new ClientProviderTask(server)).start();}/*** 负责监控(子)节点的变化(增加, 更新,删除)*/private static class ClientListenerTask implements Runnable {private TestingServer server;ClientListenerTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH);TreeCache cache = new TreeCache(client, PATH);TreeCacheListener listener = (client1, event) ->System.err.println("event type2 :" + event.getType() +" | path :" + (null != event.getData() ? event.getData().getPath() : null));cache.getListenable().addListener(listener);cache.start();Thread.sleep(60_000);cache.close();client.close();} catch (Exception e) {e.printStackTrace();}}}/*** 负责增加, 更新,删除子节点*/private static class ClientProviderTask implements Runnable {private TestingServer server;ClientProviderTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());Thread.sleep(10);client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());Thread.sleep(10);client.setData().forPath(PATH + "/test01", "01_V2".getBytes());Thread.sleep(10);client.setData().forPath(PATH, "treeCache".getBytes());Thread.sleep(10);client.delete().forPath(PATH + "/test01");Thread.sleep(10);client.delete().forPath(PATH + "/test02");Thread.sleep(10);client.close();} catch (Exception e) {e.printStackTrace();}}}
}
Curator 三种 Watcher 监听实现相关推荐
- 安卓Android绘制一个信息填写页面,使用了三种事件监听方法
先上效果图片: 第一种,匿名内部类 //设置提交按钮监听submitButton.setOnClickListener(new View.OnClickListener() {@Overridepub ...
- Vue中的三种Watcher
Vue中的三种Watcher Vue可以说存在三种watcher,第一种是在定义data函数时定义数据的render watcher:第二种是computed watcher,是computed函数在 ...
- jQuery中的四种事件监听方式
jQuery中提供了四种事件监听方式,分别是bind.live.delegate.on,对应的解除监听的函数分别是unbind.die.undelegate.off.在开始看他们之前,先来声明一个例子 ...
- Cesium 三维球转动监听事件(相机监听事件)并且获取当前中心点位置
三维球转动监听,可以使用相机监听,也可以使用鼠标移动监听. 相机监听有延迟,必须转动到一定程度后,才会启动监听. 鼠标移动监听可以实时监听 /* 三维球转动添加监听事件 */ viewer.camer ...
- webpack学习笔记(三):监听文件变化并编译
在上一篇webpack学习笔记中主要认识了webpack配置文件中相关的基础配置和命令的执行.这次学习如何在文件发生变化时自动打包编译. 首先,我们来看一下配置文件 const path = requ ...
- 【转】NGUI研究院之三种方式监听NGUI的事件方法(七)
NGUI事件的种类很多,比如点击.双击.拖动.滑动等等,他们处理事件的原理几乎万全一样,本文只用按钮来举例. 1.直接监听事件 把下面脚本直接绑定在按钮上,当按钮点击时就可以监听到,这种方法不太好很不 ...
- (转)NGUI研究院之三种方式监听NGUI的事件方法
NGUI事件的种类很多,比如点击.双击.拖动.滑动等等,他们处理事件的原理几乎万全一样,本文只用按钮来举例. 1.直接监听事件 把下面脚本直接绑定在按钮上,当按钮点击时就可以监听到,这种方法不太好很不 ...
- [已验证]双卡手机下两种来电监听方法的一些问题。
为什么80%的码农都做不了架构师?>>> 首先,网上学习到的来电监听有两种方法,但在双卡手机上都不太正常工作. 经过用户的反馈,基本上都能监听到大部分主流双卡手机的主副卡来电了 ...
- Elastic Job 入门教程(三)— 作业监听
接上一篇:Elastic Job 入门教程(二)- Spring Boot框架下是实现Elastic Job 脚本作业(Script Job),本章我们讨论作业Job的监听. 定义监听器 @Compo ...
最新文章
- Linux安全漏洞审计工具Lynis
- python搭建项目结构_Django搭建项目实战与避坑细节详解
- python 社区网络转化_python-将numpy打开网格转换为坐标
- TIOBE 6月编程语言排行榜:Python势不可挡
- Pandas 文本数据方法 contains()
- matlab 求向量的交集_从零开始的matlab学习笔记——(16)函数绘图
- 使用Apriori算法和FP-growth算法进行关联分析
- C++ STL 堆(heap)的初始化及其正确使用
- Select2异步ajax方式加载数据
- PMP培训机构怎么选?
- java订餐管理系统
- 【C语言】案例四十九 学生档案管理系统
- Everyone Do this at the Beginning!!--kaggle数据预处理-超详细的解说
- Pixhawk---基于NSH的Firmware开发与调试
- 华为社招机考考什么_牛客网-华为-2020届校园招聘上机考试-软件类机考-3
- 磨金石教育摄影技能干货分享|有哪些风格独特的摄影作品
- 打造铁军团队(王牌军)的核心三点
- 计算机网络云技术是什么,什么是云计算 云计算的技术原理介绍【详解】
- HashSet 原理结构
- linux命令之jq