碎碎念:启动成功了一半。可以启动,可以debug,但是有些方法无法访问,而且create在哪里,我还不清楚。那个DataMonitor,不能完全按照官网写,要像我一样改一下,不然会报werror,因为有些过时了

ZooKeeper的Java实例

  • 一个简单的watch客户端
    • 要求
    • 程序设计
    • Executor
    • DataMonitor
  • 完整代码
    • Executor
    • DataMonitor
  • 启动
  • 参考链接

一个简单的watch客户端

作用:监视ZooKeeper节点的更改,对程序的启动或停止做出响应。

要求

  • 1 它需要四个参数:
      zk服务器的地址
      被监视节点的名字
      输出要写入的文件名
      带参数的可执行文件

是这样理解吗?

  • 2 获取与znode关联的数据并启动可执行文件
  • 3 若被监视的znode发生更改,客户机将重新获取内容并重启启动可执行程序
  • 4 若被监视的znode消失,客户端将杀死可执行程序

程序设计

ZooKeeper应用程序分为两部分:维护与服务器连接和监视节点数据。Executor类负责维护连接部分,DataMonitor负责监视zookeeper树中的数据。Executor包含主线程和执行逻辑。它负责少量的用户交互,以及与可执行程序(根据被监视的znode节点的状态停止或重启)的交互。

Executor

Executor对象是这个简单示例中的基本容器。它包含了ZooKeeper对象和DataMonitor。

// from the Executor class...public static void main(String[] args) {if (args.length < 4) {System.err.println("USAGE: Executor hostPort znode filename program [args ...]");System.exit(2);}String hostPort = args[0];String znode = args[1];String filename = args[2];String exec[] = new String[args.length - 3];System.arraycopy(args, 3, exec, 0, exec.length);try {//Executor的任务是根据命令行中输入去启动和停止的可执行程序,以响应zk对象触发的时间。(前面的args)new Executor(hostPort, znode, filename, exec).run();} catch (Exception e) {e.printStackTrace();}
}public Executor(String hostPort, String znode, String filename,String exec[]) throws KeeperException, IOException {this.filename = filename;this.exec = exec;zk = new ZooKeeper(hostPort, 3000, this);dm = new DataMonitor(zk, znode, null, this);
}public void run() {try {synchronized (this) {while (!dm.dead) {wait();}}} catch (InterruptedException e) {}
}

Executor实现了这些

public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {...
//DataMonitor.DataMonitorListener这个是啥?

ZooKeeper的java api定义了watcher 接口,zk用watcher与容器(如Executor)进行通信。watcher仅包含process()方法。zk利用它去传递主线程感兴趣的时间,例如zk连接或会话的状态。本例中的Executor仅将事件下发给DataMonitor ,然后由DataMonitor 决定如何处理。
>  就说Executor接收了,但不想做就交给DataMonitor 处理了?是不是也可以交由其他的呢?```java
public void process(WatchedEvent event) {dm.process(event);
}

下面的DataMonitorListener接口是本案例设计的,不属于zkAPI。DataMonitor 对象使用它来与其容器(如Executor)进行通信(DataMonitor.DataMonitorListener)。

public interface DataMonitorListener {void exists(byte data[]);/**** @param rc* the ZooKeeper reason code*/void closing(int rc);

DataMonitor.DataMonitorListener这个是啥?
就是DataMonitor里面定义了DataMonitorListener接口,并由Executor实现了。

package example;import org.apache.zookeeper.WatchedEvent;public class DataMonitor {DataMonitor dm;public void process(WatchedEvent event) {dm.process(event);}public interface DataMonitorListener {/*** 节点存活与否判断*/void exists(byte data[]);/*** ZooKeeper会话失效* @param src* ZooKeeper的原因码(reason code)*/void closing(int src);}
}下面是Executor 对 DataMonitorListener.exists ()和 DataMonitorListener.closing 的实现:```java
public void exists( byte[] data ) {if (data == null) {if (child != null) {System.out.println("Killing process");child.destroy();try {child.waitFor();} catch (InterruptedException e) {}}child = null;} else {if (child != null) {System.out.println("Stopping child");child.destroy();try {child.waitFor();} catch (InterruptedException e) {e.printStackTrace();}}try {FileOutputStream fos = new FileOutputStream(filename);fos.write(data);fos.close();} catch (IOException e) {e.printStackTrace();}try {System.out.println("Starting child");child = Runtime.getRuntime().exec(exec);new StreamWriter(child.getInputStream(), System.out);new StreamWriter(child.getErrorStream(), System.err);} catch (IOException e) {e.printStackTrace();}}
}public void closing(int rc) {synchronized (this) {notifyAll();}
}

DataMonitor

DataMonitor是本程序ZooKeeper逻辑的核心,它是异步和事件驱动的。

public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,DataMonitorListener listener) {this.zk = zk;this.znode = znode;this.chainedWatcher = chainedWatcher;this.listener = listener;//检查znode是否存在,并设置监视。//传递自身作为回调对象,watcher触发时就会引起真实的处理流程//exists在服务器端完成,但其回调在客户端完成zk.exists(znode, true, this, null);

Note

  • 1 不要将完成回调和watch的回调搞混。exists()的完成回调——(客户机端)processResult ()是在服务器上的watch(ZooKeeper.exists()的)操作之后执行。
  • 2 Executor注册为了zk对象的一个watcher,所以watch触发时会向Executor对象发送一个事件
  • 3 zk3.0后DataMonitor也可以注册为特定事件的watcher,本例不支持。
public void processResult(int rc, String path, Object ctx, Stat stat) {boolean exists;//1 检查节点是否存在switch (rc) {//节点存在case Code.OK:exists = true;break;//节点不存在case Code.NoNode:exists = false;break;//会话被服务器终止(致命错误)case Code.SESSIONEXPIRED://未认证(致命错误)case Code.NoAuth:dead = true;listener.closing(rc);return;default://可恢复的服务zk.exists(znode,true,this,null);return;}byte b[] = null;// 2 存在则从znode获取数据if (exists) {try{//这两句不是太理解欸//如果节点在调用zookeeper.getData之前被删除,//zookeeper.exists()设置的watch将会触发一个回调//如果由通信错误,连接的watch事件会在连接恢复时触发b = zk.getData(znode, false,null);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {return;}}//如果状态发生变化,调用Executor 的 exists() 回调函数//???这里不是太理解if ((b == null && b!= prevData)|| b != null && !Arrays.equals(prevData, b)) {listener.exists(b);prevData = b;}}

完整代码

Executor

package example;import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;public class Executor implements Watcher, Runnable, DataMonitor.DataMonitorListener {String filename;String exec[];ZooKeeper zk;DataMonitor dm;Process child;public static void main(String[] args) {if (args.length < 4) {//(标准错误输出流)实时输出错误,显示为红色。out要累计到一定程度才输出//https://blog.csdn.net/weixin_42153410/article/details/94618943System.err.println("USAGE: Executor hostPort znode filename program [args ...]");System.exit(2);}String hostPort = args[0];String znode = args[1];String filename = args[2];String exec[] = new String[args.length - 3];/***  Object src : 原数组*    int srcPos : 从元数据的起始位置开始*   Object dest : 目标数组*   int destPos : 目标数组的开始起始位置*   int length  : 要copy的数组的长度*/System.arraycopy(args, 3, exec, 0, exec.length);try {new Executor(hostPort, znode, filename, exec).run();} catch (Exception e) {e.printStackTrace();}}public Executor(String hostPort, String znode, String filename,String exec[]) throws KeeperException, IOException {this.filename = filename;this.exec = exec;//public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)这个?//this是Executor对自身的引用zk = new ZooKeeper(hostPort, 3000, this);dm = new DataMonitor(zk, znode, null, this);}public void run() {try {//同一时间只能有一个线程访问synchronized (this) {while (!dm.dead) {wait();}}}catch (InterruptedException e) {}}public void exists(byte[] data) {//没有传数据if (data == null) {//如果进程不为空if (child != null) {System.out.println("Killing process");//杀死子进程child.destroy();try {//让线程等待到终止为止child.waitFor();} catch (InterruptedException e) {//线程在等待时中断e.printStackTrace();}}//进程置空child = null;} else {//data有数据(znode存在,或发生变化?)if (child != null) {//但是进程不为空System.out.println("Stopping child");//先终止进程child.destroy();try {child.waitFor();} catch (InterruptedException e) {e.printStackTrace();}}try {//将znode数据存入文件FileOutputStream fos = new FileOutputStream(filename);fos.write(data);fos.close();} catch (IOException e) {e.printStackTrace();}try {//启动进程System.out.println("Starting child");//getRuntime 返回与当前Java应用程序关联的运行时对象(Runtime)//exec 在单独的进程中执行指定的字符串命令// 即,线程执行用户的命令child = Runtime.getRuntime().exec(exec);//两个线程输出执行结果及日志new StreamWriter(child.getInputStream(), System.out);new StreamWriter(child.getErrorStream(), System.err);} catch (IOException e) {e.printStackTrace();}}}public void closing(int rc) {synchronized (this) {//唤醒对象的等待池中的所有线程,进入锁池// 和会话失效有啥关系?notifyAll();}}static class StreamWriter extends Thread {OutputStream os;InputStream is;StreamWriter(InputStream is,OutputStream os) {this.is = is;this.os = os;start();}public void run() {byte b[] = new byte[80];int rc;try {while ((rc = is.read(b)) > 0) {os.write(b,0,rc);}} catch (IOException e) {}}}public void process(WatchedEvent event) {dm.process(event);}
}

DataMonitor

package example;import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.KeeperException.Code;import java.util.Arrays;public class DataMonitor implements Watcher, AsyncCallback.StatCallback {ZooKeeper zk;String znode;Watcher chainedWatcher;DataMonitorListener listener;boolean dead;byte prevData[];public DataMonitor(ZooKeeper zk, String znode, Watcher chainedWatcher,DataMonitorListener listener) {this.zk = zk;this.znode = znode;this.listener = listener;//检查znode是否存在,并设置监视。//传递自身作为回调对象,watcher触发时就会引起真实的处理流程//exists在服务器端完成,但其回调在客户端完成zk.exists(znode,true,this,null);}public interface DataMonitorListener {/*** 节点存活与否判断*/void exists(byte data[]);/*** ZooKeeper会话失效* @param src* ZooKeeper的原因码(reason code)*/void closing(int src);}/*** 检查节点存在与否。存在且状态变化的调用 Executor 的 exists() 回调函数* @param rc* @param path* @param ctx* @param stat*/public void processResult(int rc, String path, Object ctx, Stat stat) {boolean exists = true;//1 检查节点是否存在Code code = Code.get(rc);switch (code) {//节点存在case OK:exists = true;break;//节点不存在case NONODE:exists = false;break;//会话被服务器终止(致命错误)case SESSIONEXPIRED://未认证(致命错误)case NOAUTH:dead = true;listener.closing(rc);return;default://可恢复的服务zk.exists(znode,true,this,null);return;}byte b[] = null;// 2 存在则从znode获取数据if (exists) {try{//这两句不是太理解欸//如果节点在调用zookeeper.getData之前被删除,//zookeeper.exists()设置的watch将会触发一个回调//如果由通信错误,连接的watch事件会在连接恢复时触发b = zk.getData(znode, false,null);} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {return;}}//如果状态发生变化,调用Executor 的 exists() 回调函数//???这里不是太理解if ((b == null && b!= prevData)|| b != null && !Arrays.equals(prevData, b)) {listener.exists(b);prevData = b;}}/*** 处理watch事件*/public void process(WatchedEvent event) {String path = event.getPath();if (event.getType() == Watcher.Event.EventType.None) {//我们被告知连接状态已经变化switch(event.getState()) {case SyncConnected://在这个例子中,不需要做任何事情-watch自动和客户端重连和注册;//客户端断连时watch依次触发break;case Expired:dead = true;listener.closing(Code.SESSIONEXPIRED.intValue());break;}} else {if (path != null && path.equals(znode)) {zk.exists(znode,true,this,null);}}if (chainedWatcher != null) {chainedWatcher.process(event);}}
}

启动

ZookeeperServerMain先启动,可参考

-Dlog4j.configuration=file:conf/log4j.properties
conf/zoo.cfg

-Dlog4j.configuration=file:conf/log4j.properties
//以空格作为分割,第一个参数是地址,第二个是监视node,第三个是输出的文件地址 第四个是命令127.0.0.1:2181 /zk_test E:/soft/kj/ZooKeeper/zookeeper-3.4.13/1.txt create

参考链接

Zookeeper 初体验之——JAVA实例
ZooKeeper官方Java例子解读

挣扎了一两周,都没有完全启动成功,暂时放弃,先学学其他的。希望路过的大佬指导一下。

最后求一键三连。

ZooKeeper官方文档学习笔记04-ZooKeeper的Java实例相关推荐

  1. ZooKeeper官方文档学习笔记03-程序员指南03

    我的每一篇这种正经文章,都是我努力克制玩心的成果,我可太难了,和自己做斗争. ZooKeeper官方文档学习笔记04-程序员指南03 绑定 Java绑定 客户端配置参数 C绑定 陷阱: 常见问题及故障 ...

  2. ZooKeeper官方文档学习笔记01-zookeeper概述

    纠结了很久,我决定用官方文档学习 ZooKeeper概述 学习文档 学习计划 ZooKeeper:分布式应用程序的分布式协调服务 设计目标 数据模型和分层名称空间 节点和短命节点 有条件的更新和监视 ...

  3. ZooKeeper官方文档学习笔记02-ZooKeeper入门指南

    本来以为学一篇都会很难很难,但是好像也没有那么难.虽然有些名词不太理解,但我决定后续学习中应该会遇到吧? 入门:使用ZooKeeper协调分布式应用程序 先决条件 下载 独立运行 1 选择一个合适的目 ...

  4. ZooKeeper官方文档学习笔记05-ZooKeeper的屏障(Barrier)和队列(Queue)教程

    开篇碎碎念:不要试图用断点,或者你断点位置要放好,不然你就会收获许多连接异常.这绝对是我目前翻译过的最流畅的.咳,不是官网流畅,是我笔记流畅,也许是我成长了.(屏障就是人齐开饭,都吃完散场.然后队列是 ...

  5. ZooKeeper官方文档学习笔记03-程序员指南02

    这个太多了 我总是坚持不下来,还是分开写吧,这样更有成就感 程序员指南02 使用ACL的ZooKeeper访问控制 permission schema Zookeeper的C语言client API ...

  6. ZooKeeper官方文档学习笔记03-程序员指南

    害,终究是我高估了自己,这个也太多了.不过里面有些思想,让我感觉似曾相识,比如他的session就有点像HTTPS,然后session的管理有点像人的管理 使用 ZooKeeper 开发分布式应用程序 ...

  7. Open3D官方文档学习笔记

    Open3D官方文档学习笔记 第一部分--点云 1 可视化点云 2 体素降采样 3 顶点法线评估 4 访问顶点法线 补充:Numpy在Open3D中的应用 5 裁剪点云 补充1:获取点云坐标 补充2: ...

  8. kafka官方文档学习笔记2--QuickStart

    下载kafka https://www.apache.org/dyn/closer.cgi?path=/kafka/1.0.0/kafka_2.11-1.0.0.tgz 解压安装包 > tar ...

  9. vue.js 2.0 官方文档学习笔记 —— 01. vue 介绍

    这是我的vue.js 2.0的学习笔记,采取了将官方文档中的代码集中到一个文件的形式.目的是保存下来,方便自己查阅. !官方文档:https://cn.vuejs.org/v2/guide/ 01. ...

最新文章

  1. Day1-python基础
  2. java怎么构造map_Java中Map结构
  3. GC和JVM调优实战
  4. (转)20个令人惊叹的深度学习应用
  5. 如何查看华为服务器配置信息,华为服务器查看配置
  6. 【Qt学习笔记】3.布局
  7. X协议 mysql_MySQL X协议分析
  8. css设置自适应屏幕高度
  9. 简单的Python文件服务器和HTTP POST上传文件C代码
  10. spring3 运行起来(即做个demo)所需要的jar包
  11. 【WiFi】WiFi 6E
  12. Privates下载
  13. C语言 输出斐波那契数列
  14. Web应用服务器——Tomcat的介绍、下载安装、环境配置与使用
  15. 2013年大学英语专升本作文——Should One Expect a Reward When Doing a Good Deed?【标准答案、精品范文答案】
  16. PowerBI-逻辑函数-SWITCH
  17. Android双屏异显以及原理分析
  18. 初级计算机基础知识教程,计算机基础知识(初中级教程)
  19. [Alg]排序算法之归并排序
  20. html广告sdk,腾讯社交联盟广告

热门文章

  1. 刚进职场的新人,这6点一定要牢记
  2. Qt4_用DOM读取XML
  3. sublime text3占用CPU过高
  4. php mysql扩展 5.4_编译php5.4及安装后添加扩展
  5. aws rds监控慢sql_估算AWS RDS SQL Server成本
  6. azure云数据库_在Azure SQL数据库中配置电子邮件通知
  7. aws rds监控慢sql_AWS RDS SQL Server中的初始Windows身份验证配置
  8. java学习过程记录
  9. 设置不定宽高的元素垂直水平居中
  10. golang的指针和切片