Apache Curator入门实战
Apache Curator入门实战
Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
1.Zookeeper安装部署
Zookeeper的部署很简单,如果已经有Java运行环境的话,下载tarball解压后即可运行。
[root@vm Temp]$ wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz [root@vm Temp]$ tar zxvf zookeeper-3.4.6.tar.gz [root@vm Temp]$ cd zookeeper-3.4.6 [root@vm zookeeper-3.4.6]$ cp conf/zoo_sample.cfg conf/zoo.cfg [root@vm zookeeper-3.4.6]$ export ZOOKEEPER_HOME=/usr/local/src/zookeeper-3.4.5 [root@vm zookeeper-3.4.6]$ export PATH=$ZOOKEEPER_HOME/bin:$PATH [root@vm zookeeper-3.4.6]$ bin/zkServer.sh start [root@vm zookeeper-3.4.6]$ bin/zkCli.sh -server 127.0.0.1:2181
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
2.客户端常用操作
用zkCli.sh连接上Zookeeper服务后,用help能列出所有命令:
[root@BC-VM-edce4ac67d304079868c0bb265337bd4 zookeeper-3.4.6]# bin/zkCli.sh -127.0.0.1:2181
Connecting to localhost:2181
2015-06-11 10:55:14,387 [myid:] - INFO [main:Environment@100] - Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT ... [zk: localhost:2181(CONNECTED) 5] help ZooKeeper -server host:port cmd args connect host:port get path [watch] ls path [watch] set path data [version] rmr path delquota [-n|-b] path quit printwatches on|off create [-s] [-e] path data acl stat path [watch] close ls2 path [watch] history listquota path setAcl path acl getAcl path sync path redo cmdno addauth scheme auth delete path [version] setquota -n|-b val path
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
下面就试验一下常用的命令:
- create:创建路径结点。
- ls:查看路径下的所有结点。
- get:获得结点上的值。
- set:修改结点上的值。
- delete:删除结点。
[zk: localhost:2181(CONNECTED) 6] create /zktest mydata
Created /zktest
[zk: localhost:2181(CONNECTED) 12] ls /
[zktest, zookeeper]
[zk: localhost:2181(CONNECTED) 7] ls /zktest
[]
[zk: localhost:2181(CONNECTED) 13] get /zktest
mydata
cZxid = 0x1c
ctime = Thu Jun 11 10:58:06 CST 2015
mZxid = 0x1c
mtime = Thu Jun 11 10:58:06 CST 2015
pZxid = 0x1c cversion = 0 dataVersion = 0 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 6 numChildren = 0 [zk: localhost:2181(CONNECTED) 14] set /zktest junk cZxid = 0x1c ctime = Thu Jun 11 10:58:06 CST 2015 mZxid = 0x1f mtime = Thu Jun 11 10:59:08 CST 2015 pZxid = 0x1c cversion = 0 dataVersion = 1 aclVersion = 0 ephemeralOwner = 0x0 dataLength = 4 numChildren = 0 [zk: localhost:2181(CONNECTED) 15] delete /zktest [zk: localhost:2181(CONNECTED) 16] ls / [zookeeper]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
3.用Curator管理Zookeeper
Curator的Maven依赖如下,一般直接使用curator-recipes就行了,如果需要自己封装一些底层些的功能的话,例如增加连接管理重试机制等,则可以引入curator-framework包。
<dependency><groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.7.0</version> </dependency>
- 1
- 2
- 3
- 4
- 5
3.1 Client操作
利用Curator提供的客户端API,可以完全实现上面原生客户端的功能。值得注意的是,Curator采用流式风格API。
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;/** * Curator framework's client test. * Output: * $ create /zktest hello * $ ls / * [zktest, zookeeper] * $ get /zktest * hello * $ set /zktest world * $ get /zktest * world * $ delete /zktest * $ ls / * [zookeeper] */ public class CuratorClientTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Client API test // 2.1 Create node String data1 = "hello"; print("create", ZK_PATH, data1); client.create(). creatingParentsIfNeeded(). forPath(ZK_PATH, data1.getBytes()); // 2.2 Get node and data print("ls", "/"); print(client.getChildren().forPath("/")); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.3 Modify data String data2 = "world"; print("set", ZK_PATH, data2); client.setData().forPath(ZK_PATH, data2.getBytes()); print("get", ZK_PATH); print(client.getData().forPath(ZK_PATH)); // 2.4 Remove node print("delete", ZK_PATH); client.delete().forPath(ZK_PATH); print("ls", "/"); print(client.getChildren().forPath("/")); } private static void print(String... cmds) { StringBuilder text = new StringBuilder("$ "); for (String cmd : cmds) { text.append(cmd).append(" "); } System.out.println(text.toString()); } private static void print(Object result) { System.out.println( result instanceof byte[] ? new String((byte[]) result) : result); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
3.2 监听器
Curator提供了三种Watcher(Cache)来监听结点的变化:
- Path Cache:监视一个路径下1)孩子结点的创建、2)删除,3)以及结点数据的更新。产生的事件会传递给注册的PathChildrenCacheListener。
- Node Cache:监视一个结点的创建、更新、删除,并将结点的数据缓存在本地。
- Tree Cache:Path Cache和Node Cache的“合体”,监视路径下的创建、更新、删除事件,并缓存路径下所有孩子结点的数据。
下面就测试一下最简单的Path Watcher:
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.ChildData;
import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.retry.RetryNTimes; /** * Curator framework watch test. */ public class CuratorWatcherTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws Exception { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); // 2.Register watcher PathChildrenCache watcher = new PathChildrenCache( client, ZK_PATH, true // if cache data ); watcher.getListenable().addListener((client1, event) -> { ChildData data = event.getData(); if (data == null) { System.out.println("No data in event[" + event + "]"); } else { System.out.println("Receive event: " + "type=[" + event.getType() + "]" + ", path=[" + data.getPath() + "]" + ", data=[" + new String(data.getData()) + "]" + ", stat=[" + data.getStat() + "]"); } }); watcher.start(StartMode.BUILD_INITIAL_CACHE); System.out.println("Register zk watcher successfully!"); Thread.sleep(Integer.MAX_VALUE); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
下面是在zkCli.sh中操作时Java程序的输出:
Java: zk client start successfully!
Java: Register zk watcher successfully!zkCli: [zk: localhost:2181(CONNECTED) 11] create /zktest/hello mydata Java: Receive event: type=[CHILD_ADDED], path=[/zktest/hello], data=[mydata], stat=[121,121,1434001221097,1434001221097,0,0,0,0,6,0,121] zkCli: [zk: localhost:2181(CONNECTED) 12] set /zktest/hello otherdata Java: Receive event: type=[CHILD_UPDATED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121] zkCli: [zk: localhost:2181(CONNECTED) 13] delete /zktest/hello Java: Receive event: type=[CHILD_REMOVED], path=[/zktest/hello], data=[otherdata], stat=[121,122,1434001221097,1434001228467,1,0,0,0,9,0,121]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
4.Curator“菜谱”
既然Maven包叫做curator-recipes,那说明Curator有它独特的“菜谱”:
- 锁:包括共享锁、共享可重入锁、读写锁等。
- 选举:Leader选举算法。
- Barrier:阻止分布式计算直至某个条件被满足的“栅栏”,可以看做JDK Concurrent包中Barrier的分布式实现。
- 缓存:前面提到过的三种Cache及监听机制。
- 持久化结点:连接或Session终止后仍然在Zookeeper中存在的结点。
- 队列:分布式队列、分布式优先级队列等。
4.1 分布式锁
分布式编程时,比如最容易碰到的情况就是应用程序在线上多机部署,于是当多个应用同时访问某一资源时,就需要某种机制去协调它们。例如,现在一台应用正在rebuild缓存内容,要临时锁住某个区域暂时不让访问;又比如调度程序每次只想一个任务被一台应用执行等等。
下面的程序会启动两个线程t1和t2去争夺锁,拿到锁的线程会占用5秒。运行多次可以观察到,有时是t1先拿到锁而t2等待,有时又会反过来。Curator会用我们提供的lock路径的结点作为全局锁,这个结点的数据类似这种格式:[_c_64e0811f-9475-44ca-aa36-c1db65ae5350-lock-0000000005],每次获得锁时会生成这种串,释放锁时清空数据。
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.RetryNTimes; import java.util.concurrent.TimeUnit; /** * Curator framework's distributed lock test. */ public class CuratorDistrLockTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_LOCK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); System.out.println("zk client start successfully!"); Thread t1 = new Thread(() -> { doWithLock(client); }, "t1"); Thread t2 = new Thread(() -> { doWithLock(client); }, "t2"); t1.start(); t2.start(); } private static void doWithLock(CuratorFramework client) { InterProcessMutex lock = new InterProcessMutex(client, ZK_LOCK_PATH); try { if (lock.acquire(10 * 1000, TimeUnit.SECONDS)) { System.out.println(Thread.currentThread().getName() + " hold lock"); Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " release lock"); } } catch (Exception e) { e.printStackTrace(); } finally { try { lock.release(); } catch (Exception e) { e.printStackTrace(); } } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
4.2 Leader选举
当集群里的某个服务down机时,我们可能要从slave结点里选出一个作为新的master,这时就需要一套能在分布式环境中自动协调的Leader选举方法。Curator提供了LeaderSelector监听器实现Leader选举功能。同一时刻,只有一个Listener会进入takeLeadership()方法,说明它是当前的Leader。注意:当Listener从takeLeadership()退出时就说明它放弃了“Leader身份”,这时Curator会利用Zookeeper再从剩余的Listener中选出一个新的Leader。autoRequeue()方法使放弃Leadership的Listener有机会重新获得Leadership,如果不设置的话放弃了的Listener是不会再变成Leader的。
package com.cdai.codebase.bigdata.hadoop.zookeeper.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener; import org.apache.curator.framework.state.ConnectionState; import org.apache.curator.retry.RetryNTimes; import org.apache.curator.utils.EnsurePath; /** * Curator framework's leader election test. * Output: * LeaderSelector-2 take leadership! * LeaderSelector-2 relinquish leadership! * LeaderSelector-1 take leadership! * LeaderSelector-1 relinquish leadership! * LeaderSelector-0 take leadership! * LeaderSelector-0 relinquish leadership! * ... */ public class CuratorLeaderTest { /** Zookeeper info */ private static final String ZK_ADDRESS = "192.168.1.100:2181"; private static final String ZK_PATH = "/zktest"; public static void main(String[] args) throws InterruptedException { LeaderSelectorListener listener = new LeaderSelectorListener() { @Override public void takeLeadership(CuratorFramework client) throws Exception { System.out.println(Thread.currentThread().getName() + " take leadership!"); // takeLeadership() method should only return when leadership is being relinquished. Thread.sleep(5000L); System.out.println(Thread.currentThread().getName() + " relinquish leadership!"); } @Override public void stateChanged(CuratorFramework client, ConnectionState state) { } }; new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); new Thread(() -> { registerListener(listener); }).start(); Thread.sleep(Integer.MAX_VALUE); } private static void registerListener(LeaderSelectorListener listener) { // 1.Connect to zk CuratorFramework client = CuratorFrameworkFactory.newClient( ZK_ADDRESS, new RetryNTimes(10, 5000) ); client.start(); // 2.Ensure path try { new EnsurePath(ZK_PATH).ensure(client.getZookeeperClient()); } catch (Exception e) { e.printStackTrace(); } // 3.Register listener LeaderSelector selector = new LeaderSelector(client, ZK_PATH, listener); selector.autoRequeue(); selector.start(); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
转载于:https://www.cnblogs.com/tonychai/p/4647701.html
Apache Curator入门实战相关推荐
- Apache Curator实战
Apache Curator入门实战 Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeep ...
- Spark入门实战系列--6.SparkSQL(中)--深入了解SparkSQL运行计划及调优
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.1 运行环境说明 1.1.1 硬软件环境 l 主机操作系统:Windows 64位, ...
- Java 定时任务调度(8)--ElasticJob 入门实战(ElasticJob-Lite使用)
ElasticJob 是一个分布式调度解决方案,由 2 个相互独立的子项目 ElasticJob-Lite 和 ElasticJob-Cloud 组成.本文主要介绍 ElasticJob-Lite 的 ...
- Spark入门实战系列--2.Spark编译与部署(下)--Spark编译安装
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.编译Spark Spark可以通过SBT和Maven两种方式进行编译,再通过make-d ...
- Spark入门实战系列--8.Spark MLlib(上)--机器学习及SparkMLlib简介
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.机器学习概念 1.1 机器学习的定义 在维基百科上对机器学习提出以下几种定义: l&qu ...
- Spark入门实战系列--4.Spark运行架构
注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1. Spark运行架构 1.1 术语定义 lApplication:Spark Applic ...
- Spark入门实战系列--6.SparkSQL(上)--SparkSQL简介
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.SparkSQL的发展历程 1.1 Hive and Shark SparkSQL的前身 ...
- Spark入门实战系列--7.Spark Streaming(上)--实时流计算Spark Streaming原理介绍
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.Spark Streaming简介 1.1 概述 Spark Streaming 是Sp ...
- Spark入门实战系列--6.SparkSQL(下)--Spark实战应用
[注]该系列文章以及使用到安装包/测试数据 可以在<倾情大奉送--Spark入门实战系列>获取 1.运行环境说明 1.1 硬软件环境 l 主机操作系统:Windows 64位,双核4线程 ...
- 4.1 数据仓库基础与Apache Hive入门
数据仓库基础与Apache Hive入门 一.数据仓库基本概念 1.数据仓库概念 2. 案例:数据仓库为何而来 (1)业务数据的存储问题 (2)分析型决策的制定 3.数据仓库主要特征 面向主题性(Su ...
最新文章
- Java中Comparable与Comparator的区别
- javascript,jquery 操作 dropdownlist ,select
- 产后抑郁症的食疗方法有什么
- 枚举反射的应用(sql update语句匹配)
- 模型加速:WAE-Learning a Wavelet-like Auto-Encoder to Accelerate Deep Neural Networks
- flask及扩展源码解读
- 叹为观止的Qt 3d控件
- 如何批量转换图片格式?怎样统一修改图片格式?
- mysql 嵌套查询
- 工商银行区块链专利 为提高证书发放的效率
- OSChina 周三乱弹 —— 风扇写着先生请自爱
- 中国公有云厂商2017年收入利润综合排名
- jenkins AWS CodeDeploy不停机部署
- Failed to read artifact descriptor for
- 小猿日记(8) - 接口优化从13秒到3秒,我做了什么
- 中控门禁控制器接玻璃移动门
- 使用低代码平台 - 危险的赌注
- 宽带码分多址系统中多径衰落与多址干扰的影响
- 对视觉显著性检测(Saliency Object Detection)中Channel Attention的一些总结
- android 4.3 模拟器,自制安卓掌机:4.3英寸大屏+骁龙855,通吃所有模拟器