为什么80%的码农都做不了架构师?>>>   

Barrier

在分布式系统中,可以使用栅栏,对多个节点上的任务进行阻塞等待;直到满足某个定制的条件,所有的节点才可以继续执行下一步任务。

1. 关键 API

org.apache.curator.framework.recipes.barriers.DistributedBarrier

2. 机制说明

控制多节点上的多任务执行步进。

类似java中的java.util.concurrent.CyclicBarrier分布式实现。

3. 用法

3.1 创建

public DistributedBarrier(CuratorFramework client,String barrierPath)

3.2 使用

3.2.1 等待栅栏

public void waitOnBarrier()

3.2.2 设置/移除栅栏

setBarrier();
removeBarrier();

4. 错误处理

DistributedBarrier实例会监听链接丢失。在waitOnBarrier()时,如果发生丢失时,则会抛出异常。

5. 源码分析

5.1 类定义

public class DistributedBarrier {}

没有继承父类,也没有实现任何接口

5.2 成员变量

public class DistributedBarrier
{private final CuratorFramework client;private final String barrierPath;private final Watcher watcher = new Watcher(){@Overridepublic void process(WatchedEvent event){notifyFromWatcher();}};
}
  • client
  • barrierPath
    • 用作栅栏的zk节点path
  • watcher
    • 用作zk链接的监听器

5.3 构造器

public DistributedBarrier(CuratorFramework client, String barrierPath)
{this.client = client;this.barrierPath = PathUtils.validatePath(barrierPath);
}

简单赋值

5.3 设置栅栏

public synchronized void setBarrier() throws Exception
{try{client.create().creatingParentContainersIfNeeded().forPath(barrierPath);}catch ( KeeperException.NodeExistsException ignore ){// ignore}
}
  • synchronized同步控制

可见,设置栅栏的过程,就是创建barrierPath节点(普通节点)

5.4 等待

public synchronized void waitOnBarrier() throws Exception
{waitOnBarrier(-1, null);
}public synchronized boolean waitOnBarrier(long maxWait, TimeUnit unit) throws Exception
{long            startMs = System.currentTimeMillis();boolean         hasMaxWait = (unit != null);long            maxWaitMs = hasMaxWait ? TimeUnit.MILLISECONDS.convert(maxWait, unit) : Long.MAX_VALUE;boolean         result;for(;;){result = (client.checkExists().usingWatcher(watcher).forPath(barrierPath) == null);if ( result ){break;}if ( hasMaxWait ){long        elapsed = System.currentTimeMillis() - startMs;long        thisWaitMs = maxWaitMs - elapsed;if ( thisWaitMs <= 0 ){break;}wait(thisWaitMs);}else{wait();}}return result;
}
  • 使用synchronized同步控制
  • 不断检查barrierPath栅栏节点是否存在
    • 如果栅栏不存在了,则栅栏放开了,返回true
    • 如果栅栏还在,则进行等待

5.5 移除栅栏

public synchronized void removeBarrier() throws Exception
{try{client.delete().forPath(barrierPath);}catch ( KeeperException.NoNodeException ignore ){// ignore}
}

直接删除栅栏节点

6. 测试

对于栅栏的示例,最先想到的就是赛马游戏的场景。所以,这里用DistributedBarrier来实现一个赛马游戏。

package com.roc.curator.demo.barriersimport org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.CuratorFrameworkFactory
import org.apache.curator.framework.recipes.barriers.DistributedBarrier
import org.apache.curator.retry.ExponentialBackoffRetry
import org.junit.Test
import java.util.*
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference/*** Created by roc on 2017/6/1.*/
class HorseRaceLampTest {val PATH: String = "/test/barrier/horse"// 由于栅栏是监听链接,所以对于一个链接上的多个栅栏监听,会存在问题// 这里为每一个任务创建一个链接fun connect(): CuratorFramework {val client: CuratorFramework = CuratorFrameworkFactory.builder().connectString("0.0.0.0:8888").connectionTimeoutMs(5000).retryPolicy(ExponentialBackoffRetry(1000, 10)).sessionTimeoutMs(3000).build()client.start()return client}@Test fun runTest() {val name: String = "HORSE-"val count: AtomicInteger = AtomicInteger()val threadFactory: ThreadFactory = ThreadFactory { r ->val t: Thread = Thread(r, name + count.incrementAndGet())t}var champion: AtomicReference<String> = AtomicReference()champion.set("")val executorService: ExecutorService = Executors.newFixedThreadPool(5, threadFactory)//使用一个计步器来判定每一匹马是否完成当轮比赛val countStep: AtomicInteger = AtomicInteger()var round: Int = 0var i: Int = 0val target: Int = 100while (i < 5) {i++executorService.execute(Runnable {var sumSetp: Int = 0run {val client: CuratorFramework = connect()val barrier: DistributedBarrier = DistributedBarrier(client, PATH)//设置栅栏barrier.setBarrier()println("${Thread.currentThread()} 准备完毕 ${Date()}")//通知准备完成countStep.incrementAndGet()//等待开赛barrier.waitOnBarrier()//需要对线程是否中断进行判断while (!Thread.interrupted() || sumSetp < target) {var setp: Int = (Math.random() * 20).toInt()setp = maxOf(setp, 1)setp = minOf(setp, target - sumSetp)sumSetp += setpprintln("${Thread.currentThread()} 第 $round 轮,跑了 $setp 步,合计 $sumSetp / $target ")//如果已经到达终点,自行标记冠军//示例而已,如果要严谨一点的话,这个应该交给裁判来判断if (sumSetp >= target) {champion.set(Thread.currentThread().name)}//设立栅栏barrier.setBarrier()//通知完成此轮比赛countStep.incrementAndGet()//等待下一轮barrier.waitOnBarrier()}}})}val client: CuratorFramework = connect()val barrier: DistributedBarrier = DistributedBarrier(client, PATH)var pStep: Int = countStep.get()while (champion.get().isBlank()) {if (countStep.get() > pStep && countStep.get() % 5 == 0) {round++println("------------------------------------------------")println("裁判: 开始第 $round 轮 ${Date()}")println("------------------------------------------------")pStep = countStep.get()barrier.removeBarrier()}TimeUnit.MILLISECONDS.sleep(500)}println("裁判: 冠军是 $champion")executorService.shutdown()barrier.removeBarrier()}
}

输出:

Thread[HORSE-2,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
Thread[HORSE-1,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
Thread[HORSE-3,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
Thread[HORSE-5,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
Thread[HORSE-4,5,main] 准备完毕 Thu Jun 01 20:18:20 CST 2017
------------------------------------------------
裁判: 开始第 1 轮 Thu Jun 01 20:18:20 CST 2017
------------------------------------------------
Thread[HORSE-4,5,main] 第 1 轮,跑了 17 步,合计 17 / 100
Thread[HORSE-3,5,main] 第 1 轮,跑了 13 步,合计 13 / 100
Thread[HORSE-2,5,main] 第 1 轮,跑了 8 步,合计 8 / 100
Thread[HORSE-5,5,main] 第 1 轮,跑了 14 步,合计 14 / 100
Thread[HORSE-1,5,main] 第 1 轮,跑了 18 步,合计 18 / 100
------------------------------------------------
裁判: 开始第 2 轮 Thu Jun 01 20:18:21 CST 2017
------------------------------------------------
Thread[HORSE-3,5,main] 第 2 轮,跑了 11 步,合计 24 / 100
Thread[HORSE-4,5,main] 第 2 轮,跑了 1 步,合计 18 / 100
Thread[HORSE-5,5,main] 第 2 轮,跑了 14 步,合计 28 / 100
Thread[HORSE-2,5,main] 第 2 轮,跑了 1 步,合计 9 / 100
Thread[HORSE-1,5,main] 第 2 轮,跑了 19 步,合计 37 / 100
------------------------------------------------
裁判: 开始第 3 轮 Thu Jun 01 20:18:21 CST 2017
------------------------------------------------
Thread[HORSE-3,5,main] 第 3 轮,跑了 10 步,合计 34 / 100
Thread[HORSE-4,5,main] 第 3 轮,跑了 13 步,合计 31 / 100
Thread[HORSE-1,5,main] 第 3 轮,跑了 10 步,合计 47 / 100
Thread[HORSE-2,5,main] 第 3 轮,跑了 8 步,合计 17 / 100
Thread[HORSE-5,5,main] 第 3 轮,跑了 18 步,合计 46 / 100
------------------------------------------------
裁判: 开始第 4 轮 Thu Jun 01 20:18:22 CST 2017
------------------------------------------------
Thread[HORSE-2,5,main] 第 4 轮,跑了 3 步,合计 20 / 100
Thread[HORSE-4,5,main] 第 4 轮,跑了 9 步,合计 40 / 100
Thread[HORSE-3,5,main] 第 4 轮,跑了 16 步,合计 50 / 100
Thread[HORSE-1,5,main] 第 4 轮,跑了 14 步,合计 61 / 100
Thread[HORSE-5,5,main] 第 4 轮,跑了 12 步,合计 58 / 100
------------------------------------------------
裁判: 开始第 5 轮 Thu Jun 01 20:18:22 CST 2017
------------------------------------------------
Thread[HORSE-3,5,main] 第 5 轮,跑了 6 步,合计 56 / 100
Thread[HORSE-1,5,main] 第 5 轮,跑了 17 步,合计 78 / 100
Thread[HORSE-2,5,main] 第 5 轮,跑了 12 步,合计 32 / 100
Thread[HORSE-4,5,main] 第 5 轮,跑了 10 步,合计 50 / 100
Thread[HORSE-5,5,main] 第 5 轮,跑了 3 步,合计 61 / 100
------------------------------------------------
裁判: 开始第 6 轮 Thu Jun 01 20:18:23 CST 2017
------------------------------------------------
Thread[HORSE-4,5,main] 第 6 轮,跑了 6 步,合计 56 / 100
Thread[HORSE-2,5,main] 第 6 轮,跑了 3 步,合计 35 / 100
Thread[HORSE-1,5,main] 第 6 轮,跑了 5 步,合计 83 / 100
Thread[HORSE-5,5,main] 第 6 轮,跑了 7 步,合计 68 / 100
Thread[HORSE-3,5,main] 第 6 轮,跑了 1 步,合计 57 / 100
------------------------------------------------
裁判: 开始第 7 轮 Thu Jun 01 20:18:23 CST 2017
------------------------------------------------
Thread[HORSE-3,5,main] 第 7 轮,跑了 14 步,合计 71 / 100
Thread[HORSE-2,5,main] 第 7 轮,跑了 3 步,合计 38 / 100
Thread[HORSE-1,5,main] 第 7 轮,跑了 14 步,合计 97 / 100
Thread[HORSE-4,5,main] 第 7 轮,跑了 1 步,合计 57 / 100
Thread[HORSE-5,5,main] 第 7 轮,跑了 11 步,合计 79 / 100
------------------------------------------------
裁判: 开始第 8 轮 Thu Jun 01 20:18:24 CST 2017
------------------------------------------------
Thread[HORSE-4,5,main] 第 8 轮,跑了 15 步,合计 72 / 100
Thread[HORSE-2,5,main] 第 8 轮,跑了 8 步,合计 46 / 100
Thread[HORSE-3,5,main] 第 8 轮,跑了 10 步,合计 81 / 100
Thread[HORSE-5,5,main] 第 8 轮,跑了 9 步,合计 88 / 100
Thread[HORSE-1,5,main] 第 8 轮,跑了 3 步,合计 100 / 100
裁判: 冠军是 HORSE-1

zk节点:

get /test/barrier/horse
192.168.60.165
cZxid = 0x1e991
ctime = Thu Jun 01 20:18:23 CST 2017
mZxid = 0x1e991
mtime = Thu Jun 01 20:18:23 CST 2017
pZxid = 0x1e991
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 14
numChildren = 0

转载于:https://my.oschina.net/roccn/blog/912833

[Curator] Barrier 的使用与分析相关推荐

  1. barrier linux,Linux Barrier I/O 实现分析笔记

    一直以来,I/O顺序问题一直困扰着我.其实这个问题是一个比较综合的问题,它涉及的层次比较多,从VFS page cache到I/O调度算法,从i/o子系统到存储外设.而Linux I/O barrie ...

  2. Curator-01

    为什么80%的码农都做不了架构师?>>>    Curator 01 在<扩展Dubbo,改用Curator2连接Zookeeper>中,介绍过Curator.其实在使用 ...

  3. Apache Flink fault tolerance源码剖析(六)

    上篇文章我们分析了基于检查点的用户状态的保存机制--状态终端.这篇文章我们来分析barrier(中文常译为栅栏或者屏障,为了避免引入名称争议,此处仍用英文表示).检查点的barrier是提供exact ...

  4. Android 源码 图形系统之 relayoutWindow

    在 <Android 源码 图形系统之请求布局> 一节,分析到 ViewRootImpl 类 performTraversals() 方法内调用 relayoutWindow(-) 方法重 ...

  5. [Curator] Path Cache 的使用与分析

    为什么80%的码农都做不了架构师?>>>    Path Cache Path Cache其实就是用于对zk节点的监听.不论是子节点的新增.更新或者移除的时候,Path Cache都 ...

  6. zookeeper客户端库curator分析

    zookeeper客户端库curator分析 前言 综述 zookeeper保证 理解zookeeper的顺序一致性 之前使用zookeeper客户端踩到的坑 curator 连接保证 连接状态监控以 ...

  7. Linkage Mapper 之 Barrier Mapper 功能解析(含实际案例分析)

    ✅创作者:陈书予

  8. kazoo源码分析:Zookeeper客户端start概述

    kazoo源码分析 kazoo-2.6.1 kazoo客户端 kazoo是一个由Python编写的zookeeper客户端,实现了zookeeper协议,从而提供了Python与zookeeper服务 ...

  9. GPU指令集技术分析

    GPU指令集技术分析 本文将两篇文章整理了一下. 参考文章链接如下: https://zhuanlan.zhihu.com/p/391238629 https://zhuanlan.zhihu.com ...

最新文章

  1. 青少年电子信息智能创新大赛 赛项说明(Python编程创新挑战赛)
  2. 区块链用AI和大数据改变行业现状
  3. ignite自定义函数
  4. Linux中shell模块的考试,linux下的shell编程要考试了题目这里有可是表示不会 求帮忙...
  5. 光动能表怎么维护_男士手表什么牌子好,男士手表品牌推荐, 天梭、阿玛尼、西铁城、天王表、罗西尼、卡西欧男手表推荐...
  6. 巧用FineReport搭建成本管控监测系统
  7. SD-WAN平台ActiveCore推出,领域新亮点精彩丰呈
  8. 从现有数据创建 XML 架构和数据集
  9. HTML5期末大作业:我的家乡网站设计4
  10. Android关于BottomNavigationView效果实现指南
  11. 艾艾贴Mysql主从同步
  12. 深入理解Android
  13. 普通人的爱国在日常生活中如何提现
  14. 部署: 搭建 Apache RocketMQ 单机环境与Rocketmq-console
  15. html中文字不自动换行 white-space style
  16. 2021SC@SDUSC基于人工智能的多肽药物分析问题(六)
  17. 恭喜河南建业冲上中超
  18. 360搜索是废了还是彻底商业化了?
  19. EBS开发_创建AP付款
  20. 基础知识----Symbian UIQ

热门文章

  1. Visual Studio 11更名为“Visual Studio 2012”,RC版(与.NET 4.5一起)开放下载
  2. HTML元素(标签)大全及使用说明 (整)
  3. 陈广老师 C#语言参考视频打包下载地址
  4. 比波士顿动力快一步:两足机器人送快递,你不用跑出门也能收货了
  5. 导航栏透明度渐变; 下拉头视图拉伸效果;勾号动画; 一段文字中点击部分可响应不同事件...
  6. JavaScript中的this详解
  7. ubuntu文字界面与图形界面切换
  8. 7-1 关于堆的判断 (25 分)
  9. 工业大数据发展面临四方面挑战
  10. 大数据在智慧社区的作用有哪些