分布式基石 Zookeeper 框架全面剖析

  • Java 客户端操作
    • Java 客户端 API
  • 服务器的动态感知
    • 服务注册
    • 服务发现
  • 分布式锁业务处理
    • 单机环境(一个虚拟机中)
    • 分布式环境_同名节点
    • 分布式环境_顺序节点

Java 从 0 到架构师目录:【Java从0到架构师】学习记录

Zookeeper 连接状态:

Java 客户端操作

Java 客户端操作:

  • 自带的 zkclient
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.0</version>
</dependency>
  • Apache 开源的 Curator
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.3.0</version>
</dependency>
<dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.3.0</version>
</dependency>
  • Apache 开源的 ZkClient (com.101tec)
<dependency><groupId>com.101tec</groupId><artifactId>zkclient</artifactId><version>0.11</version>
</dependency>

Java 客户端 API

我们使用 Zookeeper 自带的 zkclient 来操作:

创建会话

@SpringBootApplication
public class ZookeeperApiDemoApplication {private  static  String ZK_SERVER_ADDR="192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";private static  int SESSION_TIMEOUT=30000;public static void main(String[] args) {SpringApplication.run(ZookeeperApiDemoApplication.class, args);}//创建一个zookeeper的连接@Beanpublic ZooKeeper zooKeeper() throws Exception{// 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序ZooKeeper zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("event = " + event);if(event.getState()== Event.KeeperState.SyncConnected){System.out.println("zookeeper客户端连接成功");}}});return zooKeeper;}}

创建节点

@RequestMapping("createNode")
public String createNode(String path, String data, String type) throws Exception{String result = zooKeeper.create(path, data.getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.valueOf(type));return result;
}

获取节点中的数据

// 同步获取数据
@RequestMapping("getData")
public String getData(String path) throws Exception {// 1 先去查询版本信息  如果没有, 返回的是一个nullStat stat = zooKeeper.exists(path, false);// 同步获取数据byte[] data = zooKeeper.getData(path, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("event = " + event);}}, stat);System.out.println("new String(data) = " + new String(data));return new String(data);
}// 异步数据处理
@RequestMapping("getDataAsync")
public String getDataAsync(String path) throws Exception{// 1 先去查询版本信息Stat stat = zooKeeper.exists(path, false);zooKeeper.getData(path, false, new AsyncCallback.DataCallback() {@Overridepublic void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {System.out.println("异步处理回调数据");System.out.println("收到的数据:"+new String(data));System.out.println("ctx = " + ctx);}}, "测试数据");return "异步获取数据";
}// 获取子节点列表
@RequestMapping("getChildren")
public List<String> getChildren(String path) throws Exception{List<String> children = zooKeeper.getChildren(path, false);return children;
}

判断节点是否存在

@RequestMapping("exists")
public  boolean exists(String path) throws Exception{Stat stat = zooKeeper.exists(path, false);return stat != null ;
}

删除节点

@RequestMapping("delete")
public boolean delete(String path) throws Exception{Stat stat = zooKeeper.exists(path, false);if (stat != null){zooKeeper.delete(path,stat.getVersion());}return true;
}

更新数据

@RequestMapping("update")
public  boolean update(String path,String data) throws Exception{Stat stat = zooKeeper.exists(path, false);if (stat != null){zooKeeper.setData(path, data.getBytes(), stat.getVersion());}return true;
}

事件处理

绑定一次事件:

@RequestMapping("addWatch1")
public String addWatch1(String path) throws Exception{Stat stat = zooKeeper.exists(path, false);// 定义一个监视器对象Watcher watcher = new Watcher() {@Overridepublic void process(WatchedEvent event) { // 数据改变事件,而且还是一次性System.out.println("事件类型:" + event.getType());System.out.println("数据发生改变, 请及时更新");try {byte[] data = zooKeeper.getData(path, this, stat);System.out.println("更新后的数据:" + new String(data));} catch (Exception e) {e.printStackTrace();}}};zooKeeper.getData(path, watcher, stat);return "success";
}

绑定永久事件:

@RequestMapping("addWatch2")
public String addWatch2(String path) throws Exception{Stat stat = zooKeeper.exists(path, false);// 只是获取数据, 没有绑定事件byte[] data = zooKeeper.getData(path, null, stat);System.out.println("获取到数据:" + new String(data));// 绑定永久的事件  --> 1 数据变化事件  2  子节点改变事件zooKeeper.addWatch(path, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("event = " + event);// 数据改变事件if (event.getType() == Event.EventType.NodeDataChanged){try {// 重新获取数据Stat stat = zooKeeper.exists(path, false);// 只是获取数据, 没有绑定事件byte[] data = zooKeeper.getData(path, null, stat);System.out.println("更新的数据:" + new String(data));} catch (Exception e) {e.printStackTrace();}// 子节点改变事件} else if (event.getType() == Event.EventType.NodeChildrenChanged){// 重新获取子节点列表System.out.println("子节点数据发生改变");}}}, AddWatchMode.PERSISTENT);return "success";
}

递归绑定事件:对于创建的节点以及子节点都绑定事件

@RequestMapping("addWatch")
public List<String> addWatch(String path) throws Exception{// 1 先获取所有的子节点List<String> children = zooKeeper.getChildren(path, false);// 2 绑定一个监听事件zooKeeper.addWatch(path, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeChildrenChanged){System.out.println("子节点数据发送改变");System.out.println("重新获取子节点数据");try {List<String> children1 = zooKeeper.getChildren(path, false);System.out.println("children1 = " + children1);System.out.println("=========================");} catch (Exception e) {e.printStackTrace();}} else if (event.getType() == Event.EventType.NodeDataChanged){System.out.println("节点数据发生改变");try {byte[] data = zooKeeper.getData(path, false, new Stat());System.out.println("获取到的数据是:" + new String(data));} catch (Exception e) {e.printStackTrace();}}}}, AddWatchMode.PERSISTENT_RECURSIVE);return children;
}

服务器的动态感知

服务注册

创建一个 SpringBoot 项目

  1. 导入对应的依赖包
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.0</version>
</dependency>
  1. 实例化一个 Zookeeper 的客户端连接

  2. 在连接成功以后,开始创建一个对应的临时顺序节点,注册自己的 ip 和端口

启动项目,连接zk,并且注册地址和端口信息

  • 配置文件
server.port=8888
server.host=192.168.48.1
  • 实现代码
// 注册服务
@SpringBootApplication
public class ZookeeperSeckillServerApplication{private static String ZK_SERVER_ADDR = "192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";private static int SESSION_TIMEOUT = 30000;private static String PATH = "/servers";private static String SUB_PATH = "/seckillServer";@Value("${server.host}")private String host;@Value("${server.port}")private String port;private ZooKeeper zooKeeper;public static void main(String[] args) {SpringApplication.run(ZookeeperSeckillServerApplication.class, args);}@Beanpublic ZooKeeper zooKeeper() throws  Exception {// 参数1: 连接地址和端口, 参数2: 会话超过事件, 参数3:事件监听程序zooKeeper = new ZooKeeper(SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("event = " + event);if(event.getState() == Event.KeeperState.SyncConnected){System.out.println("zookeeper客户端连接成功");//注册对应的信息try {zooKeeper.create(PATH + SUB_PATH, (host + ":" + port).getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);} catch (Exception e) {e.printStackTrace();}}}});return zooKeeper;}
}

服务发现

创建一个 SpringBoot 项目

  1. 导入依赖
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.6.0</version>
</dependency>
  1. 创建一个 Zookeeper 客户端连接
  2. 连接成功以后,获取地址列表
  3. 绑定子节点改变事件(每次改变获取最新的的服务地址)

启动项目,连接 zk,并且获取服务地址列表

注册永久的事件监听:

@SpringBootApplication
public class ZookeeperOrderServerApplication {private static String ZK_SERVER_ADDR = "192.168.48.100:2181,192.168.48.100:2182,192.168.48.100:2183";private static int SESSION_TIMEOUT = 30000;private static String PATH = "/servers";public static List<String> addrList;// volatile: 保证在多线程之间的变量的可见性private volatile ZooKeeper zooKeeper;public static void main(String[] args) {SpringApplication.run(ZookeeperOrderServerApplication.class, args);}@Beanpublic ZooKeeper zooKeeper() throws Exception{// 第一个参数: 连接地址和端口 第二个参数: 会话超时时间, 第三个参数: 事件监听程序zooKeeper = new ZooKeeper(ZK_SERVER_ADDR, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println("event = " + event);if(event.getState()== Event.KeeperState.SyncConnected){System.out.println("zookeeper客户端连接成功");try {//1 获取对应的地址列表getData();//2 绑定永久的事件监听zooKeeper.addWatch(PATH, new Watcher() {@Overridepublic void process(WatchedEvent event) {// 开启另外的线程处理try {getData();} catch (Exception e) {e.printStackTrace();}}}, AddWatchMode.PERSISTENT);} catch (Exception e) {e.printStackTrace();}}}//获取数据private void getData() throws KeeperException, InterruptedException {List<String> serverAddr = zooKeeper.getChildren(PATH, null);List<String> tempList = new ArrayList<>();for (String path : serverAddr) {//获取节点路径数据byte[] data = zooKeeper.getData(PATH + "/" + path, null, new Stat());String addrInfo = new String(data);// 把数据添加到临时列表tempList.add(addrInfo);}addrList = tempList;System.out.println("获取到秒杀服务的最新地址\n" + addrList);}});return zooKeeper;}
}

分布式锁业务处理

为什么程序中需要锁?

  • 多任务环境:多个任务同时执行,可以是多线程,也可以是多进程
  • 多个任务的资源共享操作:所有的任务都需要对同一资源进行写操作
  • 对资源的访问是互斥的:对于资源的访问,多个任务同时执行,同一时间只能一个任务访问资源,其他的任务处于阻塞状态

锁的基本概念:

  • 竞争锁:任务通过竞争获取锁才能对该资源进行操作
    公平竞争:按照一定的顺序,先来先执行
    非公平竞争:没有顺序,不管先后顺序执行
  • 占有锁:当有一个任务在对资源进行更新时,其他任务都不可以对这个资源进行操作
  • 任务阻塞
  • 释放锁:直到该任务完成更新,释放资源

锁的应用场景:

  • 单机环境(一个虚拟机中)
  • 分布式环境_同名节点
  • 分布式环境_顺序节点

单机环境(一个虚拟机中)

业务实现:

// 订单ID生成器
public class OrderIDGenerator {private int count = 0;public synchronized String getId(){SimpleDateFormat sdf = new SimpleDateFormat("yyyy-dd-mm");String format = sdf.format(new Date());try {// 模拟网络延迟TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}return format + "-" + (++count);}
}

测试实现:

public class OrderService implements Runnable {private OrderIDGenerator orderIDGenerator = null;private static CountDownLatch countDownLatch = new CountDownLatch(50);private static Set<String> result = new HashSet<>(50);public OrderService(OrderIDGenerator orderIDGenerator) {this.orderIDGenerator = orderIDGenerator;}public static void main(String[] args) throws Exception{OrderIDGenerator idGenerator = new OrderIDGenerator();System.out.println("开始模拟多线程生成订单号");for(int i = 0; i < 50; i++){new Thread(new OrderService(idGenerator)).start();}countDownLatch.await();System.out.println("生成的订单号个数:" + result.size());System.out.println("======================");for (String order : result) {System.out.println(order);}}@Overridepublic void run() {result.add(orderIDGenerator.getId());countDownLatch.countDown();}
}

分布式环境_同名节点


分布式锁业务流程分析:

分布式锁流程图:

业务实现:

@RestController
public class OrderController {private  RestTemplate restTemplate = new RestTemplate();@Autowiredprivate ZooKeeper zooKeeper;private String path = "/locks";private String node = "/orderIdLock";@RequestMapping("createOrder")public String createOrder() throws Exception{// 获取idif (tryLock()) {// 调用业务方法String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);System.out.println("获取到的id:" + id);// 释放锁unlock();} else {waitLock();}return "success";}// 竞争锁资源: 尝试获取id, 如果获取到了, 返回true, 否则返回falsepublic boolean tryLock(){try {// 因为不是顺序节点, 对于同一个路径, 只能创建一次String path = zooKeeper.create(this.path + this.node,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);return true;} catch (Exception e) {e.printStackTrace();return false;}}// 释放锁资源public void unlock(){try {Stat stat = zooKeeper.exists(this.path + this.node, false);zooKeeper.delete(this.path + this.node, stat.getVersion());} catch (Exception e) {e.printStackTrace();}}// 阻塞状态public  void waitLock(){try {zooKeeper.getChildren(this.path, new Watcher() {@Overridepublic void process(WatchedEvent event) {try {createOrder(); // 重新创建订单} catch (Exception e) {e.printStackTrace();}}});} catch (Exception e) {e.printStackTrace();}}
}

分布式环境_顺序节点

分布式锁流程:

@RestController
public class Order02Controller {private RestTemplate restTemplate = new RestTemplate();@Autowiredprivate ZooKeeper zooKeeper;private String path = "/locks02";private String node = "/orderIdLock";@RequestMapping("createOrder02")public String createOrder() throws Exception {String currentPath = zooKeeper.create(this.path + this.node,null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);currentPath = currentPath.substring(currentPath.lastIndexOf("/") + 1);//获取idif (tryLock(currentPath)) {// 调用业务方法String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);System.out.println("获取到的id:" + id);// 释放锁unlock(currentPath);} else {waitLock(currentPath);}return "success";}//尝试获取id, 如果获取到了, 返回true, 否则返回false//竞争锁资源public boolean tryLock(String currentPath) {try {//获取到所有的节点List<String> children = zooKeeper.getChildren(this.path, false);Collections.sort(children);if (StringUtils.pathEquals(currentPath, children.get(0))) {return true;} else {return false;}} catch (Exception e) {e.printStackTrace();return false;}}//释放锁资源public void unlock(String currentPath) {try {Stat stat = zooKeeper.exists(this.path + "/" + currentPath, false);zooKeeper.delete(this.path + "/" + currentPath, stat.getVersion());} catch (Exception e) {e.printStackTrace();}}//阻塞状态public void waitLock(String currentPath) {try {CountDownLatch count = new CountDownLatch(1);List<String> children = zooKeeper.getChildren(this.path, false);//获取到前一个节点Collections.sort(children);int index = children.indexOf(currentPath);if (index > 0) {String preNode = children.get(index - 1);//前一个节点删除操作zooKeeper.getData(this.path + "/" + preNode, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType() == Event.EventType.NodeDeleted) {try {String id = restTemplate.getForObject("http://localhost:8080/getId", String.class);System.out.println("获取到的id:" + id);//释放锁unlock(currentPath);count.countDown();} catch (Exception e) {e.printStackTrace();}}}}, new Stat());}count.await();} catch (Exception e) {e.printStackTrace();}}
}

【Java从0到架构师】Zookeeper 应用 - Java 客户端操作、服务器动态感知、分布式锁业务处理相关推荐

  1. 【Java从0到架构师(1),Java中高级面试题总结(全面)

    JSP 九大内置对象 MySQL 基础 + 多表查询 [Java从0到架构师]MySQL 基础 MySQL MySQL 的使用步骤 数据库的内部存储细节 GUI 工具 SQL 语句 DDL 语句 DD ...

  2. 【Java从0到架构师(2),Java面试问题

    新建一个核心配置文件:applicationContext.xml <?xml version="1.0" encoding="UTF-8"?>&l ...

  3. 【Java从0到架构师】Zookeeper - 安装、核心工作机制、基本命令

    分布式基石 Zookeeper 框架全面剖析 Zookeeper 安装.配置.运行 Zookeeper 的核心工作机制 特性 数据结构.节点 基本操作命令 服务器的启动和监控 客户端连接 创建节点 查 ...

  4. 【Java从0到架构师】Zookeeper - 系统高可用、分布式的基本概念、Zookeeper 应用场景

    分布式基石 Zookeeper 框架全面剖析 系统高可用 集群 - 主备集群.主从集群.普通集群 分布式(系统部署方式) 微服务(架构设计方式) 分布式的基本概念 分布式存储.分布式计算 分布式协调服 ...

  5. 【Java从0到架构师】SpringCloud - Eureka、Ribbon、Feign

    SpringCloud 分布式.微服务相关概念 微服务框架构选型 SpringCloud 概述 服务注册与发现 - Eureka 案例项目 Eureka 自我保护机制 微服务调用方式 - Ribbon ...

  6. 【Java从0到架构师】Dubbo 基础 - 设置启动时检查、直接提供者、线程模型、负载均衡、集群容错、服务降级

    Dubbo 分布式 RPC 分布式核心基础 分布式概述 RPC Dubbo Dubbo 入门程序 - XML.注解 部署管理控制台 Dubbo Admin 修改绑定的注册 IP 地址 设置启动时检查 ...

  7. 【Java从0到架构师】SpringCloud - Sleuth、Zipkin、Config

    SpringCloud 链路追踪组件 Sleuth Zipkin 分布式配置中心 - Config Git + Config 分布式配置中心 Java 从 0 到架构师目录:[Java从0到架构师]学 ...

  8. 【Java从0到架构师】SpringCloud - Hystrix、Zuul

    SpringCloud 基本概念 熔断和降级 服务雪崩效应 服务熔断与降级 - Hystrix SpringBoot 集成 Hystrix 熔断降级服务异常报警通知 重点属性 - 熔断隔离策略.超时时 ...

  9. 【Java从0到架构师】RocketMQ 使用 - 集成 SpringBoot

    RocketMQ 消息中间件 集成 SpringBoot 入门案例 生产消息类型 - 同步.异步.一次性 消费模式 - 集群.广播 延时消息 设置消息标签 设置消息的 Key 自定义属性设置 消息过滤 ...

最新文章

  1. BZOJ1058 [ZJOI2007]报表统计 set
  2. java derby 用户安全_Java 7u51安全权限变化,运行derby server被拒,解决方法
  3. vijos 观光旅游 最小环fl 呆详看
  4. 微软推出了Cloud Native Application Bundles和开源ONNX Runtime
  5. P5020-货币系统【背包】
  6. 膜拜大牛!Android开发最佳实践手册全网独一份,终获offer
  7. 在eclipse中使用hadoop插件
  8. Java集合迭代器原理图解_Java Iterator接口遍历单列集合迭代器原理详解
  9. Python学习 - 之super函数
  10. Redis数据库实现原理(划重点)
  11. LeetCode 225. Implement Stack using Queues
  12. (35) css企业命名规范
  13. ajax怎么模拟请求,如何模拟AJAX请求?
  14. 在sealos搭建的k8s集群中自定义kubeconfig文件
  15. tf.get_variable与tf.variable_scope
  16. html生成一维码,一维码条形码生成工具
  17. matlab 汽车 仿真,MATLAB编程与汽车仿真应用
  18. [ thanos源码分析系列 ]thanos sidecar组件源码简析
  19. 古时候有个【百僧问题】,一百馒头一百僧,大僧三个更无争,小僧三人分一个,大小和尚各几丁? *...
  20. Java 多线程学习(4)浅析 LongAdder、LongAccumulator 和 Striped64 的底层实现原理

热门文章

  1. 如何打造一个让粉丝一见就能收钱的朋友圈
  2. 机器是没有思想的,只会安装规定好的电路工作机器是没有思想的,只会安装规定好的电路工作
  3. 下一个十年,互联网升级的大致方向在哪里?
  4. 公司电脑可以做无盘系统吗?怎么优化速度?
  5. CMake的简单使用
  6. SQL Server更新联接概述
  7. 索引sql server_SQL Server索引设计基础和准则
  8. SQL Server 2016 SP1中的新功能和增强功能
  9. 什么是SQL Server数据字典?为什么要创建一个?
  10. 利用T-SQL处理SQL Server数据库表中的重复行