1. 分布式锁功能

1.1. 在分布式场景中, 我们为了保证数据的一致性, 经常在程序运行的某一个点需要进行同步操作。Java提供的synchronized或者Reentrantlock等, 是同一个应用程序的多个线程的高并发, 并不是多个服务器(分布式)写同一数据的并发, 这个时候再使用Java提供的锁, 就会出现分布式不同步的问题。我们使用Curator基于ZooKeeper的特性提供的分布式锁来处理分布式场景的数据一致性。

2. 分布式锁实例

2.1. 新建一个名为CuratorDistributed的Java项目, 拷入相关jar

2.2. Java的ReentrantLock锁

package com.fj.zkcurator;import java.io.File;
import java.io.FileOutputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;public class Look {public static final ReentrantLock rl = new ReentrantLock();public static void main(String[] args) {try {rl.lock();db();} catch (Exception e) {e.printStackTrace();} finally {rl.unlock();}}public static void db() throws Exception {// 1.获取连接对象Connection conn = JDBCUtil.getConn(); // 2.创建statement, 跟数据库打交道, 一定需要这个对象Statement st = conn.createStatement();// 3.执行查询sql, 获取ResultSet结果集ResultSet rs = st.executeQuery("select * from product where id = 2");// 4.使用ResultSet结果集遍历, 下标从1开始List<Product> products = new ArrayList<Product>();while(rs.next()) {products.add(new Product(rs.getInt(1), rs.getString(2), rs.getInt(3)));}// 5.查询id为2的商品的数量int number = products.get(0).getNumber();File file = new File("Look.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);fos.write((System.currentTimeMillis() + ", number = " + number + "\r\n").getBytes());fos.close();if(number > 0) {// 6.执行查询sqlst.executeUpdate("update product set number = " + (--number) + " where id = 2");}// 7.释放资源JDBCUtil.release(conn, st);}
}

2.3. 在src目录下创建jdbc.properties

2.4. 数据库product表

2.5. Product.java

package com.fj.zkcurator;import java.io.Serializable;public class Product implements Serializable {private static final long serialVersionUID = 1L;private Integer id;private String name;private Integer number;public Product() {}public Product(Integer id, String name, Integer number) {this.id = id;this.name = name;this.number = number;}public Integer getId() {return id;}public void setId(Integer id) {this.id = id;}public String getName() {return name;}public void setName(String name) {this.name = name;}public Integer getNumber() {return number;}public void setNumber(Integer number) {this.number = number;}@Overridepublic String toString() {return "Product [id=" + id + ", name=" + name + ", number=" + number + "]";}}

2.6. JDBCUtil.java

package com.fj.zkcurator;import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;public class JDBCUtil {private static String driverClass = null;private static String url = null;private static String name = null;private static String password= null;static {try {// 1.创建一个属性配置对象Properties properties = new Properties();// 2.使用类加载器, 去读取src底下的资源文件。对应文件位于src目录底下InputStream is = JDBCUtil.class.getClassLoader().getResourceAsStream("jdbc.properties");// 3.导入输入流。properties.load(is);// 4.读取属性driverClass = properties.getProperty("driverClass");url = properties.getProperty("url");name = properties.getProperty("name");password = properties.getProperty("password");} catch (IOException e) {e.printStackTrace();}}/*** 获取连接对象*/public static Connection getConn(){Connection conn = null;try {Class.forName(driverClass);conn = DriverManager.getConnection(url, name, password);} catch (Exception e) {e.printStackTrace();}return conn;}/*** 释放资源* @param conn* @param st* @param rs*/public static void release(Connection conn, Statement st, ResultSet rs){closeRs(rs);closeSt(st);closeConn(conn);}public static void release(Connection conn, Statement st){closeSt(st);closeConn(conn);}private static void closeRs(ResultSet rs){try {if(rs != null){rs.close();}} catch (SQLException e) {e.printStackTrace();}finally{rs = null;}}private static void closeSt(Statement st){try {if(st != null){st.close();}} catch (SQLException e) {e.printStackTrace();}finally{st = null;}}private static void closeConn(Connection conn){try {if(conn != null){conn.close();}} catch (SQLException e) {e.printStackTrace();}finally{conn = null;}}}

2.7. 连续运行Look.java十五次(模拟多个应用并发), 出现了分布式并发问题

2.8. 分布式锁

package com.fj.zkcurator;import java.io.File;
import java.io.FileOutputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;public class DistributedLook {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/interProcessMutex";public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 分布式锁InterProcessMutex lock = new InterProcessMutex(cf, PATH);try {// 获取锁lock.acquire();db();} catch (Exception e) {e.printStackTrace();} finally {try {// 释放锁lock.release();} catch (Exception e) {e.printStackTrace();}}// 5. 关闭连接cf.close();}public static void db() throws Exception {// 1.获取连接对象Connection conn = JDBCUtil.getConn(); // 2.创建statement, 跟数据库打交道, 一定需要这个对象Statement st = conn.createStatement();// 3.执行查询sql, 获取ResultSet结果集ResultSet rs = st.executeQuery("select * from product where id = 2");// 4.使用ResultSet结果集遍历, 下标从1开始List<Product> products = new ArrayList<Product>();while(rs.next()) {products.add(new Product(rs.getInt(1), rs.getString(2), rs.getInt(3)));}// 5.查询id为2的商品的数量int number = products.get(0).getNumber();File file = new File("DistributedLook.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);fos.write((System.currentTimeMillis() + ", number = " + number + "\r\n").getBytes());fos.close();if(number > 0) {// 6.执行查询sqlst.executeUpdate("update product set number = " + (--number) + " where id = 2");}// 7.释放资源JDBCUtil.release(conn, st);}
}

2.9. 连续运行DistributedLook.java十五次(模拟多个应用并发)

3. 分布式计数器功能

3.1. 一说到分布式计数器, 你可能脑海里想到了AtomicInteger这种经典的方式, 如果针对于一个JVM的场景当然没问题, 但是我们现在是分布式场景下, 这就需要利用Curator框架的DistributedAtomicInteger了。

4. 分布式计数器例子

4.1. 分布式计数器

package com.fj.zkcurator;import java.io.File;
import java.io.FileOutputStream;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;/*** 分布式计数器*/
public class DistributedCounter {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/distributedAtomicInteger";public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 分布式原子整形DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(cf, PATH, new RetryNTimes(3, 1000));try {// 递增操作AtomicValue<Integer> value = atomicInteger.increment();// 检查是否操作成功if(value.succeeded()) {// 操作前和操作后的值File file = new File("DistributedCounter.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);fos.write(("preValue = " + value.preValue() + ", postValue = " + value.postValue() + "\r\n").getBytes());fos.close();}} catch (Exception e1) {e1.printStackTrace();}// 5. 关闭连接cf.close();}
}

4.2. 连续运行DistributedCounter.java十五次(模拟多个应用并发)

5. Barrier屏障

5.1. 首先, 得介绍下Barrier的概念, Barrier从字面理解是屏障的意思, 主要是用作集合线程, 然后再一起往下执行。再具体一点, 在Barrier之前, 若干个thread各自执行, 然后到了Barrier的时候停下, 等待规定数目的所有的其他线程到达这个Barrier, 之后再一起通过这个Barrier各自干自己的事情。

5.2. 在计算机的世界里, Barrier可以解决的问题很多, 比如, 一个程序有若干个线程并发的从网站上下载一个大型xml文件, 这个过程可以相互独立, 因为一个文件的各个部分并不相关。而在处理这个文件的时候, 可能需要一个完整的文件, 所以, 需要有一条虚拟的线让这些并发的部分集合一下从而可以拼接成为一个完整的文件, 可能是为了后续处理也可能是为了计算hash值来验证文件的完整性。而后, 再交由下一步处理。

6. 同时开始任务实例

6.1. 同时开始任务

package com.fj.zkcurator.barrier;import java.io.File;
import java.io.FileOutputStream;
import java.util.UUID;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;/*** 分布式屏障, 同时开始任务, 需要它人触发任务的开启。*/
public class SameTimeStartWork {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/distributedBarrier";public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 分布式屏障DistributedBarrier barrier = new DistributedBarrier(cf, PATH);try {File file = new File("SameTimeStartWork.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);String uuid = UUID.randomUUID().toString();fos.write((uuid + "准备就绪..." + "\r\n").getBytes());// 5. 设置屏障barrier.setBarrier();// 6. 等待它人触发才开始工作barrier.waitOnBarrier();fos.write((uuid + "开始任务..." + "\r\n").getBytes());Thread.sleep(1000);fos.write((uuid + "完成任务..." + "\r\n").getBytes());fos.close();} catch (Exception e1) {e1.printStackTrace();}// 7. 关闭连接cf.close();}
}

6.2. 移除屏障

package com.fj.zkcurator.barrier;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;public class RemoveDistributedBarrier {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/distributedBarrier";public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 分布式屏障DistributedBarrier barrier = new DistributedBarrier(cf, PATH);try {// 5. 作为第三者移除屏障barrier.removeBarrier();} catch (Exception e) {e.printStackTrace();}// 7. 关闭连接cf.close();}
}

6.3. 连续运行SameTimeStartWork.java五次

6.4. 运行RemoveDistributedBarrier.java移除屏障

7. 同时开始任务同时结束任务实例

7.1. 同时开始任务同时结束, 设置执行者到达5个以上开始任务

package com.fj.zkcurator.barrier;import java.io.File;
import java.io.FileOutputStream;
import java.util.UUID;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;/*** 分布式的同时开始任务, 同时离开任务。*/
public class SameTimeEnterAndLeave {public static final String connectString = "192.168.25.133:2181,192.168.25.135:2181,192.168.25.138:2181";public static final int sessionTimeoutMs = 10 * 60 * 1000;public static final int connectionTimeoutMs = 5 * 1000;public static final String PATH = "/distributedDoubleBarrier";public static void main(String[] args) {// 1. 重试策略, 初试时间为1s, 最多可重试10次。RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);// 2. 通过工厂创建连接CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(connectString).sessionTimeoutMs(sessionTimeoutMs).connectionTimeoutMs(connectionTimeoutMs).retryPolicy(retryPolicy).build();// 3. 开启连接cf.start();// 4. 分布式双重屏障DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, PATH, 5);try {// 操作前和操作后的值File file = new File("SameTimeEnterAndLeave.txt");if(!file.exists()) {file.createNewFile();}FileOutputStream fos = new FileOutputStream(file, true);String uuid = UUID.randomUUID().toString();fos.write((uuid + "准备就绪..." + "\r\n").getBytes());// 5. 进入任务barrier.enter();fos.write((uuid + "完成任务1..." + "\r\n").getBytes());Thread.sleep(1000);fos.write((uuid + "完成任务2..." + "\r\n").getBytes());Thread.sleep(1000);fos.write((uuid + "完成所有任务..." + "\r\n").getBytes());// 6. 离开任务barrier.leave();fos.write((uuid + "退出任务..." + "\r\n").getBytes());fos.close();} catch (Exception e1) {e1.printStackTrace();}// 5. 关闭连接cf.close();}
}

7.2. 运行SameTimeEnterAndLeave.java四次, 没有开始任务, 被阻塞

7.3. 再运行SameTimeEnterAndLeave.java一次

007_Curator框架二相关推荐

  1. Jersey框架二:Jersey对JSON的支持

    Jersey系列文章: Jersey框架一:Jersey RESTful WebService框架简介 Jersey框架二:Jersey对JSON的支持 Jersey框架三:Jersey对HTTPS的 ...

  2. Java开源 J2EE框架(二)

    Java开源 J2EE框架(二) 2007-01-06 12:34 Jofti [Java开源 其它开源项目] Jofti可对在缓存层中(支持EHCache,JBossCache和OSCache)的对 ...

  3. [LINQ2Dapper]最完整Dapper To Linq框架(二)---动态化查询

    目录 [LINQ2Dapper]最完整Dapper To Linq框架(一)---基础查询 [LINQ2Dapper]最完整Dapper To Linq框架(二)---动态化查询 [LINQ2Dapp ...

  4. pytest框架二次开发之自定义注解

    目录 一.背景: 二.闭包与装饰器(可以跳过这一章) 2.1 .什么是闭包 2.2 闭包的用途 2.3 .装饰器(decorator) 三.pytest自定义注解@author 3.1 自定义注解@a ...

  5. 胖虎谈ImageLoader框架(二)

    前言 从学校出来的这半年时间,发现很少有时间可以静下来学习和写博文了,为了保持着学习和分享的习惯,我准备之后每周抽出一部分时间为大家带来一个优秀的Android框架源码阅读后的理解系列博文. 期许:希 ...

  6. Spring框架(二)

    Spring框架(二) 一.控制反转IOC和依赖注入DI 控制反转和依赖注入是对同一件事情的不同描述,从某个方面讲,就是它们描述的角度不同.控制反转是说不需要程序员管理和控制bean,是解耦的目的,而 ...

  7. 利用jspxcms框架二次开发遇到的问题

    发版之后里面的文件丢失问题 要在一个利用jspxcms框架二次开发的项目里添加新功能,拉下来代码加上新功能之后,把项目打成war包,放到服务器上重新启动,后来有别的开发人员告诉我,他以前上传的文件和一 ...

  8. Android Http请求框架二:xUtils 框架网络请求

    一:对Http不了解的请看 Android Http请求框架一:Get 和 Post 请求 二.正文 1.xUtils 下载地址 github 下载地址  : https://github.com/w ...

  9. Android 开源框架 ( 二 ) 基于OkHttp进一步封装的okhttp-utils介绍

    okhttp-utils是张鸿洋是基于OkHttp封装的框架库.实际工作中,使用的不多,对于小型项目的网络请求和文件传输可以考虑直接使用.否则还是基于主流的OkHttp+Retrift+RxJava框 ...

最新文章

  1. 需求编写的几点经验之谈
  2. 9.25 360校招面试题总结? 自己面试 c++后端开发,服务器方向,探索部门。
  3. 大学计算机测试试题,大学计算机基础 excel测试题 求答案~~喵~~
  4. opencv python教程简书_OpenCV-Python教程:27.图像转换
  5. jQuery笔记[1]——jqGrid中实现自定义链接弹出subgrid
  6. [恢]hdu 1407
  7. 【Python 必会技巧】获取字典中(多个)最大值(value)的键(key)
  8. WHAT IS PYTORCH
  9. 基于Boost无锁队列实现的内存池
  10. android画布_Android画布
  11. redis之django-redis
  12. 海量数据存储 - 性能瓶颈 - 解决方案
  13. 9_林业专题图的制作
  14. 计算机设置从光盘启动怎么办,[光盘启动]BIOS设置从光盘光驱启动教程
  15. Drive-by Compromise 术语名词概念
  16. android l usb调试,你居然还不会手机usb调试?5个方法,让你轻松学会设置!
  17. 宁宛 机器人_忠犬机器人3
  18. 思维题:三个箱子,一个只装苹果,一个只装橙,另一个装苹果和橙,请问?
  19. oracle 允许级联删除,oracle系列--级联删除和级联更新
  20. java背包_java-背包的实现

热门文章

  1. Java实现单链表的逆转置
  2. HDU_1072_Nightmare题解
  3. 如何选择WinPE版本?-日常IT维护必备工具WinPE
  4. MPLS的几种备份方式——Vecloud
  5. 成功部署SD-WAN策略应注意的几个事项—Vecloud微云
  6. linux 系统安装配置 zabbix服务(源码安装)
  7. cocos2dx3.0-tinyxml在Android环境下解析xml失败的问题
  8. C#中自定义类数组和结构数组的使用
  9. 累加求和 Accumulate.java
  10. hdu 4417 Super Mario 划分树+二分