1.前言

之前自己写了一些关于Zookeeper的基础知识,Zookeeper作为一种协调分布式应用高性能的调度服务,实际的应用场景也非常的广泛,这里主要通过几个例子来具体的说明Zookeeper在特定场景下的使用方式(下面的这些功能估计consul和etcd也能实现,以后学到了再说吧)。

2.具体应用

2.1.一致性配置管理

我们在开发的时候,有时候需要获取一些公共的配置,比如数据库连接信息等,并且偶然可能需要更新配置。如果我们的服务器有N多台的话,那修改起来会特别的麻烦,并且还需要重新启动。这里Zookeeper就可以很方便的实现类似的功能。

2.1.1.思路

  1. 将公共的配置存放在Zookeeper的节点中
  2. 应用程序可以连接到Zookeeper中并对Zookeeper中配置节点进行读取或者修改(对于写操作可以进行权限验证设置),下面是具体的流程图:

2.1.2.事例

数据库配置信息一致性的维护

配置类:

public class CommonConfig implements Serializable{// 数据库连接配置private String dbUrl;private String username;private String password;private String driverClass;public CommonConfig() {}public CommonConfig(String dbUrl, String username, String password, String driverClass) {super();this.dbUrl = dbUrl;this.username = username;this.password = password;this.driverClass = driverClass;}public String getDbUrl() {return dbUrl;}public void setDbUrl(String dbUrl) {this.dbUrl = dbUrl;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getDriverClass() {return driverClass;}public void setDriverClass(String driverClass) {this.driverClass = driverClass;}@Overridepublic String toString() {return "CommonConfig:{dbUrl:" + this.dbUrl +", username:" + this.username + ", password:" + this.password + ", driverClass:" + this.driverClass + "}";}
}

配置管理中心

  1. 获取本地配置信息

  2. 修改配置,并同步

同步配置信息到Zookeeper服务器

public class ZkConfigMng {private String nodePath = "/commConfig";private CommonConfig commonConfig;private ZkClient zkClient;public CommonConfig initConfig(CommonConfig commonConfig) {if(commonConfig == null) {this.commonConfig = new CommonConfig("jdbc:mysql://127.0.0.1:3306/mydata?useUnicode=true&characterEncoding=utf-8","root", "root", "com.mysql.jdbc.Driver");   } else {this.commonConfig = commonConfig;}return this.commonConfig;}/*** 更新配置* * @param commonConfig* @return*/public CommonConfig update(CommonConfig commonConfig) {if(commonConfig != null) {this.commonConfig = commonConfig;}syncConfigToZookeeper();return this.commonConfig;}public void syncConfigToZookeeper() {if(zkClient == null) {zkClient = new ZkClient("127.0.0.1:2181");}if(!zkClient.exists(nodePath)) {zkClient.createPersistent(nodePath);}zkClient.writeData(nodePath, commonConfig);}
}

以上是提供者,下面我们需要一个客户端获取这些配置

public class ZkConfigClient implements Runnable {private String nodePath = "/commConfig";private CommonConfig commonConfig;@Overridepublic void run() {ZkClient zkClient = new ZkClient(new ZkConnection("127.0.0.1:2181", 5000));while (!zkClient.exists(nodePath)) {System.out.println("配置节点不存在!");try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}// 获取节点commonConfig = (CommonConfig)zkClient.readData(nodePath);System.out.println(commonConfig.toString());zkClient.subscribeDataChanges(nodePath, new IZkDataListener() {@Overridepublic void handleDataDeleted(String dataPath) throws Exception {if(dataPath.equals(nodePath)) {System.out.println("节点:" + dataPath + "被删除了!");}}@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {if(dataPath.equals(nodePath)) {System.out.println("节点:" + dataPath + ", 数据:" + data + " - 更新");commonConfig = (CommonConfig) data;}}});}}

下面启动Main函数

配置管理服务启动

public static void main(String[] args) throws InterruptedException {SpringApplication.run(ZookeeperApiDemoApplication.class, args);ZkConfigMng zkConfigMng = new ZkConfigMng();zkConfigMng.initConfig(null);zkConfigMng.syncConfigToZookeeper();TimeUnit.SECONDS.sleep(10);// 修改值zkConfigMng.update(new CommonConfig("jdbc:mysql://192.168.1.122:3306/mydata?useUnicode=true&characterEncoding=utf-8","root", "wxh", "com.mysql.jdbc.Driver"));}
}

客户端启动:

public static void main(String[] args) throws InterruptedException {SpringApplication.run(ZookeeperApiDemoApplication.class, args);ExecutorService executorService = Executors.newFixedThreadPool(3);// 模拟多个客户端获取配置executorService.submit(new ZkConfigClient());executorService.submit(new ZkConfigClient());executorService.submit(new ZkConfigClient());}
}

2.2.分布式锁

在我们日常的开发中,如果是单个进程中对共享资源的访问,我们只需要用synchronized或者lock就能实现互斥操作。但是对于跨进程、跨主机、跨网络的共享资源似乎就无能为力了。

2.1.1.思路

  1. 首先zookeeper中我们可以创建一个/distributed_lock持久化节点
  2. 然后再在/distributed_lock节点下创建自己的临时顺序节点,比如:/distributed_lock/task_00000000008
  3. 获取所有的/distributed_lock下的所有子节点,并排序
  4. 判读自己创建的节点是否最小值(第一位)
  5. 如果是,则获取得到锁,执行自己的业务逻辑,最后删除这个临时节点。
  6. 如果不是最小值,则需要监听自己创建节点前一位节点的数据变化,并阻塞。
  7. 当前一位节点被删除时,我们需要通过递归来判断自己创建的节点是否在是最小的,如果是则执行5);如果不是则执行6)(就是递归循环的判断)

下面是具体的流程图:

2.1.3.事例

public class DistributedLock {// 常亮static class Constant {private static final int SESSION_TIMEOUT = 10000;private static final String CONNECTION_STRING = "127.0.0.1:2181";private static final String LOCK_NODE = "/distributed_lock";private static final String CHILDREN_NODE = "/task_";}private ZkClient zkClient;public DistributedLock() {// 连接到ZookeeperzkClient = new ZkClient(new ZkConnection(Constant.CONNECTION_STRING));if(!zkClient.exists(Constant.LOCK_NODE)) {zkClient.create(Constant.LOCK_NODE, "分布式锁节点", CreateMode.PERSISTENT);}}public String getLock() {try {// 1。在Zookeeper指定节点下创建临时顺序节点String lockName = zkClient.createEphemeralSequential(Constant.LOCK_NODE + Constant.CHILDREN_NODE, "");// 尝试获取锁acquireLock(lockName);return lockName;} catch(Exception e) {e.printStackTrace();}return null;}/*** 获取锁* @throws InterruptedException */public Boolean acquireLock(String lockName) throws InterruptedException {// 2.获取lock节点下的所有子节点List<String> childrenList = zkClient.getChildren(Constant.LOCK_NODE);// 3.对子节点进行排序,获取最小值Collections.sort(childrenList, new Comparator<String>() {@Overridepublic int compare(String o1, String o2) {return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);}});// 4.判断当前创建的节点是否在第一位int lockPostion = childrenList.indexOf(lockName.split("/")[lockName.split("/").length - 1]);if(lockPostion < 0) {// 不存在该节点throw new ZkNodeExistsException("不存在的节点:" + lockName);} else if (lockPostion == 0) {// 获取到锁System.out.println("获取到锁:" + lockName);return true;} else if (lockPostion > 0) {// 未获取到锁,阻塞System.out.println("...... 未获取到锁,阻塞等待 。。。。。。");// 5.如果未获取得到锁,监听当前创建的节点前一位的节点final CountDownLatch latch = new CountDownLatch(1);IZkDataListener listener = new IZkDataListener() {@Overridepublic void handleDataDeleted(String dataPath) throws Exception {// 6.前一个节点被删除,当不保证轮到自己System.out.println("。。。。。。前一个节点被删除  。。。。。。");acquireLock(lockName);latch.countDown();}@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {// 不用理会}};try {zkClient.subscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);latch.await();} finally {zkClient.unsubscribeDataChanges(Constant.LOCK_NODE + "/" + childrenList.get(lockPostion - 1), listener);}}return false;}/*** 释放锁(删除节点)* * @param lockName*/public void releaseLock(String lockName) {zkClient.delete(lockName);}public void closeZkClient() {zkClient.close();}
}@SpringBootApplication
public class ZookeeperDemoApplication {public static void main(String[] args) throws InterruptedException {SpringApplication.run(ZookeeperDemoApplication.class, args);DistributedLock lock = new DistributedLock();String lockName = lock.getLock();/** * 执行我们的业务逻辑*/if(lockName != null) {lock.releaseLock(lockName);}lock.closeZkClient();}
}

2.3.分布式队列

在日常使用中,特别是像生产者消费者模式中,经常会使用BlockingQueue来充当缓冲区的角色。但是在分布式系统中这种方式就不能使用BlockingQueue来实现了,但是Zookeeper可以实现。

2.1.1.思路

  1. 首先利用Zookeeper中临时顺序节点的特点
  2. 当生产者创建节点生产时,需要判断父节点下临时顺序子节点的个数,如果达到了上限,则阻塞等待;如果没有达到,就创建节点。
  3. 当消费者获取节点时,如果父节点中不存在临时顺序子节点,则阻塞等待;如果有子节点,则获取执行自己的业务,执行完毕后删除该节点即可。
  4. 获取时获取最小值,保证FIFO特性。

2.1.2.事例

这个是一个消费者对一个生产者,如果是多个消费者对多个生产者,对代码需要调整。

public interface AppConstant {static String ZK_CONNECT_STR = "127.0.0.1:2181";static String NODE_PATH = "/mailbox";static String CHILD_NODE_PATH = "/mail_";static int MAILBOX_SIZE = 10;
}public class MailConsumer implements Runnable, AppConstant{private ZkClient zkClient;private Lock lock;private Condition condition;public MailConsumer() {lock = new ReentrantLock();condition = lock.newCondition();zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));System.out.println("sucess connected to zookeeper server!");// 不存在就创建mailbox节点if(!zkClient.exists(NODE_PATH)) {zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);}}@Overridepublic void run() {IZkChildListener listener = new IZkChildListener() {     @Overridepublic void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());// 还是要判断邮箱是否为空if(currentChilds.size() > 0) {// 唤醒等待的线程try {lock.lock();condition.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}};// 监视子节点的改变,不用放用while循环中,监听一次就行了,不需要重复绑定zkClient.subscribeChildChanges(NODE_PATH, listener);try {//循环随机发送邮件模拟真是情况while(true) {// 判断是否可以发送邮件checkMailReceive();// 接受邮件List<String> mailList = zkClient.getChildren(NODE_PATH);// 如果mailsize==0,也没有关系;可以直接循环获取就行了if(mailList.size() > 0) {Collections.sort(mailList, new Comparator<String>() {@Overridepublic int compare(String o1, String o2) {return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);}});// 模拟邮件处理(0-1S)TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));zkClient.delete(NODE_PATH + "/" + mailList.get(0));System.out.println("mail has been received:" + NODE_PATH + "/" + mailList.get(0));}} }catch (Exception e) {e.printStackTrace();} finally {zkClient.unsubscribeChildChanges(NODE_PATH, listener);}}private void checkMailReceive() {try {lock.lock();// 判断邮箱是为空List<String> mailList = zkClient.getChildren(NODE_PATH);System.out.println("mailbox size: " + mailList.size());if(mailList.size() == 0) {// 邮箱为空,阻塞消费者,直到邮箱有邮件System.out.println("mailbox is empty, please wait 。。。");condition.await();// checkMailReceive();}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}
}public class MailProducer implements Runnable, AppConstant{private ZkClient zkClient;private Lock lock;private Condition condition;/*** 初始化状态*/public MailProducer() {lock = new ReentrantLock();condition = lock.newCondition();zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));System.out.println("sucess connected to zookeeper server!");// 不存在就创建mailbox节点if(!zkClient.exists(NODE_PATH)) {zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);}}@Overridepublic void run() {IZkChildListener listener = new IZkChildListener() {     @Overridepublic void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {System.out.println("Znode["+parentPath + "] size:" + currentChilds.size());// 还是要判断邮箱是否已满if(currentChilds.size() < MAILBOX_SIZE) {// 唤醒等待的线程try {lock.lock();condition.signal();} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}}};// 监视子节点的改变,不用放用while循环中,监听一次就行了,不需要重复绑定zkClient.subscribeChildChanges(NODE_PATH, listener);try {//循环随机发送邮件模拟真是情况while(true) {// 判断是否可以发送邮件checkMailSend();// 发送邮件String cretePath = zkClient.createEphemeralSequential(NODE_PATH + CHILD_NODE_PATH, "your mail");System.out.println("your mail has been send:" + cretePath);// 模拟随机间隔的发送邮件(0-10S)TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));} }catch (Exception e) {e.printStackTrace();} finally {zkClient.unsubscribeChildChanges(NODE_PATH, listener);}}private void checkMailSend() {try {lock.lock();// 判断邮箱是否已满List<String> mailList = zkClient.getChildren(NODE_PATH);System.out.println("mailbox size: " + mailList.size());if(mailList.size() >= MAILBOX_SIZE) {// 邮箱已满,阻塞生产者,直到邮箱有空间System.out.println("mailbox is full, please wait 。。。");condition.await();checkMailSend();}} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}
}

2.4.均衡负载

首先我们需要简单的理解分布式和集群,通俗点说:分布式就是将一个系统拆分到多个独立运行的应用中(有可能在同一台主机也有可能在不同的主机上),集群就是将单个独立的应用复制多分放在不同的主机上来减轻服务器的压力。而Zookeeper不仅仅可以作为分布式集群的服务注册调度中心(例如dubbo),也可以实现集群的负载均衡。

2.4.1.思路

  1. 首先我们要理解,如果是一个集群,那么他就会有多台主机。所以,他在Zookeeper中信息的存在应该是如下所示:
  2. 如上的结构,当服务调用方调用服务时,就可以根据特定的均衡负载算法来实现对服务的调用(调用前需要监听/service/serviceXXX节点,以更新列表数据)

2.4.2.事例

/*** 服务提供者* * @author Administrator**/
public class ServiceProvider {// 静态常量static String ZK_CONNECT_STR = "127.0.0.1:2181";static String NODE_PATH = "/service";static String SERIVCE_NAME = "/myService";private ZkClient zkClient;public ServiceProvider() {zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));System.out.println("sucess connected to zookeeper server!");// 不存在就创建NODE_PATH节点if(!zkClient.exists(NODE_PATH)) {zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);}}public void registryService(String localIp, Object obj) {if(!zkClient.exists(NODE_PATH + SERIVCE_NAME)) {zkClient.create(NODE_PATH + SERIVCE_NAME, "provider services list", CreateMode.PERSISTENT);}// 对自己的服务进行注册zkClient.createEphemeral(NODE_PATH + SERIVCE_NAME + "/" + localIp, obj);System.out.println("注册成功![" + localIp + "]");}
}/*** 消费者,通过某种均衡负载算法选择某一个提供者* * @author Administrator**/
public class ServiceConsumer {// 静态常量static String ZK_CONNECT_STR = "127.0.0.1:2181";static String NODE_PATH = "/service";static String SERIVCE_NAME = "/myService";private List<String> serviceList = new ArrayList<String>();private ZkClient zkClient;public ServiceConsumer() {zkClient = new ZkClient(new ZkConnection(ZK_CONNECT_STR));System.out.println("sucess connected to zookeeper server!");// 不存在就创建NODE_PATH节点if(!zkClient.exists(NODE_PATH)) {zkClient.create(NODE_PATH, "this is mailbox", CreateMode.PERSISTENT);}}/*** 订阅服务*/public void subscribeSerivce() {serviceList = zkClient.getChildren(NODE_PATH + SERIVCE_NAME);zkClient.subscribeChildChanges(NODE_PATH + SERIVCE_NAME, new IZkChildListener() {@Overridepublic void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {serviceList = currentChilds;}});}/*** 模拟调用服务*/public void consume() {//负载均衡算法获取某台机器调用服务int index = new Random().nextInt(serviceList.size());System.out.println("调用[" + NODE_PATH + SERIVCE_NAME + "]服务:" + serviceList.get(index));}
}

3.总结

Zookeeper是一个功能非常强大的应用,除了上面几种应用外,还有命名服务、分布式协调通知等也是常用的场景。

Zookeeper典型应用场景介绍相关推荐

  1. Zookeeper 典型应用场景介绍

    作者 | 永远_不会懂 来源 | https://blog.csdn.net/u013468915/article/details/80955110 1.前言 之前自己写了一些关于Zookeeper的 ...

  2. 【ZooKeeper Notes 28】ZooKeeper典型应用场景一览

    ZooKeeper是一个高可用的分布式数据管理与系统协调框架.基于对Paxos算法的实现,使该框架保证了分布式环境中数据的强一致性,也正是基于这样的特性,使得ZooKeeper解决很多分布式问题.网上 ...

  3. ZooKeeper典型应用场景一览

    原文地址:http://jm-blog.aliapp.com/?p=1232 ZooKeeper典型应用场景一览 数据发布与订阅(配置中心) 发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据 ...

  4. Zookeeper分布式一致性原理(八):Zookeeper典型应用场景

    1. 简介 Zookeeper是一个高可用的分布式数据管理和协调框架,并且能够很好的保证分布式环境中数据的一致性.在越来越多的分布式系统(Hadoop.HBase.Kafka)中,Zookeeper都 ...

  5. 基于MRS-Hudi构建数据湖的典型应用场景介绍

    一.传统数据湖存在的问题与挑战 传统数据湖解决方案中,常用Hive来构建T+1级别的数据仓库,通过HDFS存储实现海量数据的存储与水平扩容,通过Hive实现元数据的管理以及数据操作的SQL化.虽然能够 ...

  6. ZooKeeper 典型应用场景有哪些?

    ZooKeeper 概览中,我们介绍到使用其通常被用于实现诸如数据发布/订阅.负载均衡.命名服务.分布式协调/通知.集群管理.Master 选举.分布式锁和分布式队列等功能. 下面选 3 个典型的应用 ...

  7. ZooKeeper学习之路 (七)ZooKeeper设计特点及典型应用场景

    ZooKeeper 特点/设计目的 ZooKeeper 作为一个集群提供数据一致的协调服务,自然,最好的方式就是在整个集群中的 各服务节点进行数据的复制和同步. 数据复制的好处 1.容错:一个节点出错 ...

  8. Zookeeper主要应用场景

    ZooKeeper典型应用场景一览 数据发布与订阅(配置中心) 发布与订阅模型,即所谓的配置中心,顾名思义就是发布者将数据发布到ZK节点上,供订阅者动态获取数据,实现配置信息的集中式管理和动态更新.例 ...

  9. Zookeeper的典型应用场景(2)

    此文知识来自于:<从Paxos到Zookeeper分布式一致性原理与实践>第六章 集群管理(子节点) Master选举(同时创建节点) 分布式锁(同时创建节点) 分布式队列(创建顺序节点) ...

最新文章

  1. 常用的富文本框插件FreeTextBox、CuteEditor、CKEditor、FCKEditor、TinyMCE、KindEditor ;和CKEditor实例...
  2. Flink State和容错机制
  3. KVM 虚拟化原理探究--启动过程及各部分虚拟化原理
  4. git clone failed. Could not read from remote repository
  5. java如何读取自定义log4j2_spring boot自定义log4j2日志文件的实例讲解
  6. 小 Q 与函数求和 1(牛客练习赛 81 E)
  7. html中评论应该怎么写,HTML-评论
  8. AI开发者福音!阿里云推出国内首个基于英伟达NGC的GPU优化容器
  9. bee 字符串转int_beego中gbk和utf8编码转换问题
  10. 高等组合学笔记(十三):组合反演,反演公式
  11. 剑指offer之正则表达式匹配
  12. [洛谷P1082]同余方程
  13. 精进 Quartz—Quartz大致介绍(一)
  14. 2020教师计算机考试笔试题,2020教师招聘考试《信息技术》练习题之答案解析
  15. MATLAB Codesys,CoDeSys学习日记(一)
  16. win10-11全版本下载地址MSDN纯净版ISO-20220217更新
  17. 《酒干倘卖无》歌曲什么意思,看了《搭错车》感人电影就知道了
  18. 迈克尔.杰克逊时代的意义
  19. java 碳架山地车寿命_关于自行车的4大“谣言” 铝合金车架寿命只有五年
  20. FileZilla的下载与安装

热门文章

  1. 有哪些比较好用的UI设计网站
  2. 2022年六大国产CPU发展的怎么样了?
  3. 角蜂鸟AI视觉套件:(三)ROS案例
  4. android 录屏 github,GitHub - mabeijianxi/ScreenRecordPushStream: Android 录屏推流demo
  5. ubuntu20 aliyun sourcelist
  6. python爬取电影网站存储于数据库_python爬虫 猫眼电影和电影天堂数据csv和mysql存储过程解析...
  7. [modem]AP和BP简介
  8. NVIDIA 驱动下载网址
  9. 直通车助流量飙升【干货实操】
  10. Ubuntu18.04安装小觅双目深度相机SDK——MYNT-EYE-D-SDK