1.前言
curator由Netflix的工程师开发,主要目的为了基于zookeeper的应用变得简单可靠,在2013年成为apache的顶级项目。curator基于zookeeper,但提供了更高级别的
API抽象以及工具集,并对zookeeper提供的常用功能进行了封装和扩充,例如leader选举、分布式锁、服务发现、缓存等功能,从而使开发者在实现这些功能时不用在实现
哪些无聊的程式化代码段,也减少出错的可能性。

2.本文主要讲解如何利用curator实现leader选举,并对curator提供的两种实现形式进行对比
2.1 通过LeaderSelector进行leader选举
建议通过LeaderSelectorListenerAdapter实现,当某个实例成为leader后,会调用对应实例的takeLeadership方法,此方法执行期间,此实例一直占着leader权,
当takeLeadership方法执行结束,实例自动释放leader权,所有实例重新进行leader选举

package chengf.falcon.curator.leader;

import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;import org.apache.curator.framework.recipes.leader.LeaderSelector;import java.io.Closeable;import java.io.IOException;import java.util.concurrent.TimeUnit;import java.util.concurrent.atomic.AtomicInteger;

/** * @author: 作者: chengaofeng * @date: 创建时间:2018-06-07 15:26:54 * @Description: TODO * @version V1.0 */public class SelectorClient extends LeaderSelectorListenerAdapter implements Closeable {    private final String name;    private final LeaderSelector leaderSelector;  private final AtomicInteger leaderCount = new AtomicInteger();

 public SelectorClient(CuratorFramework client, String path, String name) {        this.name = name;

      // 利用一个给定的路径创建一个leader selector       // 执行leader选举的所有参与者对应的路径必须一样      // 本例中SelectorClient也是一个LeaderSelectorListener,但这不是必须的。        leaderSelector = new LeaderSelector(client, path, this);

       // 在大多数情况下,我们会希望一个selector放弃leader后还要重新参与leader选举      leaderSelector.autoRequeue(); }

   /**    * 启动当前实例参与leader选举    *     * @throws IOException    */   public void start() throws IOException {      // leader选举是在后台处理的,所以这个方法会立即返回     leaderSelector.start();   }

   @Override    public void close() throws IOException {      leaderSelector.close();   }

   /**    * 当前实例成为leader时,会执行下面的方法,这个方法执行结束后,当前实例就自动释放leader了,所以在想放弃leader前此方法不能结束  */   @Override    public void takeLeadership(CuratorFramework client) throws Exception {

      final int waitSeconds = (int) (5 * Math.random()) + 1;

        System.out.println(name + " 现在是leader了,持续成为leader " + waitSeconds + " 秒.");     System.out.println(name + " 之前已经成为了 " + leaderCount.getAndIncrement() + " 次leader.");      try {         Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));     } catch (InterruptedException e) {            System.err.println(name + " was interrupted.");            Thread.currentThread().interrupt();       } finally {           System.out.println(name + " 释放leader权.\n");        } }}

执行类

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *   http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied.  See the License for the * specific language governing permissions and limitations * under the License. */package chengf.falcon.curator.leader;

import com.google.common.collect.Lists;import org.apache.curator.utils.CloseableUtils;import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;import java.io.BufferedReader;import java.io.InputStreamReader;import java.util.List;

public class LeaderSelectorExample {    private static final int CLIENT_QTY = 10;

  private static final String PATH = "/examples/leader";

   public static void main(String[] args) throws Exception {     // all of the useful sample code is in ExampleClient.java

       System.out.println("创建 " + CLIENT_QTY              + " 个客户端, 公平的参与leader选举,成为leader后,会等待一个随机的时间(几秒中),之后释放leader权,所有实例重新进行leader选举。");

     List<CuratorFramework> clients = Lists.newArrayList();     List<SelectorClient> examples = Lists.newArrayList();      TestingServer server = new TestingServer();      try {         for (int i = 0; i < CLIENT_QTY; ++i) {              CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),                       new ExponentialBackoffRetry(1000, 3));                clients.add(client);

                SelectorClient example = new SelectorClient(client, PATH, "Client #" + i);                examples.add(example);

              client.start();               example.start();          }

           System.out.println("按 enter/return 来退出\n");         new BufferedReader(new InputStreamReader(System.in)).readLine();      } finally {           System.out.println("关闭程序...");

            for (SelectorClient exampleClient : examples) {               CloseableUtils.closeQuietly(exampleClient);           }         for (CuratorFramework client : clients) {             CloseableUtils.closeQuietly(client);          }

           CloseableUtils.closeQuietly(server);      } }}

执行结果

创建 10 个客户端, 公平的参与leader选举,成为leader后,会等待一个随机的时间(几秒中),之后释放leader权,所有实例重新进行leader选举。SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".SLF4J: Defaulting to no-operation MDCAdapter implementation.SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.按 enter/return 来退出

Client #2 现在是leader了,持续成为leader 3 秒.Client #2 之前已经成为了 0 次leader.Client #2 释放leader权.

Client #4 现在是leader了,持续成为leader 3 秒.Client #4 之前已经成为了 0 次leader.Client #4 释放leader权.

Client #5 现在是leader了,持续成为leader 5 秒.Client #5 之前已经成为了 0 次leader.Client #5 释放leader权.

Client #9 现在是leader了,持续成为leader 3 秒.Client #9 之前已经成为了 0 次leader.Client #9 释放leader权.

Client #6 现在是leader了,持续成为leader 3 秒.Client #6 之前已经成为了 0 次leader.Client #6 释放leader权.

Client #1 现在是leader了,持续成为leader 4 秒.Client #1 之前已经成为了 0 次leader.Client #1 释放leader权.

Client #8 现在是leader了,持续成为leader 1 秒.Client #8 之前已经成为了 0 次leader.Client #8 释放leader权.

Client #7 现在是leader了,持续成为leader 3 秒.Client #7 之前已经成为了 0 次leader.Client #7 释放leader权.

Client #0 现在是leader了,持续成为leader 2 秒.Client #0 之前已经成为了 0 次leader.Client #0 释放leader权.

Client #3 现在是leader了,持续成为leader 5 秒.Client #3 之前已经成为了 0 次leader.Client #3 释放leader权.

Client #2 现在是leader了,持续成为leader 4 秒.Client #2 之前已经成为了 1 次leader.Client #2 释放leader权.

Client #4 现在是leader了,持续成为leader 3 秒.Client #4 之前已经成为了 1 次leader.Client #4 释放leader权.

Client #5 现在是leader了,持续成为leader 3 秒.Client #5 之前已经成为了 1 次leader.Client #5 释放leader权.

Client #9 现在是leader了,持续成为leader 3 秒.Client #9 之前已经成为了 1 次leader.Client #9 释放leader权.

Client #6 现在是leader了,持续成为leader 1 秒.Client #6 之前已经成为了 1 次leader.Client #6 释放leader权.

Client #1 现在是leader了,持续成为leader 4 秒.Client #1 之前已经成为了 1 次leader.Client #1 释放leader权.

Client #8 现在是leader了,持续成为leader 5 秒.Client #8 之前已经成为了 1 次leader.关闭程序...Client #8 was interrupted.Client #8 释放leader权.

2.2 通过LeaderLatch实现leader选举,具体实现LeaderLatchListener来实现,
当实例成为leader后,会调用isLeader()方法,之后除非此实例连接不到zookeeper,否侧将一直占着leader权,当失去leader权后会调用notLeader()方法,为了模拟
选举过程,我们追加了一个ScheduledExecutorService来周期性的自己释放leader权

/** *  */package chengf.falcon.curator.leader;

import java.io.IOException;

import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.recipes.leader.LeaderLatch;import org.apache.curator.framework.recipes.leader.LeaderLatchListener;

/** * @author: 作者: chengaofeng * @date: 创建时间:2018-06-07 15:26:54 * @Description: TODO * @version V1.0 */public class LatchClient implements LeaderLatchListener {

  private final String name;

  private LeaderLatch leaderLatch;

    private CuratorFramework client;  public LatchClient(CuratorFramework client, String path, String name) throws Exception {      this.name = name;        this.client = client;        leaderLatch = new LeaderLatch(client, path);     leaderLatch.addListener(this);        leaderLatch.start();  }

   /*     * (non-Javadoc)   *     * @see   * org.apache.curator.framework.recipes.leader.LeaderLatchListener#isLeader(   * )   */   /**    * 成为leader后会执行下面的方法,方法执行完后不会释放leader权  */   @Override    public void isLeader() {      System.out.println(name + " is now the leader. "); }

   /**    * 失去leader后执行下面的方法    */   @Override    public void notLeader() {     System.out.println(name + " is not the leader. ");

   }

   public void stop() {      try {         leaderLatch.close();      } catch (IOException e) {     } }

   public boolean hasLeadership() {      return leaderLatch.hasLeadership();   } public CuratorFramework getClient() {     return client;    }

   public String getName() {     return name;  }

}

启动类

/** *  */package chengf.falcon.curator.leader;

import java.io.BufferedReader;import java.io.InputStreamReader;import java.util.List;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;

import org.apache.curator.framework.CuratorFramework;import org.apache.curator.framework.CuratorFrameworkFactory;import org.apache.curator.retry.ExponentialBackoffRetry;import org.apache.curator.test.TestingServer;import org.apache.curator.utils.CloseableUtils;

import com.google.common.collect.Lists;

/** * @author: 作者: chengaofeng * @date: 创建时间:2018-06-07 15:25:47 * @Description: TODO * @version V1.0 */public class LeaderLatchExample {   private static final int CLIENT_QTY = 10;

  private static final String PATH = "/examples/leader";

   public static void main(String[] args) throws Exception {     // all of the useful sample code is in ExampleClient.java

       System.out.println("创建 " + CLIENT_QTY              + " 个客户端, 公平的参与leader选举,成为leader后,将一直占用此leader权。直到四秒中后主动放弃leader权");

     List<CuratorFramework> clients = Lists.newArrayList();     TestingServer server = new TestingServer();      List<LatchClient> examples = Lists.newArrayList();     int i = 0;       try {         for (; i < CLIENT_QTY; ++i) {                CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(),                       new ExponentialBackoffRetry(1000, 3));                clients.add(client);

                LatchClient example = new LatchClient(client, PATH, "Client #" + i);              examples.add(example);                client.start();

         }

           // 不调用该方法则某实例成为leader后将一直持续占有leader权          ScheduledExecutorService se = giveUpLeader(examples);

          System.out.println("按 enter/return 来退出\n");         new BufferedReader(new InputStreamReader(System.in)).readLine();          se.shutdown();        } finally {           System.out.println("关闭程序...");

            for (LatchClient exampleClient : examples) {              exampleClient.stop();         }         for (CuratorFramework client : clients) {             CloseableUtils.closeQuietly(client);          }

           CloseableUtils.closeQuietly(server);      } }

   /**    * 主动放弃leader权,并新创建一个实例参与leader选举   * @param examples    */   private static ScheduledExecutorService giveUpLeader(List<LatchClient> examples) {      ScheduledExecutorService se = Executors.newScheduledThreadPool(1);       se.scheduleAtFixedRate(()->{           examples.stream().filter(t-> t.hasLeadership()).findFirst().ifPresent(t-> {             examples.remove(t);               LatchClient example;              try {                 example = new LatchClient(t.getClient(), PATH, t.getName());                 examples.add(example);                } catch (Exception e) {

             }             //当前实例主动放弃leader权             System.out.println(t.getName() + " 现在主动放弃leader权");                t.stop();});      }, 4, 4, TimeUnit.SECONDS);       return se;    }

}

执行结果

创建 10 个客户端, 公平的参与leader选举,成为leader后,将一直占用此leader权。直到四秒中后主动放弃leader权SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".SLF4J: Defaulting to no-operation (NOP) logger implementationSLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.SLF4J: Failed to load class "org.slf4j.impl.StaticMDCBinder".SLF4J: Defaulting to no-operation MDCAdapter implementation.SLF4J: See http://www.slf4j.org/codes.html#no_static_mdc_binder for further details.按 enter/return 来退出

Client #3 is now the leader. Client #3 现在主动放弃leader权Client #2 is now the leader. Client #2 现在主动放弃leader权Client #9 is now the leader. Client #9 现在主动放弃leader权Client #4 is now the leader. Client #4 现在主动放弃leader权Client #1 is now the leader. Client #1 现在主动放弃leader权Client #5 is now the leader. Client #5 现在主动放弃leader权Client #7 is now the leader. Client #7 现在主动放弃leader权Client #8 is now the leader. Client #8 现在主动放弃leader权Client #6 is now the leader. Client #6 现在主动放弃leader权Client #0 is now the leader. 

关闭程序...

基于curator实现leader选举相关推荐

  1. 3.Spring Boot使用Apache Curator实现leader选举「第四章 ZooKeeper Curator应用场景实战」「架构之路ZooKeeper理论和实战」

    相关历史文章(阅读本文前,您可能需要先看下之前的系列

  2. 五、curator recipes之选举主节点Leader Latch

    简介 在分布式计算中,主节点选举是为了把某个进程作为主节点来控制其它节点的过程.在选举结束之前,我们不知道哪个节点会成为主节点.curator对于主节点选举有两种实现方式,本文示例演示Latch的实现 ...

  3. springboot使用curator来实现leader选举

    本文来说下springboot使用curator来实现leader选举 文章目录 概述 概述

  4. ZooKeeper : Curator框架之Leader选举LeaderLatch

    Leader选举 在分布式计算中,Leader选举是指指定单个进程作为分布在多台计算机(节点)之间的某些任务的组织者的过程.在任务开始之前,所有网络节点都不知道哪个节点将充当任务的Leader.然而, ...

  5. 基于Curator的Zookeeper操作实战

    前言 Zookeeper操作方式 这篇文章主要说的是利用java来操作zookeeper,就如操作mysql数据库一样,主要是实现增删改查功能,而实现这些功能的方式主要有以下三种: zookeeper ...

  6. 【Zookeeper】源码分析之Leader选举(一)

    一.前言 分析完了Zookeeper中的网络机制后,接着来分析Zookeeper中一个更为核心的模块,Leader选举. 二.总结框架图 对于Leader选举,其总体框架图如下图所示 说明: 选举的父 ...

  7. 面试官:说说你对ZooKeeper集群与Leader选举的理解?

    作者:TalkingData 史天舒 来自:TalkingData ZooKeeper是一个开源分布式协调服务.分布式数据一致性解决方案.可基于ZooKeeper实现命名服务.集群管理.Master选 ...

  8. Raft 为什么是更易理解的分布式一致性算法——(1)Leader在时,由Leader向Follower同步日志 (2)Leader挂掉了,选一个新Leader,Leader选举算法。...

    转自:http://www.cnblogs.com/mindwind/p/5231986.html Raft 协议的易理解性描述 虽然 Raft 的论文比 Paxos 简单版论文还容易读了,但论文依然 ...

  9. Zookeeper之Leader选举源码分析

    Zookeeper源码下载地址:https://github.com/apache/zookeeper 1.选举流程 Zookeeeper的Leader选举会分两个过程. 服务启动时的leader选举 ...

最新文章

  1. Android内存优化(三)避免可控的内存泄漏
  2. 最新中文NLP开源工具箱来了!支持6大任务,面向工业应用 | 资源
  3. wsl ubuntu update显示err: 404 Not Found解决方法
  4. 30 个极简Python代码,拿走即用(真干货)
  5. jmeter上传文件搞了一天,才搞定,没高人帮忙效率就是低,赶紧记下来,以备后用...
  6. 基于STM32和W5500的Modbus TCP通讯
  7. mysql修改主从复制id_mysql主从复制设置
  8. 参数是html代码,一些html标签的参数messup html/php代码
  9. multipathd dead but pid file exists
  10. 什么是异构数据库?它和分布式数据库的联系是什么?
  11. Java直接遍历并读取zip压缩文件的内容以及错误处理
  12. Notes配置初始化和重新设置(不卸载)
  13. ios 融云 重写对话列表_iOS开发融云即时通讯集成详细步骤
  14. GIT LFS 原理杂谈
  15. 连线封面:2亿多支付宝用户选择的背后,一个数据与评分带来的「等级世界」
  16. JavaSE-接口简单介绍
  17. 微指令和指令(机器指令)有什么区别?
  18. state软件的基本使用
  19. Unity UGUI图文混排(七) -- 下划线
  20. Python3|Opencv——dst参数的含义和使用

热门文章

  1. 找你妹+ipad+wifi,回忆那年的经典游戏
  2. 感知机——成长的烦恼
  3. 『恶意代码分析实战』Windows API编程——通过修改注册表的方式实现自启动
  4. 计算机网络按规模传输距离,计算机网络按距离如何分类
  5. 长沙公安交警携手百度Apollo 探索智能网联与智慧交通融合路径
  6. 磁带与磁盘备份系统的优劣点比较
  7. 华为无线AC双机热备三层组网配置案例
  8. Apache配置https全过程
  9. 软考-算法设计概述及常见的几种算法形式
  10. 【数学建模】青少年犯罪问题 | 逐步回归分析法stepwise函数 | 残差分析rcoplot