浅谈:Zookeeper

Zookeeper 概念

• Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务。
• Zookeeper 翻译过来就是 动物园管理员,他是用来管 Hadoop(大象)、Hive(蜜蜂)、Pig(小 猪)的管理员。简称zk
• Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。
• Zookeeper 提供的主要功能包括:
• 配置管理:用于管理服务信息,ip,端口,提供服务功能接口等,
• 分布式锁:用于控制服务资源,锁住之后,只能有一个服务可以操作,其他服务等待
• 集群管理:用于管理服务节点(每个节点提供的服务相同,则组成一个集群),组成高可用

Zookeeper 数据模型

• ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似,拥有一个层次化结构。
• 这里面的每一个节点都被称为: ZNode,每个节点上都会保存自己的数据和节点信息。
• 节点可以拥有子节点,同时也允许少量(1MB)数据存储在该节点之下。
• 节点可以分为四大类:

• PERSISTENT 持久化节点
• EPHEMERAL 临时节点 :-e
• PERSISTENT_SEQUENTIAL 持久化顺序节点 :-s
• EPHEMERAL_SEQUENTIAL 临时顺序节点 :-es

ZooKeeper 命令操作

Zookeeper 服务端常用命令

• 启动 ZooKeeper 服务: ./zkServer.sh start
• 查看 ZooKeeper 服务状态: ./zkServer.sh status
• 停止 ZooKeeper 服务: ./zkServer.sh stop
• 重启 ZooKeeper 服务: ./zkServer.sh restart

Zookeeper 客户端常用命令

• 连接ZooKeeper服务端
./zkCli.sh –server ip:port
• 断开连接
quit
• 查看命令帮助
help
• 显示指定目录下节点
ls 目录
• 创建节点
create /节点path value
• 获取节点值
get /节点path

• 创建临时节点
create -e /节点path value
• 创建顺序节点
create -s /节点path value
• 查询节点详细信息
ls –s /节点path

节点详细信息解释
• czxid:节点被创建的事务ID
• ctime: 创建时间
• mzxid: 最后一次被更新的事务ID
• mtime: 修改时间
• pzxid:子节点列表最后一次被更新的事务ID
• cversion:子节点的版本号
• dataversion:数据版本号
• aclversion:权限版本号
• ephemeralOwner:用于临时节点,代表临时节点的事务ID,如果为持
久节点则为0
• dataLength:节点存储的数据的长度
• numChildren:当前节点的子节点个数

ZooKeeper JavaAPI 操作

Curator 介绍

• Curator 是 Apache ZooKeeper 的Java客户端库。
• 常见的ZooKeeper Java API :
• 原生Java API
• ZkClient
• Curator
• Curator 项目的目标是简化 ZooKeeper 客户端的使用。
• Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。
• 官网:http://curator.apache.org/

Curator API 常用操作

• 建立连接
• 添加节点
• 删除节点
• 修改节点
• 查询节点
• Watch事件监听
• 分布式锁实现

代码演示(api详细解释在代码注释中)

增删改查节点代码

package com.fs.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.List;/*
若测试一直转圈圈,请关闭Linux防火墙*/
public class CuratorTest {//声明成员变量 提升作用域private CuratorFramework client;//建立连接@Before//在test方法执行钱
//    @Testpublic void testConnection(){//重试策略,这个类是RetryPolicy的实现类,然后我们指定时间重试次数ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000,10);//第一中方式获得curator
//        CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient("192.168.93.132:2181", 60 * 1000, 15 * 1000, exponentialBackoffRetry);
//        curatorFramework.start();//第二种方式,链式编程client = CuratorFrameworkFactory.builder()//zookeeper服务的ip和端口.connectString("192.168.93.132:2181")//会话超时时间(单位ms),若这个时间类,客户端和服务的没有进行会话连接,就会关闭会话.sessionTimeoutMs(60 * 1000)//连接超时时间.connectionTimeoutMs(15 * 1000)//给定重试策略.retryPolicy(exponentialBackoffRetry)//名称空间,相当于操作当前CuratorFramework对象,默认zookeeper的根目录不是/了,会默认在根目录后加上/xf,而且若没有这个节点会默认创建.namespace("xf")//build后就返回CuratorFramework对象.build();//开启连接client.start();}/*//创建节点create 持久,临时,顺序,数据基本创建,创建节点,带有数据设置节点的类型创建一个带有多级节点 /app1/p1*/@Testpublic void createNode() throws Exception {//基本创建,默认持久化节点//如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储String s = client.create().forPath("/app1");System.out.println(s);}@Testpublic void createNodeData() throws Exception {//创建节点,给定节点值String s = client.create().forPath("/app2","hehe".getBytes());System.out.println(s);}@Testpublic void createNodeNode() throws Exception {//默认类型,持久化节点,//创建临时节点,创建后看不到,因为代码执行了就结束会话了String s = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3","hehe".getBytes());System.out.println(s);//要让方法不结束,才能看到临时节点的效果Thread.sleep(10000);}@Testpublic void createNodeTrue() throws Exception {//创建多级节点//父节点不存在,是不能创建的,但是代码可以creatingParentsIfNeeded()如果需要,创建父节点,如果父节点不存在,则创建父节点String s = client.create().creatingParentsIfNeeded().forPath("/app4/p1");System.out.println(s);}/*** 查询节点--------------------------------------------*///查询数据@Testpublic void findNode() throws Exception {//查询数据byte[] bytes = client.getData().forPath("/app1");System.out.println(new String(bytes));}//查询子节点@Testpublic void findNode02() throws Exception {//查询子节点   /因为为我们上面设置的xf父节点,相当与查询xf的子节点List<String> strings = client.getChildren().forPath("/");System.out.println(strings);}//查询节点的状态信息@Testpublic void findNode03() throws Exception {//查询节点状态信息Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app1");System.out.println(stat);}/*** 修改节点的数据---------------------------------------------* 是没有修改节点名称的命令的,只能先删除,在添加*///修改数据@Testpublic void setNode() throws Exception {client.setData().forPath("/app1","xiaofuhaha".getBytes());}//根据版本修改/*version是通过查询出来的,保证我们这次修改和我查下的version版本要一致思想就是版本匹配才修改成功,假如有2个人同时对这个节点进行数据修改,假如第一个已经修改了,第二个人去修改不会成功的,因为第二个获取的是第一个人还未提交的版本,*/@Testpublic void setNodeForVersion() throws Exception {//查询节点状态信息Stat stat = new Stat();client.getData().storingStatIn(stat).forPath("/app1");System.out.println(stat.getVersion());//然后根据版本修改client.setData().withVersion(stat.getVersion()).forPath("/app1","xiaofubb".getBytes());}/*** 删除节点----------------------------* 若要删除节点的数据,没有对应的命令,但是可以通过set节点信息为空就相当于删除了节点数据*/@Testpublic void delNode() throws Exception {//删除单个节点client.delete().forPath("/app1");}@Testpublic void delNodeAll() throws Exception {//删除带有子节点的节点deletingChildrenIfNeeded,如果有父节点,先删除父client.delete().deletingChildrenIfNeeded().forPath("/app4");}@Testpublic void delNodeOK() throws Exception {//必须成功删除guaranteed(),为了防止网络抖动,若没有删除,会重试client.delete().guaranteed().forPath("/app2");}@Testpublic void delNodeBackground() throws Exception {//回调,,删除之后在 执行什么代码/*回调不光删除有,增删改都有..inBackground(new BackgroundCallback() {重新方法}*/client.delete().guaranteed().inBackground(new BackgroundCallback() {@Overridepublic void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {//删除后执行的方法,System.out.println("我被删除了~~~");System.out.println(curatorEvent);}}).forPath("/app1");}@After//@test执行后public void close(){//关闭连接client.close();}
}

Watch事件监听代码

package com.fs.curator;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;import java.util.List;/*
若测试一直转圈圈,请关闭Linux防火墙*/
public class CuratorWatcherTest {//声明成员变量 提升作用域private CuratorFramework client;//建立连接@Before//在test方法执行钱
//    @Testpublic void testConnection(){//重试策略,这个类是RetryPolicy的实现类,然后我们指定时间重试次数ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(3000,10);client = CuratorFrameworkFactory.builder()//zookeeper服务的ip和端口.connectString("192.168.93.132:2181")//会话超时时间(单位ms),若这个时间类,客户端和服务的没有进行会话连接,就会关闭会话.sessionTimeoutMs(60 * 1000)//连接超时时间.connectionTimeoutMs(15 * 1000)//给定重试策略.retryPolicy(exponentialBackoffRetry)//名称空间,相当于操作当前CuratorFramework对象,默认zookeeper的根目录不是/了,会默认在根目录后加上/xf,而且若没有这个节点会默认创建.namespace("xf")//build后就返回CuratorFramework对象.build();//开启连接client.start();}/*nodeCache:对某一个节点注册监听器*/@Test//@test执行后public void testNodeCache() throws Exception {//创建nodeCache对象 匿名内部类访问外部变量需要将外部变量进行final修饰,但是1.8可以不用final NodeCache nodeCache = new NodeCache(client,"/app1");//注册监听nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {/*当被监听的节点发生了修改,创建,删除等操作,就会进到这个方法*/System.out.println("节点变化了~~~");//获取修改节点后的数据byte[] data = nodeCache.getCurrentData().getData();System.out.println(new String(data));}});//开启监听,如果设置true,则开启监听的时候,加载缓存数据nodeCache.start(true);//睡10秒让连接不关,去修改节点信息,看是否能够监听到Thread.sleep(10000);}/*PathChildrenCache:监听某个节点的所有子节点们(不能监听自己节点的变化,只能感知子节点的变化只会监听当前节点的子节点,不会监听子节点的子节点*/@Test//@test执行后public void testPathChildrenCache() throws Exception {//注册监听传递的参数true是cacheData,是否开启缓存,开启后,下次启动,会从缓存中去获取上次缓存的数据PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);//绑定监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {/*只对子节点的变更才会进入这个方法*/System.out.println("子节点变化了~~~");//输出下子节点信息System.out.println(pathChildrenCacheEvent);//监听子节点的数据变更,并且拿到变更后的数据PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();//判断类型是否是updateif (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){//由输出的pathChildrenCacheEvent得知:PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p1', stat=131,134,1595825223664,1595825401122,1,0,0,0,4,0,131, data=[104, 101, 104, 101]}}//所以就点两次dataSystem.out.println("数据变化~~~");byte[] data = pathChildrenCacheEvent.getData().getData();System.out.println(new String(data));}}});//开启pathChildrenCache.start();//睡20秒让连接不关Thread.sleep(20000);}/*TreeCache:可以监听整个节点下的所有节点,包括节点本身以及子节点,不包括父节点*/@Test//@test执行后public void testTreeCache() throws Exception {//创建监听器TreeCache treeCache = new TreeCache(client,"/app2");//注册监听treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {/*所有的节点进行变更都会进入这个方法*/System.out.println("子节点变化了~~~");//输出下子节点信息System.out.println(treeCacheEvent);//监节点的数据变更,并且拿到变更后的数据TreeCacheEvent.Type type = treeCacheEvent.getType();//判断变更类型是否是updateif (type.equals(TreeCacheEvent.Type.NODE_UPDATED)){//由输出的pathChildrenCacheEvent得知:PathChildrenCacheEvent{type=CHILD_UPDATED, data=ChildData{path='/app2/p1', stat=131,134,1595825223664,1595825401122,1,0,0,0,4,0,131, data=[104, 101, 104, 101]}}//所以就点两次dataSystem.out.println("数据变化~~~");byte[] data = treeCacheEvent.getData().getData();System.out.println(new String(data));}}});//开启treeCache.start();//睡20秒让连接不关Thread.sleep(20000);}@After//@test执行后public void close(){//关闭连接client.close();}
}

分布式锁实现(不是zookeeper实现的,是curator框架实现的)

为什么使用临时顺序节点:防止偶发情况发生的死锁,顺序就是来的进程排队

Curator实现分布式锁API

• 在Curator中有五种锁方案:
• InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
• InterProcessMutex:分布式可重入排它锁
• InterProcessReadWriteLock:分布式读写锁
• InterProcessMultiLock:将多个锁作为单个实体管理的容器
• InterProcessSemaphoreV2:共享信号量

买票代码演示

买票
package com.fs.curator;import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;import java.util.concurrent.TimeUnit;/*
模拟12306买票,使用分布式锁*/
public class Ticket12306 implements Runnable {//数据库票数private int tickets = 10;private InterProcessMutex lock;public Ticket12306() {//重试策略,这个类是RetryPolicy的实现类,然后我们指定时间重试次数RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);CuratorFramework client = CuratorFrameworkFactory.builder()//zookeeper服务的ip和端口.connectString("192.168.93.132:2181")//会话超时时间(单位ms),若这个时间类,客户端和服务的没有进行会话连接,就会关闭会话.sessionTimeoutMs(60 * 1000)//连接超时时间.connectionTimeoutMs(15 * 1000)//给定重试策略.retryPolicy(retryPolicy)//build后就返回CuratorFramework对象.build();//开启连接client.start();//初始化lock,这是添加临时顺序的节点lock = new InterProcessMutex(client, "/lock");}@Overridepublic void run() {while (true) {try {//获得锁lock.acquire(3, TimeUnit.SECONDS);//如果有票就卖if (tickets > 0) {//模拟输出就卖出System.out.println(Thread.currentThread() + ":" + tickets);//缓慢执行0.1秒Thread.sleep(100);//卖出就--tickets--;}else {//买完了就停止循环break;}} catch (Exception e) {e.printStackTrace();} finally {try {//释放锁lock.release();} catch (Exception e) {e.printStackTrace();}}}}
}
测试
package com.fs.curator;public class LockTest {public static void main(String[] args) {Ticket12306 ticket12306 = new Ticket12306();//创建客户端来模拟买票Thread t1 = new Thread(ticket12306, "fz");Thread t2 = new Thread(ticket12306, "xc");t1.start();t2.start();}
}

Zookeeper常用命令操作,javaAPI操作之Curator框架 API相关推荐

  1. zookeeper常用命令行操作

    zookeeper常用命令行操作 输入help显示命令提示 [zk: localhost:2181(CONNECTED) 0] help ZooKeeper -server host:port cmd ...

  2. HBase常用Shell与JavaAPI操作

    HBase常用Shell与JavaAPI操作 1.常用shell 2.JavaAPI操作 2.1.HBaseAdmin类 2.2.HBaseConfiguration类 2.3.HTableDescr ...

  3. Zookeeper常用命令行及API

    一.Zookeeper常用命令行 1.启动zookeeper客户端(在启动zookeeper集群后启动进行调试) zkCli.sh 2.查看帮助.操作历史 help.history 3.查看当前Zno ...

  4. 【自撰】zooKeeper常用命令和权限

    zookeeper常用命令和权限 常用命令: get -s /path 和 stat /path 查看指定路径的节点信息 属性 解译 cZxid 数据节点创建时的事务ID ctime 据节点创建时的时 ...

  5. git常用命令,分支操作,子模块

    Git 是一个很强大的分布式版本管理工具,它不但适用于管理大型开源软件的源代码(如:linux kernel),管理私人的文档和源代码也有很多优势(如:wsi-lgame-pro) 二. Git 常用 ...

  6. Linux 常用命令 一顿操作猛如虎

    一顿操作猛如虎 for Ubuntu 先来看下操作系统版本,不一致可能会有不一样的地方.1 cat /proc/version Linux version 4.19.0-1.2.6.265.vca ( ...

  7. SQL Plus常用命令,文件操作,Orcale用户操作

    1        启动: 1.1         Oracle是一个库对应多个用户 1.2         Oracle启动是启动库而非所有库 2        SQLPlus常用命令 2.1     ...

  8. Linux常用命令 Linux虚拟机操作指令(更新中)

    目录 1.用户相关命令 2.文件编辑相关命令 3.文件操作常用命令 4.rpm安装和卸载命令 5.linux防火墙操作命令 6.mysql数据库相关命令 7.进程相关命令 8.查看Linux服务器的物 ...

  9. Linux学习一:(Bash 常用命令、vim操作、Linux框架目录)

    一.Bash 常用命令 1.管道(PIPING) 1.1 | 一种管道,其左方是一个命令的 STNOUT,将作为管道右方的另一个命令的 STDIN. 例如:echo 'test text' | wc ...

最新文章

  1. 在哪个公众号学python好_怎么通过公众号来快速学习python编程?
  2. linux c glob使用(文件路径模式查找函数)
  3. 一道题教会你回溯、动态规划、贪心
  4. ANSIC标准定义的6种预定义宏
  5. 英伟达登录界面卡住_一汽夏利重组;东风贪腐案行贿者名单? 众泰被申请预重整;尼古拉承认造假;理想英伟达德赛西威将合作;宋PLUS上市[9月17日]...
  6. 游戏脚本代码大全_如何用5行Python代码写出刷分游戏脚本!Python真牛!
  7. 软件测试风险管理包含哪两方面,软件测试风险的应对措施通常包括哪几类
  8. 个人建站用php,个人用不花钱 8款PHP建站软件推荐
  9. openwrt 锐捷 单线多拨
  10. 一个ios开发者使用Android手机后的一些感想
  11. rails网站分享到朋友圈功能是怎么实现的
  12. redis 全量复制条件
  13. Java 如何控制项目进度?
  14. windows server2012R将域名和访问的网址绑定
  15. 2020-12-02 微信JSAPIV3支付
  16. fiddler设置字体
  17. UBUNTU ifconfig只有lo
  18. 计算机三级网络技术考试详解
  19. 高精度DEM数据免费下载教程
  20. 卖桃子问题(递归函数求解)

热门文章

  1. shopnc前台登陆不进去解决方法
  2. start.s中的.balignl 16,0xdeadbeef
  3. PowerDesigner 企业架构模型 ( EAM ) 说明
  4. Python笔记 【无序】 【一】
  5. 关于linux中的 秘钥认证 ,最清晰解读
  6. loj #6235. 区间素数个数
  7. spring boot mybatis没有扫描jar中的Mapper接口
  8. LeetCode 250. Count Univalue Subtrees
  9. 8个前沿的 HTML5 CSS3 效果【附源码下载】
  10. centos中使用python遇到的几个问题