ZooKeeper官方文档学习笔记04-ZooKeeper的Java实例
碎碎念:启动成功了一半。可以启动,可以debug,但是有些方法无法访问,而且create在哪里,我还不清楚。那个DataMonitor,不能完全按照官网写,要像我一样改一下,不然会报werror,因为有些过时了
ZooKeeper的Java实例
- 一个简单的watch客户端
- 要求
- 程序设计
- Executor
- DataMonitor
- 完整代码
- Executor
- DataMonitor
- 启动
- 参考链接
一个简单的watch客户端
作用:监视ZooKeeper节点的更改,对程序的启动或停止做出响应。
要求
- 1 它需要四个参数:
zk服务器的地址
被监视节点的名字
输出要写入的文件名
带参数的可执行文件
是这样理解吗?
- 2 获取与znode关联的数据并启动可执行文件
- 3 若被监视的znode发生更改,客户机将重新获取内容并重启启动可执行程序
- 4 若被监视的znode消失,客户端将杀死可执行程序
程序设计
ZooKeeper应用程序分为两部分:维护与服务器连接和监视节点数据。Executor
类负责维护连接部分,DataMonitor
负责监视zookeeper树中的数据。Executor包含主线程和执行逻辑。它负责少量的用户交互,以及与可执行程序(根据被监视的znode节点的状态停止或重启)的交互。
Executor
Executor对象是这个简单示例中的基本容器。它包含了ZooKeeper对象和DataMonitor。
// from the Executor class...public static void main(String[] args) {if (args.length < 4) {System.err.println("USAGE: Executor hostPort znode filename program [args ...]");System.exit(2);}String hostPort = args[0];String znode = args[1];String filename = args[2];String exec[] = new String[args.length - 3];System.arraycopy(args, 3, exec, 0, exec.length);try {//Executor的任务是根据命令行中输入去启动和停止的可执行程序,以响应zk对象触发的时间。(前面的args)new Executor(hostPort, znode, filename, exec).run();} catch (Exception e) {e.printStackTrace();}
}public Executor(String hostPort, String znode, String filename,String exec[]) throws KeeperException, IOException {this.filename = filename;this.exec = exec;zk = new ZooKeeper(hostPort, 3000, this);dm = new DataMonitor(zk, znode, null, this);
}public void run() {try {synchronized (this) {while (!dm.dead) {wait();}}} catch (InterruptedException e) {}
}
Executor实现了这些
public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {...
//DataMonitor.DataMonitorListener这个是啥?
ZooKeeper的java api定义了watcher 接口,zk用watcher与容器(如Executor)进行通信。watcher仅包含process()方法。zk利用它去传递主线程感兴趣的时间,例如zk连接或会话的状态。本例中的Executor仅将事件下发给DataMonitor ,然后由DataMonitor 决定如何处理。
> 就说Executor接收了,但不想做就交给DataMonitor 处理了?是不是也可以交由其他的呢?```java
public void process(WatchedEvent event) {dm.process(event);
}
下面的DataMonitorListener接口是本案例设计的,不属于zkAPI。DataMonitor 对象使用它来与其容器(如Executor)进行通信(DataMonitor.DataMonitorListener)。
public interface DataMonitorListener {void exists(byte data[]);/**** @param rc* the ZooKeeper reason code*/void closing(int rc);
DataMonitor.DataMonitorListener这个是啥?
就是DataMonitor里面定义了DataMonitorListener接口,并由Executor实现了。
package example;import org.apache.zookeeper.WatchedEvent;public class DataMonitor {DataMonitor dm;public void process(WatchedEvent event) {dm.process(event);}public interface DataMonitorListener {/*** 节点存活与否判断*/void exists(byte data[]);/*** ZooKeeper会话失效* @param src* ZooKeeper的原因码(reason code)*/void closing(int src);}
}下面是Executor 对 DataMonitorListener.exists ()和 DataMonitorListener.closing 的实现:```java
public void exists( byte[] data ) {if (data == null) {if (child != null) {System.out.println("Killing process");child.destroy();try {child.waitFor();} catch (InterruptedException e) {}}child = null;} else {if (child != null) {System.out.println("Stopping child");child.destroy();try {child.waitFor();} catch (InterruptedException e) {e.printStackTrace();}}try {FileOutputStream fos = new FileOutputStream(filename);fos.write(data);fos.close();} catch (IOException e) {e.printStackTrace();}try {System.out.println("Starting child");child = Runtime.getRuntime().exec(exec);new StreamWriter(child.getInputStream(), System.out);new StreamWriter(child.getErrorStream(), System.err);} catch (IOException e) {e.printStackTrace();}}
}public void closing(int rc) {synchronized (this) {notifyAll();}
}
DataMonitor
DataMonitor是本程序ZooKeeper逻辑的核心,它是异步和事件驱动的。
public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,DataMonitorListener listener) {this.zk = zk;this.znode = znode;this.chainedWatcher = chainedWatcher;this.listener = listener;//检查znode是否存在,并设置监视。//传递自身作为回调对象,watcher触发时就会引起真实的处理流程//exists在服务器端完成,但其回调在客户端完成zk.exists(znode, true, this, null);
Note
- 1 不要将完成回调和watch的回调搞混。exists()的完成回调——(客户机端)processResult ()是在服务器上的watch(ZooKeeper.exists()的)操作之后执行。
- 2 Executor注册为了zk对象的一个watcher,所以watch触发时会向Executor对象发送一个事件
- 3 zk3.0后DataMonitor也可以注册为特定事件的watcher,本例不支持。
public void processResult(int rc, String path, Object ctx, Stat stat) {boolean exists;//1 检查节点是否存在switch (rc) {//节点存在case Code.OK:exists = true;break;//节点不存在case Code.NoNode:exists = false;break;//会话被服务器终止(致命错误)case Code.SESSIONEXPIRED://未认证(致命错误)case Code.NoAuth:dead = true;listener.closing(rc);return;default://可恢复的服务zk.exists(znode,true,this,null);return;}byte b[] = null;// 2 存在则从znode获取数据if (exists) {try{//这两句不是太理解欸//如果节点在调用zookeeper.getData之前被删除,//zookeeper.exists()设置的watch将会触发一个回调//如果由通信错误,连接的watch事件会在连接恢复时触发b = zk.getData(znode, false,null);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {return;}}//如果状态发生变化,调用Executor 的 exists() 回调函数//???这里不是太理解if ((b == null && b!= prevData)|| b != null && !Arrays.equals(prevData, b)) {listener.exists(b);prevData = b;}}
完整代码
Executor
package example;import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {String filename;String exec[];ZooKeeper zk;DataMonitor dm;Process child;public static void main(String[] args) {if (args.length < 4) {//(标准错误输出流)实时输出错误,显示为红色。out要累计到一定程度才输出//https://blog.csdn.net/weixin_42153410/article/details/94618943System.err.println("USAGE: Executor hostPort znode filename program [args ...]");System.exit(2);}String hostPort = args[0];String znode = args[1];String filename = args[2];String exec[] = new String[args.length - 3];/*** Object src : 原数组* int srcPos : 从元数据的起始位置开始* Object dest : 目标数组* int destPos : 目标数组的开始起始位置* int length : 要copy的数组的长度*/System.arraycopy(args, 3, exec, 0, exec.length);try {new Executor(hostPort, znode, filename, exec).run();} catch (Exception e) {e.printStackTrace();}}public Executor(String hostPort, String znode, String filename,String exec[]) throws KeeperException, IOException {this.filename = filename;this.exec = exec;//public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)这个?//this是Executor对自身的引用zk = new ZooKeeper(hostPort, 3000, this);dm = new DataMonitor(zk, znode, null, this);}public void run() {try {//同一时间只能有一个线程访问synchronized (this) {while (!dm.dead) {wait();}}}catch (InterruptedException e) {}}public void exists(byte[] data) {//没有传数据if (data == null) {//如果进程不为空if (child != null) {System.out.println("Killing process");//杀死子进程child.destroy();try {//让线程等待到终止为止child.waitFor();} catch (InterruptedException e) {//线程在等待时中断e.printStackTrace();}}//进程置空child = null;} else {//data有数据(znode存在,或发生变化?)if (child != null) {//但是进程不为空System.out.println("Stopping child");//先终止进程child.destroy();try {child.waitFor();} catch (InterruptedException e) {e.printStackTrace();}}try {//将znode数据存入文件FileOutputStream fos = new FileOutputStream(filename);fos.write(data);fos.close();} catch (IOException e) {e.printStackTrace();}try {//启动进程System.out.println("Starting child");//getRuntime 返回与当前Java应用程序关联的运行时对象(Runtime)//exec 在单独的进程中执行指定的字符串命令// 即,线程执行用户的命令child = Runtime.getRuntime().exec(exec);//两个线程输出执行结果及日志new StreamWriter(child.getInputStream(), System.out);new StreamWriter(child.getErrorStream(), System.err);} catch (IOException e) {e.printStackTrace();}}}public void closing(int rc) {synchronized (this) {//唤醒对象的等待池中的所有线程,进入锁池// 和会话失效有啥关系?notifyAll();}}static class StreamWriter extends Thread {OutputStream os;InputStream is;StreamWriter(InputStream is,OutputStream os) {this.is = is;this.os = os;start();}public void run() {byte b[] = new byte[80];int rc;try {while ((rc = is.read(b)) > 0) {os.write(b,0,rc);}} catch (IOException e) {}}}public void process(WatchedEvent event) {dm.process(event);}
}
DataMonitor
package example;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException.Code;import java.util.Arrays;public class DataMonitor implements Watcher, AsyncCallback.StatCallback {ZooKeeper zk;String znode;Watcher chainedWatcher;DataMonitorListener listener;boolean dead;byte prevData[];public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,DataMonitorListener listener) {this.zk = zk;this.znode = znode;this.listener = listener;//检查znode是否存在,并设置监视。//传递自身作为回调对象,watcher触发时就会引起真实的处理流程//exists在服务器端完成,但其回调在客户端完成zk.exists(znode,true,this,null);}public interface DataMonitorListener {/*** 节点存活与否判断*/void exists(byte data[]);/*** ZooKeeper会话失效* @param src* ZooKeeper的原因码(reason code)*/void closing(int src);}/*** 检查节点存在与否。存在且状态变化的调用 Executor 的 exists() 回调函数* @param rc* @param path* @param ctx* @param stat*/public void processResult(int rc, String path, Object ctx, Stat stat) {boolean exists = true;//1 检查节点是否存在Code code = Code.get(rc);switch (code) {//节点存在case OK:exists = true;break;//节点不存在case NONODE:exists = false;break;//会话被服务器终止(致命错误)case SESSIONEXPIRED://未认证(致命错误)case NOAUTH:dead = true;listener.closing(rc);return;default://可恢复的服务zk.exists(znode,true,this,null);return;}byte b[] = null;// 2 存在则从znode获取数据if (exists) {try{//这两句不是太理解欸//如果节点在调用zookeeper.getData之前被删除,//zookeeper.exists()设置的watch将会触发一个回调//如果由通信错误,连接的watch事件会在连接恢复时触发b = zk.getData(znode, false,null);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {return;}}//如果状态发生变化,调用Executor 的 exists() 回调函数//???这里不是太理解if ((b == null && b!= prevData)|| b != null && !Arrays.equals(prevData, b)) {listener.exists(b);prevData = b;}}/*** 处理watch事件*/public void process(WatchedEvent event) {String path = event.getPath();if (event.getType() == Watcher.Event.EventType.None) {//我们被告知连接状态已经变化switch(event.getState()) {case SyncConnected://在这个例子中,不需要做任何事情-watch自动和客户端重连和注册;//客户端断连时watch依次触发break;case Expired:dead = true;listener.closing(Code.SESSIONEXPIRED.intValue());break;}} else {if (path != null && path.equals(znode)) {zk.exists(znode,true,this,null);}}if (chainedWatcher != null) {chainedWatcher.process(event);}}
}
启动
ZookeeperServerMain先启动,可参考
-Dlog4j.configuration=file:conf/log4j.properties
conf/zoo.cfg
-Dlog4j.configuration=file:conf/log4j.properties
//以空格作为分割,第一个参数是地址,第二个是监视node,第三个是输出的文件地址 第四个是命令127.0.0.1:2181 /zk_test E:/soft/kj/ZooKeeper/zookeeper-3.4.13/1.txt create
参考链接
Zookeeper 初体验之——JAVA实例
ZooKeeper官方Java例子解读
挣扎了一两周,都没有完全启动成功,暂时放弃,先学学其他的。希望路过的大佬指导一下。
最后求一键三连。
ZooKeeper官方文档学习笔记04-ZooKeeper的Java实例相关推荐
- ZooKeeper官方文档学习笔记03-程序员指南03
我的每一篇这种正经文章,都是我努力克制玩心的成果,我可太难了,和自己做斗争. ZooKeeper官方文档学习笔记04-程序员指南03 绑定 Java绑定 客户端配置参数 C绑定 陷阱: 常见问题及故障 ...
- ZooKeeper官方文档学习笔记01-zookeeper概述
纠结了很久,我决定用官方文档学习 ZooKeeper概述 学习文档 学习计划 ZooKeeper:分布式应用程序的分布式协调服务 设计目标 数据模型和分层名称空间 节点和短命节点 有条件的更新和监视 ...
- ZooKeeper官方文档学习笔记02-ZooKeeper入门指南
本来以为学一篇都会很难很难,但是好像也没有那么难.虽然有些名词不太理解,但我决定后续学习中应该会遇到吧? 入门:使用ZooKeeper协调分布式应用程序 先决条件 下载 独立运行 1 选择一个合适的目 ...
- ZooKeeper官方文档学习笔记05-ZooKeeper的屏障(Barrier)和队列(Queue)教程
开篇碎碎念:不要试图用断点,或者你断点位置要放好,不然你就会收获许多连接异常.这绝对是我目前翻译过的最流畅的.咳,不是官网流畅,是我笔记流畅,也许是我成长了.(屏障就是人齐开饭,都吃完散场.然后队列是 ...
- ZooKeeper官方文档学习笔记03-程序员指南02
这个太多了 我总是坚持不下来,还是分开写吧,这样更有成就感 程序员指南02 使用ACL的ZooKeeper访问控制 permission schema Zookeeper的C语言client API ...
- ZooKeeper官方文档学习笔记03-程序员指南
害,终究是我高估了自己,这个也太多了.不过里面有些思想,让我感觉似曾相识,比如他的session就有点像HTTPS,然后session的管理有点像人的管理 使用 ZooKeeper 开发分布式应用程序 ...
- Open3D官方文档学习笔记
Open3D官方文档学习笔记 第一部分--点云 1 可视化点云 2 体素降采样 3 顶点法线评估 4 访问顶点法线 补充:Numpy在Open3D中的应用 5 裁剪点云 补充1:获取点云坐标 补充2: ...
- kafka官方文档学习笔记2--QuickStart
下载kafka https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz 解压安装包 > tar ...
- vue.js 2.0 官方文档学习笔记 —— 01. vue 介绍
这是我的vue.js 2.0的学习笔记,采取了将官方文档中的代码集中到一个文件的形式.目的是保存下来,方便自己查阅. !官方文档:https://cn.vuejs.org/v2/guide/ 01. ...
最新文章
- Day1-python基础
- java怎么构造map_Java中Map结构
- GC和JVM调优实战
- (转)20个令人惊叹的深度学习应用
- 如何查看华为服务器配置信息,华为服务器查看配置
- 【Qt学习笔记】3.布局
- X协议 mysql_MySQL X协议分析
- css设置自适应屏幕高度
- 简单的Python文件服务器和HTTP POST上传文件C代码
- spring3 运行起来(即做个demo)所需要的jar包
- 【WiFi】WiFi 6E
- Privates下载
- C语言 输出斐波那契数列
- Web应用服务器——Tomcat的介绍、下载安装、环境配置与使用
- 2013年大学英语专升本作文——Should One Expect a Reward When Doing a Good Deed?【标准答案、精品范文答案】
- PowerBI-逻辑函数-SWITCH
- Android双屏异显以及原理分析
- 初级计算机基础知识教程,计算机基础知识(初中级教程)
- [Alg]排序算法之归并排序
- html广告sdk,腾讯社交联盟广告