1. Curator简介

Curator是Netfix公司开源的一套Zookeeper客户端。Curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括重连、反复注册Watcher和NodeExistsException异常等。目前已经成为Apache的顶级项目,是全世界范围内使用最广泛的Zookeeper客户端

Curator Maven依赖

  <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-framework --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.1.0</version></dependency>

2.Curator基本操作

2.1. 创建会话

1. 使用CuraotrFrameWworkFactory这个工厂类的两个静态方法实现

public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs,RetryPolicy retryPolicy)

通过调用CuratorFramework中的start()方法启动会话

RetryPolicy接口参数说明:

下面是使用Curator创建会话的实例

//使用curator来创建一个ZooKeeper客户端
public class CreateSession {public static void main(String[] args) throws Exception{RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client =CuratorFrameworkFactory.newClient(Constant.ZK_CONNECT_STRING,Constant.ZK_SESSION_TIMEOUT,Constant.ZK_CONNECT_TIMEOUT,retryPolicy);client.start();Thread.sleep(Integer.MAX_VALUE);}
}

ExponentialBackoffRetry是Curator提供的默认几种重试策略之一。其构造方法如下:

public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)public ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)

可以看出随着重试次数的增加,计算出sleep的时间就会越来越大。如果该sleep时间在maxSleeps的范围之内,那么就使用该sleep时间,否则就使用maxSleepMs。

CuratorFrameWorkFactory工厂在创建一个客户端CuratorFrameWork的实例之后,实质上没有完成会话的创建,而是需要调用start()方法之后才能完成会话的创建工作。

2. 使用Fluent风格的API接口创建会话

Curator提供的API接口还实现了Fluent的风格

//使用Fluent风格的API接口来创建一个ZooKeeper客户端
public class CreateSessionFluent {public static void main(String[] args) throws Exception{RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client =CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(retryPolicy).build();client.start();Thread.sleep(Integer.MAX_VALUE);}
}

3. 使用Curator创建含隔离命令空间的会话

为了实现不同的Zookeeper业务之间的隔离,往往会为每个业务分配一个独立的命名空间,即指定一个Zookeeper根路径。该客户端对Zookeeper的所有操作都是针对该命令空间的相对目录进行的。

//使用curator来创建一个含隔离命名空间的ZooKeeper客户端
public class CreateSessionNamespace {public static void main(String[] args) throws Exception{RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client =CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(retryPolicy).namespace("base").build();client.start();Thread.sleep(Integer.MAX_VALUE);}
}

2.2 创建节点

Curator提供了一系列Fluent风格的接口

1. 创建一个节点。初始内容为空

client.create().forPath(path)

如果没有设置节点的属性,那么Curator默认创建的是持久节点,内容默认是空。

2.创建一个节点,附带初始内容

client.create().forPath(path,"init".getBytes())

可以在创建节点的时候写入初始节点的内容。

3.创建一个临时节点,初始内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath(path)

4.创建一个临时节点,并自动递归创建父节点

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path)

在使用Curator之后,通过调用creatingParentsIfNeeded接口,Curaotr能够自动递归地创建所需要的父节点。

//使用Curator创建节点
public class CreateNode {static String path = "/zk-book/c1";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());}
}

2.3 删除节点

Curator提供了一系列Fluent风格的接口

1.删除一个节点,并且递归删除其所有父节点

client.delete().deletingChildrenIfNeed().forPath(path)

2.删除一个节点,强制指定版本进行删除

client.delete().withVersion(version).forPath(path)

3.删除一个节点,强制保证删除

client.delete().guranteed().forPath(path)
//使用Curator删除节点
public class DelData {static String path = "/zk-book/c1";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());Stat stat = new Stat();client.getData().storingStatIn(stat).forPath(path);client.delete().deletingChildrenIfNeeded().withVersion(stat.getVersion()).forPath(path);}
}

在Zookeeper客户端使用过程中,可能会碰到:客户端执行一个删除节点操作,但是由于网络原因,导致删除操作失败。对于这个异常,在有些场景下是致命的,如“Master选举”————在这个场景下,Zookeeper通常是通过节点的删除和创建来实现的。

如果我们调用了guranteed()方法,那么客户端碰到上面这些网络异常的问题之后,会记录这些失败的删除操作,只要客户端会话有效,那么其就会在后台进行多次重试,直达节点删除成功。

2.4 读取数据

1.读取一个节点的数据

client.getData().forPath(path)

2.读取一个节点的数据内容,同时获得到该节点的Stat

client.getData().storingStatIn(stat).forPath(path)

Curator通过传入一个旧的stat变量的方式来存储服务端返回的最新的节点信息。

//使用Curator获取数据内容
public class GetData {static String path = "/zk-book";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());Stat stat = new Stat();System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));}
}

2.5 更新数据

1.更新一个节点的数据

client.setData().forPath(path,data)

调用该接口之后,会返回一个stat对象

2. 更新一个节点数据内容,强制指定版本进行更新

client.setData().withVersion(version).forPath(path,data)

注意withVersion接口就是实现CAS(Compare And Swap)的,version通常是从一个旧的stat对象中获得的

//使用Curator更新数据内容
public class SetData {static String path = "/zk-book";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();client.delete().deletingChildrenIfNeeded().forPath( path );client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());Stat stat = new Stat();client.getData().storingStatIn(stat).forPath(path);System.out.println("Success set node for : " + path + ", new version: "+ client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());try {client.setData().withVersion(stat.getVersion()).forPath(path);} catch (Exception e) {System.out.println("Fail set node due to " + e.getMessage());}}
}

运行结果:

Success set node for : /zk-book, new version: 1
Fail set node due to KeeperErrorCode = BadVersion for /zk-book

2.6 异步接口

Caurator引入了BackgroundCallback接口,用来处理异步接口调用之后服务端返回的结果信息。其接口定义如下:

public interface BackgroundCallback
{/*** Called when the async background operation completes** @param client the client* @param event operation result details* @throws Exception errors*/public void processResult(CuratorFramework client, CuratorEvent event) throws Exception;
}

BackgroundCallback接口只有一个processResult方法,从注释中我们可以看出,该方法会在操作完成后被异步调用。

CuratorEvent中定义了Zookeeper服务器发送到客户端的一系列事件参数,其中比较重要的就是事件类型和响应码两个参数。

响应码(int)
响应码用于表示事件的结果状态,所有响应码被定义在org.apache.zookeeper.KeeperException.Code类中,比较常见的响应码有0(OK)、-4(ConnectionLoss)、-110(NodeExists)和-112(SessionExpired)等,分别表示接口调用成功,客户端与服务器端连接以及断开、指定节点已存在和会话已经过期等。

在程序中,我们可以通过以下API来进行异步操作:


public interface Backgroundable<T>
{/*** Perform the action in the background** @return this*/public T inBackground();/*** Perform the action in the background** @param context context object - will be available from the event sent to the listener* @return this*/public T inBackground(Object context);/*** Perform the action in the background** @param callback a functor that will get called when the operation has completed* @return this*/public T inBackground(BackgroundCallback callback);/*** Perform the action in the background** @param callback a functor that will get called when the operation has completed* @param context context object - will be available from the event sent to the listener* @return this*/public T inBackground(BackgroundCallback callback, Object context);/*** Perform the action in the background** @param callback a functor that will get called when the operation has completed* @param executor executor to use for the background call* @return this*/public T inBackground(BackgroundCallback callback, Executor executor);/*** Perform the action in the background** @param callback a functor that will get called when the operation has completed* @param context context object - will be available from the event sent to the listener* @param executor executor to use for the background call* @return this*/public T inBackground(BackgroundCallback callback, Object context, Executor executor);
}

在这个API中我们重点来逐一这个executor这个参数。在Zookeeper中,所有的异步事件通知都是由EventThread这个线程来处理的——————EventThread这个线程用于处理所有的事件通知。EventThread的“串行通知机制”在绝大部分应用场景下能够保证对事件处理的顺序性,但这个特性也有弊端,就是一旦碰上复杂的处理单元,就会消耗大部分时间,从而影响对其他事件的处理效率。

//使用Curator的异步接口
public class Create_Node_Background_Sample {static String path = "/zk-book";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();static CountDownLatch semaphore = new CountDownLatch(2);static ExecutorService tp = Executors.newFixedThreadPool(2);public static void main(String[] args) throws Exception {client.start();System.out.println("Main thread: " + Thread.currentThread().getName());// 此处传入了自定义的Executorclient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");System.out.println("Thread of processResult: " + Thread.currentThread().getName());semaphore.countDown();}}, tp).forPath(path, "init".getBytes());// 此处没有传入自定义的Executorclient.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");System.out.println("Thread of processResult: " + Thread.currentThread().getName());semaphore.countDown();}}).forPath(path, "init".getBytes());semaphore.await();tp.shutdown();}
}

运行结果:

event[code: 0, type: CREATE]
Thread of processResult: main-EventThread
event[code: -110, type: CREATE]
Thread of processResult: pool-4-thread-1

3. 典型使用场景

Curator不仅为开发者提供了更为方便的API接口,而且还提供了一些典型场景的使用参考。这些参考都在recipes包中。

<!-- https://mvnrepository.com/artifact/org.apache.curator/curator-recipes -->
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.1.0</version>
</dependency>

3.1 事件监听

Curator引入了Cache来实现Zookeeper服务器端事件监听。Cache是Curator中对事件的包装,其对事件的监听其实可以开做是一个本地缓存视图和远程Zookeeper视图的对比过程。同时Curator能够自动为开发人员处理反复注册,从而大大简化了原生API的过程

Curator中分为两类事件监听:

  • 节点监听
  • 子节点监听

NodeCache

NodeCache用于监听指定的Zookeeper数据节点变化,其构造方法如下:

public NodeCache(CuratorFramework client, String path)public NodeCache(CuratorFramework client, String path, boolean dataIsCompressed)

NodeCache构造方法参数说明如下:

NodCache定义了事件处理的回调接口:

public interface NodeCacheListener
{/*** Called when a change has occurred*/public void     nodeChanged() throws Exception;
}
public class NodeCacheSample {static String path = "/zk-book/nodecache";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());final NodeCache cache = new NodeCache(client,path,false);cache.start(true);cache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {System.out.println("Node data update, new data: " + new String(cache.getCurrentData().getData()));}});client.setData().forPath( path, "u".getBytes() );Thread.sleep( 1000 );client.delete().deletingChildrenIfNeeded().forPath( path );Thread.sleep( Integer.MAX_VALUE );}
}

运行结果:

Node data update, new data: u

我们首先构造了一个NodeCache实例,然后调用了该实例的start(true)方法。这个方法有一个boolean类型的参数,默认是false,如果设置为true,那么NodeCache在第一次启动的时候就会立刻从Zookeeper上读取对应的数据所在的节点的数据内容,并保存在Cache中。

NodeCache不仅可以用于监听节点内容的创建,还能监听指定节点是否存在。如果原本节点不存在,那么在Cache就会在节点被创建后触发NodeCacheListener。但是如果该节点被删除了,那么Curator就不能触发NodeCacheListener。

PathChildrenCache

PathChildrenCache用于指定Zookeeper数据节点的子节点变化情况。

PathChildrenCache有如下几个构造方法:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, ThreadFactory threadFactory)public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, ThreadFactory threadFactory)public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final ExecutorService executorService)public PathChildrenCache(CuratorFramework client, String path, boolean cacheData, boolean dataIsCompressed, final CloseableExecutorService executorService)

PathChildrenCache构造参数的使用说明:


PathChildrenCache回调函数接口定义:

public interface PathChildrenCacheListener
{/*** Called when a change has occurred** @param client the client* @param event describes the change* @throws Exception errors*/public void     childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception;
}

当指定节点的子节点发生变化时,就会回调该方法。PathChildrenCacheEvent类中定义了所有的事件类型,主要包括新增子节点(CHILD_ADDED)、子节点数据变更(CHILD_UPDATED)和子节点删除(CHILD_DELETE)三类。


public class PathChildrenCacheSample {static String path = "/zk-book";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).retryPolicy(new ExponentialBackoffRetry(1000, 3)).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).build();public static void main(String[] args) throws Exception {client.start();PathChildrenCache cache = new PathChildrenCache(client, path, true);cache.start(StartMode.POST_INITIALIZED_EVENT);cache.getListenable().addListener(new PathChildrenCacheListener() {public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {switch (event.getType()) {case CHILD_ADDED:System.out.println("CHILD_ADDED," + event.getData().getPath());break;case CHILD_UPDATED:System.out.println("CHILD_UPDATED," + event.getData().getPath());break;case CHILD_REMOVED:System.out.println("CHILD_REMOVED," + event.getData().getPath());break;default:break;}}});client.create().withMode(CreateMode.PERSISTENT).forPath(path);Thread.sleep( 1000 );client.create().withMode(CreateMode.PERSISTENT).forPath(path+"/c1");Thread.sleep( 1000 );client.delete().forPath(path+"/c1");Thread.sleep( 1000 );client.delete().forPath(path);Thread.sleep(Integer.MAX_VALUE);}
}

运行结果:

CHILD_ADDED,/zk-book/c1
CHILD_REMOVED,/zk-book/c1

对/zk-book节点进行了子节点变更事件的监听,一旦该节点进行新增/删除子节点,或者子节点数据变更,就会回调PathChildrenCacheListener,并根据对应的事件类型进行相关的处理。

3.2 Master选举

在分布式系统中,经常会碰到这样的场景:对于一个复杂的任务,仅需要从集群中选举出一台进行处理即可。诸如此类的分布式问题,我们统称为“Master”选举问题。

借助Zookeeper,我们可以较为方便地实现Master选举的功能,主要思路为下:

选举一个根节点,例如/master_select,多台机器同时向该节点创建一个子节点/master_select/lock,利用Zookeeper特性,最终只有一台机器能够创建成果,成功的那台机器就作为Master。

public class Recipes_MasterSelect {static String master_path = "/curator_recipes_master_path";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(Constant.ZK_SESSION_TIMEOUT).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main( String[] args ) throws Exception {client.start();LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {public void takeLeadership(CuratorFramework client) throws Exception {System.out.println("成为Master角色");Thread.sleep( 3000 );System.out.println( "完成Master操作,释放Master权利" );}});selector.autoRequeue();selector.start();Thread.sleep( Integer.MAX_VALUE );}
}

运行结果:

完成Master操作,释放Master权利
成为Master角色

在创建LeaderSelector实例的时候,还会传入一个监听器,LeaderSelectListenerAdapter。该回调监听器的定义如下:

public interface LeaderSelectorListener extends ConnectionStateListener
{/*** Called when your instance has been granted leadership. This method* should not return until you wish to release leadership** @param client the client* @throws Exception any errors*/public void         takeLeadership(CuratorFramework client) throws Exception;
}public abstract class LeaderSelectorListenerAdapter implements LeaderSelectorListener
{@Overridepublic void stateChanged(CuratorFramework client, ConnectionState newState){if ( client.getConnectionStateErrorPolicy().isErrorState(newState) ){throw new CancelLeadershipException();}}
}

Curator会在竞争到Master后自动调用该方法,开发者需要在这个方法中实现自己的业务逻辑。需要注意的一点是,一旦执行完takeLeadership方法,Curator就会立即释放Master的权利,然后重新开始新的一轮Master选举。

3.3 分布式锁

在分布式环境中,为了保证数据的一致性,经常在这个程序的某个运行点需要进行同步控制。在用户量非常大的情况下时,可能会出现并发问题。

一个典型的时间戳引起的并发问题

public class Recipes_NoLock {public static void main(String[] args) throws Exception {final CountDownLatch down = new CountDownLatch(1);for(int i = 0; i < 10; i++){new Thread(new Runnable() {public void run() {try {down.await();} catch ( Exception e ) {}SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(new Date());System.err.println("生成的订单号是 : "+orderNo);}}).start();}down.countDown();}
}

运行结果:

生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023
生成的订单号是 : 03:25:44|023

使用Curator实现分布式锁功能

//使用Curator实现分布式锁功能
public class Recipes_Lock {static String lock_path = "/curator_recipes_lock_path";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();final InterProcessMutex lock = new InterProcessMutex(client,lock_path);final CountDownLatch down = new CountDownLatch(1);for(int i = 0; i < 30; i++){new Thread(new Runnable() {public void run() {try {down.await();lock.acquire();} catch ( Exception e ) {}SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");String orderNo = sdf.format(new Date());System.out.println("生成的订单号是 : "+orderNo);try {lock.release();} catch ( Exception e ) {}}}).start();}down.countDown();}
}

运行结果:

生成的订单号是 : 03:27:00|841
生成的订单号是 : 03:27:01|089
生成的订单号是 : 03:27:01|397
生成的订单号是 : 03:27:01|459
生成的订单号是 : 03:27:01|500
生成的订单号是 : 03:27:01|526
生成的订单号是 : 03:27:01|555
生成的订单号是 : 03:27:01|575
生成的订单号是 : 03:27:01|632
生成的订单号是 : 03:27:01|654
生成的订单号是 : 03:27:01|674
生成的订单号是 : 03:27:01|707
生成的订单号是 : 03:27:01|722
生成的订单号是 : 03:27:01|735
生成的订单号是 : 03:27:01|754
生成的订单号是 : 03:27:01|797
生成的订单号是 : 03:27:01|809
生成的订单号是 : 03:27:01|815
生成的订单号是 : 03:27:01|848
生成的订单号是 : 03:27:01|856
生成的订单号是 : 03:27:01|874
生成的订单号是 : 03:27:01|902
生成的订单号是 : 03:27:01|917
生成的订单号是 : 03:27:01|921
生成的订单号是 : 03:27:01|939
生成的订单号是 : 03:27:01|943
生成的订单号是 : 03:27:01|949
生成的订单号是 : 03:27:01|953
生成的订单号是 : 03:27:01|955
生成的订单号是 : 03:27:01|958

3.4 分布式计数器

基于Zookeeper的分布式计数器的实现思路非常简单:

指定一个Zookeeper数据节点作为计时器,多个应用实例在分布式锁的控制下,通过更新该数据节点的内容实现技术功能

使用Curator实现分布式计数器

// 使用Curator实现分布式计数器
public class Recipes_DistAtomicInt {static String distatomicint_path = "/curator_recipes_distatomicint_path";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main( String[] args ) throws Exception {client.start();DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger( client, distatomicint_path, new RetryNTimes( 3, 1000 ) );AtomicValue<Integer> rc = atomicInteger.add( 8 );System.out.println( "Result: " + rc.succeeded() );}
}

运行结果:

Result: true

3.5 分布式Barrier

Barrier是一种控制多线程进行同步的经典方式,在JDK中自带了CyclicBarrier的实例

public class Recipes_CyclicBarrier {public static CyclicBarrier barrier = new CyclicBarrier( 3 );public static void main( String[] args ) throws IOException, InterruptedException {ExecutorService executor = Executors.newFixedThreadPool( 3 );executor.submit( new Thread( new Runner( "1号选手" ) ) );executor.submit( new Thread( new Runner( "2号选手" ) ) );executor.submit( new Thread( new Runner( "3号选手" ) ) );executor.shutdown();}
}
class Runner implements Runnable {private String name;public Runner( String name ) {this.name = name;}public void run() {System.out.println( name + " 准备好了." );try {Recipes_CyclicBarrier.barrier.await();} catch ( Exception e ) {}System.out.println( name + " 起跑!" );}
}

运行结果:

1号选手 准备好了.
2号选手 准备好了.
3号选手 准备好了.
3号选手 起跑!
2号选手 起跑!
1号选手 起跑!

如果是在同一个JVM中,使用CyclicBarrier完全可以解决诸如此类的多线程同步的问题。但是在分布式环境中如何解决呢?Curator中提供的DistributedBarrier就是采用这种方式实现分布式Barrier同步:

//使用Curator实现分布式Barrier
public class Recipes_Barrier {static String barrier_path = "/curator_recipes_barrier_path";static DistributedBarrier barrier;public static void main(String[] args) throws Exception {for (int i = 0; i < 5; i++) {new Thread(new Runnable() {public void run() {try {CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();client.start();barrier = new DistributedBarrier(client, barrier_path);System.out.println(Thread.currentThread().getName() + "号barrier设置" );barrier.setBarrier();barrier.waitOnBarrier();System.err.println("启动...");} catch (Exception e) {}}}).start();}Thread.sleep( 2000 );barrier.removeBarrier();}
}

运行结果:

Thread-2号barrier设置
Thread-3号barrier设置
Thread-1号barrier设置
Thread-5号barrier设置
Thread-4号barrier设置启动...
启动...
启动...
启动...
启动..

Curator还使用了另外一种方式实现分布式Barrier,这种方式和JDK的那种带有成员数的阈值一样。

public class Recipes_Barrier2 {static String barrier_path = "/curator_recipes_barrier_path";public static void main(String[] args) throws Exception {for (int i = 0; i < 5; i++) {new Thread(new Runnable() {public void run() {try {CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();client.start();DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, barrier_path,5);Thread.sleep( Math.round(Math.random() * 3000) );System.out.println(Thread.currentThread().getName() + "号进入barrier" );barrier.enter();System.out.println("启动...");Thread.sleep( Math.round(Math.random() * 3000) );barrier.leave();System.out.println( "退出..." );} catch (Exception e) {}}}).start();}}
}

运行结果:

Thread-3号进入barrier
Thread-1号进入barrier
Thread-0号进入barrier
Thread-2号进入barrier
Thread-4号进入barrier启动...
启动...
启动...
启动...
启动...
退出...
退出...
退出...
退出...
退出...

每个Barrier的参与者都会调用DistributedDoubleBarrier.enter方法之后进行等待,此时处于准备进入阶段。一旦准备进入Barrier的成员数量达到5个之后,所有的成员会被同时触发进入。之后调用DistributedDoubleBarrier.leave方法之后则会再次等待,此时处于准备退出阶段。一旦准备退出的Barrier的成员数量达到5个之后,所有的成员同样会被同时触发退出。因此Curator的DistributedDoubleBarrier能够很好的实现一个分布式Barrier,并控制同时进入和退出。

4. 工具

Curator提供了很多工具类,其中用的最多的就是ZKPaths和EnsurePath。

4.1 ZKPaths

ZKPath提供了一些简单的API来构建Znode路径,递归创建和删除节点等。

public class ZKPathsSample {static String path = "/curator_zkpath_sample";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs( Constant.ZK_SESSION_TIMEOUT ).retryPolicy( new ExponentialBackoffRetry( 1000, 3 ) ).build();public static void main(String[] args) throws Exception {client.start();ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();System.out.println(ZKPaths.fixForNamespace(path,"sub"));System.out.println(ZKPaths.makePath(path, "sub"));System.out.println( ZKPaths.getNodeFromPath( "/curator_zkpath_sample/sub1" ) );PathAndNode pn = ZKPaths.getPathAndNode( "/curator_zkpath_sample/sub1" );System.out.println(pn.getPath());System.out.println(pn.getNode());String dir1 = path + "/child1";String dir2 = path + "/child2";ZKPaths.mkdirs(zookeeper, dir1);ZKPaths.mkdirs(zookeeper, dir2);System.out.println(ZKPaths.getSortedChildren( zookeeper, path ));ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);}
}

运行结果:

/curator_zkpath_sample/sub
/curator_zkpath_sample/sub
sub1
/curator_zkpath_sample
sub1
[child1, child2]

4.2 EnsurePath

EnsurePath提供了一种能够确保数据节点存在的机制,多用于这样的应用场景:

上层业务希望对数据节点进行操作,同时操作之前要确保该节点存在。

EnsurePath 采取了静默的节点窗机方式,其内部实现就是试图创建指定节点,如果节点已经存在,那么就不进行任何操作,也不对外抛出异常,否则正常创建数据节点。


public class EnsurePathDemo {static String path = "/zk-book/c1";static CuratorFramework client = CuratorFrameworkFactory.builder().connectString(Constant.ZK_CONNECT_STRING).sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();public static void main(String[] args) throws Exception {client.start();client.usingNamespace( "zk-book" );EnsurePath ensurePath = new EnsurePath(path);ensurePath.ensure(client.getZookeeperClient());ensurePath.ensure(client.getZookeeperClient());   EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/c1");ensurePath2.ensure(client.getZookeeperClient());}
}

5. Testing

5.1 TestingServer

为了方便开发人员进行Zookeeper的开发和测试工作。Curator提供了一种非常简单Zookeeper服务器的方法————TestingServer。为了使用Curator-Test模块,我们需要添加Maven包

     <!-- https://mvnrepository.com/artifact/org.apache.curator/curator-test --><dependency><groupId>org.apache.curator</groupId><artifactId>curator-test</artifactId><version>4.1.0</version><scope>test</scope></dependency>
public class TestingServerSample {@Testpublic void test() throws Exception {String path = "/zookeeper";TestingServer server = new TestingServer(2181,new File("/home/admin/zk-book-data"));CuratorFramework client = CuratorFrameworkFactory.builder().connectString(server.getConnectString()).sessionTimeoutMs(5000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();client.start();System.out.println( client.getChildren().forPath( path ));server.close();}
}

5.2 TesingCluster

TesingCluster是一个可以模拟Zookeeper集群环境的Curator工具类,能够便于开发人员在本地模拟由n台机器组成的集群环境。

public class TestingClusterSample {@Testpublic void test() throws Exception{TestingCluster cluster = new TestingCluster(3);cluster.start();Thread.sleep(2000);TestingZooKeeperServer leader = null;for(TestingZooKeeperServer zs : cluster.getServers()){System.out.print(zs.getInstanceSpec().getServerId()+"-");System.out.print(zs.getQuorumPeer().getServerState()+"-");  System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());if( zs.getQuorumPeer().getServerState().equals( "leading" )){leader = zs;}}leader.kill();System.out.println( "--After leader kill:" );for(TestingZooKeeperServer zs : cluster.getServers()){System.out.print(zs.getInstanceSpec().getServerId()+"-");System.out.print(zs.getQuorumPeer().getServerState()+"-");  System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());}cluster.stop();}
}

Zookeeper分布式一致性原理(七):Curator客户端相关推荐

  1. 《从Paxos到zookeeper分布式一致性原理与实践》笔记

    <从Paxos到zookeeper分布式一致性原理与实践>笔记 文章目录 <从Paxos到zookeeper分布式一致性原理与实践>笔记 一.概念 二.一致性协调 2.1 2P ...

  2. 《从Paxos到zookeeper分布式一致性原理与实践》

    <从Paxos到zookeeper分布式一致性原理与实践> 一.概念 ACID: Automaticy.consistency.isolation. Durability CAP: con ...

  3. [201502][从 Paxos 到 ZooKeeper][分布式一致性原理与实践][倪超][著]

    [201502][从 Paxos 到 ZooKeeper][分布式一致性原理与实践][倪超][著] http://zookeeper.apache.org 第 1 章 分布式架构 1.1 从集中式到分 ...

  4. Zookeeper分布式一致性原理(四):Zookeeper简介

    zookeeper是一个典型的分布式数据一致性的解决方案,分布式应用程序可以基于它实现数据发布/订阅.负载均衡.命名服务.分布式协调/通知.集群管理.master选举.分布式锁和分布式队列等.Zook ...

  5. 《从Paxos到ZooKeeper 分布式一致性原理与实践》读书笔记

    一.分布式架构 1.分布式特点 分布性 对等性.分布式系统中的所有计算机节点都是对等的 并发性.多个节点并发的操作一些共享的资源 缺乏全局时钟.节点之间通过消息传递进行通信和协调,因为缺乏全局时钟,很 ...

  6. 《从Paxos到Zookeeper 分布式一致性原理与实践》

    第1章 分布式架构 1.1 从集中式到分布式 1.1.1 集中式的特点 集中式的特点:部署结构简单(因为基于底层性能卓越的大型主机,不需考虑对服务多个节点的部署,也就不用考虑多个节点之间分布式协调问题 ...

  7. Zookeeper分布式一致性原理(八):Zookeeper典型应用场景

    1. 简介 Zookeeper是一个高可用的分布式数据管理和协调框架,并且能够很好的保证分布式环境中数据的一致性.在越来越多的分布式系统(Hadoop.HBase.Kafka)中,Zookeeper都 ...

  8. Zookeeper分布式一致性原理(五):Zookeeper-Java-API

    1. 部署与运行 ZK文档 http://zookeeper.apache.org/doc/r3.4.13/zookeeperStarted.html ZK下载 https://www.apache. ...

  9. Zookeeper分布式一致性原理(二):一致性协议

    为了解决分布式一致性问题,在长期的研究过程中,提出了一大批经典的一致性协议和算法,其中最著名的就是2PC和3PC以及Paxos算法了. 1. 2PC和3PC 在分布式系统中,每个节点都明确知道自己事务 ...

最新文章

  1. Hadoop生态圈-hive五种数据格式比较
  2. keras网络变为pytorch网络的一些参考
  3. Talairach空间、MNI空间、Native空间、Stereotaxic空间
  4. 从圆的面积说起 循环小数 PI
  5. 如何找到SAP ECC事务码升级到S4HANA后对应的新事务码
  6. [Unity] GameFramework 学习记录 4:第三人称控制器
  7. Linux项目日报,1Password推出Linux版本 现已进入公开预览阶段
  8. Building Document Workflows in SharePoint 2007 翻译
  9. Selenium-鼠标操作
  10. java wsdl文件生成代码_wsdl文件生成java代码
  11. IIS发布网站 后台接口404
  12. 网站内容收录不稳定/不收录的原因分析
  13. iOS中把故事板中视功能和美工结合在1起
  14. 吉安稻谷飘香 国稻种芯·中国水稻节:江西主产区农田喝上水
  15. Comparable接口、Comparator接口、Cloneable接口
  16. 中国人工智能企业中集飞瞳,集装箱人工智能平台全球4千企业用户,免费集装箱号识别信息识别API,智慧港航智能化港航中国人工智能企业
  17. AMD64 Sepc 学习笔记
  18. win10-桌面加载失败-explorer.exe崩溃
  19. 关于企业数字化转型这个问题,低代码在其中起了怎样的作用?
  20. 【第173期】游戏策划:不动脑子,连最基本的执行策划都做不好,亲测有害

热门文章

  1. 图片的赖加载(lazyLoad)
  2. Mac下Android studio 之NDK配置教程(二)
  3. LETTers比赛第七场 Guess the Numbers
  4. AspNetDB.mdf数据库的建立和使用
  5. MVC专题研究(三)——数据绑定和传送
  6. 手动挡和自动挡该怎么选?哪个起步快、哪个更舒适?
  7. 2021年春节联欢晚会第三次联排亮点多
  8. 力扣(LeetCode):字符串转换整数 (atoi)
  9. android文件存储教程,android开发基础教程—文件存储功能实现
  10. 黑苹果睡眠无法唤醒_电脑睡眠后无法唤醒怎么办?