分布式锁的几种实现:
1.zookeeper分布式锁,基于自增节点
2.Redis分布式锁,基于setnx命令,
基于Redis实现分布式锁:http://blog.csdn.net/daiyudong2020/article/details/51760648
官网:http://redis.io/topics/distlock
译文:http://www.oschina.net/translate/redis-distlock
3.memcache分布式锁,基于add函数
这几种方案在网上有很多技术文章,不重复叙述,需要的Google一下

阿凡卢

  • 博客园
  • 首页
  • 新随笔
  • 联系
  • 订阅
  • 管理
随笔 - 142  文章 - 0  评论 - 558

基于ZooKeeper的分布式锁和队列

在分布式系统中,往往需要一些分布式同步原语来做一些协同工作,上一篇文章介绍了Zookeeper的基本原理,本文介绍下基于Zookeeper的Lock和Queue的实现,主要代码都来自Zookeeper的官方recipe。

锁(Lock)

完全分布式锁是全局同步的,这意味着在任何时刻没有两个客户端会同时认为它们都拥有相同的锁,使用 Zookeeper 可以实现分布式锁,需要首先定义一个锁节点(lock root node)。

需要获得锁的客户端按照以下步骤来获取锁:

  1. 保证锁节点(lock root node)这个父根节点的存在,这个节点是每个要获取lock客户端共用的,这个节点是PERSISTENT的。
  2. 第一次需要创建本客户端要获取lock的节点,调用 create( ),并设置 节点为EPHEMERAL_SEQUENTIAL类型,表示该节点为临时的和顺序的。如果获取锁的节点挂掉,则该节点自动失效,可以让其他节点获取锁。

  3. 在父锁节点(lock root node)上调用 getChildren( ) ,不需要设置监视标志。 (为了避免“羊群效应”).

  4. 按照Fair竞争的原则,将步骤3中的子节点(要获取锁的节点)按照节点顺序的大小做排序,取出编号最小的一个节点做为lock的owner,判断自己的节点id
    是否就为owner id,如果是则返回,lock成功。如果不是则调用 exists( )监听比自己小的前一位的id,关注它锁释放的操作(也就是exist watch)。

  5. 如果第4步监听exist的watch被触发,则继续按4中的原则判断自己是否能获取到lock。

释放锁:需要释放锁的客户端只需要删除在第2步中创建的节点即可。

注意事项:

一个节点的删除只会导致一个客户端被唤醒,因为每个节点只被一个客户端watch,这避免了“羊群效应”。

一个分布式lock的实现:

package org.apache.zookeeper.recipes.lock;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import static org.apache.zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;/*** A <a href="package.html">protocol to implement an exclusive*  write lock or to elect a leader</a>. <p/> You invoke {@link #lock()} to *  start the process of grabbing the lock; you may get the lock then or it may be *  some time later. <p/> You can register a listener so that you are invoked *  when you get the lock; otherwise you can ask if you have the lock*  by calling {@link #isOwner()}**/
public class WriteLock extends ProtocolSupport {private static final Logger LOG = LoggerFactory.getLogger(WriteLock.class);private final String dir;private String id;private ZNodeName idName;private String ownerId;private String lastChildId;private byte[] data = {0x12, 0x34};private LockListener callback;private LockZooKeeperOperation zop;/*** zookeeper contructor for writelock* @param zookeeper zookeeper client instance* @param dir the parent path you want to use for locking* @param acls the acls that you want to use for all the paths, * if null world read/write is used.*/public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl) {super(zookeeper);this.dir = dir;if (acl != null) {setAcl(acl);}this.zop = new LockZooKeeperOperation();}/*** zookeeper contructor for writelock with callback* @param zookeeper the zookeeper client instance* @param dir the parent path you want to use for locking* @param acl the acls that you want to use for all the paths* @param callback the call back instance*/public WriteLock(ZooKeeper zookeeper, String dir, List<ACL> acl, LockListener callback) {this(zookeeper, dir, acl);this.callback = callback;}/*** return the current locklistener* @return the locklistener*/public LockListener getLockListener() {return this.callback;}/*** register a different call back listener* @param callback the call back instance*/public void setLockListener(LockListener callback) {this.callback = callback;}/*** Removes the lock or associated znode if * you no longer require the lock. this also * removes your request in the queue for locking* in case you do not already hold the lock.* @throws RuntimeException throws a runtime exception* if it cannot connect to zookeeper.*/public synchronized void unlock() throws RuntimeException {if (!isClosed() && id != null) {// we don't need to retry this operation in the case of failure// as ZK will remove ephemeral files and we don't wanna hang// this process when closing if we cannot reconnect to ZKtry {ZooKeeperOperation zopdel = new ZooKeeperOperation() {public boolean execute() throws KeeperException,InterruptedException {zookeeper.delete(id, -1);   return Boolean.TRUE;}};zopdel.execute();} catch (InterruptedException e) {LOG.warn("Caught: " + e, e);//set that we have been interrupted.
               Thread.currentThread().interrupt();} catch (KeeperException.NoNodeException e) {// do nothing} catch (KeeperException e) {LOG.warn("Caught: " + e, e);throw (RuntimeException) new RuntimeException(e.getMessage()).initCause(e);}finally {if (callback != null) {callback.lockReleased();}id = null;}}}/** * the watcher called on  * getting watch while watching * my predecessor*/private class LockWatcher implements Watcher {public void process(WatchedEvent event) {// lets either become the leader or watch the new/updated nodeLOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType());try {lock();} catch (Exception e) {LOG.warn("Failed to acquire lock: " + e, e);}}}/*** a zoookeeper operation that is mainly responsible* for all the magic required for locking.*/private  class LockZooKeeperOperation implements ZooKeeperOperation {/** find if we have been created earler if not create our node* * @param prefix the prefix node* @param zookeeper teh zookeeper client* @param dir the dir paretn* @throws KeeperException* @throws InterruptedException*/private void findPrefixInChildren(String prefix, ZooKeeper zookeeper, String dir) throws KeeperException, InterruptedException {List<String> names = zookeeper.getChildren(dir, false);for (String name : names) {if (name.startsWith(prefix)) {id = dir + "/" + name;if (LOG.isDebugEnabled()) {LOG.debug("Found id created last time: " + id);}break;}}if (id == null) {id = zookeeper.create(dir + "/" + prefix, data, getAcl(), EPHEMERAL_SEQUENTIAL);if (LOG.isDebugEnabled()) {LOG.debug("Created id: " + id);}}}/*** the command that is run and retried for actually * obtaining the lock* @return if the command was successful or not*/public boolean execute() throws KeeperException, InterruptedException {do {if (id == null) {long sessionId = zookeeper.getSessionId();String prefix = "x-" + sessionId + "-";// lets try look up the current ID if we failed // in the middle of creating the znode
                    findPrefixInChildren(prefix, zookeeper, dir);idName = new ZNodeName(id);}if (id != null) {List<String> names = zookeeper.getChildren(dir, false);if (names.isEmpty()) {LOG.warn("No children in: " + dir + " when we've just " +"created one! Lets recreate it...");// lets force the recreation of the idid = null;} else {// lets sort them explicitly (though they do seem to come back in order ususally :)SortedSet<ZNodeName> sortedNames = new TreeSet<ZNodeName>();for (String name : names) {sortedNames.add(new ZNodeName(dir + "/" + name));}ownerId = sortedNames.first().getName();SortedSet<ZNodeName> lessThanMe = sortedNames.headSet(idName);if (!lessThanMe.isEmpty()) {ZNodeName lastChildName = lessThanMe.last();lastChildId = lastChildName.getName();if (LOG.isDebugEnabled()) {LOG.debug("watching less than me node: " + lastChildId);}Stat stat = zookeeper.exists(lastChildId, new LockWatcher());if (stat != null) {return Boolean.FALSE;} else {LOG.warn("Could not find the" +" stats for less than me: " + lastChildName.getName());}} else {if (isOwner()) {if (callback != null) {callback.lockAcquired();}return Boolean.TRUE;}}}}}while (id == null);return Boolean.FALSE;}};/*** Attempts to acquire the exclusive write lock returning whether or not it was* acquired. Note that the exclusive lock may be acquired some time later after* this method has been invoked due to the current lock owner going away.*/public synchronized boolean lock() throws KeeperException, InterruptedException {if (isClosed()) {return false;}ensurePathExists(dir);return (Boolean) retryOperation(zop);}/*** return the parent dir for lock* @return the parent dir used for locks.*/public String getDir() {return dir;}/*** Returns true if this node is the owner of the*  lock (or the leader)*/public boolean isOwner() {return id != null && ownerId != null && id.equals(ownerId);}/*** return the id for this lock* @return the id for this lock*/public String getId() {return this.id;}
}

View Code

注意这里的lock,可能会失败,会尝试多次,每次失败后会Sleep一段时间。

PS:官方的代码有个小bug,id和ownerId应该都是全路径,即id = dir + "/" + name;原代码在findPrefixInChildren里有问题。

用于辅助节点大小顺序排序的类:

package org.apache.zookeeper.recipes.lock;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** Represents an ephemeral znode name which has an ordered sequence number and* can be sorted in order* */
class ZNodeName implements Comparable<ZNodeName> {private final String name;private String prefix;private int sequence = -1;private static final Logger LOG = LoggerFactory.getLogger(ZNodeName.class);public ZNodeName(String name) {if (name == null) {throw new NullPointerException("id cannot be null");}this.name = name;this.prefix = name;int idx = name.lastIndexOf('-');if (idx >= 0) {this.prefix = name.substring(0, idx);try {this.sequence = Integer.parseInt(name.substring(idx + 1));// If an exception occurred we misdetected a sequence suffix,// so return -1.} catch (NumberFormatException e) {LOG.info("Number format exception for " + idx, e);} catch (ArrayIndexOutOfBoundsException e) {LOG.info("Array out of bounds for " + idx, e);}}}@Overridepublic String toString() {return name.toString();}@Overridepublic boolean equals(Object o) {if (this == o)return true;if (o == null || getClass() != o.getClass())return false;ZNodeName sequence = (ZNodeName) o;if (!name.equals(sequence.name))return false;return true;}@Overridepublic int hashCode() {return name.hashCode() + 37;}public int compareTo(ZNodeName that) {int s1 = this.sequence;int s2 = that.sequence;if (s1 == -1 && s2 == -1) {return this.name.compareTo(that.name);}if (s1 == -1) {return -1;} else if (s2 == -1) {return 1;} else {return s1 - s2;}}/*** Returns the name of the znode*/public String getName() {return name;}/*** Returns the sequence number*/public int getZNodeName() {return sequence;}/*** Returns the text prefix before the sequence number*/public String getPrefix() {return prefix;}
}

View Code

PS:这个ZNodeName类是被我修改过的,官方的代码比较有问题,官方的先用了节点路径的前缀prefix比较,再去比较sequence序号是不对的,这样会导致sessionid小的总是能拿到锁。应该直接比较全局有序的sequence序号,小的先拿到锁,先到先得。

Zookeeper统一操作ZooKeeperOperation接口:

public interface ZooKeeperOperation {/*** Performs the operation - which may be involved multiple times if the connection* to ZooKeeper closes during this operation** @return the result of the operation or null* @throws KeeperException* @throws InterruptedException*/public boolean execute() throws KeeperException, InterruptedException;
}

View Code

因为Zookeeper的操作会失败,这个类封装了多次尝试:

/**** Licensed to the Apache Software Foundation (ASF) under one or more* contributor license agreements.  See the NOTICE file distributed with* this work for additional information regarding copyright ownership.* The ASF licenses this file to You under the Apache License, Version 2.0* (the "License"); you may not use this file except in compliance with* the License.  You may obtain a copy of the License at** http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS,* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.* See the License for the specific language governing permissions and* limitations under the License.*/
package org.apache.zookeeper.recipes.lock;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.recipes.lock.ZooKeeperOperation;import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;/*** A base class for protocol implementations which provides a number of higher * level helper methods for working with ZooKeeper along with retrying synchronous*  operations if the connection to ZooKeeper closes such as *  {@link #retryOperation(ZooKeeperOperation)}**/
class ProtocolSupport {private static final Logger LOG = LoggerFactory.getLogger(ProtocolSupport.class);protected final ZooKeeper zookeeper;private AtomicBoolean closed = new AtomicBoolean(false);private long retryDelay = 500L;private int retryCount = 10;private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;public ProtocolSupport(ZooKeeper zookeeper) {this.zookeeper = zookeeper;}/*** Closes this strategy and releases any ZooKeeper resources; but keeps the*  ZooKeeper instance open*/public void close() {if (closed.compareAndSet(false, true)) {doClose();}}/*** return zookeeper client instance* @return zookeeper client instance*/public ZooKeeper getZookeeper() {return zookeeper;}/*** return the acl its using* @return the acl.*/public List<ACL> getAcl() {return acl;}/*** set the acl * @param acl the acl to set to*/public void setAcl(List<ACL> acl) {this.acl = acl;}/*** get the retry delay in milliseconds* @return the retry delay*/public long getRetryDelay() {return retryDelay;}/*** Sets the time waited between retry delays* @param retryDelay the retry delay*/public void setRetryDelay(long retryDelay) {this.retryDelay = retryDelay;}/*** Allow derived classes to perform * some custom closing operations to release resources*/protected void doClose() {}/*** Perform the given operation, retrying if the connection fails* @return object. it needs to be cast to the callee's expected * return type.*/protected Object retryOperation(ZooKeeperOperation operation) throws KeeperException, InterruptedException {KeeperException exception = null;for (int i = 0; i < retryCount; i++) {try {return operation.execute();} catch (KeeperException.SessionExpiredException e) {LOG.warn("Session expired for: " + zookeeper + " so reconnecting due to: " + e, e);throw e;} catch (KeeperException.ConnectionLossException e) {if (exception == null) {exception = e;}LOG.debug("Attempt " + i + " failed with connection loss so " +"attempting to reconnect: " + e, e);retryDelay(i);}}throw exception;}/*** Ensures that the given path exists with no data, the current* ACL and no flags* @param path*/protected void ensurePathExists(String path) {ensureExists(path, null, acl, CreateMode.PERSISTENT);}/*** Ensures that the given path exists with the given data, ACL and flags* @param path* @param acl* @param flags*/protected void ensureExists(final String path, final byte[] data,final List<ACL> acl, final CreateMode flags) {try {retryOperation(new ZooKeeperOperation() {public boolean execute() throws KeeperException, InterruptedException {Stat stat = zookeeper.exists(path, false);if (stat != null) {return true;}zookeeper.create(path, data, acl, flags);return true;}});} catch (KeeperException e) {LOG.warn("Caught: " + e, e);} catch (InterruptedException e) {LOG.warn("Caught: " + e, e);}}/*** Returns true if this protocol has been closed* @return true if this protocol is closed*/protected boolean isClosed() {return closed.get();}/*** Performs a retry delay if this is not the first attempt* @param attemptCount the number of the attempts performed so far*/protected void retryDelay(int attemptCount) {if (attemptCount > 0) {try {Thread.sleep(attemptCount * retryDelay);} catch (InterruptedException e) {LOG.debug("Failed to sleep: " + e, e);}}}
}

View Code

这个类是本客户端获取到lock和释放lock的时候触发操作的接口:

public interface LockListener {/*** call back called when the lock * is acquired*/public void lockAcquired();/*** call back called when the lock is * released.*/public void lockReleased();
}

View Code

队列(Queue)

分布式队列是通用的数据结构,为了在 Zookeeper 中实现分布式队列,首先需要指定一个 Znode 节点作为队列节点(queue node), 各个分布式客户端通过调用 create() 函数向队列中放入数据,调用create()时节点路径名带"qn-"结尾,并设置顺序(sequence)节点标志。 由于设置了节点的顺序标志,新的路径名具有以下字符串模式:"_path-to-queue-node_/qn-X",X 是唯一自增号。需要从队列中获取数据/移除数据的客户端首先调用 getChildren() 函数,有数据则获取(获取数据后可以删除也可以不删),没有则在队列节点(queue node)上将 watch 设置为 true,等待触发并处理最小序号的节点(即从序号最小的节点中取数据)。

实现步骤基本如下:

前提:需要一个队列root节点dir

入队:使用create()创建节点,将共享数据data放在该节点上,节点类型为PERSISTENT_SEQUENTIAL,永久顺序性的(也可以设置为临时的,看需求)。

出队:因为队列可能为空,2种方式处理:一种如果为空则wait等待,一种返回异常。

等待方式:这里使用了CountDownLatch的等待和Watcher的通知机制,使用了TreeMap的排序获取节点顺序最小的数据(FIFO)。

抛出异常:getChildren()获取队列数据时,如果size==0则抛出异常。

一个分布式Queue的实现,详细代码:

package org.apache.zookeeper.recipes.queue;import java.util.List;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;/*** * A <a href="package.html">protocol to implement a distributed queue</a>.* */
public class DistributedQueue {private static final Logger LOG = LoggerFactory.getLogger(DistributedQueue.class);private final String dir;private ZooKeeper zookeeper;private List<ACL> acl = ZooDefs.Ids.OPEN_ACL_UNSAFE;private final String prefix = "qn-";public DistributedQueue(ZooKeeper zookeeper, String dir, List<ACL> acl){this.dir = dir;if(acl != null){this.acl = acl;}this.zookeeper = zookeeper;//Add root dir first if not existsif (zookeeper != null) {try {Stat s = zookeeper.exists(dir, false);if (s == null) {zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);}} catch (KeeperException e) {LOG.error(e.toString());} catch (InterruptedException e) {LOG.error(e.toString());}}}/*** Returns a Map of the children, ordered by id.* @param watcher optional watcher on getChildren() operation.* @return map from id to child name for all children*/private TreeMap<Long,String> orderedChildren(Watcher watcher) throws KeeperException, InterruptedException {TreeMap<Long,String> orderedChildren = new TreeMap<Long,String>();List<String> childNames = null;try{childNames = zookeeper.getChildren(dir, watcher);}catch (KeeperException.NoNodeException e){throw e;}for(String childName : childNames){try{//Check formatif(!childName.regionMatches(0, prefix, 0, prefix.length())){LOG.warn("Found child node with improper name: " + childName);continue;}String suffix = childName.substring(prefix.length());Long childId = new Long(suffix);orderedChildren.put(childId,childName);}catch(NumberFormatException e){LOG.warn("Found child node with improper format : " + childName + " " + e,e);}}return orderedChildren;}/*** Find the smallest child node.* @return The name of the smallest child node.*/private String smallestChildName() throws KeeperException, InterruptedException {long minId = Long.MAX_VALUE;String minName = "";List<String> childNames = null;try{childNames = zookeeper.getChildren(dir, false);}catch(KeeperException.NoNodeException e){LOG.warn("Caught: " +e,e);return null;}for(String childName : childNames){try{//Check formatif(!childName.regionMatches(0, prefix, 0, prefix.length())){LOG.warn("Found child node with improper name: " + childName);continue;}String suffix = childName.substring(prefix.length());long childId = Long.parseLong(suffix);if(childId < minId){minId = childId;minName = childName;}}catch(NumberFormatException e){LOG.warn("Found child node with improper format : " + childName + " " + e,e);}}if(minId < Long.MAX_VALUE){return minName;}else{return null;}}/*** Return the head of the queue without modifying the queue.* @return the data at the head of the queue.* @throws NoSuchElementException* @throws KeeperException* @throws InterruptedException*/public byte[] element() throws NoSuchElementException, KeeperException, InterruptedException {TreeMap<Long,String> orderedChildren;// element, take, and remove follow the same pattern.// We want to return the child node with the smallest sequence number.// Since other clients are remove()ing and take()ing nodes concurrently, // the child with the smallest sequence number in orderedChildren might be gone by the time we check.// We don't call getChildren again until we have tried the rest of the nodes in sequence order.while(true){try{orderedChildren = orderedChildren(null);}catch(KeeperException.NoNodeException e){throw new NoSuchElementException();}if(orderedChildren.size() == 0 ) throw new NoSuchElementException();for(String headNode : orderedChildren.values()){if(headNode != null){try{return zookeeper.getData(dir+"/"+headNode, false, null);}catch(KeeperException.NoNodeException e){//Another client removed the node first, try next
                    }}}}}/*** Attempts to remove the head of the queue and return it.* @return The former head of the queue* @throws NoSuchElementException* @throws KeeperException* @throws InterruptedException*/public byte[] remove() throws NoSuchElementException, KeeperException, InterruptedException {TreeMap<Long,String> orderedChildren;// Same as for element.  Should refactor this.while(true){try{orderedChildren = orderedChildren(null);}catch(KeeperException.NoNodeException e){throw new NoSuchElementException();}if(orderedChildren.size() == 0) throw new NoSuchElementException();for(String headNode : orderedChildren.values()){String path = dir +"/"+headNode;try{byte[] data = zookeeper.getData(path, false, null);zookeeper.delete(path, -1);return data;}catch(KeeperException.NoNodeException e){// Another client deleted the node first.
                }}}}private class LatchChildWatcher implements Watcher {CountDownLatch latch;public LatchChildWatcher(){latch = new CountDownLatch(1);}public void process(WatchedEvent event){LOG.debug("Watcher fired on path: " + event.getPath() + " state: " + event.getState() + " type " + event.getType());latch.countDown();}public void await() throws InterruptedException {latch.await();}}/*** Removes the head of the queue and returns it, blocks until it succeeds.* @return The former head of the queue* @throws NoSuchElementException* @throws KeeperException* @throws InterruptedException*/public byte[] take() throws KeeperException, InterruptedException {TreeMap<Long,String> orderedChildren;// Same as for element.  Should refactor this.while(true){LatchChildWatcher childWatcher = new LatchChildWatcher();try{orderedChildren = orderedChildren(childWatcher);}catch(KeeperException.NoNodeException e){zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);continue;}if(orderedChildren.size() == 0){childWatcher.await();continue;}for(String headNode : orderedChildren.values()){String path = dir +"/"+headNode;try{byte[] data = zookeeper.getData(path, false, null);zookeeper.delete(path, -1);return data;}catch(KeeperException.NoNodeException e){// Another client deleted the node first.
                }}}}/*** Inserts data into queue.* @param data* @return true if data was successfully added*/public boolean offer(byte[] data) throws KeeperException, InterruptedException{for(;;){try{zookeeper.create(dir+"/"+prefix, data, acl, CreateMode.PERSISTENT_SEQUENTIAL);return true;}catch(KeeperException.NoNodeException e){zookeeper.create(dir, new byte[0], acl, CreateMode.PERSISTENT);}}}/*** Returns the data at the first element of the queue, or null if the queue is empty.* @return data at the first element of the queue, or null.* @throws KeeperException* @throws InterruptedException*/public byte[] peek() throws KeeperException, InterruptedException{try{return element();}catch(NoSuchElementException e){return null;}}/*** Attempts to remove the head of the queue and return it. Returns null if the queue is empty.* @return Head of the queue or null.* @throws KeeperException* @throws InterruptedException*/public byte[] poll() throws KeeperException, InterruptedException {try{return remove();}catch(NoSuchElementException e){return null;}}
}

View Code

Apache Curator

Curator是一个封装Zookeeper操作的库,使用这个库的好处是Curator帮你管理和Zookeeper的连接,当连接有问题时会自动重试(retry)。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3)
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();

Curator已经封装了一些常用的Recipes

Distributed Lock

InterProcessMutex lock = new InterProcessMutex(client, lockPath);
if ( lock.acquire(maxWait, waitUnit) )
{try {// do some work inside of the critical section here
    }finally{lock.release();}
}

Leader Election

LeaderSelectorListener listener = new LeaderSelectorListenerAdapter()
{public void takeLeadership(CuratorFramework client) throws Exception{// this callback will get called when you are the leader// do whatever leader work you need to and only exit// this method when you want to relinquish leadership
    }
}LeaderSelector selector = new LeaderSelector(client, path, listener);
selector.autoRequeue();  // not required, but this is behavior that you will probably expect
selector.start(); 

参考:

http://zookeeper.apache.org/doc/trunk/recipes.html

http://curator.apache.org/curator-recipes/index.html

作者:阿凡卢
出处:http://www.cnblogs.com/luxiaoxun/
本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

分类: Distributed System
标签: zookeeper, 分布式, 锁, 队列
好文要顶 关注我 收藏该文

阿凡卢
关注 - 22
粉丝 - 549

+加关注

3
0

« 上一篇:ZooKeeper基本原理
» 下一篇:使用Lucene索引和检索POI数据

posted @ 2015-10-18 16:52 阿凡卢 阅读(4172) 评论(0) 编辑 收藏

刷新评论刷新页面返回顶部
发表评论

昵称:

评论内容:

不改了 退出登录 订阅评论

[Ctrl+Enter快捷键提交]

【推荐】50万行VC++源码: 大型组态工控、电力仿真CAD与GIS源码库
【推荐】融云发布 App 社交化白皮书 IM 提升活跃超 8 倍
【推荐】自开发 零实施的BPM
最新IT新闻:
· 滴滴程维:未来十年滴滴将成新能源汽车运营商
· “牵手”融创后 乐视至少要付出这四个代价
· 诺基亚将在西班牙发布多款智能手机
· 为回归民生银行做准备 史玉柱清空中民投股份并辞去所有职务
· 苏州通报希捷关厂原因:订单持续减少 去年进出口额降近四成
» 更多新闻...
最新知识库文章:

· 「代码家」的学习过程和学习经验分享
· 写给未来的程序媛
· 高质量的工程代码为什么难写
· 循序渐进地代码重构
· 技术的正宗与野路子

» 更多知识库文章...

历史上的今天:
2013-10-18 C# IP地址与整数之间的转换

公告

昵称:阿凡卢
园龄:4年5个月
粉丝:549
关注:22

+加关注

< 2017年1月 >
25 26 27 28 29 30 31
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
29 30 31 1 2 3 4

搜索

常用链接

  • 我的随笔
  • 我的评论
  • 我的参与
  • 最新评论
  • 我的标签

随笔分类(143)

  • Algorithm(25)
  • Big Data(7)
  • C#(11)
  • C/C++(19)
  • Data Structure(15)
  • DataBase(2)
  • Distributed System(4)
  • GIS(13)
  • Java(16)
  • Linux(3)
  • Network(7)
  • Program Design(4)
  • Python(2)
  • Research(6)
  • Web(2)
  • Windows(7)

随笔档案(142)

  • 2016年12月 (1)
  • 2016年5月 (2)
  • 2016年3月 (1)
  • 2015年12月 (2)
  • 2015年10月 (3)
  • 2015年8月 (2)
  • 2015年7月 (3)
  • 2015年5月 (1)
  • 2015年4月 (2)
  • 2015年3月 (1)
  • 2015年2月 (1)
  • 2015年1月 (1)
  • 2014年12月 (2)
  • 2014年11月 (1)
  • 2014年10月 (3)
  • 2014年9月 (3)
  • 2014年8月 (1)
  • 2014年7月 (3)
  • 2014年6月 (3)
  • 2014年5月 (2)
  • 2014年4月 (1)
  • 2014年3月 (3)
  • 2014年1月 (1)
  • 2013年12月 (6)
  • 2013年11月 (2)
  • 2013年10月 (4)
  • 2013年9月 (4)
  • 2013年8月 (2)
  • 2013年7月 (2)
  • 2013年6月 (1)
  • 2013年5月 (4)
  • 2013年4月 (3)
  • 2012年12月 (4)
  • 2012年11月 (10)
  • 2012年10月 (14)
  • 2012年9月 (15)
  • 2012年8月 (28)

友情链接

  • IBM developerworks
  • UC技术博客
  • 酷壳
  • 美团技术团队博客
  • 奇虎360技术博客
  • 淘宝搜索技术博客
  • 腾讯ISUX

最新评论

  • 1. Re:C#自定义工业控件开发
  • 很不错,最近在研究组态软件的开发,博主,能否给点建议,看到请回复下
  • --YaoShuangQisBlogs
  • 2. Re:百度谷歌离线地图解决方案(离线地图下载)
  • @chdyc百度不行,坐标不一致。...
  • --阿凡卢
  • 3. Re:百度谷歌离线地图解决方案(离线地图下载)
  • 请问这个软件能下载能用于百度地图api 的地图瓦片么 我试了一下 下载百度的地图的瓦片 不行啊
  • --chdyc
  • 4. Re:Web GIS离线解决方案
  • @Dominic Xu那里面没有加百度的,用那个下载器下了百度的也没用,用真实的坐标转换不到下载器里的百度地图的坐标。...
  • --阿凡卢
  • 5. Re:百度谷歌离线地图解决方案(离线地图下载)
  • @Dominic Xu这篇文章里有...
  • --阿凡卢

阅读排行榜

  • 1. C++中数字与字符串之间的转换(104222)
  • 2. C#多线程编程(85750)
  • 3. Mybatis实现数据的增删改查(CRUD)(54153)
  • 4. NPOI读写Excel(51136)
  • 5. Mybatis关联查询(嵌套查询)(49040)
  • 6. 基于Netty4的HttpServer和HttpClient的简单实现(38296)
  • 7. RabbitMQ的几种典型使用场景(35425)
  • 8. C#读写config配置文件(34058)
  • 9. 百度谷歌离线地图解决方案(离线地图下载)(31458)
  • 10. C++的构造函数和析构函数(31186)

评论排行榜

  • 1. 百度谷歌离线地图解决方案(离线地图下载)(67)
  • 2. Web GIS离线解决方案(40)
  • 3. GMap.Net开发之在地图上添加多边形(36)
  • 4. GMap.Net开发之自定义Marker(34)
  • 5. 高斯混合模型GMM的C++实现(32)
  • 6. 基于GMap.Net的地图解决方案(31)
  • 7. NPOI读写Excel(30)
  • 8. GMap.Net开发之在WinForm和WPF中使用GMap.Net地图插件(28)
  • 9. 基于netty-socketio的web推送服务(15)
  • 10. GMap.Net开发之地址解析与路径查找(15)

推荐排行榜

  • 1. C#多线程编程(39)
  • 2. NPOI读写Excel(20)
  • 3. 一个轻量级分布式RPC框架--NettyRpc(14)
  • 4. 百度谷歌离线地图解决方案(离线地图下载)(12)
  • 5. 基于GMap.Net的地图解决方案(11)
  • 6. C# byte数组与Image的相互转换(11)
  • 7. C++中的new、operator new与placement new(8)
  • 8. 基于Netty与RabbitMQ的消息服务(8)
  • 9. RabbitMQ的几种典型使用场景(7)
  • 10. C++中数字与字符串之间的转换(7)

Copyright ©2017 阿凡卢

转载于:https://www.cnblogs.com/jobs-lgy/p/6288826.html

基于ZooKeeper的分布式锁和队列相关推荐

  1. 基于 Zookeeper 的分布式锁实现

    1. 背景 最近在学习 Zookeeper,在刚开始接触 Zookeeper 的时候,完全不知道 Zookeeper 有什么用.且很多资料都是将 Zookeeper 描述成一个"类 Unix ...

  2. ieee39节点系统介绍_Java秒杀系统实战系列-基于ZooKeeper的分布式锁优化秒杀逻辑...

    本文是"Java秒杀系统实战系列文章"的第十六篇,本文我们将继续秒杀系统的优化之路,采用统一协调调度中心中间件ZooKeeper控制秒杀系统中高并发多线程对于共享资源~代码块的并发 ...

  3. 【Zookeeper】基于Zookeeper实现分布式锁

    1.概述 转载:基于Zookeeper实现分布式锁 1.1 为什么使用分布式锁 我们在开发应用的时候,如果需要对某一个共享变量进行多线程同步访问的时候,我们往往采用synchronized或者Lock ...

  4. 基于Zookeeper的分布式锁

    实现分布式锁目前有三种流行方案,分别为基于数据库.Redis.Zookeeper的方案,其中前两种方案网络上有很多资料可以参考,本文不做展开.我们来看下使用Zookeeper如何实现分布式锁. 什么是 ...

  5. Zookeeper:基于Zookeeper的分布式锁与领导选举

    本文转发自技术世界,原文链接 http://www.jasongj.com/zookeeper/distributedlock/ 1.Zookeeper特点 1.1 Zookeeper节点类型 如上文 ...

  6. 基于ZooKeeper实现分布式锁

    ZooKeeper 保证了数据的强一致性,  zk集群中任意节点(一个zkServer)上的相同znode下的数据一定是相同的.使用zookeeper可以非常简单的实现分布式锁, 其基本逻辑如下: 客 ...

  7. 基于 Redis 的分布式锁到底安全吗?

    [完整版] 网上有关Redis分布式锁的文章可谓多如牛毛了,不信的话你可以拿关键词"Redis 分布式锁"随便到哪个搜索引擎上去搜索一下就知道了.这些文章的思路大体相近,给出的实现 ...

  8. 基于Redis的分布式锁真的安全吗?

    说明: 我前段时间写了一篇用consul实现分布式锁,感觉理解的也不是很好,直到我看到了这2篇写分布式锁的讨论,真的是很佩服作者严谨的态度, 把这种分布式锁研究的这么透彻,作者这种技术态度真的值得我好 ...

  9. 基于Redis的分布式锁到底安全吗?

    网上有关Redis分布式锁的文章可谓多如牛毛了,不信的话你可以拿关键词"Redis 分布式锁"随便到哪个搜索引擎上去搜索一下就知道了.这些文章的思路大体相近,给出的实现算法也看似合 ...

最新文章

  1. 安装node.js 附带node.js以及npm初步认识 设置淘宝/npm镜像 命令
  2. python序列化和反序列化ppt_老生常谈Python序列化和反序列化
  3. Linux内核最新的连续内存分配器(CMA)——避免预留大块内存
  4. XCTF_Web_新手练习区:disabled_button
  5. 架构-浅谈MySQL数据库优化
  6. [JavaWeb-XML]XML_快捷查询方式(selector选择器,XPath)
  7. OpenCV的DNN模块
  8. Linux字符界面的cat,Linux命令之cat详解
  9. error: 'Can't connect to local MySQL server through socket '/data/3307/data/mysql.sock' (2)'
  10. matlab 图例自定义,matlab中如何自定义图例_常见问题解析
  11. 如何使用JS来改变CSS样式
  12. 线性求逆元模板_专栏:ACM算法面面观[9]逆元
  13. Alex 的 Hadoop 菜鸟教程: 第1课 hadoop体系介绍
  14. VOSviewer | (二)入门-分析web of science
  15. kuangbin RMQ
  16. 如何做网站推广?如何提高网站浏览量?
  17. Java二叉树基础操作常见代码例题
  18. html logo写法,教你用CSS3打造HTML5的Logo
  19. grep的-A-B(使用grep显示keyword前后的内容)
  20. 数据库大实验展示(上)

热门文章

  1. 【翻译】DataDog Kafka运维经验谈
  2. V2X-ViT:基于Vision Transformer的V2X协同感知
  3. 新手入门PS人像磨皮教程
  4. WHERE EXISTS
  5. Halcon——热熔胶质量检测
  6. 计算机软件著作权登记范文,计算机软件著作权登记申请表范本
  7. 图像处理(7)--高斯模糊原理
  8. [5.1] 架构与思想:Phal Api核心设计和思想解读
  9. 如何解决Mac苹果笔记本键盘背光灯不亮的问题?
  10. 萨提亚领衔主题演讲,带领高管和MVP合影,预告Julia女神的演讲中将有我的.NET Core实践