本博文的主要内容有

   一、zookeeper编程入门系列之利用zookeeper的临时节点的特性来监控程序是否还在运行

    二、zookeeper编程入门系列之zookeeper实现分布式进程监控

   三、zookeeper编程入门系列之zookeeper实现分布式共享锁

  这里,推荐用下面的eclipse版本(当然你若也有myeclipse,请忽视我这句话)

Group Id:zhouls.bigdata

Artifact Id:zkDemo

Package:zhouls.bigdata.zkDemo

  将默认的jdk,修改为jdk1.7

  修改默认的pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>zhouls.bigdata</groupId><artifactId>zkDemo</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>zkDemo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency></dependencies>
</project>

  修改后的pom.xml为

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>zhouls.bigdata</groupId><artifactId>zkDemo</artifactId><version>0.0.1-SNAPSHOT</version><packaging>jar</packaging><name>zkDemo</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.12</version><scope>test</scope></dependency><!-- 此版本的curator操作的zk是3.4.6版本 --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>2.10.0</version></dependency></dependencies>
</project>

  junit是单元测试。

  也许,大家的jdk1.7会报错误,那就改为jdk1.8。

一、zookeeper编程入门系列之利用zookeeper的临时节点的特性来监控程序是否还在运行

  写一个TestCurator.java

  怎么通过Curator连接到zookeeper官网,其实是有固定的。

  这打开需要好几分钟的时间,里面会有示范代码,教我们怎么连接zookeeper。

  我这里的zookeeper集群是master(192.168.80.145)、slave1(192.168.80.146)和slave2(192.168.80.147)。

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] 

  现在,我想通过Curator来连接到zookeeper集群,并在里面创建临时节点。

  这里,永久节点为monitor、临时节点为test123。(当然,大家可以自行去命名)(同时,大家也可以通过命令行方式来创建,我这里就是以代码api形式来创建了)

    比如,这样,monitor是父节点(作为永久节点),test123是临时节点。

    而现在,是monitor都没有,它不会给我们一次性创建完。

  除非,大家在命令行里先创建好monitor节点,之后,然后上述代码可以操作成功。否则,就需如下修改代码。

package zhouls.bigdata.zkDemo;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;/*** * @author zhouls**/
public class TestCurator {@Testpublic void testName() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/test123");//指定节点名称
            }}

  可以看到成功monitor生成,其实啊,/monitor/test123节点也是有的。(只是中间又消失了)

  为什么会中间消失了呢?是因为,test123是临时节点。创建完之后,它就会消失了。

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[]

  那么,我想看,怎么用代码来实现呢?

  增加以下代码

  此时的代码是TestCurator.java

package zhouls.bigdata.zkDemo;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;/*** * @author zhouls**/
public class TestCurator {@Testpublic void testName() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/test123");//指定节点名称 while (true) {;}}}

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 3] 

  然后,我这边,把代码,来停掉,则它就会消失了。

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[]
[zk: localhost:2181(CONNECTED) 3] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 4] ls /monitor
[]
[zk: localhost:2181(CONNECTED) 5] 

  好的,那么,现在,又有一个疑问出来了,在往monitor节点里,注册节点如test123,那么,我怎么知道是哪一台的呢?则此时,需要做如下修改

  此刻的代码如下TestCurator.java

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;/*** * @author zhouls**/
public class TestCurator {@Testpublic void testName() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/" + ip);//指定节点名称while (true) {;}}}

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[]
[zk: localhost:2181(CONNECTED) 3] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 4] ls /monitor
[]
[zk: localhost:2181(CONNECTED) 5] ls /monitor
[169.254.28.160]
[zk: localhost:2181(CONNECTED) 6] 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 3] ls /monitor
[169.254.28.160]
[zk: localhost:2181(CONNECTED) 4] 

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 1] ls /
[monitor, hbase, zookeeper, admin, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 2] ls /monitor
[test123]
[zk: localhost:2181(CONNECTED) 3] ls /monitor
[169.254.28.160]
[zk: localhost:2181(CONNECTED) 4] 

  这个ip怎么不是我集群里的ip呢?是哪里的???

  原来是这里的

  因为,我是在test测试,所以,拿到的是windows本地的ip地址。

  如果,放在mian去测试,则就是拿到集群里的ip地址了。

  至此,我们是用临时节点的这个特性,来监控程序有没有运行的。并不是说临时节点就是来只做这个事!!!

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;/*** * @author zhouls**/
public class TestCurator {@Testpublic void testName() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/" + ip);//指定节点名称while (true) {;}//或者//        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
//                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面
//                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
//                .forPath("/monitor/ + ip");// 指定节点名称
//        while (true) {
//            ;
//        }
}}

  可以将这个,写入到入口类或构造函数里。每次开始前都调用执行。以此来监控程序是否还在运行,非常重要!

二、zookeeper编程入门系列之zookeeper实现分布式进程监控

  思路: 即在/下,先注册一个监视器,即monitor节点(为永久节点)

         然后,监视monitor节点下面的所有子节点(为临时节点)

  概念见

Zookeeper概念学习系列之zookeeper实现分布式进程监控

  先执行

  然后执行

  ZkNodeWacter.java

package zhouls.bigdata.zkDemo;import java.util.ArrayList;
import java.util.List;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
/*** 这个监视器需要一直在后台运行,所以相当于是一个死循环的进程* @author zhouls**/
public class ZkNodeWacter implements Watcher {CuratorFramework client;List<String> childrenList = new ArrayList<String>();public ZkNodeWacter() {//在启动监视器的时候,链接到zkRetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;int connectionTimeoutMs = 3000;client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端//监视monitor节点下面的所有子节点(为临时节点)try {//在monitor目录上注册一个监视器,这个监视器只能使用一次childrenList = client.getChildren().usingWatcher(this).forPath("/monitor");} catch (Exception e) {e.printStackTrace();}}/*** 当monitor节点下面的子节点发生变化的时候,这个方法会被调用到*/public void process(WatchedEvent event) {System.out.println("我被调用了:"+event);try {//重复注册监视器List<String> newChildrenList = client.getChildren().usingWatcher(this).forPath("/monitor");//先遍历原始的子节点listfor (String ip : childrenList) {if(!newChildrenList.contains(ip)){System.out.println("节点消失:"+ip);}}for (String ip : newChildrenList) {if(!childrenList.contains(ip)){System.out.println("新增节点:"+ip);}}childrenList = newChildrenList;} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {ZkNodeWacter spiderWacter = new ZkNodeWacter();spiderWacter.start();//表示需要开启一个监视器
    }private void start() {while(true){;}}}

  TestCurator.java

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;/*** * @author zhouls**/
public class TestCurator {@Testpublic void testName() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/" + ip);//指定节点名称while (true) {;}//        client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建
//                .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面
//                .withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息
//                .forPath("/monitor/ + ip");// 指定节点名称
//        while (true) {
//            ;
//        }
}}

三、zookeeper编程入门系列之zookeeper实现分布式共享锁

  这里,一般,都是创建临时有序子节点,怎么来创建,不难

  说到协调,我首先想到的是北京很多十字路口的交通协管,他们手握着小红旗,指挥车辆和行人是不是可以通行。如果我们把车辆和行人比喻成运行在计算机中的单元(线程),那么这个协管是干什么的?很多人都会想到,这不就是锁么?对,在一个并发的环境里,我们为了避免多个运行单元对共享数据同时进行修改,造成数据损坏的情况出现,我们就必须依赖像锁这样的协调机制,让有的线程可以先操作这些资源,然后其他线程等待。对于进程内的锁来讲,我们使用的各种语言平台都已经给我们准备很多种选择。

  

  TestCurator.java

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;/*** * @author zhouls**/
public class TestCurator {@Testpublic void test1() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/" + ip);//指定节点名称while (true) {;}}@Testpublic void test2() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/");// 指定节点名称while (true) {;}}}

  DistributedLock.java

package zhouls.bigdata.zkDemo;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;/**DistributedLock lock = null;try {lock = new DistributedLock("127.0.0.1:2181","test");lock.lock();//do something...} catch (Exception e) {e.printStackTrace();} finally {if(lock != null)lock.unlock();}//lock.closeZk();//在cleanup方法中添加**/
public class DistributedLock implements Lock, Watcher{private ZooKeeper zk;private String root = "/locks";//根private String lockName;//竞争资源的标志private String waitNode;//等待前一个锁private String myZnode;//当前锁private CountDownLatch latch;//计数器private int sessionTimeout = 30000;//30秒private int waitTimeout = 30000;//等待节点失效最大时间 30秒private List<Exception> exception = new ArrayList<Exception>();/*** 创建分布式锁,使用前请确认zkConnString配置的zookeeper服务可用* @param zkConnString 127.0.0.1:2181* @param lockName 竞争资源标志,lockName中不能包含单词lock*/public DistributedLock(String zkConnString, String lockName){this.lockName = lockName;// 创建一个与服务器的连接try {zk = new ZooKeeper(zkConnString, sessionTimeout, this);Stat stat = zk.exists(root, false);if(stat == null){// 创建根节点zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); }} catch (IOException e) {exception.add(e);} catch (KeeperException e) {exception.add(e);} catch (InterruptedException e) {exception.add(e);}}/*** zookeeper节点的监视器*/public void process(WatchedEvent event) {if(this.latch != null) {  this.latch.countDown();  }}/*** 获取锁*/public void lock() {if(exception.size() > 0){throw new LockException(exception.get(0));}try {if(this.tryLock()){System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");return;}else{waitForLock(waitNode, waitTimeout);//等待获取锁
            }} catch (KeeperException e) {throw new LockException(e);} catch (InterruptedException e) {throw new LockException(e);} }/*** 尝试获取锁*/public boolean tryLock() {try {String splitStr = "_lock_";if(lockName.contains(splitStr))throw new LockException("lockName can not contains \\u000B");//创建临时有序子节点myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.err.println(myZnode + " is created ");//取出所有子节点List<String> subNodes = zk.getChildren(root, false);//取出所有lockName的锁List<String> lockObjNodes = new ArrayList<String>();for (String node : subNodes) {String _node = node.split(splitStr)[0];if(_node.equals(lockName)){lockObjNodes.add(node);}}//对所有节点进行默认排序,从小到大
            Collections.sort(lockObjNodes);System.out.println(myZnode + "==" + lockObjNodes.get(0));if(myZnode.equals(root+"/"+lockObjNodes.get(0))){//如果是最小的节点,则表示取得锁return true;}//如果不是最小的节点,找到比自己小1的节点String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);//获取比当前节点小一级的节点(Collections.binarySearch(lockObjNodes, subMyZnode):获取当前节点的角标)waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);} catch (KeeperException e) {throw new LockException(e);} catch (InterruptedException e) {throw new LockException(e);}return false;}public boolean tryLock(long time, TimeUnit unit) {try {if(this.tryLock()){return true;}return waitForLock(waitNode,time);} catch (Exception e) {e.printStackTrace();}return false;}/*** 等待获取锁* @param lower :等待的锁* @param waitTime 最大等待时间* @return* @throws InterruptedException* @throws KeeperException*/private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {Stat stat = zk.exists(root + "/" + lower,true);//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听if(stat != null){System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);this.latch = new CountDownLatch(1);this.latch.await(waitTime, TimeUnit.MILLISECONDS);this.latch = null;}return true;}/*** 取消锁监控*/public void unlock() {try {System.out.println(Thread.currentThread().getId()+",unlock " + myZnode);zk.delete(myZnode,-1);myZnode = null;//zk.close();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}/*** 关闭zk链接*/public void closeZk(){try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}public void lockInterruptibly() throws InterruptedException {this.lock();}public Condition newCondition() {return null;}/*** 自定义异常信息* @author lenovo**/public class LockException extends RuntimeException {private static final long serialVersionUID = 1L;public LockException(String e){super(e);}public LockException(Exception e){super(e);}}
}

  如有两个线程, 两个线程要同时到mysql中更新一条数据, 对数据库中的数据进行累加更新。由于在分布式环境下, 这两个线程可能存在于不同的机器上的不同jvm进程中, 所以这两个线程的关系就是垮主机跨进程, 使用java中的synchronized锁是搞不定的。

  概念,见

Zookeeper概念学习系列之zookeeper实现分布式共享锁

  这里的节点也可以为lock。

  先执行以下的test3,再执行test4

[zk: localhost:2181(CONNECTED) 9] ls /
[monitor, hbase, zookeeper, admin, lock, consumers, config, storm, brokers, controller_epoch]
[zk: localhost:2181(CONNECTED) 10] ls /lock
[169.254.28.160]
[zk: localhost:2181(CONNECTED) 11] 

  然后,再执行test4

  然后,再执行下test4,试试,看看有什么变化

  可以看到,在增加。

 

  总的代码是TestCurator.java

package zhouls.bigdata.zkDemo;import java.net.InetAddress;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.junit.Test;/*** * @author zhouls**/
public class TestCurator {@Testpublic void test1() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/" + ip);//指定节点名称while (true) {;}}@Testpublic void test2() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/monitor/");// 指定节点名称while (true) {;}}@Testpublic void test3() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL)//指定节点类型,临时节点.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/lock/" + ip);//指定节点名称while (true) {;}}@Testpublic void test4() throws Exception {// 1000:表示curator链接zk的时候超时时间是多少 3:表示链接zk的最大重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);String connectString = "master:2181,slave1:2181,slave2:2181";int sessionTimeoutMs = 5000;// 这个值只能在4000-40000ms之间 表示链接断掉之后多长时间临时节点会消失int connectionTimeoutMs = 3000;// 获取链接的超时时间CuratorFramework client = CuratorFrameworkFactory.newClient(connectString, sessionTimeoutMs, connectionTimeoutMs,retryPolicy);client.start();// 开启客户端
InetAddress localhost = InetAddress.getLocalHost();String ip = localhost.getHostAddress();client.create().creatingParentsIfNeeded()// 如果父节点不存在则创建.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)// 指定节点类型,注意:临时节点必须在某一个永久节点下面.withACL(Ids.OPEN_ACL_UNSAFE)// 设置节点权限信息.forPath("/lock/");// 指定节点名称while (true) {;}}}

  DistributedLock.java

package zhouls.bigdata.zkDemo;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;/**DistributedLock lock = null;try {lock = new DistributedLock("127.0.0.1:2181","test");lock.lock();//do something...} catch (Exception e) {e.printStackTrace();} finally {if(lock != null)lock.unlock();}//lock.closeZk();//在cleanup方法中添加**/
public class DistributedLock implements Lock, Watcher{private ZooKeeper zk;private String root = "/locks";//根private String lockName;//竞争资源的标志private String waitNode;//等待前一个锁private String myZnode;//当前锁private CountDownLatch latch;//计数器private int sessionTimeout = 30000;//30秒private int waitTimeout = 30000;//等待节点失效最大时间 30秒private List<Exception> exception = new ArrayList<Exception>();/*** 创建分布式锁,使用前请确认zkConnString配置的zookeeper服务可用* @param zkConnString 127.0.0.1:2181* @param lockName 竞争资源标志,lockName中不能包含单词lock*/public DistributedLock(String zkConnString, String lockName){this.lockName = lockName;// 创建一个与服务器的连接try {zk = new ZooKeeper(zkConnString, sessionTimeout, this);Stat stat = zk.exists(root, false);if(stat == null){// 创建根节点zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); }} catch (IOException e) {exception.add(e);} catch (KeeperException e) {exception.add(e);} catch (InterruptedException e) {exception.add(e);}}/*** zookeeper节点的监视器*/public void process(WatchedEvent event) {if(this.latch != null) {  this.latch.countDown();  }}/*** 获取锁*/public void lock() {if(exception.size() > 0){throw new LockException(exception.get(0));}try {if(this.tryLock()){System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");return;}else{waitForLock(waitNode, waitTimeout);//等待获取锁
            }} catch (KeeperException e) {throw new LockException(e);} catch (InterruptedException e) {throw new LockException(e);} }/*** 尝试获取锁*/public boolean tryLock() {try {String splitStr = "_lock_";if(lockName.contains(splitStr))throw new LockException("lockName can not contains \\u000B");//创建临时有序子节点myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.err.println(myZnode + " is created ");//取出所有子节点List<String> subNodes = zk.getChildren(root, false);//取出所有lockName的锁List<String> lockObjNodes = new ArrayList<String>();for (String node : subNodes) {String _node = node.split(splitStr)[0];if(_node.equals(lockName)){lockObjNodes.add(node);}}//对所有节点进行默认排序,从小到大
            Collections.sort(lockObjNodes);System.out.println(myZnode + "==" + lockObjNodes.get(0));if(myZnode.equals(root+"/"+lockObjNodes.get(0))){//如果是最小的节点,则表示取得锁return true;}//如果不是最小的节点,找到比自己小1的节点String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);//获取比当前节点小一级的节点(Collections.binarySearch(lockObjNodes, subMyZnode):获取当前节点的角标)waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);} catch (KeeperException e) {throw new LockException(e);} catch (InterruptedException e) {throw new LockException(e);}return false;}public boolean tryLock(long time, TimeUnit unit) {try {if(this.tryLock()){return true;}return waitForLock(waitNode,time);} catch (Exception e) {e.printStackTrace();}return false;}/*** 等待获取锁* @param lower :等待的锁* @param waitTime 最大等待时间* @return* @throws InterruptedException* @throws KeeperException*/private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {Stat stat = zk.exists(root + "/" + lower,true);//判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听if(stat != null){System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);this.latch = new CountDownLatch(1);this.latch.await(waitTime, TimeUnit.MILLISECONDS);this.latch = null;}return true;}/*** 取消锁监控*/public void unlock() {try {System.out.println(Thread.currentThread().getId()+",unlock " + myZnode);zk.delete(myZnode,-1);myZnode = null;//zk.close();} catch (InterruptedException e) {e.printStackTrace();} catch (KeeperException e) {e.printStackTrace();}}/*** 关闭zk链接*/public void closeZk(){try {zk.close();} catch (InterruptedException e) {e.printStackTrace();}}public void lockInterruptibly() throws InterruptedException {this.lock();}public Condition newCondition() {return null;}/*** 自定义异常信息* @author lenovo**/public class LockException extends RuntimeException {private static final long serialVersionUID = 1L;public LockException(String e){super(e);}public LockException(Exception e){super(e);}}
}

  这个代码里,大家可以改为自己的集群,如我的是master:2181,slave1:2181,slave2:2181

本文转自大数据躺过的坑博客园博客,原文链接:http://www.cnblogs.com/zlslch/p/7242381.html,如需转载请自行联系原作者

zookeeper编程入门系列之zookeeper实现分布式进程监控和分布式共享锁(图文详解)...相关推荐

  1. Hadoop入门(二)——VMware虚拟网络设置+Windows10的IP地址配置+CentOS7静态IP设置(图文详解步骤2021)

    Hadoop入门(二)--VMware虚拟网络设置+Windows10的IP地址配置+CentOS7静态IP设置(图文详解步骤2021) 之前在上一篇文章中讲述了 CentOS7下载+VM上安装(手动 ...

  2. Hadoop入门(四)——模板虚拟机环境准备(图文详解步骤2021)

    Hadoop入门(四)--模板虚拟机环境准备(图文详解步骤2021) 系列文章传送门 这个系列文章传送门: Hadoop入门(一)--CentOS7下载+VM上安装(手动分区)图文步骤详解(2021) ...

  3. 大数据学习系列之七 ----- Hadoop+Spark+Zookeeper+HBase+Hive集群搭建 图文详解

    引言 在之前的大数据学习系列中,搭建了Hadoop+Spark+HBase+Hive 环境以及一些测试.其实要说的话,我开始学习大数据的时候,搭建的就是集群,并不是单机模式和伪分布式.至于为什么先写单 ...

  4. C++编程入门系列之十四(类与对象:构造函数和析构函数)

    C++编程入门系列之十四(类与对象:构造函数和析构函数) 鸡啄米上一节中给大家讲解了类的声明.成员的访问控制和对象,今天鸡啄米给大家讲C++编程入门时同样必须掌握的构造函数和析构函数.从上一讲开始已经 ...

  5. Python编程入门系列课程

    原文:https://mc.dfrobot.com.cn/thread-309195-1-1.html Python编程入门系列课程--教程目录: Python编程入门系列课程--01 智能绘星 Py ...

  6. 【备战春招/秋招系列】美团Java面经总结进阶篇 (附详解答案)

    <!-- MarkdownTOC --> 一 消息队列MQ的套路 1.1 介绍一下消息队列MQ的应用场景/使用消息队列的好处 ①.通过异步处理提高系统性能 ②.降低系统耦合性 1.2 那么 ...

  7. Disconf 学习系列之全网最详细的最新稳定Disconf 搭建部署(基于Windows7 / 8 / 10)(图文详解)...

    不多说,直接上干货! 工作环境以及安装依赖软件 Zookeeper-3.4.8 Disconf 2.6.36 Nginx 1.9.9(见如下博文的phpstudy) redis JDK1.8.0_66 ...

  8. 【备战春招/秋招系列】美团Java面经总结终结篇 (附详解答案)

    该文已加入开源项目:JavaGuide(一份涵盖大部分Java程序员所需要掌握的核心知识的文档类项目,Star 数接近 14 k).地址:https://github.com/Snailclimb.. ...

  9. (转)C#进阶系列——WebApi 接口返回值不困惑:返回值类型详解

    原文链接:https://www.cnblogs.com/landeanfen/p/5501487.html 阅读目录 一.void无返回值 二.IHttpActionResult 1.Json(T ...

最新文章

  1. [转]Ext Grid控件的配置与方法
  2. 菜鸟实时数仓2.0进阶之路
  3. c++ 异步回调_知道Java中的回调机制吗?
  4. 多协议注入工具t50
  5. CentOS7解决配置静态IP还是会出现动态IP地址的问题
  6. Markdown 如何实现空行、空格?
  7. LeetCode Range Sum Query Immutable
  8. 一个封装比较完整的FTP类——clsFTP
  9. MSSQL中使用CASE函数来灵活返回结果
  10. 必知必会JVM垃圾回收——对象搜索算法与回收算法
  11. 使用 ExMerge.exe 工具从邮箱中删除感染病毒的邮件
  12. HTML5中最重要的技术点有哪些?
  13. 宏脉系统怎么改服务器地址大全,宏脉系统使用手册大全新.doc
  14. 微信小程序 手写签名_【微信小程序canvas】实现小程序手写板用户签名(附代码)...
  15. 大学生计算机知识竞赛,大学生计算机基础知识竞赛题库(试题附答案).docx
  16. matlab实现图像滤波——高斯滤波
  17. 如何把视频生成二维码,手机扫一扫就可以看
  18. AutoSar之微控制器抽象层MCAL
  19. 关于win7阉割版pycharm pyttsx3库无法使用解决方法
  20. wireshark分析oracle报错,Linux下抓包工具tcpdump以及分析包的工具wireshark

热门文章

  1. 《中国AI算力发展评估报告》发布!北京只能排第二,推理需求猛增
  2. 由李飞飞领导,斯坦福以人为本AI学院正式成立,比尔·盖茨来捧场
  3. 2018热度上升最快的编程工具是什么?TensorFlow只排第11
  4. 腾讯叮当智能屏发布,主打视听体验、海量内容、儿童模式
  5. 李彦宏说互联网思维已过时,AI可以根本上变革交通、城市、农业和医疗
  6. 红芯事件追踪:官方致歉承认基于开源架构;创始人履历被指夸大
  7. JBoss Tomcat 对 JSP 的泛型支持
  8. 使用auditctl追踪文件变化
  9. oracle dblink使用
  10. 亲自动手用HTK实现YES NO孤立词识别