项目背景:

实时 Flink任务中,需要实现不停实时任务,清除关联维表的本地缓存。

方案:

方案采用 Zookeeper 的配置中心的功能,即当需要清除正在运行 Flink App 的维表本地缓存时,通过web端配置 Zookeeper 指定节点上的配置值,Flink App 上 Watch 指定节点上的值,一旦发生变化,Flink 即清除本地维表数据缓存。

Zookeeper特性背景

Curator提供了三种Watcher(Cache)来监听结点的变化

  • Path Cache : 监控一个ZNode的子节点. 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态
  • Node Cache : 只是监听某一个特定的节点
  • Tree Cache : 可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合

demo说明:

* PathCacheE : PathCache 的实现
* NodeCacheE : NodeCache 的实现
* TreeCacheE : TreeCache 的实现

项目中各个Flink任务对应各自的ZK节点,所以采用 Node Cache 即可

PathCacheE:监控一个ZNode的子节点

/*** Path Cache用来监控一个ZNode的子节点. <br>* 当一个子节点增加, 更新,删除时, Path Cache会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的变更将通过PathChildrenCacheListener通知。*/
public class PathCacheE {/*** 监控 PATH 子节点的变化*/private static final String PATH = "/example/pathCache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();new Thread(new ClientListenerTask(server)).start();Thread.sleep(500);new Thread(new ClientProviderTask(server)).start();}/*** 负责监控子节点的变化(增加, 更新,删除)*/private static class ClientListenerTask implements Runnable {private TestingServer server;ClientListenerTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();PathChildrenCache cache = new PathChildrenCache(client, PATH, true);cache.start();PathChildrenCacheListener cacheListener = (client1, event) -> {System.err.println("Event TYpe :" + event.getType());if (null != event.getData()) {System.err.println("data :" + event.getData().getPath() + " = " + new String(event.getData().getData()));}};cache.getListenable().addListener(cacheListener);Thread.sleep(60_000);cache.close();client.close();} catch (Exception e) {e.printStackTrace();}}}/*** 负责增加, 更新,删除子节点*/private static class ClientProviderTask implements Runnable {private TestingServer server;ClientProviderTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());Thread.sleep(10);client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());Thread.sleep(10);client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());Thread.sleep(10);client.delete().forPath("/example/pathCache/test01");Thread.sleep(10);client.delete().forPath("/example/pathCache/test02");Thread.sleep(10);client.close();} catch (Exception e) {e.printStackTrace();}}}}

NodeCacheE:监听某一个特定的节点

/*** Node Cache与Path Cache类似,Node Cache只是监听某一个特定的节点*/
public class NodeCacheE {/*** 监控 PATH 节点的变化*/private static final String PATH = "/example/nodeCache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();new Thread(new ClientListenerTask(server)).start();Thread.sleep(500);new Thread(new ClientProviderTask(server)).start();}/*** 负责监控子节点的变化(增加, 更新,删除)*/private static class ClientListenerTask implements Runnable {private TestingServer server;ClientListenerTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH);final NodeCache cache = new NodeCache(client, PATH);NodeCacheListener listener = () -> {ChildData data = cache.getCurrentData();if (null != data) {System.err.println("节点数据:" + new String(cache.getCurrentData().getData()));} else {System.err.println("节点被删除!");}};cache.getListenable().addListener(listener);cache.start();Thread.sleep(60_000);cache.close();client.close();} catch (Exception e) {e.printStackTrace();}}}/*** 负责更新,删除子节点*/private static class ClientProviderTask implements Runnable {private TestingServer server;ClientProviderTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.setData().forPath(PATH, "01".getBytes());Thread.sleep(100);client.setData().forPath(PATH, "02".getBytes());Thread.sleep(100);client.delete().deletingChildrenIfNeeded().forPath(PATH);Thread.sleep(100);client.close();} catch (Exception e) {e.printStackTrace();}}}}

TreeCacheE:监控整个树上的所有节点,类似于PathCache和NodeCache的组合

/*** Tree Cache可以监控整个树上的所有节点,类似于PathCache和NodeCache的组合*/
public class TreeCacheE {/*** 监控 PATH (子)节点的变化*/private static final String PATH = "/example/treeCache";public static void main(String[] args) throws Exception {TestingServer server = new TestingServer();new Thread(new ClientListenerTask(server)).start();Thread.sleep(500);new Thread(new ClientProviderTask(server)).start();}/*** 负责监控(子)节点的变化(增加, 更新,删除)*/private static class ClientListenerTask implements Runnable {private TestingServer server;ClientListenerTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH);TreeCache cache = new TreeCache(client, PATH);TreeCacheListener listener = (client1, event) ->System.err.println("event type2 :" + event.getType() +" | path :" + (null != event.getData() ? event.getData().getPath() : null));cache.getListenable().addListener(listener);cache.start();Thread.sleep(60_000);cache.close();client.close();} catch (Exception e) {e.printStackTrace();}}}/*** 负责增加, 更新,删除子节点*/private static class ClientProviderTask implements Runnable {private TestingServer server;ClientProviderTask(TestingServer server) {this.server = server;}@Overridepublic void run() {try {CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));client.start();client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());Thread.sleep(10);client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());Thread.sleep(10);client.setData().forPath(PATH + "/test01", "01_V2".getBytes());Thread.sleep(10);client.setData().forPath(PATH, "treeCache".getBytes());Thread.sleep(10);client.delete().forPath(PATH + "/test01");Thread.sleep(10);client.delete().forPath(PATH + "/test02");Thread.sleep(10);client.close();} catch (Exception e) {e.printStackTrace();}}}
}

Curator 三种 Watcher 监听实现相关推荐

  1. 安卓Android绘制一个信息填写页面,使用了三种事件监听方法

    先上效果图片: 第一种,匿名内部类 //设置提交按钮监听submitButton.setOnClickListener(new View.OnClickListener() {@Overridepub ...

  2. Vue中的三种Watcher

    Vue中的三种Watcher Vue可以说存在三种watcher,第一种是在定义data函数时定义数据的render watcher:第二种是computed watcher,是computed函数在 ...

  3. jQuery中的四种事件监听方式

    jQuery中提供了四种事件监听方式,分别是bind.live.delegate.on,对应的解除监听的函数分别是unbind.die.undelegate.off.在开始看他们之前,先来声明一个例子 ...

  4. Cesium 三维球转动监听事件(相机监听事件)并且获取当前中心点位置

    三维球转动监听,可以使用相机监听,也可以使用鼠标移动监听. 相机监听有延迟,必须转动到一定程度后,才会启动监听. 鼠标移动监听可以实时监听 /* 三维球转动添加监听事件 */ viewer.camer ...

  5. webpack学习笔记(三):监听文件变化并编译

    在上一篇webpack学习笔记中主要认识了webpack配置文件中相关的基础配置和命令的执行.这次学习如何在文件发生变化时自动打包编译. 首先,我们来看一下配置文件 const path = requ ...

  6. 【转】NGUI研究院之三种方式监听NGUI的事件方法(七)

    NGUI事件的种类很多,比如点击.双击.拖动.滑动等等,他们处理事件的原理几乎万全一样,本文只用按钮来举例. 1.直接监听事件 把下面脚本直接绑定在按钮上,当按钮点击时就可以监听到,这种方法不太好很不 ...

  7. (转)NGUI研究院之三种方式监听NGUI的事件方法

    NGUI事件的种类很多,比如点击.双击.拖动.滑动等等,他们处理事件的原理几乎万全一样,本文只用按钮来举例. 1.直接监听事件 把下面脚本直接绑定在按钮上,当按钮点击时就可以监听到,这种方法不太好很不 ...

  8. [已验证]双卡手机下两种来电监听方法的一些问题。

    为什么80%的码农都做不了架构师?>>>    首先,网上学习到的来电监听有两种方法,但在双卡手机上都不太正常工作. 经过用户的反馈,基本上都能监听到大部分主流双卡手机的主副卡来电了 ...

  9. Elastic Job 入门教程(三)— 作业监听

    接上一篇:Elastic Job 入门教程(二)- Spring Boot框架下是实现Elastic Job 脚本作业(Script Job),本章我们讨论作业Job的监听. 定义监听器 @Compo ...

最新文章

  1. Linux安全漏洞审计工具Lynis
  2. python搭建项目结构_Django搭建项目实战与避坑细节详解
  3. python 社区网络转化_python-将numpy打开网格转换为坐标
  4. TIOBE 6月编程语言排行榜:Python势不可挡
  5. Pandas 文本数据方法 contains()
  6. matlab 求向量的交集_从零开始的matlab学习笔记——(16)函数绘图
  7. 使用Apriori算法和FP-growth算法进行关联分析
  8. C++ STL 堆(heap)的初始化及其正确使用
  9. Select2异步ajax方式加载数据
  10. PMP培训机构怎么选?
  11. java订餐管理系统
  12. 【C语言】案例四十九 学生档案管理系统
  13. Everyone Do this at the Beginning!!--kaggle数据预处理-超详细的解说
  14. Pixhawk---基于NSH的Firmware开发与调试
  15. 华为社招机考考什么_牛客网-华为-2020届校园招聘上机考试-软件类机考-3
  16. 磨金石教育摄影技能干货分享|有哪些风格独特的摄影作品
  17. 打造铁军团队(王牌军)的核心三点
  18. 计算机网络云技术是什么,什么是云计算 云计算的技术原理介绍【详解】
  19. HashSet 原理结构
  20. linux命令之jq

热门文章

  1. win7下虚拟WIFI
  2. 昨日美股:苹果突然放量暴跌,机构彻底失去了方向
  3. dvd转wmv及wmv转flv参数(3M/1min)
  4. Numpy学习——科学计算
  5. Typora markdown公式换行等号对齐_Markdown快速入门教程
  6. 高中英语教师资格证考试成功通过前辈备考经验分享
  7. 理论计算机科学逻辑博导,软件学院研究生论文导师一览表
  8. ROS中使用乐视 奥比中光(Astra Pro)深度相机显示彩色和深度图像
  9. CC2541 自定义按键
  10. 赫迈泽牵手苹果Homekit,预定中国智能家居用户1.75亿?