zookeeper实际应用场景
zookeeper能够实现哪些场景
1)订阅发布/配置中心
watcher机制
统一配置管理(disconf)
实现配置信息的集中式原理和数据的动态更新
实现配置中心有俩种模式:push,pull
长轮询
zookeeper采用的是推拉相结合的方式。客户端向服务器端注册自己需要关注的节点。一旦节点数据发生变化,name服务器端会向客户端发送watcher事件通知。客户端收到通知后,主动到服务器端获取更新后的数据。
a 数据量比较小
b 数据内容在运行时发生动态变更
c 集群中的各个机器共享变量

2)分布式锁
2.1 redis setNX 存在则会返回0 不存在则返回数据
2.2 数据库 创建一个表 通过唯一索引的方式
create table(id,methodname..) methodname增加唯一索引
insert 一条数据 xxx delete 删除数据
mysql 有innodb来设置表锁或者行锁
2.3 zookeeper 有序节点
排他锁

共享锁(读锁)

3)负载均衡
请求/数据分摊多个计算单元
4)ID生成器
5)分布式队列
activeMQ kafka
a 先进先出队列
getchildren获取指定根节点下面的子节点
确定自己节点在子节点中的顺序
如果自己不是最小的子节点,监听比自己小的上一个子节点,否则处于等待 接受watcher通知,重复流程
b Barrier模式 =阻碍模式= 围栏模型 满足条件才会触发执行

6)统一命名服务
7)master选举
7*24小时可用 99.999%可用
master-slave模式
master出现故障 slave上位作为master 心跳机制去维持状态 脑裂

1)分布式锁实现

package com.lulf.DistrubuteLock.JavaApi;import java.io.IOException;
import java.util.concurrent.CountDownLatch;import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;public class ZookeeperClient {private final static String CONNECTSTRING = "192.168.48.133:2181,192.168.48.134:2181,"+ "192.168.48.135:2181,192.168.48.136:2181";private static int sessionTimeOut = 5000;//获取连接public static ZooKeeper getInstance() throws IOException, InterruptedException {final CountDownLatch countDownLatch = new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(CONNECTSTRING, sessionTimeOut, new Watcher() {@Overridepublic void process(WatchedEvent paramWatchedEvent) {if (paramWatchedEvent.getState() == Event.KeeperState.SyncConnected) {countDownLatch.countDown();}}});countDownLatch.await();return zooKeeper;}public static int getSessionTimeOut() {return sessionTimeOut;}
}
复制代码
package com.lulf.DistrubuteLock.JavaApi;import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;public class DistributeLock {private static final String ROOT_LOCKS = "/LOCKS"; // 根节点private ZooKeeper zooKeeper;private int sessionTimeOut = 5000;// 会话超时时间private String lockID;// 记录锁节点IDprivate CountDownLatch countDownLatch = new CountDownLatch(1);private final static byte[] data = { 1, 2 };public DistributeLock() throws IOException, InterruptedException {this.zooKeeper = ZookeeperClient.getInstance();this.sessionTimeOut = ZookeeperClient.getSessionTimeOut();}// 获取锁的方法public boolean lock() {try {lockID = zooKeeper.create(ROOT_LOCKS + "/", data, ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);System.out.println(Thread.currentThread().getName() + "->成功创建lock节点[" + lockID + "]开始去竞争锁");List<String> childrenNodes = zooKeeper.getChildren(ROOT_LOCKS, true);// 排序从小到大SortedSet<String> sortedSet = new TreeSet<String>();for (String children : childrenNodes) {sortedSet.add(ROOT_LOCKS + "/" + children);}String first = sortedSet.first();// 拿到最小的节点if (lockID.equals(first)) {// 表示当前就是最小的System.out.println(Thread.currentThread().getName() + "success get lock,lock节点[" + lockID + "]");return true;}SortedSet<String> lessThanLockID = sortedSet.headSet(lockID);if (!lessThanLockID.isEmpty()) {String preLockId = lessThanLockID.last(); // 拿到比当前lockid这个节点更小的上一个节点zooKeeper.exists(preLockId, new LockWatcher(countDownLatch));countDownLatch.await(sessionTimeOut, TimeUnit.MILLISECONDS);// 上面这段代码意味着如果会话超时或者节点被删除System.out.println(Thread.currentThread().getName() + "成功获取锁,lock节点[" + lockID + "]");}return true;} catch (KeeperException e) {e.printStackTrace();} catch (InterruptedException e) {e.printStackTrace();}return false;}// 释放锁的方法public boolean unlock() {System.out.println(Thread.currentThread().getName() + "-->开始释放锁:[" + lockID + "]");try {zooKeeper.delete(lockID, -1);System.out.println("节点[" + lockID + "]被成功删除");return true;} catch (Exception e) {e.getStackTrace().toString();}return false;}public static void main(String[] args) {CountDownLatch latch = new CountDownLatch(10);Random random = new Random();for (int i = 0; i < 10; i++) {new Thread(() -> {DistributeLock lock = null;try {lock = new DistributeLock();latch.countDown();latch.await();lock.lock();Thread.sleep(random.nextInt(500));} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();}}).start();}}
}
复制代码

运行程序2)master选举 实现共享锁:

package com.lulf.DistrubuteLock.zkclient;import java.io.Serializable;public class UserCenter implements Serializable{/*** */private static final long serialVersionUID = -4060228979536051295L;private  int m_id;//机器信息private String mc_name; //机器名称public int getM_id() {return m_id;}public void setM_id(int m_id) {this.m_id = m_id;}public String getMc_name() {return mc_name;}public void setMc_name(String mc_name) {this.mc_name = mc_name;}
}
复制代码
package com.lulf.DistrubuteLock.zkclient;import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;/*** 选主服务* * @author lulf**/
public class MasterSelector {private ZkClient client;private final static String MASTER_PATH = "/master";// 需要争抢的节点private IZkDataListener dataListener;// 注册节点内容发生变化private UserCenter server; // 其他服务器private UserCenter master; // master节点private static boolean isrunning = false;ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);public MasterSelector(UserCenter server,ZkClient client) {this.server = server;this.client=client;this.dataListener = new IZkDataListener() {@Overridepublic void handleDataDeleted(String dataPath) throws Exception {// 节点如果被删除,发起一个选主操作chooseMaster();}@Overridepublic void handleDataChange(String dataPath, Object data) throws Exception {}};}public void start() {// 开始选举if(!isrunning){isrunning=true;client.subscribeDataChanges(MASTER_PATH, dataListener);//注冊节点时间chooseMaster();}}public void stop() {// 停止if(isrunning){isrunning=false;scheduledExecutorService.shutdown();client.unsubscribeDataChanges(MASTER_PATH, dataListener);//取消订阅releaseMaster();}}// 具体选主的服务private void chooseMaster() {if (!isrunning) {System.out.println("当前服务没有启动。。。");return;}try {client.createEphemeral(MASTER_PATH, server);master = server;// 把server节点赋值给masterSystem.out.println(master.getMc_name() + "-->我已经是master,开始领导你们");// 定时器// master释放锁,出现故障scheduledExecutorService.schedule(() -> {releaseMaster();}, 5, TimeUnit.SECONDS);//每5秒释放一次锁} catch (ZkNodeExistsException e) {e.getStackTrace().toString();//表示master已经存在UserCenter userCenter=client.readData(MASTER_PATH, true);if(userCenter==null){chooseMaster();//再次获取master}else{master=userCenter;}}}private void releaseMaster() {// 释放锁(故障模拟)//判断当前是否是master,只有master才需要释放锁if(checkMaster()){client.delete(MASTER_PATH, -1);//删除}}private boolean checkMaster() {// 判断当前的server是否是masterUserCenter userCenter=client.readData(MASTER_PATH);if(userCenter.getMc_name().equals(server.getMc_name())){master=userCenter;return true;}return false;}
}
复制代码
package com.lulf.DistrubuteLock.zkclient;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.SerializableSerializer;public class MasterChooseDemo {private final static String CONNECTSTRING = "192.168.48.133:2181,192.168.48.134:2181,"+ "192.168.48.135:2181,192.168.48.136:2181";public static void main(String[] args) {List<MasterSelector>selectorList=new ArrayList<MasterSelector>();try {  for (int i = 0; i < 10; i++) {ZkClient zkClient=new ZkClient(CONNECTSTRING, 5000,5000,new SerializableSerializer());UserCenter userCenter=new UserCenter();userCenter.setM_id(i);userCenter.setMc_name("lulf_"+i);MasterSelector selector=new MasterSelector(userCenter,zkClient);selectorList.add(selector);selector.start();//触发选举操作TimeUnit.SECONDS.sleep(4);}   } catch (Exception e) {e.getStackTrace().toString();}finally {for (MasterSelector masterSelector : selectorList) {masterSelector.stop();}}}
}
复制代码

curator 提供应用场景封装

curator-reciples
提供了api调用
如:master/leader选举
分布式锁 读锁 写锁
分布式队列
。。。
LeaderLatch 阻塞
写一个master
LeaderSelector 自动抢
每个应用都写一个临时有序节点,根据最小的节点来获取优先点

package com.lulf.DistrubuteLock.curator;import java.util.concurrent.TimeUnit;import javax.swing.tree.ExpandVetoException;import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.retry.ExponentialBackoffRetry;public class MasterSelector {private final static String CONNECTSTRING = "192.168.48.133:2181,192.168.48.134:2181,"+ "192.168.48.135:2181,192.168.48.136:2181";private final static String MASTER_PATH="/curator_master_path";public static void main(String[] args) {CuratorFramework curatorFramework=CuratorFrameworkFactory.builder().connectString(CONNECTSTRING).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();LeaderSelector leaderSelector=new LeaderSelector(curatorFramework, MASTER_PATH, new LeaderSelectorListenerAdapter() {@Overridepublic void takeLeadership(CuratorFramework arg0) throws Exception {System.out.println("获取leader成功");TimeUnit.SECONDS.sleep(2);}});leaderSelector.autoRequeue();leaderSelector.start();//开始选举}
}
复制代码

zookeeper的集群角色
leader
leader是zookeeper集群的核心。
1 事务请求的唯一调度的矗立着,他需要保证事务处理的顺序性
2 集群内部各个服务器的调度者
follower
1 处理客户端非事务请求以及转发事务请求给leader服务器
2 参与事务请求提议的proposal投票 【客户端的一个事务请求需要半数服务投票通过才能通知leader commit,leader会发起一个提案,要求follower投票】
3 参与leader选举的投票
observer
1 观察zookeeper集群中最新状态的变化,并且把这些状态同步到observer服务器上。
2 增加observer不影响集群事务处理能力,同时能提升集群的非事务处理能力

zookeeper的集群组成
zookeeper一般是由2n+1台服务器组成

leader选举
1)leaderElection
2)AuthFastLeaderElection
3)FastLeaderElection

serverID :配置server集群的时候给定服务器的标识id myid
zxid:服务器它运行时产生的数据ID,zxid值越大,标识数据越新
Epoch:选举轮次 sid
server的状态:Looking,Following,Observering,Leading

第一次初始化启动的时候是Looking
1)所有集群中的server都会推荐自己为leader,然后把(myid,zxid,epoch)作为广播信息,广播给集群中的其他server,然后等待其他服务器返回。
2)每个服务器都会接受到来自集群中的其他服务器的投票,及群众的每个服务器在接受到投票之后,都会判断投票的有效性
a)判断逻辑时钟epoch 如果epoch大于自己当前的epoch,说明自己保存的epoch是过期,更新epoch,同事clear其他服务器送过来的选举数据,判断是否需要更新当前自己的选举情况
b)如果Epoch小于目前的epoch,说明对方的epoch过期,意味着对方服务器的选举轮次是过期的,只需要把自己的信息发送给对方
c)如果接收到的epoch等于当前的epoch,根据规则来判断是否有资格获得leader 接受到来自其他服务器的投票后,针对每一个投标,都需要将别人的投票和自己的投票进行pk
ZXID最大的服务器优先
3)统计投票

ZAB协议
拜占庭问题
paxos协议主要就是如何保证在分布式网络环境下,各个服务器如何达成一致最终保证数据的一致性问题。
ZAB协议,基于paxos协议的一个改进。

ZAB协议为分布式协调服务zookeeper专门设计的一种支持奔溃恢复的原子广播协议。
zookeeper并没有完全采用paxos算法,而是采用zab zookeeper stomic broadcast zab协议的原理:
1)在zookeeper的主备模式下,通过zab协议来保证集群中的各个副本数据的一致性
2)zookeeper是使用单一的主进程来接受并处理所有的事务请求,并采用zab协议,把数据的状态变更以事务请求的形式广播到其他节点
3)zab协议在主备模型架构中保证了同一时刻只能有一个主进程来广播服务器的状态变更
4)所有的事务请求必须由全局唯一的服务器来协调处理,这个服务器叫leader,其他叫follower
leader节点主要是负责把客户端的请求转化为一个事务提议(proposal),并且分发给集群中的所有follower节点,再等待所有follower节点反馈,一旦超过半数服务器进行了正确的反馈,nameleader就会commit这个消息。

奔溃恢复
原子广播

zab协议的工作原理
1)什么情况下zab协议会进入奔溃恢复模式
a 当服务器启动时
b 当leader服务器出现网络中断 奔溃 重启的情况
c 集群中已经不存在过半的服务器与该leader保持正常通行

2)zab协议进入奔溃恢复模式会做什么
a 当leader出现问题,zab协议进入奔溃恢复模式,并且选举出新的leader。当新的leader选举出来以后,如果集群中已经有过半机器完成了leader服务器的状态同步(数据同步),退出崩溃恢复。进入消息广播模式
b 当新的机器加入到集群中的时候,如果已经存在leader服务器,那么新加入的服务器就会自觉进入数据恢复模式,找到leader进行数据同步

问题:
假设一个事务在leader服务器被提交了,并且已经有过半的follower返回了ack。在leader节点把commit消息发送给follower机器之前,leader服务器挂了怎么办?
zab协议,一定需要保证已经被leader提交的事务也能够被所有follower提交。
zab协议需要协议,在奔溃恢复过程中跳过那些已经被丢弃的事务。

zookeeper学习03 使用场景相关推荐

  1. ZooKeeper学习第七期--ZooKeeper一致性原理

    ZooKeeper学习第六期---ZooKeeper机制架构 ZooKeeper学习第一期---Zookeeper简单介绍 ZooKeeper学习第二期--ZooKeeper安装配置 ZooKeepe ...

  2. ZooKeeper学习第四期---构建ZooKeeper应用

    ZooKeeper学习第一期---Zookeeper简单介绍 ZooKeeper学习第二期--ZooKeeper安装配置 ZooKeeper学习第三期---Zookeeper命令操作 ZooKeepe ...

  3. ZooKeeper学习专题之四:示例 实时更新server列表

    2019独角兽企业重金招聘Python工程师标准>>> [转载请注明作者和原文链接,  如有谬误, 欢迎在评论中指正. ] 通过之前的3篇博文, 讲述了ZooKeeper的基础知识点 ...

  4. Zookeeper学习之源生API的使用(java与shell操作zookeeper)。

    如果不会搭建zookeeper环境:请看此文章:Zookeeper学习之集群环境搭建 1.操作zookeeper(shell) 启动zookeeper客户端:zkCli.sh; 根据提示命令进行操作: ...

  5. Zookeeper学习笔记

    Zookeeper学习笔记 概念 Zookeeper工作机制 特点 数据结构 应用场景 统一命名服务 统一配置管理 统一集群管理 软负载均衡 Zookeeper本地安装 本地模式安装 安装前准备 配置 ...

  6. 线上分享会预告之深度学习在3D场景中的应用

    大家好.上周我们迎来了第一期的线上分享,三维模型检索技术介绍,此次分享是一次接力形式的分享,每周都将有一位主讲人分享,希望更多的小伙伴加入我们一起分享,也是给自己一个机会锻炼.这里先预告一下,线上直播 ...

  7. WSDM 2022 | 基于元学习的多场景多任务商家建模

    丨目录: · 前言 · 背景 · 问题定义 · 算法建模 · 实验 · 总结展望 · 参考文献 · 关于我们 ▐ 前言 面向淘系商家或广告主的商家理解与建模,是阿里妈妈客户生态建设的关键一环.有别于C ...

  8. api 创建zookeeper客户端_一文了解 Zookeeper 基本原理与应用场景

    Zookeeper 是一个高性能.高可靠的分布式协调系统,是 Google Chubby 的一个开源实现,目前在分布式系统.大数据领域中使用非常广泛.本文将介绍 Zookeeper 集群架构.数据模型 ...

  9. ZooKeeper学习笔记(八):ZooKeeper集群写数据原理

    写数据原理 写流程直接请求发送给Leader节点 这里假设集群中有三个zookeeper服务端 ACK (Acknowledge character)即是确认字符,在数据通信中,接收站发给发送站的一种 ...

最新文章

  1. 你认识的世界与客观世界间差了N光年
  2. 机器学习基础——RandomForest
  3. 关于某些 Visual Studio Code 扩展程序无法在浏览器中运行的原因
  4. Spark SQL(八)之基于物品的相似度公式
  5. 【Direct3D游戏开发】——DirectInput 让世界动起来
  6. synchronized的4种用法
  7. Python学习汇总,做数据采集的一些小技巧,收获满满
  8. python默认编码有什么用_Python2.7.8的默认编码是什么?
  9. Java设计person类,有姓名,年龄,性别。要求:该类至多只能创建一男,一女两个对象。
  10. SQL Server 2012安装过程中出现 NetFx3错误
  11. pythonwin7下载教程_如何在win7下安装Python及配置!python win7下载教程
  12. 古籍研究社系列第6部《迟来的翅膀》读后感……吗?
  13. windows检查磁盘命令
  14. Python房价分析和可视化<anjuke二手房>
  15. 解决IMP-00058和IMP-00000
  16. 建设网站:购买域名和主机的原则你知道吗?
  17. 3W-60W恒流LED驱动电源AH3103
  18. 红黑树系列1——红黑树的建立
  19. 招商证券携手联想Filez谱写云中办公新篇章
  20. SolrJ更新索引数据

热门文章

  1. Android 第三课 Activity的生命周期
  2. 从头开始vue创建项目_我正在以设计师的身份开始一个被动的收入项目。 从头开始。...
  3. 最优资产组合步骤_重新设计投资组合网站之前,请按照以下5个步骤进行操作
  4. ux和ui_糟糕的UI与UX番茄酱模因
  5. figma下载_通过构建7个通用UI动画来掌握Figma中的动画
  6. 源码群友问:你这么多项目是怎么进行技术选型的?
  7. mac使用brew update无反应解决办法
  8. 后端model传入前端JSP页面中的值判断后再取值
  9. 消息模式在实际开发应用中的优势
  10. HashCode和hashMap、hashTable