使用ZooKeeper编程 - 一个基本教程

  • 介绍
  • 障碍
  • 生产者 - 消费者队列
  • 完整的例子
    • 队列测试
    • 屏障测试
    • 来源清单

介绍

在本教程中,我们使用ZooKeeper显示障碍和生产者 - 消费者队列的简单实现。我们将相应的类称为障碍和队列。这些示例假定您至少运行了一个ZooKeeper服务器。

两个原语都使用以下常见的代码摘录:

static ZooKeeper zk = null;
static Integer mutex;String root;SyncPrimitive(String address) {if(zk == null){try {System.out.println("Starting ZK:");zk = new ZooKeeper(address, 3000, this);mutex = new Integer(-1);System.out.println("Finished starting ZK: " + zk);} catch (IOException e) {System.out.println(e.toString());zk = null;}}
}synchronized public void process(WatchedEvent event) {synchronized (mutex) {mutex.notify();}
}

这两个类都扩展了SyncPrimitive。通过这种方式,我们执行SyncPrimitive构造函数中所有基元共有的步骤。为了简化示例,我们在第一次实例化屏障对象或队列对象时创建一个ZooKeeper对象,并声明一个静态变量,该变量是对该对象的引用。Barrier和Queue的后续实例检查ZooKeeper对象是否存在。或者,我们可以让应用程序创建一个ZooKeeper对象并将其传递给Barrier和Queue的构造函数。

我们使用process()方法处理由于监视而触发的通知。在下面的讨论中,我们提供了设置手表的代码。监视是内部结构,使ZooKeeper能够通知客户端节点的更改。例如,如果客户端正在等待其他客户端留下障碍,那么它可以设置监视并等待对特定节点的修改,这可以指示它是等待的结束。一旦我们回顾这些例子,这一点就变得清晰了。

障碍

屏障是一种原语,它使一组进程能够同步计算的开始和结束。这种实现的一般思想是拥有一个屏障节点,其目的是成为各个进程节点的父节点。假设我们调用屏障节点“/ b1”。然后每个进程“p”创建一个节点“/ b1 / p”。一旦足够的进程创建了相应的节点,连接的进程就可以开始计算。

在此示例中,每个进程都实例化一个Barrier对象,其构造函数作为参数:

  • ZooKeeper服务器的地址(例如“zoo1.foo.com:2181”)
  • ZooKeeper上的障碍节点的路径(例如,“/ b1”)
  • 进程组的大小

Barrier的构造函数将Zookeeper服务器的地址传递给父类的构造函数。如果不存在,则父类创建ZooKeeper实例。然后,Barrier的构造函数在ZooKeeper上创建一个barrier节点,它是所有进程节点的父节点,我们称之为root(注意:这不是ZooKeeper根“/”)。

/*** Barrier constructor** @param address* @param root* @param size*/
Barrier(String address, String root, int size) {super(address);this.root = root;this.size = size;// Create barrier nodeif (zk != null) {try {Stat s = zk.exists(root, false);if (s == null) {zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}} catch (KeeperException e) {System.out.println("Keeper exception when instantiating queue: "+ e.toString());} catch (InterruptedException e) {System.out.println("Interrupted exception");}}// My node nametry {name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());} catch (UnknownHostException e) {System.out.println(e.toString());}
}

要进入屏障,进程会调用enter()。该进程在根目录下创建一个节点来表示它,使用其主机名来形成节点名称。然后等到有足够的进程进入屏障。一个进程通过检查根节点具有“getChildren()”的子节点数并在没有足够的情况下等待通知来完成它。要在根节点发生更改时收到通知,进程必须设置监视,并通过调用“getChildren()”来完成。在代码中,我们知道“getChildren()”有两个参数。第一个表示要读取的节点,第二个是布尔标志,使进程能够设置监视。在代码中,标志为true。

/*** Join barrier** @return* @throws KeeperException* @throws InterruptedException*/boolean enter() throws KeeperException, InterruptedException{zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);while (true) {synchronized (mutex) {List<String> list = zk.getChildren(root, true);if (list.size() < size) {mutex.wait();} else {return true;}}}
}

请注意,enter()会抛出KeeperException和InterruptedException,因此应用程序负责捕获和处理此类异常。

计算完成后,进程调用leave()离开屏障。首先,它删除其对应的节点,然后获取根节点的子节点。如果至少有一个子节点,则等待通知(obs:请注意,对getChildren()的调用的第二个参数为true,这意味着ZooKeeper必须在根节点上设置监视)。收到通知后,它再次检查根节点是否有子节点。

/*** Wait until all reach barrier** @return* @throws KeeperException* @throws InterruptedException*/boolean leave() throws KeeperException, InterruptedException {zk.delete(root + "/" + name, 0);while (true) {synchronized (mutex) {List<String> list = zk.getChildren(root, true);if (list.size() > 0) {mutex.wait();} else {return true;}}}}

生产者 - 消费者队列

生产者 - 消费者队列是一种分布式数据结构,进程组用于生成和使用项目。生产者进程创建新元素并将其添加到队列中。使用者进程从列表中删除元素并处理它们。在此实现中,元素是简单的整数。队列由根节点表示,并且为了向队列添加元素,生成器进程创建新节点,即根节点的子节点。

以下代码摘录对应于对象的构造函数。与Barrier对象一样,它首先调用父类SyncPrimitive的构造函数,如果不存在,则创建ZooKeeper对象。然后,它验证队列的根节点是否存在,如果不存在则创建。

/*** Constructor of producer-consumer queue** @param address* @param name*/
Queue(String address, String name) {super(address);this.root = name;// Create ZK node nameif (zk != null) {try {Stat s = zk.exists(root, false);if (s == null) {zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}} catch (KeeperException e) {System.out.println("Keeper exception when instantiating queue: "+ e.toString());} catch (InterruptedException e) {System.out.println("Interrupted exception");}}
}

生产者进程调用“produce()”将元素添加到队列,并传递整数作为参数。要向队列添加元素,该方法使用“create()”创建一个新节点,并使用SEQUENCE标志指示ZooKeeper附加与根节点关联的sequencer计数器的值。通过这种方式,我们对队列的元素施加了一个总顺序,从而保证队列中最旧的元素是下一个消耗的元素。

/*** Add element to the queue.** @param i* @return*/boolean produce(int i) throws KeeperException, InterruptedException{ByteBuffer b = ByteBuffer.allocate(4);byte[] value;// Add child with value ib.putInt(i);value = b.array();zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);return true;
}

要使用元素,使用者进程将获取根节点的子节点,读取具有最小计数器值的节点,并返回该元素。请注意,如果存在冲突,则两个竞争进程中的一个将无法删除该节点,并且删除操作将引发异常。

对getChildren()的调用以字典顺序返回子项列表。由于词典顺序不必遵循计数器值的数字顺序,我们需要确定哪个元素是最小的。为了确定哪一个具有最小的计数器值,我们遍历列表,并从每个列表中删除前缀“元素”。

/*** Remove first element from the queue.** @return* @throws KeeperException* @throws InterruptedException*/
int consume() throws KeeperException, InterruptedException{int retvalue = -1;Stat stat = null;// Get the first element availablewhile (true) {synchronized (mutex) {List<String> list = zk.getChildren(root, true);if (list.size() == 0) {System.out.println("Going to wait");mutex.wait();} else {Integer min = new Integer(list.get(0).substring(7));for(String s : list){Integer tempValue = new Integer(s.substring(7));//System.out.println("Temporary value: " + tempValue);if(tempValue < min) min = tempValue;}System.out.println("Temporary value: " + root + "/element" + min);byte[] b = zk.getData(root + "/element" + min,false, stat);zk.delete(root + "/element" + min, 0);ByteBuffer buffer = ByteBuffer.wrap(b);retvalue = buffer.getInt();return retvalue;}}}}
}

完整的例子

在下一节中,您可以找到完整的命令行应用程序来演示上述配方。使用以下命令运行它。

ZOOBINDIR="[path_to_distro]/bin"
. "$ZOOBINDIR"/zkEnv.sh
java SyncPrimitive [Test Type] [ZK server] [No of elements] [Client type]

队列测试

启动生产者以创建100个元素

java SyncPrimitive qTest localhost 100 p

启动使用者消耗100个元素

java SyncPrimitive qTest localhost 100 c

屏障测试

开始一个有2个参与者的障碍(开始的次数是您想要输入的参与者的数量)

java SyncPrimitive bTest localhost 2

来源清单

SyncPrimitive.Java

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;public class SyncPrimitive implements Watcher {static ZooKeeper zk = null;static Integer mutex;String root;SyncPrimitive(String address) {if(zk == null){try {System.out.println("Starting ZK:");zk = new ZooKeeper(address, 3000, this);mutex = new Integer(-1);System.out.println("Finished starting ZK: " + zk);} catch (IOException e) {System.out.println(e.toString());zk = null;}}//else mutex = new Integer(-1);}synchronized public void process(WatchedEvent event) {synchronized (mutex) {//System.out.println("Process: " + event.getType());mutex.notify();}}/*** Barrier*/static public class Barrier extends SyncPrimitive {int size;String name;/*** Barrier constructor** @param address* @param root* @param size*/Barrier(String address, String root, int size) {super(address);this.root = root;this.size = size;// Create barrier nodeif (zk != null) {try {Stat s = zk.exists(root, false);if (s == null) {zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}} catch (KeeperException e) {System.out.println("Keeper exception when instantiating queue: "+ e.toString());} catch (InterruptedException e) {System.out.println("Interrupted exception");}}// My node nametry {name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());} catch (UnknownHostException e) {System.out.println(e.toString());}}/*** Join barrier** @return* @throws KeeperException* @throws InterruptedException*/boolean enter() throws KeeperException, InterruptedException{zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);while (true) {synchronized (mutex) {List<String> list = zk.getChildren(root, true);if (list.size() < size) {mutex.wait();} else {return true;}}}}/*** Wait until all reach barrier** @return* @throws KeeperException* @throws InterruptedException*/boolean leave() throws KeeperException, InterruptedException{zk.delete(root + "/" + name, 0);while (true) {synchronized (mutex) {List<String> list = zk.getChildren(root, true);if (list.size() > 0) {mutex.wait();} else {return true;}}}}}/*** Producer-Consumer queue*/static public class Queue extends SyncPrimitive {/*** Constructor of producer-consumer queue** @param address* @param name*/Queue(String address, String name) {super(address);this.root = name;// Create ZK node nameif (zk != null) {try {Stat s = zk.exists(root, false);if (s == null) {zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT);}} catch (KeeperException e) {System.out.println("Keeper exception when instantiating queue: "+ e.toString());} catch (InterruptedException e) {System.out.println("Interrupted exception");}}}/*** Add element to the queue.** @param i* @return*/boolean produce(int i) throws KeeperException, InterruptedException{ByteBuffer b = ByteBuffer.allocate(4);byte[] value;// Add child with value ib.putInt(i);value = b.array();zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT_SEQUENTIAL);return true;}/*** Remove first element from the queue.** @return* @throws KeeperException* @throws InterruptedException*/int consume() throws KeeperException, InterruptedException{int retvalue = -1;Stat stat = null;// Get the first element availablewhile (true) {synchronized (mutex) {List<String> list = zk.getChildren(root, true);if (list.size() == 0) {System.out.println("Going to wait");mutex.wait();} else {Integer min = new Integer(list.get(0).substring(7));String minNode = list.get(0);for(String s : list){Integer tempValue = new Integer(s.substring(7));//System.out.println("Temporary value: " + tempValue);if(tempValue < min) {min = tempValue;minNode = s;}}System.out.println("Temporary value: " + root + "/" + minNode);byte[] b = zk.getData(root + "/" + minNode,false, stat);zk.delete(root + "/" + minNode, 0);ByteBuffer buffer = ByteBuffer.wrap(b);retvalue = buffer.getInt();return retvalue;}}}}}public static void main(String args[]) {if (args[0].equals("qTest"))queueTest(args);elsebarrierTest(args);}public static void queueTest(String args[]) {Queue q = new Queue(args[1], "/app1");System.out.println("Input: " + args[1]);int i;Integer max = new Integer(args[2]);if (args[3].equals("p")) {System.out.println("Producer");for (i = 0; i < max; i++)try{q.produce(10 + i);} catch (KeeperException e){} catch (InterruptedException e){}} else {System.out.println("Consumer");for (i = 0; i < max; i++) {try{int r = q.consume();System.out.println("Item: " + r);} catch (KeeperException e){i--;} catch (InterruptedException e){}}}}public static void barrierTest(String args[]) {Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));try{boolean flag = b.enter();System.out.println("Entered barrier: " + args[2]);if(!flag) System.out.println("Error when entering the barrier");} catch (KeeperException e){} catch (InterruptedException e){}// Generate random integerRandom rand = new Random();int r = rand.nextInt(100);// Loop for rand iterationsfor (int i = 0; i < r; i++) {try {Thread.sleep(100);} catch (InterruptedException e) {}}try{b.leave();} catch (KeeperException e){} catch (InterruptedException e){}System.out.println("Left barrier");}
}

转载来源:https://github.com/apache/zookeeper/blob/master/zookeeper-docs/src/main/resources/markdown/zookeeperTutorial.md

使用ZooKeeper编程 - 一个基本教程相关推荐

  1. 教程 | 以太坊智能合约编程之菜鸟教程

    教程 | 以太坊智能合约编程之菜鸟教程 译注:原文首发于ConsenSys开发者博客,原作者为Eva以及ConsenSys的开发团队.如果您想要获取更多及时信息,可以访问ConsenSys首页点击左下 ...

  2. 使用Reactor进行反应式编程最全教程

    反应式编程(Reactive Programming)这种新的编程范式越来越受到开发人员的欢迎.在 Java 社区中比较流行的是 RxJava 和 RxJava 2.本文要介绍的是另外一个新的反应式编 ...

  3. zookeeper编程入门系列之zookeeper实现分布式进程监控和分布式共享锁(图文详解)...

    本博文的主要内容有 一.zookeeper编程入门系列之利用zookeeper的临时节点的特性来监控程序是否还在运行   二.zookeeper编程入门系列之zookeeper实现分布式进程监控 三. ...

  4. python编程入门经典-总算理解python编程入门经典教程

    为了提高模块加载的速度,每个模块都会在__pycache__文件夹中放置该模块的预编译模块,命名为module.version.pyc,version是模块的预编译版本编码,一般都包含Python的版 ...

  5. 麒麟子Javascript游戏编程零基础教程大纲

    大家好,我是麒麟子, 开源项目<幼麟棋牌-四川麻将>(泄漏版叫 <达达麻将>)作者,成都幼麟科技创始人. 开源项目地址(Github与Gitee同步更新): Github ht ...

  6. 从零开始学视觉里程计——一个初学者教程

    从零开始学视觉里程计--一个初学者教程 目录 从零开始学视觉里程计--一个初学者教程 什么是里程计 什么是视觉里程计 为什么使用立体相机,或者为什么使用单目相机? 理论足够了,现在讨论算法 问题描述 ...

  7. 【“计算机科学与技术”专业小白成长系列】Linux Shell 编程 极简教程

    Linux Shell 编程 极简教程 内容摘要 本文是 Linux Shell 编程简单入门.主要内容: Linux 简介 Shell 编程入门 Kotlin 脚本与 Shell 脚本 Linux ...

  8. 少儿编程Scratch学习教程2--官方初学指南

    下载完软件,可以看一下官方的初学指南.学习下基本的操作. Scratch主要由角色和背景两部分构成.其中角色和背景都是由脚本,造型和声音组成的. 打开软件就可以看见下面的界面了. 在红色的脚本区域,是 ...

  9. 少儿编程Scratch学习教程--Scratch介绍及参赛相关

    之前写了几篇关于Scratch的使用文章.忽然想起,忘记介绍下Scratch了. Scratch是一款由麻省理工学院(MIT) 设计开发的少儿编程工具.官方网址是麻省理工学院网站的一个分支.这个软件的 ...

最新文章

  1. 学习openresty的log_by_lua*和stream-lua-nginx-module模块
  2. 基于SSM实现校园失物招领系统
  3. mysql 查询任务_mysql中怎样显示服务器正在执行的sql任务
  4. linux命令行下的ftp 多文件下载和目录下载
  5. 河南省第七届省赛 问题 F: Turing equation 水题
  6. 骁龙617 android7,骁龙617手机有哪些?骁龙617手机汇总
  7. 如何查看电脑显卡能使用的对应的cuda版本?
  8. 娜塔莉波特曼2015哈佛毕业演讲
  9. NFS网络文件系统服务配置、验证及错误解决
  10. 2022 SpringBoot/SSM的极为轻量级推荐博客系统
  11. python3.8安装matplotlib_图文详解python之matplotlib安装与报错解决办法
  12. Reinforcement Learning(001)
  13. 分区桌面背景制作方法-EXCEL
  14. 逆向-IDA工具的基本使用
  15. 互联网之于人类社会进化的意义
  16. 275. H指数 II
  17. 属性加密技术及基于属性的访问控制技术
  18. Java实现首字母大写
  19. Scrapy学习记录
  20. Holm–Bonferroni method

热门文章

  1. 使用js实现思维导图
  2. composer中文阿里云镜像地址
  3. LeetCode刷题(Python)——每个节点的右向指针
  4. 从包中导入类的使用 java
  5. 学习编程的方法与建议
  6. centos系统安装python3.8的操作过程 亲测可行 云服务器安装过程 200327
  7. python-环境篇-Anaconda的安装
  8. create-react-app教程-源码篇
  9. 51Nod 1085 01背包
  10. 【待解决】使用JUnit时报错java.lang