分布式计数器的思路是:指定一个Zookeeper数据节点作为计数器,多个应用实例在分布式锁的控制下,通过更新该数据节点的内容来实现计数功能。

Curator中封装了实现,例如 DistributedAtomicInteger 和 DistributedAtomicLong。

以下是我写的一个测试用例:

[java] view plaincopy
  1. package com.my.CuratorTest;
  2. import org.apache.curator.framework.CuratorFramework;
  3. import org.apache.curator.framework.CuratorFrameworkFactory;
  4. import org.apache.curator.framework.recipes.atomic.AtomicValue;
  5. import org.apache.curator.framework.recipes.atomic.DistributedAtomicLong;
  6. import org.apache.curator.retry.ExponentialBackoffRetry;
  7. import org.apache.curator.retry.RetryNTimes;
  8. import java.util.concurrent.CyclicBarrier;
  9. import java.util.concurrent.ExecutorService;
  10. import java.util.concurrent.Executors;
  11. /**
  12. * Title: 分布式计数器演示<br/>
  13. * Intention: <br/>
  14. * <p>
  15. * Class Name: com.my.CuratorTest.RecipesDisAutomicLong<br/>
  16. * Create Date: 2017/8/20 22:48 <br/>
  17. * Project Name: MyTest <br/>
  18. * Company:  All Rights Reserved. <br/>
  19. * Copyright © 2017 <br/>
  20. * </p>
  21. * <p>
  22. * author: GaoWei <br/>
  23. * 1st_examiner: <br/>
  24. * 2nd_examiner: <br/>
  25. * </p>
  26. *
  27. * @version 1.0
  28. * @since JDK 1.7
  29. */
  30. public class RecipesDisAutomicLong {
  31. static String disAutomicIntPath = "/curator_recipes_distatomicint_path3";
  32. static CuratorFramework client = CuratorFrameworkFactory.builder()
  33. .connectString("127.0.0.1:2181")
  34. .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
  35. static  DistributedAtomicLong atomicLong =
  36. new DistributedAtomicLong(client, disAutomicIntPath, new RetryNTimes(10, 500),
  37. null);
  38. public static void main(String[] args) throws Exception{
  39. client.start();
  40. Long[] nums = {1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L};
  41. ExecutorService executor = Executors.newFixedThreadPool(nums.length);
  42. CyclicBarrier barrier = new CyclicBarrier(nums.length);
  43. atomicLong.compareAndSet(atomicLong.get().postValue(), new Long(0));
  44. for (int i=0;i<nums.length;i++) {
  45. final int k = i;
  46. executor.execute(new Runnable() {
  47. @Override
  48. public void run() {
  49. try {
  50. barrier.await();
  51. System.out.println(Thread.currentThread().getName()+" " + System.nanoTime()+ " , 开始执行");
  52. AtomicValue<Long> av = atomicLong.add(nums[k]);
  53. System.out.println("to add value=" + nums[k] + ", Result=" + av.succeeded() + " preValue=" + av.preValue()
  54. + " postValue=" + av.postValue());
  55. } catch (Exception e) {
  56. e.printStackTrace();
  57. }
  58. }
  59. });
  60. }
  61. executor.shutdown();
  62. }
  63. }

运行结果:

pool-3-thread-10 89586635189009 , 开始执行
pool-3-thread-8 89586635508120 , 开始执行
pool-3-thread-9 89586635501453 , 开始执行
pool-3-thread-7 89586635493898 , 开始执行
pool-3-thread-6 89586635480564 , 开始执行
pool-3-thread-5 89586635470342 , 开始执行
pool-3-thread-4 89586635427231 , 开始执行
pool-3-thread-3 89586635410787 , 开始执行
pool-3-thread-2 89586635360564 , 开始执行
pool-3-thread-1 89586635236564 , 开始执行
to add value=10, Result=true preValue=0 postValue=10
to add value=2, Result=true preValue=10 postValue=12
to add value=6, Result=true preValue=12 postValue=18
to add value=1, Result=true preValue=18 postValue=19
to add value=7, Result=true preValue=19 postValue=26
to add value=3, Result=true preValue=26 postValue=29
to add value=4, Result=true preValue=29 postValue=33
to add value=8, Result=true preValue=33 postValue=41
to add value=9, Result=true preValue=41 postValue=50
to add value=5, Result=true preValue=50 postValue=55

如果在DistributedAtomicLong的构造方法参数中,RetryNTimes重试次数不够,比如是3,你会发现并不一定每次加数都会成功。显然这里是用了乐观锁机制,它并不保证操作一定成功(它在重试这么多次中都没有成功获得锁,导致操作没有执行),所以我们有必要通过调用 av.succeeded() 来查看此次加数是否成功。

下面是RetryNTimes为3时的某一次运行结果:

pool-3-thread-1 89922027531135 , 开始执行
pool-3-thread-5 89922027681802 , 开始执行
pool-3-thread-8 89922027737357 , 开始执行
pool-3-thread-4 89922027673802 , 开始执行
pool-3-thread-9 89922028120024 , 开始执行
pool-3-thread-10 89922027531580 , 开始执行
pool-3-thread-2 89922027616024 , 开始执行
pool-3-thread-3 89922027606246 , 开始执行
pool-3-thread-7 89922027722691 , 开始执行
pool-3-thread-6 89922027699580 , 开始执行
to add value=9, Result=true preValue=0 postValue=9
to add value=10, Result=true preValue=9 postValue=19
to add value=4, Result=true preValue=19 postValue=23
to add value=7, Result=true preValue=23 postValue=30
to add value=3, Result=true preValue=30 postValue=33
to add value=2, Result=true preValue=33 postValue=35
to add value=5, Result=true preValue=35 postValue=40
to add value=1, Result=false preValue=35 postValue=0
to add value=6, Result=false preValue=35 postValue=0
to add value=8, Result=false preValue=35 postValue=0

参考:

1、从PAXOS到ZOOKEEPER分布式一致性原理与实践

转载于:https://www.cnblogs.com/maohuidong/p/8407498.html

zookeeper 分布式计数器相关推荐

  1. Zookeeper后端开发工具Curator的使用 | Curator对节点的增删改查 | ACL权限控制 | 分布式锁 | 分布式计数器 | 附带最新版本下载

    前言 Curator是Apache开源的一个Java工具类,通过它操作Zookeeper会变得极度舒适! 前置条件:已掌握的基本操作,比如在后台可以增减节点.ACL权限设置等. 1.Zookeeper ...

  2. Zookeeper分布式一致性原理(七):Curator客户端

    1. Curator简介 Curator是Netfix公司开源的一套Zookeeper客户端.Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括重连.反复注册Watcher和 ...

  3. Zookeeper分布式一致性原理(四):Zookeeper简介

    zookeeper是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现数据发布/订阅.负载均衡.命名服务.分布式协调/通知.集群管理.master选举.分布式锁和分布式队列等.Zook ...

  4. 架构系列---利用zookeeper 分布式锁解决缓存重建冲突实战

    上一篇 分布式缓存重建并发冲突问题以及zookeeper分布式锁解决方案, 主要讲解了分布式缓存重建冲突原因及利用zookeeper分布式锁解决缓存重建冲突问题,本篇接着上篇,实现上篇思路,带你利用z ...

  5. 《从Paxos到zookeeper分布式一致性原理与实践》笔记

    <从Paxos到zookeeper分布式一致性原理与实践>笔记 文章目录 <从Paxos到zookeeper分布式一致性原理与实践>笔记 一.概念 二.一致性协调 2.1 2P ...

  6. 《从Paxos到zookeeper分布式一致性原理与实践》

    <从Paxos到zookeeper分布式一致性原理与实践> 一.概念 ACID: Automaticy.consistency.isolation. Durability CAP: con ...

  7. Curator典型使用场景之分布式计数器。

    有了上述分布式锁实现的基础之后,我们就很容易基于其实现一个分布式计数器.分布式计数器的一个典型场景是统计系统的在线人数.基于ZooKeeper的分布式计数器的实现思路页非常简单: 指定一个ZooKee ...

  8. (五)springmvc+mybatis+dubbo+zookeeper分布式架构 整合 - maven构建根项目

    上一篇我们介绍<springmvc+mybatis+dubbo+zookeeper分布式架构 整合 - maven模块规划>,从今天开始,我们将对代码的每一个构建做详细的记录,能够帮助大家 ...

  9. zookeeper 分布式过程协同技术详解.pdf_阿里大牛耗时18个月整理这份ZooKeeper分布式详解文档...

    前言 摩尔定律揭示了集成电路每18个月计算性能就会增加一倍.随着信息的飞速膨胀,很多应用都无法依赖单个服务器的性能升级来处理如此庞大的数据量,分布式系统和应用越来越受到人们的青睐.分布式系统和应用不仅 ...

最新文章

  1. iPhone曝严重漏洞,用户接听FaceTime前或被“监听”!
  2. Linux文件系统目录
  3. 伯纳德•罗森伯格先生参加华为技术2016首届国际光电连接技术研讨会
  4. Nginx中的break和last
  5. 开关电源雷击浪涌整改_高频开关电源的EMC电磁兼容整改问题分析
  6. 归并python_python基本算法之实现归并排序(Merge sort)
  7. Vert.x(vertx) 认证和授权
  8. 如今市面上有哪些可以远程的软件?
  9. 编写xml文件不当时会出现R文件找不到情况
  10. excel转换linux时间戳,在Excel中转换时间戳(timeStamp)
  11. 7-5 华氏度转摄氏度(四舍五入) (5分)
  12. 虫虫 5个衡量软件质量的标准(可自动化)
  13. 邮件到达对方服务器但是没到邮箱,无法将邮件发送进到对方服务器,教你如何用手工探测...
  14. Linux第7章Gdk及Cairo基础,Linux第7章Gdk及Cairo基础概要1.ppt
  15. POI实现EXCEL导出(resources配置路径下或者网络图片)
  16. 记一次 .NET 某桌面奇侠游戏 非托管内存泄漏分析
  17. java 采用MD5加密解密代码示例(不玩套路, 非标题党, 附带解密代码)
  18. 斐波那契生兔子问题(一月大兔子生a对,二月大兔子生b对,三月大兔子生c对。。。)
  19. Access to XMLHttpRequest at ‘XXX‘ from origin ‘XX‘ has been blocked by CORS policy: No ‘Access-Contr
  20. b,B,KB,MB,GB,TB,PB,EB,ZB,YB,BB,NB,DB的含义,之间的关系

热门文章

  1. 实名羡慕!蚂蚁员工激励达 1376.9 亿,人均能在杭州买套 283 平的房子?
  2. 华山论剑之iOStableView的双剑合璧
  3. content 内容生成技术2
  4. 查看tcp各个连接状态的数量
  5. Leetcode PHP题解--D14 561. Array Partition I
  6. rsync同步服务实验讲解
  7. 网页缩放zoom用法
  8. 兴趣部落的 Git 迁移实践
  9. MySQL Binlog解析
  10. 【Python 第2课】print