前言

上文【从入门到放弃-ZooKeeper】ZooKeeper入门中,我们学习了ZooKeeper的简单安装和cli使用。
接下来我们开始基于java API的实战编程。本文先来写一个分布式队列的代码实现。

设计

我们来写一个先进先出的分布式无界公平队列。参考我们之前介绍的【从入门到放弃-Java】并发编程-JUC-ConcurrentLinkedQueue和【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue。我们直接继承AbstractQueue类,并实现Queue接口。
主要重写offer、poll、peek、size方法。
我们使用ZooKeeper的持久化顺序节点来实现分布式队列。
offer是入队,入队时新创建一个持久化顺序节点,节点后缀会根据ZooKeeper的特性自动累加。
poll的出队,获取根节点下的所有节点,根据后缀数字排序,数组最小的是最先入队的,因此要最先出队。
peek,获取到最下入队的数据,和poll的区别是,peek只获取数据,不出队,不删除已经消费的节点。
size获取队列长度,实现方式是,获取根节点下的节点数量即可。这个方法在并发时可能会有问题。慎用。

DistributedQueue

//继承AbstractQueue类并实现Queue接口
public class DistributedQueue<E> extends AbstractQueue<E> implements Queue<E> {private static Logger logger = LoggerFactory.getLogger(DistributedQueue.class);//ZooKeeper客户端,进行ZooKeeper操作private ZooKeeper zooKeeper;//根节点名称private String dir;//数据节点名称,顺序节点在插入口会变为 node{00000000xx} 格式private String node;//ZooKeeper鉴权信息private List<ACL> acls;/*** Constructor.** @param zooKeeper the zoo keeper* @param dir       the dir* @param node      the node* @param acls      the acls*/public DistributedQueue (ZooKeeper zooKeeper, String dir, String node, List<ACL> acls) {this.zooKeeper = zooKeeper;this.dir = dir;this.node = node;this.acls = acls;init();}private void init() {//需要先判断根节点是否存在,不存在的话,创建子节点时会出错。try {Stat stat = zooKeeper.exists(dir, false);if (stat == null) {zooKeeper.create(dir, null, acls, CreateMode.PERSISTENT);}} catch (Exception e) {logger.error("[DistributedQueue#init] error : " + e.toString(), e);}}
}

offer

/*** Offer boolean.** @param o the o* @return the boolean*/
@Override
public boolean offer(E o) {//构建要插入的节点名称String fullPath = dir.concat("/").concat(node);try {//创建子节点成功则返回入队成功zooKeeper.create(fullPath, objectToBytes(o), acls, CreateMode.PERSISTENT_SEQUENTIAL);return true;} catch (Exception e) {logger.error("[DistributedQueue#offer] error : " + e.toString(), e);}return false;
}

poll

/*** Poll e.** @return the e*/
@Override
public E poll() {try {//获取根节点所有子节点信息。List<String> children = zooKeeper.getChildren(dir, null);//如果队列是空的则返回nullif (children == null || children.isEmpty()) {return null;}//将子节点名称排序Collections.sort(children);for (String child : children) {//拼接子节点的具体名称String fullPath = dir.concat("/").concat(child);try {//如果获取数据成功,则类型转换后,返回,并删除改队列中该节点byte[] bytes = zooKeeper.getData(fullPath, false, null);E data = (E) bytesToObject(bytes);zooKeeper.delete(fullPath, -1);return data;} catch (Exception e) {logger.warn("[DistributedQueue#poll] warn : " + e.toString(), e);}}} catch (Exception e) {logger.error("[DistributedQueue#peek] poll : " + e.toString(), e);}return null;
}

peek

/*** Peek e.** @return the e*/
@Override
public E peek() {try {//获取根节点所有子节点信息。List<String> children = zooKeeper.getChildren(dir, null);//如果队列是空的则返回nullif (children == null || children.isEmpty()) {return null;}//将子节点名称排序Collections.sort(children);for (String child : children) {//拼接子节点的具体名称String fullPath = dir.concat("/").concat(child);try {//如果获取数据成功,则类型转换后,返回,不会删除改队列中该节点byte[] bytes = zooKeeper.getData(fullPath, false, null);E data = (E) bytesToObject(bytes);return data;} catch (Exception e) {logger.warn("[DistributedQueue#peek] warn : " + e.toString(), e);}}} catch (Exception e) {logger.error("[DistributedQueue#peek] warn : " + e.toString(), e);}return null;
}

size

/*** Size int.** @return the int*/
@Override
public int size() {try {//获取根节点的子节点名称List<String> children = zooKeeper.getChildren(dir, null);//返回子结点信息数量return children.size();} catch (Exception e) {logger.error("[DistributedQueue#offer] size : " + e.toString(), e);}return 0;
}

总结

上面我们一起学习了如何利用持久性顺序节点,创建一个分布式先进先出队列。源代码可见:aloofJr。
如果有好的优化建议,欢迎一起讨论。

原文链接
本文为云栖社区原创内容,未经允许不得转载。

【从入门到放弃-ZooKeeper】ZooKeeper实战-分布式队列相关推荐

  1. 【从入门到放弃-ZooKeeper】ZooKeeper入门

    前言 ZooKeeper是一个分布式服务协调框架,可以用来维护分布式配置信息.服务注册中心.实现分布式锁等.在Hbase.Hadoop.kafka等项目中都有广泛的应用.随着分布式.微服务的普及,Zo ...

  2. Zookeeper实战-分布式锁的现实

    1. 简介 我们在之前的博文中讲解了如何使用redis实现分布式锁,其实除了 redis 还有 zookeeper 也能实现分布式锁. 废话不多说,直接上图. 从整个流程中可以看出,zk实现分布式锁, ...

  3. api 创建zookeeper客户端_zookeeper分布式锁原理及实现

    前言 本文介绍下 zookeeper方式 实现分布式锁 原理简介 zookeeper实现分布式锁的原理就是多个节点同时在一个指定的节点下面创建临时会话顺序节点,谁创建的节点序号最小,谁就获得了锁,并且 ...

  4. 基于zookeeper实现的分布式锁

    http://www.jiacheo.org/blog/122 zookeeper是hadoop下面的一个子项目, 用来协调跟hadoop相关的一些分布式的框架, 如hadoop, hive, pig ...

  5. zookeeper安装及分布式配置

    zookeeper安装及分布式配置 下载zookeeper 解压到/usr/local/目录下 tar -xvf [zookeeper压缩包名] -C /usr/local/ 对zookeeper目录 ...

  6. 关于Zookeeper来实现分布式锁的几个问题

    本文来说下Zookeeper实现分布式锁的几个问题 文章目录 概述 zk基本锁原理 监听通知机制 zk锁优化原理 zk锁的优缺点 本文小结 概述 zookeeper锁相关基础知识 zookeeper锁 ...

  7. 使用Zookeeper共享锁和排它锁和分布式队列的原理和实现(ZkClient)

    简介 当我们分布式系统中多个节点需要访问同一共享数据,就需要加一把分布式锁,因为如果是同一进程的线程的话,完全可以采用Java的同步锁实现,但是这是多进程间的锁,所以就需要一个协调者来协调进程间的通信 ...

  8. 使用ZooKeeper实现分布式队列、分布式锁和选举详解

    点击关注公众号,实用技术文章及时了解 来源:blog.csdn.net/qq_40378034/ article/details/117014648 ZooKeeper源码的zookeeper-rec ...

  9. zookeeper应用 - FIFO 队列 分布式队列

    使用ZooKeeper实现的FIFO队列,这个队列是分布式的. package fifo; import java.util.Collections; import java.util.List; i ...

最新文章

  1. 悟道·文汇详解:少样本学习等近十个数据集取得第一
  2. python官网下载步骤linux-Linux 安装python3.x步骤
  3. linux查看passwd最后一行,Linux命令总结
  4. Google: 如何做code review?
  5. 01.analyzer简介及char_filter组件
  6. vmware服务器虚拟化部署sdn,使用VMware的NSX多面落地软件定义网络SDN视频课程
  7. JS组件系列——Bootstrap Table 表格行拖拽
  8. Jmeter函数助手
  9. SpringCloud工作笔记045---SpringCloud分布式服务部署常用端口
  10. Spring Cloud Netflix之Eureka上篇
  11. 使Docker Container支持运行SWT程序
  12. Linux Mint开发环境安装整理
  13. arcgis自带的python版本_arcgis10.3自带的python2.7.8怎么安装geopandas?
  14. 第 7 章 Neutron - 067 - Neutron 架构
  15. java mrp模拟器_MRP模拟器(冒泡官方版)好用吗_MRP模拟器(冒泡官方版)怎么样_MRP模拟器(冒泡官方版)3.811用户评论-AppChina应用汇...
  16. 《老路用得上的商学课》91-95学习笔记
  17. 人民币对美元汇率中间价报6.7560元 上调349个基点
  18. 【刘文彬】 Debug EOS:nodeos + mongo_db_plugin
  19. 【财务】FMS财务管理系统---审计流程
  20. 富文本编辑器复制Wod字体问题

热门文章

  1. 决策树 prepruning_智能建筑运维前探 AI天天见之五:决策树算法应用探索
  2. java不同进程的相互唤醒_Java多线程(二)同步与等待唤醒
  3. java类与对象实验报告心得体会_Java类与对象实验报告.doc
  4. 怎样用计算机记账,仓管员怎么用电脑记账?简单实用的电脑操作方式一览!
  5. python入门之函数调用-python之函数入门
  6. 【LeetCode笔记】48. 旋转图像(Java、矩阵、偏数学、原地算法)
  7. php ajax download,通过Ajax和PHP强制下载
  8. element ui 多个子组件_vue前端UI框架,一点都不圆润,盘它!
  9. php函数方法,基于PHP函数的操作方法
  10. php sphinx api调用,php调用Sphinx