若有不正之处请多多谅解,并欢迎批评指正。

请尊重作者劳动成果,转载请标明原文链接:

http://www.cnblogs.com/go2sea/p/5625536.html

Semaphore是JUC包提供的一个共享锁,一般称之为信号量。

Semaphore通过自定义的同步器维护了一个或多个共享资源,线程通过调用acquire获取共享资源,通过调用release释放。

源代码:

/** ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.*********************//******* Written by Doug Lea with assistance from members of JCP JSR-166* Expert Group and released to the public domain, as explained at* http://creativecommons.org/publicdomain/zero/1.0/*/package java.util.concurrent;
import java.util.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;/*** A counting semaphore.  Conceptually, a semaphore maintains a set of* permits.  Each {@link #acquire} blocks if necessary until a permit is* available, and then takes it.  Each {@link #release} adds a permit,* potentially releasing a blocking acquirer.* However, no actual permit objects are used; the {@code Semaphore} just* keeps a count of the number available and acts accordingly.** <p>Semaphores are often used to restrict the number of threads than can* access some (physical or logical) resource. For example, here is* a class that uses a semaphore to control access to a pool of items:* <pre>* class Pool {*   private static final int MAX_AVAILABLE = 100;*   private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);**   public Object getItem() throws InterruptedException {*     available.acquire();*     return getNextAvailableItem();*   }**   public void putItem(Object x) {*     if (markAsUnused(x))*       available.release();*   }**   // Not a particularly efficient data structure; just for demo**   protected Object[] items = ... whatever kinds of items being managed*   protected boolean[] used = new boolean[MAX_AVAILABLE];**   protected synchronized Object getNextAvailableItem() {*     for (int i = 0; i < MAX_AVAILABLE; ++i) {*       if (!used[i]) {*          used[i] = true;*          return items[i];*       }*     }*     return null; // not reached*   }**   protected synchronized boolean markAsUnused(Object item) {*     for (int i = 0; i < MAX_AVAILABLE; ++i) {*       if (item == items[i]) {*          if (used[i]) {*            used[i] = false;*            return true;*          } else*            return false;*       }*     }*     return false;*   }** }* </pre>** <p>Before obtaining an item each thread must acquire a permit from* the semaphore, guaranteeing that an item is available for use. When* the thread has finished with the item it is returned back to the* pool and a permit is returned to the semaphore, allowing another* thread to acquire that item.  Note that no synchronization lock is* held when {@link #acquire} is called as that would prevent an item* from being returned to the pool.  The semaphore encapsulates the* synchronization needed to restrict access to the pool, separately* from any synchronization needed to maintain the consistency of the* pool itself.** <p>A semaphore initialized to one, and which is used such that it* only has at most one permit available, can serve as a mutual* exclusion lock.  This is more commonly known as a <em>binary* semaphore</em>, because it only has two states: one permit* available, or zero permits available.  When used in this way, the* binary semaphore has the property (unlike many {@link Lock}* implementations), that the &quot;lock&quot; can be released by a* thread other than the owner (as semaphores have no notion of* ownership).  This can be useful in some specialized contexts, such* as deadlock recovery.** <p> The constructor for this class optionally accepts a* <em>fairness</em> parameter. When set false, this class makes no* guarantees about the order in which threads acquire permits. In* particular, <em>barging</em> is permitted, that is, a thread* invoking {@link #acquire} can be allocated a permit ahead of a* thread that has been waiting - logically the new thread places itself at* the head of the queue of waiting threads. When fairness is set true, the* semaphore guarantees that threads invoking any of the {@link* #acquire() acquire} methods are selected to obtain permits in the order in* which their invocation of those methods was processed* (first-in-first-out; FIFO). Note that FIFO ordering necessarily* applies to specific internal points of execution within these* methods.  So, it is possible for one thread to invoke* {@code acquire} before another, but reach the ordering point after* the other, and similarly upon return from the method.* Also note that the untimed {@link #tryAcquire() tryAcquire} methods do not* honor the fairness setting, but will take any permits that are* available.** <p>Generally, semaphores used to control resource access should be* initialized as fair, to ensure that no thread is starved out from* accessing a resource. When using semaphores for other kinds of* synchronization control, the throughput advantages of non-fair* ordering often outweigh fairness considerations.** <p>This class also provides convenience methods to {@link* #acquire(int) acquire} and {@link #release(int) release} multiple* permits at a time.  Beware of the increased risk of indefinite* postponement when these methods are used without fairness set true.** <p>Memory consistency effects: Actions in a thread prior to calling* a "release" method such as {@code release()}* <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>* actions following a successful "acquire" method such as {@code acquire()}* in another thread.** @since 1.5* @author Doug Lea**/public class Semaphore implements java.io.Serializable {private static final long serialVersionUID = -3222578661600680210L;/** All mechanics via AbstractQueuedSynchronizer subclass */private final Sync sync;/*** Synchronization implementation for semaphore.  Uses AQS state* to represent permits. Subclassed into fair and nonfair* versions.*/abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 1192457210091910933L;Sync(int permits) {setState(permits);}final int getPermits() {return getState();}final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}}/*** NonFair version*/static final class NonfairSync extends Sync {private static final long serialVersionUID = -2694183684443567898L;NonfairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}}/*** Fair version*/static final class FairSync extends Sync {private static final long serialVersionUID = 2014338818796000944L;FairSync(int permits) {super(permits);}protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}}/*** Creates a {@code Semaphore} with the given number of* permits and nonfair fairness setting.** @param permits the initial number of permits available.*        This value may be negative, in which case releases*        must occur before any acquires will be granted.*/public Semaphore(int permits) {sync = new NonfairSync(permits);}/*** Creates a {@code Semaphore} with the given number of* permits and the given fairness setting.** @param permits the initial number of permits available.*        This value may be negative, in which case releases*        must occur before any acquires will be granted.* @param fair {@code true} if this semaphore will guarantee*        first-in first-out granting of permits under contention,*        else {@code false}*/public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}/*** Acquires a permit from this semaphore, blocking until one is* available, or the thread is {@linkplain Thread#interrupt interrupted}.** <p>Acquires a permit, if one is available and returns immediately,* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of two things happens:* <ul>* <li>Some other thread invokes the {@link #release} method for this* semaphore and the current thread is next to be assigned a permit; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** @throws InterruptedException if the current thread is interrupted*/public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}/*** Acquires a permit from this semaphore, blocking until one is* available.** <p>Acquires a permit, if one is available and returns immediately,* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* some other thread invokes the {@link #release} method for this* semaphore and the current thread is next to be assigned a permit.** <p>If the current thread is {@linkplain Thread#interrupt interrupted}* while waiting for a permit then it will continue to wait, but the* time at which the thread is assigned a permit may change compared to* the time it would have received the permit had no interruption* occurred.  When the thread does return from this method its interrupt* status will be set.*/public void acquireUninterruptibly() {sync.acquireShared(1);}/*** Acquires a permit from this semaphore, only if one is available at the* time of invocation.** <p>Acquires a permit, if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.** <p>If no permit is available then this method will return* immediately with the value {@code false}.** <p>Even when this semaphore has been set to use a* fair ordering policy, a call to {@code tryAcquire()} <em>will</em>* immediately acquire a permit if one is available, whether or not* other threads are currently waiting.* This &quot;barging&quot; behavior can be useful in certain* circumstances, even though it breaks fairness. If you want to honor* the fairness setting, then use* {@link #tryAcquire(long, TimeUnit) tryAcquire(0, TimeUnit.SECONDS) }* which is almost equivalent (it also detects interruption).** @return {@code true} if a permit was acquired and {@code false}*         otherwise*/public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}/*** Acquires a permit from this semaphore, if one becomes available* within the given waiting time and the current thread has not* been {@linkplain Thread#interrupt interrupted}.** <p>Acquires a permit, if one is available and returns immediately,* with the value {@code true},* reducing the number of available permits by one.** <p>If no permit is available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of three things happens:* <ul>* <li>Some other thread invokes the {@link #release} method for this* semaphore and the current thread is next to be assigned a permit; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or* <li>The specified waiting time elapses.* </ul>** <p>If a permit is acquired then the value {@code true} is returned.** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* to acquire a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.** <p>If the specified waiting time elapses then the value {@code false}* is returned.  If the time is less than or equal to zero, the method* will not wait at all.** @param timeout the maximum time to wait for a permit* @param unit the time unit of the {@code timeout} argument* @return {@code true} if a permit was acquired and {@code false}*         if the waiting time elapsed before a permit was acquired* @throws InterruptedException if the current thread is interrupted*/public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}/*** Releases a permit, returning it to the semaphore.** <p>Releases a permit, increasing the number of available permits by* one.  If any threads are trying to acquire a permit, then one is* selected and given the permit that was just released.  That thread* is (re)enabled for thread scheduling purposes.** <p>There is no requirement that a thread that releases a permit must* have acquired that permit by calling {@link #acquire}.* Correct usage of a semaphore is established by programming convention* in the application.*/public void release() {sync.releaseShared(1);}/*** Acquires the given number of permits from this semaphore,* blocking until all are available,* or the thread is {@linkplain Thread#interrupt interrupted}.** <p>Acquires the given number of permits, if they are available,* and returns immediately, reducing the number of available permits* by the given amount.** <p>If insufficient permits are available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* one of two things happens:* <ul>* <li>Some other thread invokes one of the {@link #release() release}* methods for this semaphore, the current thread is next to be assigned* permits and the number of available permits satisfies this request; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread.* </ul>** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* for a permit,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.* Any permits that were to be assigned to this thread are instead* assigned to other threads trying to acquire permits, as if* permits had been made available by a call to {@link #release()}.** @param permits the number of permits to acquire* @throws InterruptedException if the current thread is interrupted* @throws IllegalArgumentException if {@code permits} is negative*/public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}/*** Acquires the given number of permits from this semaphore,* blocking until all are available.** <p>Acquires the given number of permits, if they are available,* and returns immediately, reducing the number of available permits* by the given amount.** <p>If insufficient permits are available then the current thread becomes* disabled for thread scheduling purposes and lies dormant until* some other thread invokes one of the {@link #release() release}* methods for this semaphore, the current thread is next to be assigned* permits and the number of available permits satisfies this request.** <p>If the current thread is {@linkplain Thread#interrupt interrupted}* while waiting for permits then it will continue to wait and its* position in the queue is not affected.  When the thread does return* from this method its interrupt status will be set.** @param permits the number of permits to acquire* @throws IllegalArgumentException if {@code permits} is negative**/public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}/*** Acquires the given number of permits from this semaphore, only* if all are available at the time of invocation.** <p>Acquires the given number of permits, if they are available, and* returns immediately, with the value {@code true},* reducing the number of available permits by the given amount.** <p>If insufficient permits are available then this method will return* immediately with the value {@code false} and the number of available* permits is unchanged.** <p>Even when this semaphore has been set to use a fair ordering* policy, a call to {@code tryAcquire} <em>will</em>* immediately acquire a permit if one is available, whether or* not other threads are currently waiting.  This* &quot;barging&quot; behavior can be useful in certain* circumstances, even though it breaks fairness. If you want to* honor the fairness setting, then use {@link #tryAcquire(int,* long, TimeUnit) tryAcquire(permits, 0, TimeUnit.SECONDS) }* which is almost equivalent (it also detects interruption).** @param permits the number of permits to acquire* @return {@code true} if the permits were acquired and*         {@code false} otherwise* @throws IllegalArgumentException if {@code permits} is negative*/public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;}/*** Acquires the given number of permits from this semaphore, if all* become available within the given waiting time and the current* thread has not been {@linkplain Thread#interrupt interrupted}.** <p>Acquires the given number of permits, if they are available and* returns immediately, with the value {@code true},* reducing the number of available permits by the given amount.** <p>If insufficient permits are available then* the current thread becomes disabled for thread scheduling* purposes and lies dormant until one of three things happens:* <ul>* <li>Some other thread invokes one of the {@link #release() release}* methods for this semaphore, the current thread is next to be assigned* permits and the number of available permits satisfies this request; or* <li>Some other thread {@linkplain Thread#interrupt interrupts}* the current thread; or* <li>The specified waiting time elapses.* </ul>** <p>If the permits are acquired then the value {@code true} is returned.** <p>If the current thread:* <ul>* <li>has its interrupted status set on entry to this method; or* <li>is {@linkplain Thread#interrupt interrupted} while waiting* to acquire the permits,* </ul>* then {@link InterruptedException} is thrown and the current thread's* interrupted status is cleared.* Any permits that were to be assigned to this thread, are instead* assigned to other threads trying to acquire permits, as if* the permits had been made available by a call to {@link #release()}.** <p>If the specified waiting time elapses then the value {@code false}* is returned.  If the time is less than or equal to zero, the method* will not wait at all.  Any permits that were to be assigned to this* thread, are instead assigned to other threads trying to acquire* permits, as if the permits had been made available by a call to* {@link #release()}.** @param permits the number of permits to acquire* @param timeout the maximum time to wait for the permits* @param unit the time unit of the {@code timeout} argument* @return {@code true} if all permits were acquired and {@code false}*         if the waiting time elapsed before all permits were acquired* @throws InterruptedException if the current thread is interrupted* @throws IllegalArgumentException if {@code permits} is negative*/public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));}/*** Releases the given number of permits, returning them to the semaphore.** <p>Releases the given number of permits, increasing the number of* available permits by that amount.* If any threads are trying to acquire permits, then one* is selected and given the permits that were just released.* If the number of available permits satisfies that thread's request* then that thread is (re)enabled for thread scheduling purposes;* otherwise the thread will wait until sufficient permits are available.* If there are still permits available* after this thread's request has been satisfied, then those permits* are assigned in turn to other threads trying to acquire permits.** <p>There is no requirement that a thread that releases a permit must* have acquired that permit by calling {@link Semaphore#acquire acquire}.* Correct usage of a semaphore is established by programming convention* in the application.** @param permits the number of permits to release* @throws IllegalArgumentException if {@code permits} is negative*/public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}/*** Returns the current number of permits available in this semaphore.** <p>This method is typically used for debugging and testing purposes.** @return the number of permits available in this semaphore*/public int availablePermits() {return sync.getPermits();}/*** Acquires and returns all permits that are immediately available.** @return the number of permits acquired*/public int drainPermits() {return sync.drainPermits();}/*** Shrinks the number of available permits by the indicated* reduction. This method can be useful in subclasses that use* semaphores to track resources that become unavailable. This* method differs from {@code acquire} in that it does not block* waiting for permits to become available.** @param reduction the number of permits to remove* @throws IllegalArgumentException if {@code reduction} is negative*/protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction);}/*** Returns {@code true} if this semaphore has fairness set true.** @return {@code true} if this semaphore has fairness set true*/public boolean isFair() {return sync instanceof FairSync;}/*** Queries whether any threads are waiting to acquire. Note that* because cancellations may occur at any time, a {@code true}* return does not guarantee that any other thread will ever* acquire.  This method is designed primarily for use in* monitoring of the system state.** @return {@code true} if there may be other threads waiting to*         acquire the lock*/public final boolean hasQueuedThreads() {return sync.hasQueuedThreads();}/*** Returns an estimate of the number of threads waiting to acquire.* The value is only an estimate because the number of threads may* change dynamically while this method traverses internal data* structures.  This method is designed for use in monitoring of the* system state, not for synchronization control.** @return the estimated number of threads waiting for this lock*/public final int getQueueLength() {return sync.getQueueLength();}/*** Returns a collection containing threads that may be waiting to acquire.* Because the actual set of threads may change dynamically while* constructing this result, the returned collection is only a best-effort* estimate.  The elements of the returned collection are in no particular* order.  This method is designed to facilitate construction of* subclasses that provide more extensive monitoring facilities.** @return the collection of threads*/protected Collection<Thread> getQueuedThreads() {return sync.getQueuedThreads();}/*** Returns a string identifying this semaphore, as well as its state.* The state, in brackets, includes the String {@code "Permits ="}* followed by the number of permits.** @return a string identifying this semaphore, as well as its state*/public String toString() {return super.toString() + "[Permits = " + sync.getPermits() + "]";}
}

View Code

下面我们来详细分下下Semaphore的工作原理。

一、构造函数

    public Semaphore(int permits) {sync = new NonfairSync(permits);}public Semaphore(int permits, boolean fair) {sync = fair ? new FairSync(permits) : new NonfairSync(permits);}

初始化Semaphore时需要指定共享资源的个数。Semaphore提供了两种模式:公平模式&非公平模式。如果不指定工作模式的话,默认工作在非公平模式下。后面我们将看到,两种模式的区别在于获取共享资源时的排序策略。Semaphore有三个内部类:Sync&NonfairSync&FairSync。后两个继承自Sync,Sync继承自AQS。除了序列化版本号之外,Semaphore只有一个成员变量sync,公平模式下sync初始化为FairSync,非公平模式下sync初始化为NonfairSync。

二、acquire 响应中断获取资源

Semaphore提供了两种获取资源的方式:响应中断&不响应中断。我们先来看一下响应中断的获取。

    public void acquire() throws InterruptedException {sync.acquireSharedInterruptibly(1);}

acquire方法由同步器sync调用上层AQS提供的acquireSharedInterruptibly方法获取:

    public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);}

acquireSharedInterruptibly方法先检测中断。然后调用tryAcquireShared方法试图获取共享资源。这时公平模式和非公平模式的代码执行路径发生分叉,FairSync和NonfairSync各自重写了tryAcquireShared方法。

我们先来看下非公平模式下的tryAcquireShared方法:

        protected int tryAcquireShared(int acquires) {return nonfairTryAcquireShared(acquires);}

它直接代用了父类Sync提供的nonfairTryAcquireShared方法:

        final int nonfairTryAcquireShared(int acquires) {for (;;) {int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}

注意,这里是一个CAS自旋。因为Semaphore是一个共享锁,可能有多个线程同时申请共享资源,因此CAS操作可能失败。直到成功获取返回剩余资源数目,或者发现没有剩余资源返回负值代表申请失败。有一个问题,为什么我们不在CAS操作失败后就直接返回失败呢?因为这样做虽然不会导致错误,但会降低效率:在还有剩余资源的情况下,一个线程因为竞争导致CAS失败后被放入等待序列尾,一定在队列头部有一个线程被唤醒去试图获取资源,这比直接自旋继续获取多了操作等待队列的开销。

这里“非公平”的语义体现在:如果一个线程通过nonfairTryAcquireShared成功获取了共享资源,对于此时正在等待队列中的线程来说,可能是不公平的:队列中线程先到,却没能先获取资源。

如果tryAcquireShared没能成功获取,acquireSharedInterruptibly方法调用doAcquireSharedInterruptibly方法将当前线程放入等待队列并开始自旋检测获取资源:

    private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

我们注意到,doAcquireSharedInterruptibly中,当一个线程从parkAndCheckInterrupt方法中被中断唤醒之后,直接抛出了中断异常。还记得我们分析AQS时的doAcquireShared方法吗,它在这里的处理方式是用一个局部变量interrupted记录下这个异常但不立即处理,而是等到成功获取资源之后返回这个中断标志,并在上层调用selfInterrupt方法补上中断。这正是两个方法的关键区别:是否及时响应中断。

我们再来看公平模式下的tryAcquireShared方法:

        protected int tryAcquireShared(int acquires) {for (;;) {if (hasQueuedPredecessors())return -1;int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}}

相比较非公平模式的nonfairTryAcquireShared方法,公平模式下的tryAcquireShared方法在试图获取之前做了一个判断,如果发现等对队列中有线程在等待获取资源,就直接返回-1表示获取失败。当前线程会被上层的acquireSharedInterruptibly方法调用doAcquireShared方法放入等待队列中。这正是“公平”模式的语义:如果有线程先于我进入等待队列且正在等待,就直接进入等待队列,效果便是各个线程按照申请的顺序获得共享资源,具有公平性。

三、acquireUnInterruptibly 不响应中断获取资源

    public void acquireUninterruptibly() {sync.acquireShared(1);}

acquireUnInterruptibly方法调用AQS提供的acquireShared方法:

    public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}

acquireShared方法首先试图获取资源,这与acquireSharedInterruptibly方法相比,没有先检测中断的这一步。紧接着调用doAcquireShared方法,由于这个方法我在另一篇博文AQS源码学习笔记中已经详细分析过,这里我们只关注它与doAcquireSharedInterruptibly方法的区别:

    private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}

正如刚刚说过的,区别只在线程从parkAndCheckInterrupt方法中因中断而返回时的处理:在这里它没有抛出异常,而是用一个局部变量interrupted记录下这个异常但不立即处理,而是等到成功获取资源之后返回这个中断标志,并在上层调用selfInterrupt方法补上中断。

四、acquire(int) & acquireUninterruptibly(int) 指定申请的资源数目的获取

    public void acquire(int permits) throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();sync.acquireSharedInterruptibly(permits);}public void acquireUninterruptibly(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.acquireShared(permits);}

可以看到,与不指定数目时的获取的区别仅在参数值,不再赘述。

五、release 释放资源

公平模式和非公平模式的释放资源操作是一样的:

    public void release() {sync.releaseShared(1);}public void release(int permits) {if (permits < 0) throw new IllegalArgumentException();sync.releaseShared(permits);}

调用AQS提供的releaseShared方法:

    public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}

releaseShared方法首先调用我们重写的tryReleaseShared方法试图释放资源。然后调用doReleaseShared方法唤醒队列之后的等待线程。由于在我的另一篇博文AQS源码学习笔记中已经详细分析了doReleaseShared方法,因此不再赘述。我们主要关注tryReleaseShared方法:

        protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}

这个方法也是一个CAS自旋,原因是应为Semaphore是一个共享锁,可能有多个线程同时释放资源,因此CAS操作可能失败。最后方法总会成功释放并返回true(如果不出错的话)。

六、tryAcquire & tryAcquire(timeout) 方法

    public boolean tryAcquire() {return sync.nonfairTryAcquireShared(1) >= 0;}public boolean tryAcquire(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));}public boolean tryAcquire(int permits) {if (permits < 0) throw new IllegalArgumentException();return sync.nonfairTryAcquireShared(permits) >= 0;}public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException {if (permits < 0) throw new IllegalArgumentException();return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));}

没有指定等待时间的tryAcquire调用的是nonfairTryAcquireShared方法,我们已经分析过,不再赘述。我们重点关注指定等待时长的方法。限时等待是通过调用AQS提供的tryAcquireSharedNanos方法实现的:

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();return tryAcquireShared(arg) >= 0 ||doAcquireSharedNanos(arg, nanosTimeout);}

注意:限时等待默认都是及时响应中断的。方法开始先检测中断,然后调用tryAcquireShared方法试图获取资源,如果成功的话直接返回true,不成功则调用doAcquireSharedNanos方法:

    private boolean doAcquireSharedNanos(int arg, long nanosTimeout)throws InterruptedException {if (nanosTimeout <= 0L)return false;final long deadline = System.nanoTime() + nanosTimeout;final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return true;}}nanosTimeout = deadline - System.nanoTime();if (nanosTimeout <= 0L)return false;if (shouldParkAfterFailedAcquire(p, node) &&nanosTimeout > spinForTimeoutThreshold)LockSupport.parkNanos(this, nanosTimeout);if (Thread.interrupted())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

方法在自旋之前先计算了一个结束等待的时间节点deadline,然后便开始自旋,每次自旋都要计算一下剩余等待时间nanosTimeout,如果nanosTimeout小于等于0,说明已经到达deadline,直接返回false表示超时。

有一点值得注意,spinForTimeoutThreshold这个值规定了一个阈值,当剩余等待时间小于这个值的时候,线程将不再被park,而是一直在自旋试图获取资源。关于这个值的作用Doug Lea是这样注释的:

    /*** The number of nanoseconds for which it is faster to spin* rather than to use timed park. A rough estimate suffices* to improve responsiveness with very short timeouts.*/

我的理解是,park和unpark操作需要一定的开销,当nanosTimeout很小的时候,这个开销就相对很大了。这个阈值的设置可以让短时等待的线程一直保持自旋,可以提高短时等待的反应效率,而由于nanosTimeout很小,自旋又不会有过多的开销。

除此之外,doAcquireSharedNanos方法与不限时等待的doAcquireShared方法还有两点重要区别:①由于有等待时限,所以线程从park方法返回时我们不能确定返回的原因是中断还是超时,因此需要调用interrupted方法检测一下中断标志;②doAcquireSharedNanos方法是及时响应中断的,而doAcquireShared方法延迟处理中断。

七、drainPermits & reducePermits 修改剩余共享资源数量

Semaphore提供了“耗尽”所有剩余共享资源的操作:

    public int drainPermits() {return sync.drainPermits();}

drainPermits调用了自定义同步器Sync的同名方法:

        final int drainPermits() {for (;;) {int current = getState();if (current == 0 || compareAndSetState(current, 0))return current;}}

用CAS自旋将剩余资源清空。

我们再来看看“缩减”剩余共享资源的操作:

    protected void reducePermits(int reduction) {if (reduction < 0) throw new IllegalArgumentException();sync.reducePermits(reduction);}

首先,缩减必须是单向的,即只能减少不能增加,然后调用Sync的同名方法:

        final void reducePermits(int reductions) {for (;;) {int current = getState();int next = current - reductions;if (next > current) // underflowthrow new Error("Permit count underflow");if (compareAndSetState(current, next))return;}}

用CAS自旋在剩余共享资源上做缩减。

上述两个对共享资源数量的修改操作有两点需要注意:①是不可逆的②是对剩余资源的操作而不是全部资源,当剩余资源数目不足或已经为0时,方法就返回,正咋被占用的资源不参与。

八、其他

    public int availablePermits() {return sync.getPermits();}public boolean isFair() {return sync instanceof FairSync;}public final boolean hasQueuedThreads() {return sync.hasQueuedThreads();}public final int getQueueLength() {return sync.getQueueLength();}protected Collection<Thread> getQueuedThreads() {return sync.getQueuedThreads();}public String toString() {return super.toString() + "[Permits = " + sync.getPermits() + "]";}

这些方法比较简单,不再赘述。

总结:

Semaphore是JUC包提供的一个典型的共享锁,它通过自定义两种不同的同步器(FairSync&NonfairSync)提供了公平&非公平两种工作模式,两种模式下分别提供了限时/不限时、响应中断/不响应中断的获取资源的方法(限时获取总是及时响应中断的),而所有的释放资源的release操作是统一的。

转载于:https://www.cnblogs.com/go2sea/p/5625536.html

Java多线程之JUC包:Semaphore源码学习笔记相关推荐

  1. java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw

    java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw java毕业设计线上教学平台mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构:B/S架构 开 ...

  2. java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试

    java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上花店购物商城源码+lw文档+mybatis+系统+mysql数据库+调试 本源码技术栈 ...

  3. java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试

    java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上鲜花销售系统源码+lw文档+mybatis+系统+mysql数据库+调试 本源码技术栈 ...

  4. java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试

    java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试 java毕业设计线上旅行信息管理系统源码+lw文档+mybatis+系统+mysql数据库+调试 本源 ...

  5. Apache log4j-1.2.17源码学习笔记

    (1)Apache log4j-1.2.17源码学习笔记 http://blog.csdn.net/zilong_zilong/article/details/78715500 (2)Apache l ...

  6. Vuex 4源码学习笔记 - 通过Vuex源码学习E2E测试(十一)

    在上一篇笔记中:Vuex 4源码学习笔记 - 做好changelog更新日志很重要(十) 我们学到了通过conventional-changelog来生成项目的Changelog更新日志,通过更新日志 ...

  7. 雷神FFMpeg源码学习笔记

    雷神FFMpeg源码学习笔记 文章目录 雷神FFMpeg源码学习笔记 读取编码并依据编码初始化内容结构 每一帧的视频解码处理 读取编码并依据编码初始化内容结构 在开始编解码视频的时候首先第一步需要注册 ...

  8. RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

    RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...

  9. Vuex 4源码学习笔记 - Vuex是怎么与Vue结合?(三)

    在上一篇笔记中:Vuex源码学习笔记 - Vuex开发运行流程(二) 我们通过运行npm run dev命令来启动webpack,来开发Vuex,并在Vuex的createStore函数中添加了第一个 ...

最新文章

  1. R语言使用gganimate包可视化动画点直方图生成过程(dot histogram)、在数据遍历的过程中逐步在箱体内堆叠数据点形成最终的点直方图
  2. shell判断IP地址是否合法
  3. Android 联系人导入导出(VCard格式)
  4. Maven配置_01
  5. 题解 P2949 【[USACO09OPEN]工作调度Work Scheduling】
  6. 执行容器内的shell_你的Docker容器可能充满了Graboid加密蠕虫
  7. mysql connector 是什么_mysql的connector/J,和JDBC是啥关系?有啥区别?
  8. C语言高效编程的四大秘技之以空间换时间
  9. 中文和英语中主语、谓语、宾语、定语、状语、补语的定义
  10. 设置 Google Analytics(分析)全局网站统计代码
  11. Excel图表制作(二):滚动条实现动态图表
  12. rxjava背压_关于RxJava背压
  13. 被蠢人、穷人与聪明人放弃的“概率权”
  14. setInterval使用过程中报Uncaught SyntaxError: Unexpected identifier
  15. Warmup 模型训练之标配
  16. 2023版golang面试题100道(map)
  17. 使用MATLAB的EEGLAB和BCT工具箱画脑网络连接图
  18. Java基础:数据类型与变量
  19. 关于大学计算机相关专业学习路线的见解与分析
  20. Smart3D空三不过的解决办法

热门文章

  1. Algorithm I assignment Collinear
  2. Quartz2D知识点聚合案例
  3. [个人推荐]函数式编程另类指南[zz]
  4. 【收集】ADOADO.NET 读取 Oracle 数据集
  5. disruptor 介绍
  6. Activiti 简易教程
  7. C#制作、打包、签名、发布Activex全过程
  8. “机器换人”之潮涌向珠三角,蓝领工人将何去何从
  9. Linux下安全审计工具 lynis 使用说明
  10. 快速排序——Java