文章目录

  • 第13章 显示锁
    • 13.1 Lock与ReentrantLock
      • 13.1.1 轮询锁和定时锁
      • 13.1.2 可中断的锁获取操作
      • 13.1.3 非块结构的加锁
    • 13.2 性能考虑因素
    • 13.3 公平性
    • 13.4 在synchronized和ReentrantLock之间选择
    • 13.5 读-写锁
    • 小结
  • 第14章 构建自定义的同步工具
    • 14.1 状态依赖性的管理】
      • 14.1.1 示例:将前提条件的失败传递给调用者
      • 14.1.2 示例 通过轮询与休眠来实现简单的阻塞
      • 14.1.3 条件队列
    • 14.2 使用条件队列
      • 14.2.1 条件队列
      • 14.2.2 过早唤醒
      • 14.2.3 丢失的信号
      • 14.2.4 通知
      • 14.2.5 示例 阀门类
      • 14.2.6 字类的安全问题
      • 14.2.7 封装条件队列
      • 14.2.8 入口协议与出口协议
    • 14.3 显示的Condition对象
    • 14.4 Synchronizer剖析
    • 14.5 AbstractQueueSynchronizer
    • 14.6 java.util.concurrent同步器类中的AQS
      • 14.6.1 ReentrantLock
      • 14.6.2 Semaphore与CountDownLatch
      • 14.6.3 FutureTask
      • 14.6.4 ReentrantReadWriteLock
    • 小结
  • 第15章 原子变量与非阻塞同步机制
    • 15.1 锁的劣势
    • 15.2 硬件对并发的支持
      • 15.2.1 比较并交换
      • 15.2.2 非阻塞的计数器
      • 15.2.3 JVM对CAS的支持
    • 15.3 原子变量类
      • 15.3.1 原子变量类是一种“更好的volatile”
      • 15.3.2 性能比较: 锁与原子变量
    • 15.4 非阻塞算法
      • 15.4.1 非阻塞的栈
      • 15.4.2 非阻塞的链表
      • 15.4.3 原子的域更新器
      • 15.4.4 ABA问题
    • 小结
  • 第16章 Java内存模型
    • 16.1 什么是内存模型,为什么需要它
      • 16.1.1 平台的内存模型
      • 16.1.2 重排序
      • 16.1.3 Java内存模型简介
      • 16.1.4 借助同步
    • 16.2 发布
      • 16.2.1 不安全的发布
      • 16.2.2 安全的发布
      • 16.2.3 安全初始化模式
      • 16.2.4 双重检查枷锁
    • 16.3 初始化过程中的安全性
    • 小结

第13章 显示锁

在Java 5.0之前,在协调对共享对象的访问时可以使用的机制只有synchronized和volatile。Java 5.0增加了一种新的机制:ReentrantLock。与之前提到过的机制相反,ReentrantLock并不是一种替代内置加锁的方法,而是当内置加锁机制不适用时,作为一种可选择的高级功能。

13.1 Lock与ReentrantLock

在程序清单13-1给出的Lock接口中定义了一组抽象的加锁操作。与内置加锁机制不同的是,Lock提供了一种无条件的、可轮询的、定时的以及可中断的锁获取操作,所有加锁和解锁的方法都是显式的。在Lock的实现中必须提供与内部锁相同的内存可见性语义,但在加锁语义、调度算法、顺序保证以及性能特性等方面可以有所不同。(第14章将介绍Lock.newCondition。)
   程序清单13-1 Lock接口

public interface Lock {void lock();void lockInterruptibly() throws InterruptedException;boolean tryLock();boolean tryLock(long time, TimeUnit unit) throws InterruptedException;void unlock();Condition newCondition();
}

ReentrantLock实现了Lock接口,并提供了与synchronized相同的互斥性和内存可见性。在获取ReentrantLock时,有着与进入同步代码块相同的内存语义,在释放ReentrantLock时,同样有着与退出同步代码块相同的内存语义。(3.1节以及第16章介绍内存可见性。)此外,与synchronized一样,ReentrantLock还提供了可重入的加锁语义(请参见2.3.2节)。ReentrantLock支持在Lock接口中定义的所有获取锁模式,并且与synchronized相比,它还为处理锁的不可用性问题提供了更高的灵活性。
   为什么要创建一种与内置锁如此相似的新加锁机制?在大多数情况下,内置锁都能很好地工作,但在功能上存在一些局限性,例如,无法中断一个正在等待获取锁的线程,或者无法在请求获取一个锁时无限地等待下去。内置锁必须在获取该锁的代码块中释放,这就简化了编码工作,并且与异常处理操作实现了很好的交互,但却无法实现非阻塞结构的加锁规则。这些都是使用synchronized的原因,但在某些情况下,一种更灵活的加锁机制通常能提供更好的活跃性或性能。
   程序清单13-2给出了Lock接口的标准使用形式。这种形式比使用内置锁复杂一些:必须在finally块中释放锁。否则,如果在被保护的代码中抛出了异常,那么这个锁永远都无法释放。当使用加锁时,还必须考虑在try块中抛出异常的情况,如果可能使对象处于某种不一致的状态,那么就需要更多的try-catch或try-finally代码块。(当使用某种形式的加锁时,包括内置锁,都应该考虑在出现异常时的情况。)
   程序清单13-2 使用ReentrantLock来保护对象状态

Lock lock = new ReentrantLock();
lock.lock();
try {// 更新对象状态// 捕获异常,并在必要时恢复不变性条件
} finally { lock.unlock();
}

如果没有使用finally来释放Lock,那么相当于启动了一个定时炸弹。当“炸弹爆炸”时,将很难追踪到最初发生错误的位置,因为没有记录应该释放锁的位置和时间。这就是ReentrantLock不能完全替代synchronized的原因:它更加“危险”,因为当程序的执行控制离开被保护的代码块时,不会自动清除锁。虽然在finally块中释放锁并不困难,但也可能忘记。

13.1.1 轮询锁和定时锁

可定时的与可轮询的锁获取模式是由tryLock方法实现的,与无条件的锁获取模式相比,它具有更完善的错误恢复机制。在内置锁中,死锁是一个严重的问题,恢复程序的唯一方法是重新启动程序,而防止死锁的唯一方法就是在构造程序时避免出现不一致的锁顺序。可定时的与可轮询的锁提供了另一种选择:避免死锁的发生。
   如果不能获得所有需要的锁,那么可以使用可定时的或可轮询的锁获取方式,从而使你重新获得控制权,它会释放已经获得的锁,然后重新尝试获取所有锁(或者至少会将这个失败记录到日志,并采取其他措施)。程序清单13-3给出了另一种方法来解决10.1.2节中动态顺序死锁的问题:使用tryLock来获取两个锁,如果不能同时获得,那么就回退并重新尝试。在休眠时间中包括固定部分和随机部分,从而降低发生活锁的可能性。如果在指定时间内不能获得所有需要的锁,那么transferMoney将返回一个失败状态,从而使该操作平缓地失败。(请参见[CPJ 2.5.1.2]和[CPJ 2.5.1.3]了解更多使用可轮询的锁来避免死锁的示例。)
   程序清单13-3 通过tryLock来避免锁顺序死锁

public boolean transferMoney(Account fromAcct, Account toAcct, BigDecimal amount,long timeout, TimeUnit unit) {long fixeDelay = getFixedDelayComponentNanos(timeout,unit);long randMod = getFixedDelayComponentNanos(timeout,unit);long stopTime = System.nanoTime()+unit.toNanos(timeout);while (true) {if (fromAcct.lock.tryLock()) {try {if (toAcct.lock.tryLock()) {try {if (fromAcct.getBalance().compareTo(amount)<0) {throw new InsufficientResourcesException();} else {fromAcct.debit(amount);toAcct.credit(amount);return true;}} finally {toAcct.lock.unlock();}}} catch (Exception ex) {ex.printStackTrace();} finally {fromAcct.lock.unlock();}}if (System.nanoTime()<stopTime) {return false;}NANOSECONdS.sleep(fixeDelay+rnd.nextLong()%randMod);}
}

在实现具有时间限制的操作时,定时锁同样非常有用(请参见6.3.7节)。当在带有时间限制的操作中调用了一个阻塞方法时,它能根据剩余时间来提供一个时限。如果操作不能在指定的时间内给出结果,那么就会使程序提前结束。当使用内置锁时,在开始请求锁后,这个操作将无法取消,因此内置锁很难实现带有时间限制的操作。
   在程序清单6-17的旅游门户网站示例中,为询价的每个汽车租赁公司都创建了一个独立的任务。询价操作包含某种基于网络的请求机制,例如Web服务请求。但在询价操作中同样可能需要实现对紧缺资源的独占访问,例如通向公司的直连通信线路。
   9.5节介绍了确保对资源进行串行访问的方法:一个单线程的Executor。另一种方法是使用一个独占锁来保护对资源的访问。程序清单13-4试图在Lock保护的共享通信线路上发送一条消息,如果不能在指定时间内完成,代码就会失败。定时的tryLock能够在这种带有时间限制的操作中实现独占加锁行为。
   程序清单13-4 带有时间限制的加锁

public boolean trysendOnShareLine(String message,long timeout,TimeUnit unit) {long nanosToLock = unit.toNanos(timeout)-estimeatedNanostoSend(message);;if (!lock.tryLock(nanosToLock,NANOSECONDS)) {return false;}try {return trysendOnShareLine(message);} finally {lock.unlock();}
}

13.1.2 可中断的锁获取操作

正如定时的锁获取操作能在带有时间限制的操作中使用独占锁,可中断的锁获取操作同样能在可取消的操作中使用加锁。7.1.6节给出了几种不能响应中断的机制,例如请求内置锁。这些不可中断的阻塞机制将使得实现可取消的任务变得复杂。lockInterruptibly方法能够在获得锁的同时保持对中断的响应,并且由于它包含在Lock中,因此无须创建其他类型的不可中断阻塞机制。
   可中断的锁获取操作的标准结构比普通的锁获取操作略微复杂一些,因为需要两个try块。(如果在可中断的锁获取操作中抛出了InterruptedException,那么可以使用标准的try-finally加锁模式。)在程序清单13-5中使用了lockInterruptibly来实现程序清单13-4中的sendOnSharedLine,以便在一个可取消的任务中调用它。定时的tryLock同样能响应中断,因此当需要实现一个定时的和可中断的锁获取操作时,可以使用tryLock方法。
   程序清单13-5 可中断的锁获取操作

public boolean sendOnShareLine(String message) {lock.lockInterruptibly();try {return cancellableSendOnShareLine(message);} finally {lock.unlock();}
}
private boolean cancellableSendOnShareLine(String message){// ...
}

对lockInterruptibly的理解:

class LockTest {private Lock lock = new ReentrantLock();public void doBussiness() {String name = Thread.currentThread().getName();try {System.out.println(name+"开始获取锁");lock.lockInterruptibly();System.out.println(name+"得到锁");for (int i = 0; i < 5; i++) {Thread.sleep(1000);System.out.println(name+":"+i);}} catch (Exception ex) {System.out.println(name+"被中断");System.out.println(name+"做些别的事情");} finally {try {lock.unlock();System.out.println(name+"释放锁");} catch (Exception ex) {System.out.println(name+"没有得到锁的线程运行结束");}}}public static void main(String[] args) throws InterruptedException {LockTest lockTest = new LockTest();Thread to = new Thread(lockTest::doBussiness,"t0");Thread t1 = new Thread(lockTest::doBussiness,"t1");to.start();Thread.sleep(10);t1.start();Thread.sleep(100);// 线程t1没有得到锁,中断t1的等待t1.interrupt();}
}

运行结果:

t0开始获取锁
t0得到锁
t1开始获取锁
t1被中断
t1做些别的事情
t1没有得到锁的线程运行结束
t0:0
t0:1
t0:2
t0:3
t0:4
t0释放锁

13.1.3 非块结构的加锁

在内置锁中,锁的获取和释放等操作都是基于代码块的——释放锁的操作总是与获取锁的操作处于同一个代码块,而不考虑控制权如何退出该代码块。自动的锁释放操作简化了对程序的分析,避免了可能的编码错误,但有时侯需要更灵活的加锁规则。
   在第11章中,我们看到了通过降低锁的粒度可以提高代码的可伸缩性。锁分段技术在基于散列的容器中实现了不同的散列链,以便使用不同的锁。我们可以通过采用类似的原则来降低链表中锁的粒度,即为每个链表节点使用一个独立的锁,使不同的线程能独立地对链表的不同部分进行操作。每个节点的锁将保护链接指针以及在该节点中存储的数据,因此当遍历或修改链表时,我们必须持有该节点上的这个锁,直到获得了下一个节点的锁,只有这样,才能释放前一个节点上的锁。在[CPJ 2.5.1.4]中介绍了使用这项技术的一个示例,并称之为连锁式加锁(Hand-Over-Hand Locking)或者锁耦合(Lock Coupling)。

13.2 性能考虑因素

当把ReentrantLock添加到Java 5.0时,它能比内置锁提供更好的竞争性能。对于同步原语来说,竞争性能是可伸缩性的关键要素:如果有越多的资源被耗费在锁的管理和调度上,那么应用程序得到的资源就越少。锁的实现方式越好,将需要越少的系统调用和上下文切换,并且在共享内存总线上的内存同步通信量也越少,而一些耗时的操作将占用应用程序的计算资源。
   Java 6使用了改进后的算法来管理内置锁,与在ReentrantLock中使用的算法类似,该算法有效地提高了可伸缩性。图13-1给出了在Java 5.0和Java 6版本中,内置锁与ReentrantLock之间的性能差异,测试程序的运行环境是4路的Opteron系统,操作系统为Solaris。图中的曲线表示在某个JVM版本中ReentrantLock相对于内置锁的“加速比”。在Java 5.0中,ReentrantLock能提供更高的吞吐量,但在Java 6中,二者的吞吐量非常接近【这张曲线图中没有给出的信息是:Java 5.0和Java 6之间的可伸缩性差异是源于内置锁的改进,而不是ReentrantLock。】。这里使用了与11.5节相同的测试程序,而这次比较的是通过一个HashMap在由内置锁保护以及由ReentrantLock保护的情况下的吞吐量。

   在Java 5.0中,当从单线程(无竞争)变化到多线程时,内置锁的性能将急剧下降,而ReentrantLock的性能下降则更为平缓,因而它具有更好的可伸缩性。但在Java 6中,情况就完全不同了,内置锁的性能不会由于竞争而急剧下降,并且两者的可伸缩性也基本相当。
   图13-1的曲线图告诉我们,像“X比Y更快”这样的表述大多是短暂的。性能和可伸缩性对于具体平台等因素都较为敏感,例如CPU、处理器数量、缓存大小以及JVM特性等,所有这些因素都可能会随着时间而发生变化。【当开始写作本书时,ReentrantLock似乎是解决锁的可伸缩性的最终手段。但不到一年的时间,内置锁在可伸缩性上已经获得了极大的提升。性能不仅是一个在不断变化的指标,而且变化得非常快。】
   性能是一个不断变化的指标,如果在昨天的测试基准中发现X比Y更快,那么在今天就可能已经过时了。

13.3 公平性

在ReentrantLock的构造函数中提供了两种公平性选择:创建一个非公平的锁(默认)或者一个公平的锁。在公平的锁上,线程将按照它们发出请求的顺序来获得锁,但在非公平的锁上,则允许“插队”:当一个线程请求非公平的锁时,如果在发出请求的同时该锁的状态变为可用,那么这个线程将跳过队列中所有的等待线程并获得这个锁。(在Semaphore中同样可以选择采用公平的或非公平的获取顺序。)非公平的ReentrantLock并不提倡“插队”行为,但无法防止某个线程在合适的时候进行“插队”。在公平的锁中,如果有另一个线程持有这个锁或者有其他线程在队列中等待这个锁,那么新发出请求的线程将被放入队列中。在非公平的锁中,只有当锁被某个线程持有时,新发出请求的线程才会被放入队列中【即使对于公平锁而言,可轮询的tryLock仍然会“插队”。】
   我们为什么不希望所有的锁都是公平的?毕竟,公平是一种好的行为,而不公平则是一种不好的行为,对不对?当执行加锁操作时,公平性将由于在挂起线程和恢复线程时存在的开销而极大地降低性能。在实际情况中,统计上的公平性保证——确保被阻塞的线程能最终获得锁,通常已经够用了,并且实际开销也小得多。有些算法依赖于公平的排队算法以确保它们的正确性,但这些算法并不常见。在大多数情况下,非公平锁的性能要高于公平锁的性能。
   图13-2给出了Map的性能测试,并比较由公平的以及非公平的ReentrantLock包装的HashMap的性能,测试程序在一个4路的Opteron系统上运行,操作系统为Solaris,在绘制结果曲线时采用了对数缩放比例【ConcurrentHashMap的曲线在4个线程和8个线程之间的变动非常大。这些变动大多是测量噪声,噪声的可能来源包括:与元素散列码之间的偶然交互,线程的调度,重新调整映射集合的大小,垃圾回收或其他内存系统的作用,以及操作系统在测试用例运行时执行一些周期性的辅助任务。实际上,在性能测试中存在着各种各样的变动因素,而这些因素通常不需要进行控制。我们不要人为地使曲线变得平滑,因为在现实世界的性能测试中同样会存在各种各样的噪声。】从图中可以看出,公平性把性能降低了约两个数量级。不必要的话,不要为公平性付出代价。

   在激烈竞争的情况下,非公平锁的性能高于公平锁的性能的一个原因是:在恢复一个被挂起的线程与该线程真正开始运行之间存在着严重的延迟。假设线程A持有一个锁,并且线程B请求这个锁。由于这个锁已被线程A持有,因此B将被挂起。当A释放锁时,B将被唤醒,因此会再次尝试获取锁。与此同时,如果C也请求这个锁,那么C很可能会在B被完全唤醒之前获得、使用以及释放这个锁。这样的情况是一种“双赢”的局面:B获得锁的时刻并没有推迟,C更早地获得了锁,并且吞吐量也获得了提高。
   当持有锁的时间相对较长,或者请求锁的平均时间间隔较长,那么应该使用公平锁。在这些情况下,“插队”带来的吞吐量提升(当锁处于可用状态时,线程却还处于被唤醒的过程中)则可能不会出现。
   与默认的ReentrantLock一样,内置加锁并不会提供确定的公平性保证,但在大多数情况下,在锁实现上实现统计上的公平性保证已经足够了。Java语言规范并没有要求JVM以公平的方式来实现内置锁,而在各种JVM中也没有这样做。ReentrantLock并没有进一步降低锁的公平性,而只是使一些已经存在的内容更明显。

13.4 在synchronized和ReentrantLock之间选择

ReentrantLock在加锁和内存上提供的语义与与内置锁相同,此外它还提供了一些其他功能,包括定时的锁等待、可中断的锁等待、公平性,以及实现非块结构的加锁。ReentrantLock在性能上似乎优于内置锁,其中在Java 6中略有胜出,而在Java 5.0中则是远远胜出。那么为什么不放弃synchronized,并在所有新的并发代码中都使用ReentrantLock?事实上有些作者已经建议这么做,将synchronized作为一种“遗留”结构,但这会将好事情变坏。
   与显式锁相比,内置锁仍然具有很大的优势。内置锁为许多开发人员所熟悉,并且简洁紧凑,而且在许多现有的程序中都已经使用了内置锁——如果将这两种机制混合使用,那么不仅容易令人困惑,也容易发生错误。ReentrantLock的危险性比同步机制要高,如果忘记在finally块中调用unlock,那么虽然代码表面上能正常运行,但实际上已经埋下了一颗定时炸弹,并很有可能伤及其他代码。仅当内置锁不能满足需求时,才可以考虑使用ReentrantLock。
   在一些内置锁无法满足需求的情况下,ReentrantLock可以作为一种高级工具。当需要一些高级功能时才应该使用ReentrantLock,这些功能包括:可定时的、可轮询的与可中断的锁获取操作,公平队列,以及非块结构的锁。否则,还是应该优先使用synchronized。
   在Java 5.0中,内置锁与ReentrantLock相比还有另一个优点:在线程转储中能给出在哪些调用帧中获得了哪些锁,并能够检测和识别发生死锁的线程。JVM并不知道哪些线程持有ReentrantLock,因此在调试使用ReentrantLock的线程的问题时,将起不到帮助作用。Java 6解决了这个问题,它提供了一个管理和调试接口,锁可以通过该接口进行注册,从而与ReentrantLocks相关的加锁信息就能出现在线程转储中,并通过其他的管理接口和调试接口来访问。与synchronized相比,这些调试消息是一种重要的优势,即便它们大部分都是临时性消息,线程转储中的加锁能给很多程序员带来帮助。ReentrantLock的非块结构特性仍然意味着,获取锁的操作不能与特定的栈帧关联起来,而内置锁却可以。
   未来更可能会提升synchronized而不是ReentrantLock的性能。因为synchronized是JVM的内置属性,它能执行一些优化,例如对线程封闭的锁对象的锁消除优化,通过增加锁的粒度来消除内置锁的同步(请参见11.3.2节),而如果通过基于类库的锁来实现这些功能,则可能性不大。除非将来需要在Java 5.0上部署应用程序,并且在该平台上确实需要ReentrantLock包含的可伸缩性,否则就性能方面来说,应该选择synchronized而不是ReentrantLock。

13.5 读-写锁

ReentrantLock实现了一种标准的互斥锁:每次最多只有一个线程能持有ReentrantLock。但对于维护数据的完整性来说,互斥通常是一种过于强硬的加锁规则,因此也就不必要地限制了并发性。互斥是一种保守的加锁策略,虽然可以避免“写/写”冲突和“写/读”冲突,但同样也避免了“读/读”冲突。在许多情况下,数据结构上的操作都是“读操作”——虽然它们也是可变的并且在某些情况下被修改,但其中大多数访问操作都是读操作。此时,如果能够放宽加锁需求,允许多个执行读操作的线程同时访问数据结构,那么将提升程序的性能。只要每个线程都能确保读取到最新的数据,并且在读取数据时不会有其他的线程修改数据,那么就不会发生问题。在这种情况下就可以使用读/写锁:一个资源可以被多个读操作访问,或者被一个写操作访问,但两者不能同时进行。
   在程序清单13-6的ReadWriteLock中暴露了两个Lock对象,其中一个用于读操作,而另一个用于写操作。要读取由ReadWriteLock保护的数据,必须首先获得读取锁,当需要修改ReadWriteLock保护的数据时,必须首先获得写入锁。尽管这两个锁看上去是彼此独立的,但读取锁和写入锁只是读-写锁对象的不同视图。
   程序清单13-6 ReadWriteLock接口

public interface ReadWriteLock {Lock readLock();Lock writeLock();
}

在读-写锁实现的加锁策略中,允许多个读操作同时进行,但每次只允许一个写操作。与Lock一样,ReadWriteLock可以采用多种不同的实现方式,这些方式在性能、调度保证、获取优先性、公平性以及加锁语义等方面可能有所不同。
   读-写锁是一种性能优化措施,在一些特定的情况下能实现更高的并发性。在实际情况中,对于在多处理器系统上被频繁读取的数据结构,读-写锁能够提高性能。而在其他情况下,读-写锁的性能比独占锁的性能要略差一些,这是因为它们的复杂性更高。如果要判断在某种情况下使用读-写锁是否会带来性能提升,最好对程序进行分析。由于ReadWriteLock使用Lock来实现锁的读-写部分,因此如果分析结果表明读-写锁没有提高性能,那么可以很容易地将读-写锁换为独占锁。
   在读取锁和写入锁之间的交互可以采用多种实现方式。ReadWriteLock中的一些可选实现包括:
   释放优先。当一个写入操作释放写入锁时,并且队列中同时存在读线程和写线程,那么应该优先选择读线程,写线程,还是最先发出请求的线程?
   读线程插队。如果锁是由读线程持有,但有写线程正在等待,那么新到达的读线程能否立即获得访问权,还是应该在写线程后面等待?如果允许读线程插队到写线程之前,那么将提高并发性,但却可能造成写线程发生饥饿问题。
   重入性。读取锁和写入锁是否是可重入的?
   降级。如果一个线程持有写入锁,那么它能否在不释放该锁的情况下获得读取锁?这可能会使得写入锁被“降级”为读取锁,同时不允许其他写线程修改被保护的资源。
   升级。读取锁能否优先于其他正在等待的读线程和写线程而升级为一个写入锁?在大多数的读-写锁实现中并不支持升级,因为如果没有显式的升级操作,那么很容易造成死锁。(如果两个读线程试图同时升级为写入锁,那么二者都不会释放读取锁。)
   ReentrantReadWriteLock为这两种锁都提供了可重入的加锁语义。与ReentrantLock类似,ReentrantReadWriteLock在构造时也可以选择是一个非公平的锁(默认)还是一个公平的锁。在公平的锁中,等待时间最长的线程将优先获得锁。如果这个锁由读线程持有,而另一个线程请求写入锁,那么其他读线程都不能获得读取锁,直到写线程使用完并且释放了写入锁。在非公平的锁中,线程获得访问许可的顺序是不确定的。写线程降级为读线程是可以的,但从读线程升级为写线程则是不可以的(这样做会导致死锁)。
   与ReentrantLock类似的是,ReentrantReadWriteLock中的写入锁只能有唯一的所有者,并且只能由获得该锁的线程来释放。在Java 5.0中,读取锁的行为更类似于一个Semaphore而不是锁,它只维护活跃的读线程的数量,而不考虑它们的标识。在Java 6中修改了这个行为:记录哪些线程已经获得了读者锁。【做出这种修改的一个原因是:在Java 5.0的锁实现中,无法区别一个线程是首次请求读取锁,还是可重入锁请求,从而可能使公平的读-写锁发生死锁。】
   当锁的持有时间较长并且大部分操作都不会修改被守护的资源时,那么读-写锁能提高并发性。在程序清单13-7的ReadWriteMap中使用了ReentrantReadWriteLock来包装Map,从而使它能在多个读线程之间被安全地共享,并且仍然能避免“读-写”或“写-写”冲突【ReadWriteMap并没有实现Map,因为实现一些方法(例如entrySet和values)是非常困难的,况且“简单”的方法通常已经足够了。】。在现实中,ConcurrentHashMap的性能已经很好了,因此如果只需要一个并发的基于散列的映射,那么就可以使用ConcurrentHashMap来代替这种方法,但如果需要对另一种Map实现(例如LinkedHashMap)提供并发性更高的访问,那么可以使用这项技术。
   程序清单13-7 用读-写锁来包装Map

class ReadWriteMap<K,V> {private final Map<K,V> map;private final ReadWriteLock lock = new ReentrantReadWriteLock();private final Lock r = lock.readLock();private final Lock w = lock.writeLock();public ReadWriteLock(Map<K,V> map) {this.map=map;}public V put(K key,V value) {w.lock();try {return map.put(key, value);} finally {w.unlock();}}// 对remove,putAll(),clear等方法执行相同等操作public V get(Object key) {r.lock();try {return map.get(key);} finally {r.unlock();}}// 对其他只读的Map方法执行相同的操作
}

图13-3给出了分别用ReentrantLock和ReadWriteLock来封装ArrayList的吞吐量比较,测试程序在4路的Opteron系统上运行,操作系统为Solaris。这里使用的测试程序与本书使用的Map性能测试基本类似——每个操作随机地选择一个值并在容器中查找这个值,并且只有少量的操作会修改这个容器中的内容。

小结

与内置锁相比,显式的Lock提供了一些扩展功能,在处理锁的不可用性方面有着更高的灵活性,并且对队列行有着更好的控制。但ReentrantLock不能完全替代synchronized,只有在synchronized无法满足需求时,才应该使用它。
读-写锁允许多个读线程并发地访问被保护的对象,当访问以读取操作为主的数据结构时,它能提高程序的可伸缩性。

第14章 构建自定义的同步工具

类库包含了许多存在状态依赖性的类,例如FutureTask、Semaphore和BlockingQueue等。在这些类的一些操作中有着基于状态的前提条件,例如,不能从一个空的队列中删除元素,或者获取一个尚未结束的任务的计算结果,在这些操作可以执行之前,必须等到队列进入“非空”状态,或者任务进入“已完成”状态。
   创建状态依赖类的最简单方法通常是在类库中现有状态依赖类的基础上进行构造。例如,在第8章的ValueLatch中就采用了这种方法,其中使用了一个CountDownLatch来提供所需的阻塞行为。但如果类库没有提供你需要的功能,那么还可以使用Java语言和类库提供的底层机制来构造自己的同步机制,包括内置的条件队列、显式的Condition对象以及AbstractQueuedSynchronizer框架。本章将介绍实现状态依赖性的各种选择,以及在使用平台提供的状态依赖性机制时需要遵守的各项规则。

14.1 状态依赖性的管理】

在单线程程序中调用一个方法时,如果某个基于状态的前提条件未得到满足(例如“连接池必须非空”),那么这个条件将永远无法成真。因此,在编写顺序程序中的类时,要使得这些类在它们的前提条件未被满足时就失败。但在并发程序中,基于状态的条件可能会由于其他线程的操作而改变:一个资源池可能在几条指令之前还是空的,但现在却变为非空的,因为另一个线程可能会返回一个元素到资源池。对于并发对象上依赖状态的方法,虽然有时候在前提条件不满足的情况下不会失败,但通常有一种更好的选择,即等待前提条件变为真。
   依赖状态的操作可以一直阻塞直到可以继续执行,这比使它们先失败再实现起来要更为方便且更不易出错。内置的条件队列可以使线程一直阻塞,直到对象进入某个进程可以继续执行的状态,并且当被阻塞的线程可以执行时再唤醒它们。我们将在14.2节介绍条件队列的详细内容,但为了突出高效的条件等待机制的价值,我们将首先介绍如何通过轮询与休眠等方式来(勉强地)解决状态依赖性问题。
   可阻塞的状态依赖操作的形式如程序清单14-1所示。这种加锁模式有些不同寻常,因为锁是在操作的执行过程中被释放与重新获取的。构成前提条件的状态变量必须由对象的锁来保护,从而使它们在测试前提条件的同时保持不变。如果前提条件尚未满足,就必须释放锁,以便其他线程可以修改对象的状态,否则,前提条件就永远无法变成真。在再次测试前提条件之前,必须重新获得锁。
   程序清单14-1 可阻塞的状态依赖操作的结构

accquire lock on object state
while( precondtion doesnot hold) {release lockwait util precondition might holdoptionally fail if interrupted or timeout expirseaccquire lock
}
persorm action
release lock

在生产者-消费者的设计中经常会使用像ArrayBlockingQueue这样的有界缓存。在有界缓存提供的put和take操作中都包含有一个前提条件:不能从空缓存中获取元素,也不能将元素放入已满的缓存中。当前提条件未满足时,依赖状态的操作可以抛出一个异常或返回一个错误状态(使其成为调用者的一个问题),也可以保持阻塞直到对象进入正确的状态。
   在生产者-消费者的设计中经常会使用像ArrayBlockingQueue这样的有界缓存。在有界缓存提供的put和take操作中都包含有一个前提条件:不能从空缓存中获取元素,也不能将元素放入已满的缓存中。当前提条件未满足时,依赖状态的操作可以抛出一个异常或返回一个错误状态(使其成为调用者的一个问题),也可以保持阻塞直到对象进入正确的状态。
   程序清单14-2 有界缓存实现的基类

@ThreadSafe
abstract class BaseBoundBuffer<V> {@GuardedBy("this") private final V[] buf;@GuardedBy("this")private int tail;@GuardedBy("this")private int head;@GuardedBy("this")private int count;protected BaseBoundBuffer(int capacity) {this.buf = (V[]) new Object[capacity];}protected synchronized final void doPut(V v) {buf[tail] = v;if (++tail == buf.length) {tail = 0;}++count;}protected synchronized final V doTake() {V v = buf[head];buf[head] = null;if (++head == buf.length) {head = 0;}--count;return v;}public synchronized final boolean isFull() {return count == buf.length;}public synchronized final boolean ifEmpty() {return count == 0;}
}

14.1.1 示例:将前提条件的失败传递给调用者

程序清单14-3的GrumpyBoundedBuffer是第一个简单的有界缓存实现。put和take方法都进行了同步以确保实现对缓存状态的独占访问,因为这两个方法在访问缓存时都采用“先检查再运行”的逻辑策略。
   程序清单14-3 当不满足前提条件时,有界缓存不会执行相应的操作

@ThreadSafe
public class GrumpyBoundedBuffer<V> extends BaseBoundBuffer<V> {public GrumpyBoundedBuffer(int capacity) {super(capacity);}public synchronized void put(V v) {if (isFull()) {throw new BufferFullException();}doPut(v);}public synchronized V take() {if (ifEmpty()) {throw new BufferEmptyException();}return doTake();}
}

尽管这种方法实现起来很简单,但使用起来却并非如此。异常应该用于发生异常条件的情况中[EJItem 39]。“缓存已满”并不是有界缓存的一个异常条件,就像“红灯”并不表示交通信号灯出现了异常。在实现缓存时得到的简化(使调用者管理状态依赖性)并不能抵消在使用时存在的复杂性,因为现在调用者必须做好捕获异常的准备,并且在每次缓存操作时都需要重试【如果将状态依赖性交给调用者管理,那么将导致一些功能无法实现,例如维持FIFO顺序,由于迫使调用者重试,因此失去了“谁先到达”的信息。】程序清单14-4给出了对take的调用——并不是很漂亮,尤其是当程序中有许多地方都调用put和take方法时。
   程序清单14-4 调用GrumpyBoundedBuffer的代码

while (true) {try {V item = buffer.take();// 对于item执行一些操作break;} catch (Exception ex) {Thread.sleep(SLEEP_GRANULARITY);}
}

程序清单14-4中的客户代码不是实现重试的唯一方式。调用者可以不进入休眠状态,而直接重新调用take方法,这种方法被称为忙等待或自旋等待。如果缓存的状态在很长一段时间内都不会发生变化,那么使用这种方法就会消耗大量的CPU时间。但是,调用者也可以进入休眠状态来避免消耗过多的CPU时间,但如果缓存的状态在刚调用完sleep就立即发生变化,那么将不必要地休眠一段时间。因此,客户代码必须要在二者之间进行选择:要么容忍自旋导致的CPU时钟周期浪费,要么容忍由于休眠而导致的低响应性。(除了忙等待与休眠之外,还有一种选择就是调用Thread.yield,这相当于给调度器一个提示:现在需要让出一定的时间使另一个线程运行。假如正在等待另一个线程执行工作,那么如果选择让出处理器而不是消耗完整个CPU调度时间片,那么可以使整体的执行过程变快。)

14.1.2 示例 通过轮询与休眠来实现简单的阻塞

程序清单14-5中的SleepyBoundedBuffer尝试通过put和take方法来实现一种简单的“轮询与休眠”重试机制,从而使调用者无须在每次调用时都实现重试逻辑。如果缓存为空,那么take将休眠并直到另一个线程在缓存中放入一些数据;如果缓存是满的,那么put将休眠并直到另一个线程从缓存中移除一些数据,以便有空间容纳新的数据。这种方法将前提条件的管理操作封装起来,并简化了对缓存的使用——这正是朝着正确的改进方向迈出了一步。
   程序清单14-5 使用简单阻塞实现的有界缓存

@ThreadSafe
public class SleepyBoundedBuffer<V> extends BaseBoundedBuffer<V> {public SleepyBoundedBuffer(int size) { super(size); }  public void put(V v) throws InterruptedException {while (true) {synchronized (this) {if (!isFull()) {doPut(v);return;}}Thread.sleep(SLEEP_GRANULARITY);}}  public V take() throws InterruptedException {while (true) {synchronized (this) {if (!isEmpty()) {return doTake();}}Thread.sleep(SLEEP_GRANULARITY);}}
}

SleepyBoundedBuffer的实现远比之前的实现复杂【与白雪公主中其他5个小矮人的名字相对应的其他有界缓存留给读者来实现,尤其是SneezyBoundedBuffer。(白雪公主中7个小矮人的名字分别为Sleepy, Grumpy, Sneezy, Doc,Bashful, Happy和Dopey,其中与Sleepy和Grumpy对应的BoundedBuffer已经实现了。)】缓存代码必须在持有缓存锁的时候才能测试相应的状态条件,因为表示状态条件的变量是由缓存锁保护的。如果测试失败,那么当前执行的线程将首先释放锁并休眠一段时间,从而使其他线程能够访问缓存。【通常,如果线程在休眠或者被阻塞时持有一个锁,那么这通常是一种不好的做法,因为只要线程不释放这个锁,有些条件(缓存为满/空)就永远无法为真。】当线程醒来时,它将重新请求锁并再次尝试执行操作,因而线程将反复地在休眠以及测试状态条件等过程之间进行切换,直到可以执行操作为止。
   从调用者的角度看,这种方法能很好地运行,如果某个操作可以执行,那么就立即执行,否则就阻塞,调用者无须处理失败和重试。要选择合适的休眠时间间隔,就需要在响应性与CPU使用率之间进行权衡。休眠的间隔越小,响应性就越高,但消耗的CPU资源也越高。图14-1给出了休眠间隔对响应性的影响:在缓存中出现可用空间的时刻与线程醒来并再次检查的时刻之间可能存在延迟。

   SleepyBoundedBuffer对调用者提出了一个新的需求:处理InterruptedException。当一个方法由于等待某个条件变成真而阻塞时,需要提供一种取消机制(请参见第7章)。与大多数具备良好行为的阻塞库方法一样,SleepyBoundedBuffer通过中断来支持取消,如果该方法被中断,那么将提前返回并抛出InterruptedException。
   这种通过轮询与休眠来实现阻塞操作的过程需要付出大量的努力。如果存在某种挂起线程的方法,并且这种方法能够确保当某个条件成真时线程立即醒来,那么将极大地简化实现工作。这正是条件队列实现的功能。

14.1.3 条件队列

条件队列就好像烤面包机中通知“面包已烤好”的铃声。如果你注意听着铃声,那么当面包烤好后可以立刻得到通知,然后放下手头的事情(或者先把手头的事情做完,例如先看完报纸)开始品尝面包。如果没有听见铃声(可能出去拿报纸了),那么会错过通知信息,但回到厨房时还可以观察烤面包机的状态,如果已经烤好,那么就取出面包,如果还未烤好,就再次留意铃声。
   “条件队列”这个名字来源于:它使得一组线程(称之为等待线程集合)能够通过某种方式来等待特定的条件变成真。传统队列的元素是一个个数据,而与之不同的是,条件队列中的元素是一个个正在等待相关条件的线程。
   正如每个Java对象都可以作为一个锁,每个对象同样可以作为一个条件队列,并且Object中的wait、notify和notifyAll方法就构成了内部条件队列的API。对象的内置锁与其内部条件队列是相互关联的,要调用对象X中条件队列的任何一个方法,必须持有对象X上的锁。这是因为“等待由状态构成的条件”与“维护状态一致性”这两种机制必须被紧密地绑定在一起:只有能对状态进行检查时,才能在某个条件上等待,并且只有能修改状态时,才能从条件等待中释放另一个线程。
   Object.wait会自动释放锁,并请求操作系统挂起当前线程,从而使其他线程能够获得这个锁并修改对象的状态。当被挂起的线程醒来时,它将在返回之前重新获取锁。从直观上来理解,调用wait意味着“我要去休息了,但当发生特定的事情时唤醒我”,而调用通知方法就意味着“特定的事情发生了”。
   在程序清单14-6的BoundedBuffer中使用了wait和notifyAll来实现一个有界缓存。这比使用“休眠”的有界缓存更简单,并且更高效(当缓存状态没有发生变化时,线程醒来的次数将更少),响应性也更高(当发生特定状态变化时将立即醒来)。这是一个较大的改进,但要注意:与使用“休眠”的有界缓存相比,条件队列并没有改变原来的语义。它只是在多个方面进行了优化:CPU效率、上下文切换开销和响应性等。如果某个功能无法通过“轮询和休眠”来实现,那么使用条件队列也无法实现。【这并非完全正确;一个公平的条件队列可以确保线程按照顺序从等待集合中释放。与内置锁相同,内置条件队列并不提供公平的排队操作,而在显式的Condition却可以提供公平或非公平的排队操作。】但条件队列使得在表达和管理状态依赖性时更加简单和高效。
   程序清单14-6 使用条件队列实现的有界缓存

@ThreadSafe
public class BoundedBuffer<V> extends BaseBoundedBuffer<V> {// 条件谓词,not-full (!isFull())// 条件谓词,not-empty (!isEmpty())public BoundedBuffer(int size) { super(size); }// 阻塞并直到: not-fullpublic synchronized void put(V v) throws InterruptedException {while (isFull()) {wait();}doPut(v);notifyAll();}// 阻塞并直到: not-emptypublic synchronized V take() throws InterruptedException {while (isEmpty()) {wait();}V v = doTake();notifyAll();return v;}
}

最终,BoundedBuffer变得足够好了,不仅简单易用,而且实现了明晰的状态依赖性管理【14.3节的ConditionBoundedBuffer还要更好,因为它能使用单一通知方法而不是notifyAll,因此效率更高】。在产品的正式版本中还应包括限时版本的put和take,这样当阻塞操作不能在预计时间内完成时,可以因超时而返回。通过使用定时版本的Object.wait,可以很容易实现这些方法。

14.2 使用条件队列

条件队列使构建高效以及高可响应性的状态依赖类变得更容易,但同时也很容易被不正确地使用。虽然许多规则都能确保正确地使用条件队列,但在编译器或系统平台上却并没有强制要求遵循这些规则。(这也是为什么要尽量基于LinkedBlockingQueue、Latch、Semaphore和FutureTask等类来构造程序的原因之一,如果能避免使用条件队列,那么实现起来将容易许多。)

14.2.1 条件队列

要想正确地使用条件队列,关键是找出对象在哪个条件谓词上等待。条件谓词将在等待与通知等过程中导致许多困惑,因为在API中没有对条件谓词进行实例化的方法,并且在Java语言规范或JVM实现中也没有任何信息可以确保正确地使用它们。事实上,在Java语言规范或Javadoc中根本就没有直接提到它。但如果没有条件谓词,条件等待机制将无法发挥作用。
   条件谓词是使某个操作成为状态依赖操作的前提条件。在有界缓存中,只有当缓存不为空时,take方法才能执行,否则必须等待。对take方法来说,它的条件谓词就是“缓存不为空”,take方法在执行之前必须首先测试该条件谓词。同样,put方法的条件谓词是“缓存不满”。条件谓词是由类中各个状态变量构成的表达式。BaseBoundedBuffer在测试“缓存不为空”时将把count与0进行比较,在测试“缓存不满”时将把count与缓存的大小进行比较。
   将与条件队列相关联的条件谓词以及在这些条件谓词上等待的操作都写入文档。
   在条件等待中存在一种重要的三元关系,包括加锁、wait方法和一个条件谓词。在条件谓词中包含多个状态变量,而状态变量由一个锁来保护,因此在测试条件谓词之前必须先持有这个锁。锁对象与条件队列对象(即调用wait和notify等方法所在的对象)必须是同一个对象。
   在BoundedBuffer中,缓存的状态是由缓存锁保护的,并且缓存对象被用做条件队列。take方法将获取请求缓存锁,然后对条件谓词(即缓存为非空)进行测试。如果缓存非空,那么它会移除第一个元素。之所以能这样做,是因为take此时仍然持有保护缓存状态的锁。
   如果条件谓词不为真(缓存为空),那么take必须等待并直到另一个线程在缓存中放入一个对象。take将在缓存的内置条件队列上调用wait方法,这需要持有条件队列对象上的锁。这是一种谨慎的设计,因为take方法已经持有在测试条件谓词时(并且如果条件谓词为真,那么在同一个原子操作中修改缓存的状态)需要的锁。wait方法将释放锁,阻塞当前线程,并等待直到超时,然后线程被中断或者通过一个通知被唤醒。在唤醒进程后,wait在返回前还要重新获取锁。当线程从wait方法中被唤醒时,它在重新请求锁时不具有任何特殊的优先级,而要与任何其他尝试进入同步代码块的线程一起正常地在锁上进行竞争。
   每一次wait调用都会隐式地与特定的条件谓词关联起来。当调用某个特定条件谓词的wait时,调用者必须已经持有与条件队列相关的锁,并且这个锁必须保护着构成条件谓词的状态变量。

14.2.2 过早唤醒

虽然在锁、条件谓词和条件队列之间的三元关系并不复杂,但wait方法的返回并不一定意味着线程正在等待的条件谓词已经变成真了。
   虽然在锁、条件谓词和条件队列之间的三元关系并不复杂,但wait方法的返回并不一定意味着线程正在等待的条件谓词已经变成真了。【在Tim(Tim Peierls,本书的合著者之一)的厨房里就是这种情况,当听到一个铃声时,可能有很多设备正在发出声音,此时必须检查烤面包机、微波炉、咖啡机和其他设备来判断是哪一个设备发出的铃声。】另外,wait方法还可以“假装”返回,而不是由于某个线程调用了notify。【继续以“早餐”为例,这就好比烤面包机的线路连接有问题,有时候当面包还未烤好时,铃声就响起来了。】
   当执行控制重新进入调用wait的代码时,它已经重新获取了与条件队列相关联的锁。现在条件谓词是不是已经变为真了?或许。在发出通知的线程调用notifyAll时,条件谓词可能已经变成真,但在重新获取锁时将再次变为假。在线程被唤醒到wait重新获取锁的这段时间里,可能有其他线程已经获取了这个锁,并修改了对象的状志。或者,条件谓词从调用wait起根本就没有变成真。你并不知道另一个线程为什么调用notify或notifyAll,也许是因为与同一条件队列相关的另一个条件谓词变成了真。“一个条件队列与多个条件谓词相关”是一种很常见的情况——在BoundedBuffer中使用的条件队列与“非满”和“非空”两个条件谓词相关。【线程可能同时在“非满”与“非空”这两个条件谓词上等待。当生产者/消费者的数量超过缓存的容量时,就会出现这种情况。】
   基于所有这些原因,每当线程从wait中唤醒时,都必须再次测试条件谓词,如果条件谓词不为真,那么就继续等待(或者失败)。由于线程在条件谓词不为真的情况下也可以反复地醒来,因此必须在一个循环中调用wait,并在每次迭代中都测试条件谓词。程序清单14-7给出了条件等待的标准形式。
   程序清单14-7 状态依赖方法的标准形式

 void stateDependentMethod() throws InterruptedException {// 必须通过一个锁来保护条件谓词synchronized (lock) {while (!conditionPredicate()) {lock.wait();}// 现在对象处于合适的状态}}

当使用条件等待时(例如Object.wait或Condition.await):

  • 通常都有一个条件谓词——包括一些对象状态的测试,线程在执行前必须首先通过这些测试。
  • 在调用wait之前测试条件谓词,并且从wait中返回时再次进行测试。
  • 在一个循环中调用wait。
  • 确保使用与条件队列相关的锁来保护构成条件谓词的各个状态变量。
  • 当调用wait、notify或notifyAll等方法时,一定要持有与条件队列相关的锁。
  • 在检查条件谓词之后以及开始执行相应的操作之前,不要释放锁。

14.2.3 丢失的信号

第10章曾经讨论过活跃性故障,例如死锁和活锁。另一种形式的活跃性故障是丢失的信号。丢失的信号是指:线程必须等待一个已经为真的条件,但在开始等待之前没有检查条件谓词。现在,线程将等待一个已经发过的事件。这就好比在启动了烤面包机后出去拿报纸,当你还在屋外时烤面包机的铃声响了,但你没有听到,因此还会坐在厨房的桌子前等着烤面包机的铃声。你可能会等待很长的时间【为了摆脱等待,其他人也不得不开始烤面包,从而使情况变得更糟,当铃声响起时,还要与别人争论这个面包是属于谁的。】通知并不像你涂在面包上的果酱,它没有“黏附性”。如果线程A通知了一个条件队列,而线程B随后在这个条件队列上等待,那么线程B将不会立即醒来,而是需要另一个通知来唤醒它。像上述程序清单中警示之类的编码错误(例如,没有在调用wait之前检测条件谓词)就会导致信号的丢失。如果按照程序清单14-7的方式来设计条件等待,那么就不会发生信号丢失的问题。

14.2.4 通知

到目前为止,我们介绍了条件等待的前一半内容:等待。另一半内容是通知。在有界缓存中,如果缓存为空,那么在调用take时将阻塞。在缓存变为非空时,为了使take解除阻塞,必须确保在每条使缓存变为非空的代码路径中都发出一个通知。在BoundedBuffer中,只有一条代码路径,即在put方法之后。因此,put在成功地将一个元素添加到缓存后,将调用notifyAll。同样,take在移除一个元素后也将调用notifyAll,向任何正在等待“不为满”条件的线程发出通知:缓存已经不满了。
   每当在等待一个条件时,一定要确保在条件谓词变为真时通过某种方式发出通知。
   在条件队列API中有两个发出通知的方法,即notify和notifyAll。无论调用哪一个,都必须持有与条件队列对象相关联的锁。在调用notify时,JVM会从这个条件队列上等待的多个线程中选择一个来唤醒,而调用notifyAll则会唤醒所有在这个条件队列上等待的线程。由于在调用notify或notifyAll时必须持有条件队列对象的锁,而如果这些等待中线程此时不能重新获得锁,那么无法从wait返回,因此发出通知的线程应该尽快地释放锁,从而确保正在等待的线程尽可能快地解除阻塞。
   由于多个线程可以基于不同的条件谓词在同一个条件队列上等待,因此如果使用notify而不是notifyAll,那么将是一种危险的操作,因为单一的通知很容易导致类似于信号丢失的问题。
   在BoundedBuffer中很好地说明为什么在大多数情况下应该优先选择notifyAll而不是单个的notify。这里的条件队列用于两个不同的条件谓词:“非空”和“非满”。假设线程A在条件队列上等待条件谓词PA,同时线程B在同一个条件队列上等待条件谓词PB。现在,假设PB变成真,并且线程C执行一个notify:JVM将从它拥有的众多线程中选择一个并唤醒。如果选择了线程A,那么它被唤醒,并且看到PA尚未变成真,因此将继续等待。同时,线程B本可以开始执行,却没有被唤醒。这并不是严格意义上的“丢失信号”,而更像一种“被劫持的”信号,但导致的问题是相同的:线程正在等待一个已经(或者本应该)发生过的信号。

只有同时满足以下两个条件时,才能用单一的notify而不是notifyAll:

  • 所有等待线程的类型都相同。只有一个条件谓词与条件队列相关,并且每个线程在从wait返回后将执行相同的操作。
  • 单进单出。在条件变量上的每次通知,最多只能唤醒一个线程来执行。
       BoundedBuffer满足“单进单出”的条件,但不满足“所有等待线程的类型都相同”的条件,因此正在等待的线程可能在等待“非满”,也可能在等待“非空”。例如第5章的TestHarness中使用的“开始阀门”闭锁(单个事件释放一组线程)并不满足“单进单出”的需求,因为这个“开始阀门”将使得多个线程开始执行。
       由于大多数类并不满足这些需求,因此普遍认可的做法是优先使用notifyAll而不是notify。虽然notifyAll可能比notify更低效,但却更容易确保类的行为是正确的。
       有些开发人员并不赞同这种“普遍认可的做法”。当只有一个线程可以执行时,如果使用notifyAll,那么将是低效的,这种低效情况带来的影响有时候很小,但有时侯却非常大。如果有10个线程在一个条件队列上等待,那么调用notifyAll将唤醒每个线程,并使得它们在锁上发生竞争。然后,它们中的大多数或者全部又都回到休眠状态。因而,在每个线程执行一个事件的同时,将出现大量的上下文切换操作以及发生竞争的锁获取操作。(最坏的情况是,在使用notifyAll时将导致O(n2)次唤醒操作,而实际上只需要n次唤醒操作就足够了)。这是“性能考虑因素与安全性考虑因素相互矛盾”的另一种情况。
       在BoundedBuffer的put和take方法中采用的通知机制是保守的:每当将一个对象放入缓存或者从缓存中移走一个对象时,就执行一次通知。我们可以对其进行优化:首先,仅当缓存从空变为非空,或者从满转为非满时,才需要释放一个线程。并且,仅当put或take影响到这些状态转换时,才发出通知。这也被称为“条件通知(Conditional Notification)。虽然“条件通知”可以提升性能,但却很难正确地实现(而且还会使子类的实现变得复杂),因此在使用时应该谨慎。程序清单14-8给出了如何在BoundedBuffer.put中使用“条件通知”。
       程序清单14-8 在BoundedBuffer.put中使用条件通知
 public synchronized void put(V v) throws InterruptedException {while (isFull()) {wait();}boolean wasEmpty = isEmpty();doPut(v);if (wasEmpty) {notifyAll();}}

单次通知和条件通知都属于优化措施。通常,在使用这些优化措施时,应该遵循“首选使程序正确地执行,然后才使其运行得更快”这个原则。如果不正确地使用这些优化措施,那么很容易在程序中引入奇怪的活跃性故障。

14.2.5 示例 阀门类

在第5章的TestHarness中使用的“开始阀门闭锁”在初始化时指定的参数为1,从而创建了一个二元闭锁:它只有两种状态,即初始状态和结束状态。闭锁能阻止线程通过开始阀门,并直到阀门被打开,此时所有的线程都可以通过该阀门。虽然闭锁机制通常都能满足需求,但在某些情况下存在一个缺陷:按照这种方式构造的阀门在打开后无法重新关闭。
   通过使用条件等待,可以很容易地开发一个可重新关闭的ThreadGate类,如程序清单14-9所示。ThreadGate可以打开和关闭阀门,并提供一个await方法,该方法能一直阻塞直到阀门被打开。在open方法中使用了notifyAll,这是因为这个类的语义不满足单次通知的“单进单出”测试。
   程序清单14-9 使用wait和notifyAll来实现可重新关闭的阀门

@ThreadSafe
public class ThreadGate {// 条件谓词 opened-since(n) (isOpen || generation > n)@GuardedBy("this") private boolean isOpen;@GuardedBy("this") private int generation;public synchronized void close() {isOpen = false;}public synchronized void open() {++generation;isOpen = true;notifyAll();}// 阻塞直到: opened-since(generation on entry)public synchronized void await() throws InterruptedException {int arrivalGeneration = generation;while (! isOpen && arrivalGeneration == generation) {wait();}}
}

在await中使用的条件谓词比测试isOpen复杂得多。这种条件谓词是必需的,因为如果当阀门打开时有N个线程正在等待它,那么这些线程都应该被允许执行。然而,如果阀门在打开后又非常快速地关闭了,并且await方法只检查isOpen,那么所有线程都可能无法释放:当所有线程收到通知时,将重新请求锁并退出wait,而此时的阀门可能已经再次关闭了。因此,在ThreadGate中使用了一个更复杂的条件谓词:每次阀门关闭时,递增一个“Generation”计数器,如果阀门现在是打开的,或者阀门自从该线程到达后就一直是打开的,那么线程就可以通过await。
   由于ThreadGate只支持等待打开阀门,因此它只在open中执行通知。要想既支持“等待打开”又支持“等待关闭”,那么ThreadGate必须在open和close中都进行通知。这很好地说明了为什么在维护状态依赖的类时是非常困难的——当增加一个新的状态依赖操作时,可能需要对多条修改对象的代码路径进行改动,才能正确地执行通知。

14.2.6 字类的安全问题

在使用条件通知或单次通知时,一些约束条件使得子类化过程变得更加复杂[CPJ 3.3.3.3]。要想支持子类化,那么在设计类时需要保证:如果在实施子类化时违背了条件通知或单次通知的某个需求,那么在子类中可以增加合适的通知机制来代表基类。
   对于状态依赖的类,要么将其等待和通知等协议完全向子类公开(并且写入正式文档),要么完全阻止子类参与到等待和通知等过程中。(这是对“要么围绕着继承来设计和文档化,要么禁止使用继承”这条规则的一种扩展[EJ Item 15]。)当设计一个可被继承的状态依赖类时,至少需要公开条件队列和锁,并且将条件谓词和同步策略都写入文档。此外,还可能需要公开一些底层的状态变量。(最糟糕的情况是,一个状态依赖的类虽然将其状态向子类公开,但却没有将相应的等待和通知等协议写入文档,这就类似于一个类虽然公开了它的状态变量,但却没有将其不变性条件写入文档。)
   另外一种选择就是完全禁止子类化,例如将类声明为final类型,或者将条件队列、锁和状态变量等隐藏起来,使子类看不见它们。否则,如果子类破坏了在基类中使用notify的方式,那么基类需要修复这种破坏。考虑一个无界的可阻塞栈,当栈为空时,pop操作将阻塞,但push操作通常可以执行。这就满足了使用单次通知的需求。如果在这个类中使用了单次通知,并且在其一个子类中添加了一个阻塞的“弹出两个连续元素”方法,那么就会出现两种类型的等待线程:等待弹出一个元素的线程和等待弹出两个元素的线程。但如果基类将条件队列公开出来,并且将使用该条件队列的协议也写入文档,那么子类就可以将push方法改写为执行notifyAll,从而重新确保安全性。

14.2.7 封装条件队列

通常,我们应该把条件队列封装起来,因而除了使用条件队列的类,就不能在其他地方访问它。否则,调用者会自以为理解了在等待和通知上使用的协议,并且采用一种违背设计的方式来使用条件队列。(除非条件队列对象对于你无法控制的代码来说是不可访问的,否则就不可能要求在单次通知中的所有等待线程都是同一类型的。如果外部代码错误地在条件队列上等待,那么可能通知协议,并导致一个“被劫持的”信号。)
   不幸的是,这条建议——将条件队列对象封装起来,与线程安全类的最常见设计模式并不一致,在这种模式中建议使用对象的内置锁来保护对象自身的状态。在BoundedBuffer中给出了这种常见的模式,即缓存对象自身既是锁,又是条件队列。然而,可以很容易将BoundedBuffer重新设计为使用私有的锁对象和条件队列,唯一的不同之处在于,新的BoundedBuffer不再支持任何形式的客户端加锁。

14.2.8 入口协议与出口协议

Wellings(Wellings,2004)通过“入口协议和出口协议(Entry and Exit Protocols)”来描述wait和notify方法的正确使用。对于每个依赖状态的操作,以及每个修改其他操作依赖状态的操作,都应该定义一个入口协议和出口协议。入口协议就是该操作的条件谓词,出口协议则包括,检查被该操作修改的所有状态变量,并确认它们是否使某个其他的条件谓词变为真,如果是,则通知相关的条件队列。
   在AbstractQueuedSynchronizer(java.util.concurrent包中大多数依赖状态的类都是基于这个类构建的)中使用出口协议(请参见14.4节)。这个类并不是由同步器类执行自己的通知,而是要求同步器方法返回一个值来表示该类的操作是否已经解除了一个或多个等待线程的阻塞。这种明确的API调用需求使得更难以“忘记”在某些状态转换发生时进行通知。

14.3 显示的Condition对象

第13章曾介绍过,在某些情况下,当内置锁过于灵活时,可以使用显式锁。正如Lock是一种广义的内置锁,Condition(参见程序清单14-10)也是一种广义的内置条件队列。
   程序清单14-10 Condition接口

public interface Condition {void await() throws InterruptedException;void awaitUninterruptibly();long awaitNanos(long nanosTimeout) throws InterruptedException;boolean await(long time, TimeUnit unit) throws InterruptedException;boolean awaitUntil(Date deadline) throws InterruptedException;void signal();void signalAll();
}

内置条件队列存在一些缺陷。每个内置锁都只能有一个相关联的条件队列,因而在像BoundedBuffer这种类中,多个线程可能在同一个条件队列上等待不同的条件谓词,并且在最常见的加锁模式下公开条件队列对象。这些因素都使得无法满足在使用notifyAll时所有等待线程为同一类型的需求。如果想编写一个带有多个条件谓词的并发对象,或者想获得除了条件队列可见性之外的更多控制权,就可以使用显式的Lock和Condition而不是内置锁和条件队列,这是一种更灵活的选择。
   一个Condition和一个Lock关联在一起,就像一个条件队列和一个内置锁相关联一样。要创建一个Condition,可以在相关联的Lock上调用Lock.newCondition方法。正如Lock比内置加锁提供了更为丰富的功能,Condition同样比内置条件队列提供了更丰富的功能:在每个锁上可存在多个等待、条件等待可以是可中断的或不可中断的、基于时限的等待,以及公平的或非公平的队列操作。
   与内置条件队列不同的是,对于每个Lock,可以有任意数量的Condition对象。Condition对象继承了相关的Lock对象的公平性,对于公平的锁,线程会依照FIFO顺序从Condition.await中释放。
   特别注意:在Condition对象中,与wait、notify和notifyAll方法对应的分别是await、signal和signalAll。但是,Condition对Object进行了扩展,因而它也包含wait和notify方法。一定要确保使用正确的版本——await和signal。
   程序清单14-11给出了有界缓存的另一种实现,即使用两个Condition,分别为notFull和notEmpty,用于表示“非满”与“非空”两个条件谓词。当缓存为空时,take将阻塞并等待notEmpty,此时put向notEmpty发送信号,可以解除任何在take中阻塞的线程。
   程序清单14-11 使用显式条件变量的有界缓存

@ThreadSafe
public  class ConditionBoundedBuffer<T> {protected final Lock lock = new ReentrantLock();// 条件谓词 notFull (count < items.length)private final Condition notFull = lock.newCondition();// 条件谓词 notEmpty (count > 0)private final Condition notEmpty = lock.newCondition();@GuardedBy("lock")private final T[] items = (T[]) new Object[10];@GuardedBy("lock")private int tail, head, count;// 阻塞并直到: notFullpublic void put(T x) throws InterruptedException {lock.lock();try {while (count == items.length) {notFull.await();}items[tail] = x;if (++tail == items.length) {tail = 0;}++count;notEmpty.signal();} finally {lock.unlock();}}// 阻塞并直到: notEmptypublic T take() throws InterruptedException {lock.lock();try {while (count == 0) {notEmpty.await();}T x = items[head];items[head] = null;if (++head == items.length) {head = 0;}--count;notFull.signal();return x;} finally {lock.unlock();}}
}

ConditionBoundedBuffer的行为和BoundedBuffer相同,但它对条件队列的使用方式更容易理解——在分析使用多个Condition的类时,比分析一个使用单一内部队列加多个条件谓词的类简单得多。通过将两个条件谓词分开并放到两个等待线程集中,Condition使其更容易满足单次通知的需求。signal比signalAll更高效,它能极大地减少在每次缓存操作中发生的上下文切换与锁请求的次数。
   与内置锁和条件队列一样,当使用显式的Lock和Condition时,也必须满足锁、条件谓词和条件变量之间的三元关系。在条件谓词中包含的变量必须由Lock来保护,并且在检查条件谓词以及调用await和signal时,必须持有Lock对象【ReentrantLock要求在调用signal或signalAll时应该持有Lock,但在Lock的具体实现中,在构造Condition时也可以不满足这个需求。】
   在使用显式的Condition和内置条件队列之间进行选择时,与在ReentrantLock和synchronized之间进行选择是一样的:如果需要一些高级功能,例如使用公平的队列操作或者在每个锁上对应多个等待线程集,那么应该优先使用Condition而不是内置条件队列。(如果需要ReentrantLock的高级功能,并且已经使用了它,那么就已经做出了选择。)

14.4 Synchronizer剖析

在ReentrantLock和Semaphore这两个接口之间存在许多共同点。这两个类都可以用做一个“阀门”,即每次只允许一定数量的线程通过,并当线程到达阀门时,可以通过(在调用lock或acquire时成功返回),也可以等待(在调用lock或acquire时阻塞),还可以取消(在调用tryLock或tryAcquire时返回“假”,表示在指定的时间内锁是不可用的或者无法获得许可)。而且,这两个接口都支持可中断的、不可中断的以及限时的获取操作,并且也都支持等待线程执行公平或非公平的队列操作。
   列出了这种共性后,你或许会认为Semaphore是基于ReentrantLock实现的,或者认为ReentrantLock实际上是带有一个许可的Semaphore。这些实现方式都是可行的,一个很常见的练习就是,证明可以通过锁来实现计数信号量(如程序清单14-12中的SemaphoreOnLock所示),以及可以通过计数信号量来实现锁。
   程序清单14-12 使用Lock来实现信号量

@ThreadSafe
public class SemaphoreOnLock {private final Lock lock = new ReentrantLock();// 条件谓词 permitsAvailable (permits > 0)private final Condition permitsAvailable = lock.newCondition();@GuardedBy("lock") private int permits;SemaphoreOnLock(int initialPermits) throws InterruptedException {lock.lock();try {while (permits <= 0) {permitsAvailable.await();}--permits;} finally {lock.unlock();}}// 阻塞并直到 permitsAvailablepublic void acquire() throws InterruptedException {lock.lock();try {while (permits <= 0) {permitsAvailable.await();}--permits;} finally {lock.unlock();}}public void release() {lock.lock();try {++permits;permitsAvailable.signal();} finally {lock.unlock();}}
}

事实上,它们在实现时都使用了一个共同的基类,即AbstractQueuedSynchronizer(AQS),这个类也是其他许多同步类的基类。AQS是一个用于构建锁和同步器的框架,许多同步器都可以通过AQS很容易并且高效地构造出来。不仅ReentrantLock和Semaphore是基于AQS构建的,还包括CountDownLatch、ReentrantReadWriteLock、SynchronousQueue【在Java 6中将基于AQS的SynchronousQueue替换为一个(可伸缩性更高的)非阻塞的版本。】和FutureTask。
   AQS解决了在实现同步器时涉及的大量细节问题,例如等待线程采用FIFO队列操作顺序。在不同的同步器中还可以定义一些灵活的标准来判断某个线程是应该通过还是需要等待。
   基于AQS来构建同步器能带来许多好处。它不仅能极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题(这是在没有使用AQS来构建同步器时的情况)。在SemaphoreOnLock中,获取许可的操作可能在两个时刻阻塞——当锁保护信号量状态时,以及当许可不可用时。在基于AQS构建的同步器中,只可能在一个时刻发生阻塞,从而降低上下文切换的开销,并提高吞吐量。在设计AQS时充分考虑了可伸缩性,因此java.util.concurrent中所有基于AQS构建的同步器都能获得这个优势。

14.5 AbstractQueueSynchronizer

大多数开发者都不会直接使用AQS,标准同步器类的集合能够满足绝大多数情况的需求。但如果能了解标准同步器类的实现方式,那么对于理解它们的工作原理是非常有帮助的。
   在基于AQS构建的同步器类中,最基本的操作包括各种形式的获取操作和释放操作。获取操作是一种依赖状态的操作,并且通常会阻塞。当使用锁或信号量时,“获取”操作的含义就很直观,即获取的是锁或者许可,并且调用者可能会一直等待直到同步器类处于可被获取的状态。在使用CountDownLatch时,“获取”操作意味着“等待并直到闭锁到达结束状态”,而在使用FutureTask时,则意味着“等待并直到任务已经完成”。“释放”并不是一个可阻塞的操作,当执行“释放”操作时,所有在请求时被阻塞的线程都会开始执行。
   如果一个类想成为状态依赖的类,那么它必须拥有一些状态。AQS负责管理同步器类中的状态,它管理了一个整数状态信息,可以通过getState, setState以及compareAndSetState等protected类型方法来进行操作。这个整数可以用于表示任意状态。例如,ReentrantLock用它来表示所有者线程已经重复获取该锁的次数,Semaphore用它来表示剩余的许可数量,FutureTask用它来表示任务的状态(尚未开始、正在运行、已完成以及已取消)。在同步器类中还可以自行管理一些额外的状态变量,例如,ReentrantLock保存了锁的当前所有者的信息,这样就能区分某个获取操作是重入的还是竞争的。
   程序清单14-13给出了AQS中的获取操作与释放操作的形式。根据同步器的不同,获取操作可以是一种独占操作(例如ReentrantLock),也可以是一个非独占操作(例如Semaphore和CountDownLatch)。一个获取操作包括两部分。首先,同步器判断当前状态是否允许获得操作,如果是,则允许线程执行,否则获取操作将阻塞或失败。这种判断是由同步器的语义决定的。例如,对于锁来说,如果它没有被某个线程持有,那么就能被成功地获取,而对于闭锁来说,如果它处于结束状态,那么也能被成功地获取。
  程序清单14-13 AQS中获取操作和释放操作的标准形式

boolean acquire() throws InterruptedException {while (当前状态不允许获取操作) {if (需要阻塞获取请求) {如果当前线程不在队列中,则将其插入队列阻塞当前线程} else {返回失败}可能更新同步器的状态如果线程位于队列中,则将其移出队列返回成功}
}
void release() {更新同步器的状态if (新的状态允许某个被阻塞的线程获取成功) {解除队列中一个或多个线程的阻塞状态}
}

其次,就是更新同步器的状态,获取同步器的某个线程可能会对其他线程能否也获取该同步器造成影响。例如,当获取一个锁后,锁的状态将从“未被持有”变成“已被持有”,而从Semaphore中获取一个许可后,将把剩余许可的数量减1。然而,当一个线程获取闭锁时,并不会影响其他线程能否获取它,因此获取闭锁的操作不会改变闭锁的状态。
   如果某个同步器支持独占的获取操作,那么需要实现一些保护方法,包括tryAcquire、tryRelease和isHeldExclusively等,而对于支持共享获取的同步器,则应该实现tryAcquire-Shared和tryReleaseShared等方法。AQS中的accuire、acquireShared、release和releaseShared等方法都将调用这些方法在子类中带有前缀try的版本来判断某个操作是否能执行。在同步器的子类中,可以根据其获取操作和释放操作的语义,使用getState、setState以及compareAndSetState来检查和更新状态,并通过返回的状态值来告知基类“获取”或“释放”同步器的操作是否成功。例如,如果tryAcquireShared返回一个负值,那么表示获取操作失败,返回零值表示同步器通过独占方式被获取,返回正值则表示同步器通过非独占方式被获取。对于tryRelease和tryReleaseShared方法来说,如果释放操作使得所有在获取同步器时被阻塞的线程恢复执行,那么这两个方法应该返回true。
   为了使支持条件队列的锁(例如ReentrantLock)实现起来更简单,AQS还提供了一些机制来构造与同步器相关联的条件变量。
   一个简单的闭锁
   程序清单14-14中的OneShotLatch是一个使用AQS实现的二元闭锁。它包含两个公有方法:await和signal,分别对应获取操作和释放操作。起初,闭锁是关闭的,任何调用await的线程都将阻塞并直到闭锁被打开。当通过调用signal打开闭锁时,所有等待中的线程都将被释放,并且随后到达闭锁的线程也被允许执行。

@ThreadSafe
public class OneShotLatch {private final Sync sync = new Sync();public void signal() { sync.releaseShared(0); }public void await() throws InterruptedException {sync.acquireSharedInterruptibly(0); }private class Sync extends AbstractQueuedSynchronizer {protected int tryAcquireShared(int ignored) {// 如果闭锁是开的(state == 1), 那么这个操作将成功,否则失败return (getState() == 1) ? 1 : -1;}protected boolean tryReleaseShared(int ignored) {setState(1); // 现在打开闭锁return true; // 现在其他的线程可以获取该闭锁}}
}

在OneShotLatch中,AQS状态用来表示闭锁状态——关闭(0)或者打开(1)。await方法调用AQS的acquireSharedInterruptibly,然后接着调用OneShotLatch中的tryAcquireShared方法。在tryAcquireShared的实现中必须返回一个值来表示该获取操作能否执行。如果之前已经打开了闭锁,那么tryAcquireShared将返回成功并允许线程通过,否则就会返回一个表示获取操作失败的值。acquireSharedInterruptibly方法在处理失败的方式,是把这个线程放入等待线程队列中。类似地,signal将调用releaseShared,接下来又会调用tryReleaseShared。在tryReleaseShared中将无条件地把闭锁的状态设置为打开,(通过返回值)表示该同步器处于完全被释放的状态。因而AQS让所有等待中的线程都尝试重新请求该同步器,并且由于tryAcquireShared将返回成功,因此现在的请求操作将成功。
   OneShotLatch是一个功能全面的、可用的、性能较好的同步器,并且仅使用了大约20多行代码就实现了。当然,它缺少了一些有用的特性,例如限时的请求操作以及检查闭锁的状态,但这些功能实现起来同样很容易,因为AQS提供了限时版本的获取方法,以及一些在常见检查中使用的辅助方法。
   oneShotLatch也可以通过扩展AQS来实现,而不是将一些功能委托给AQS,但这种做法并不合理[EJItem 14],原因有很多。这样做将破坏OneShotLatch接口(只有两个方法)的简洁性,并且虽然AQS的公共方法不允许调用者破坏闭锁的状态,但调用者仍可以很容易地误用它们。java.util.concurrent中的所有同步器类都没有直接扩展AQS,而是都将它们的相应功能委托给私有的AQS子类来实现。

14.6 java.util.concurrent同步器类中的AQS

java.util.concurrent中的许多可阻塞类,例如ReentrantLock、Semaphore、ReentrantRead-WriteLock、CountDownLatch、SynchronousQueue和FutureTask等,都是基于AQS构建的。我们快速地浏览一下每个类是如何使用AQS的,不需要过于地深入了解细节(在JDK的下载包中包含了源代码【也可以从http://gee.cs.oswego.edu/dl/concurrency-interest获得,只是存在一些许可限制。】

14.6.1 ReentrantLock

ReentrantLock只支持独占方式的获取操作,因此它实现了tryAcquire、tryRelease和isHeldExclusively,程序清单14-15给出了非公平版本的tryAcquire。ReentrantLock将同步状态用于保存锁获取操作的次数,并且还维护一个owner变量来保存当前所有者线程的标识符,只有在当前线程刚刚获取到锁,或者正要释放锁的时候,才会修改这个变量【由于受保护的状态操作方法具有volatile类型的内存读写语义,同时ReentrantLock只是在调用getState之后才会读取owner域,并且只有在调用setState之前才会写入owner,因此ReentrantLock可以拥有同步状态的内存语义,因此避免了进一步的同步(请参见16.1.4节)。】在tryRelease中检查owner域,从而确保当前线程在执行unlock操作之前已经获取了锁:在tryAcquire中将使用这个域来区分获取操作是重入的还是竞争的。
   程序清单14-15 基于非公平的ReentrantLock实现tryAcquire

protected boolean tryAcquire(int ignored) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (compareAndSetState(0, 1)) {owner = current;return true;}} else if (current == owner) {setState(c + 1);return true;}return false;
}

当一个线程尝试获取锁时,tryAcquire将首先检查锁的状态。如果锁未被持有,那么它将尝试更新锁的状态以表示锁已经被持有。由于状态可能在检查后被立即修改,因此tryAcquire使用compareAndSetState来原子地更新状态,表示这个锁已经被占有,并确保状态在最后一次检查以后就没有被修改过。(请参见15.3节中对compareAndSet的描述)。如果锁状态表明它已经被持有,并且如果当前线程是锁的拥有者,那么获取计数会递增,如果当前线程不是锁的拥有者,那么获取操作将失败。
   ReentrantLock还利用了AQS对多个条件变量和多个等待线程集的内置支持。Lock.newCondition将返回一个新的ConditionObject实例,这是AQS的一个内部类。

14.6.2 Semaphore与CountDownLatch

Semaphore将AQS的同步状态用于保存当前可用许可的数量。tryAcquireShared方法(请参见程序清单14-16)首先计算剩余许可的数量,如果没有足够的许可,那么会返回一个值表示获取操作失败。如果还有剩余的许可,那么tryAcquireShared会通过compareAndSetState以原子方式来降低许可的计数。如果这个操作成功(这意味着许可的计数自从上一次读取后就没有被修改过),那么将返回一个值表示获取操作成功。在返回值中还包含了表示其他共享获取操作能否成功的信息,如果成功,那么其他等待的线程同样会解除阻塞。
   程序清单14-16 Semaphore中的tryAcquireShared与tryReleaseShared

protected int tryAcquireShared(int acquires) {while (true) {int available = getState();int remaining = available - acquires;if (remaining < 0 || compareAndSetState(available, remaining)) {return remaining;}}
}
protected boolean tryReleaseShared(int release) {while (true) {int p = getState();if (compareAndSetState(p, p + release)) {return true;}}
}

当没有足够的许可,或者当tryAcquireShared可以通过原子方式来更新许可的计数以响应获取操作时,while循环将终止。虽然对compareAndSetState的调用可能由于与另一个线程发生竞争而失败(请参见15.3节),并使其重新尝试,但在经过了一定次数的重试操作以后,在这两个结束条件中有一个会变为真。同样,tryReleaseShared将增加许可计数,这可能会解除等待中线程的阻塞状态,并且不断地重试直到更新操作成功。tryReleaseShared的返回值表示在这次释放操作中解除了其他线程的阻塞
   CountDownLatch使用AQS的方式与Semaphore很相似:在同步状态中保存的是当前的计数值。countDown方法调用release,从而导致计数值递减,并且当计数值为零时,解除所有等待线程的阻塞。await调用acquire,当计数器为零时,acquire将立即返回,否则将阻塞。

14.6.3 FutureTask

初看上去,FutureTask甚至不像一个同步器,但Future.get的语义非常类似于闭锁的语义——如果发生了某个事件(由FutureTask表示的任务执行完成或被取消),那么线程就可以恢复执行,否则这些线程将停留在队列中并直到该事件发生。
   在FutureTask中,AQS同步状态被用来保存任务的状态,例如,正在运行、已完成或已取消。FutureTask还维护一些额外的状态变量,用来保存计算结果或者抛出的异常。此外,它还维护了一个引用,指向正在执行计算任务的线程(如果它当前处于运行状态),因而如果任务取消,该线程就会中断。

14.6.4 ReentrantReadWriteLock

ReadWriteLock接口表示存在两个锁:一个读取锁和一个写入锁,但在基于AQS实现的ReentrantReadWriteLock中,单个AQS子类将同时管理读取加锁和写入加锁。Reentrant-ReadWriteLock使用了一个16位的状态来表示写入锁的计数,并且使用了另一个16位的状态来表示读取锁的计数。在读取锁上的操作将使用共享的获取方法与释放方法,在写入锁上的操作将使用独占的获取方法与释放方法。
   AQS在内部维护一个等待线程队列,其中记录了某个线程请求的是独占访问还是共享访问。在ReentrantReadWriteLock中,当锁可用时,如果位于队列头部的线程执行写入操作,那么线程会得到这个锁,如果位于队列头部的线程执行读取访问,那么队列中在第一个写入线程之前的所有线程都将获得这个锁。【这种机制并不允许选择读取线程优先或写入线程优先等策略,在某些读写锁实现中也采用了这种方式。因此,要么AQS的等待队列不能是一个FIFO队列,要么使用两个队列。然而,在实际中很少需要这么严格的排序策略。如果非公平版本的ReentrantReadWriteLock无法提供足够的活跃性,那么公平版本的ReentrantReadWriteLock通常会提供令人满意的排序保证,并且能确保读取线程和写入线程不会发生饥饿问题。】

小结

要实现一个依赖状态的类——如果没有满足依赖状态的前提条件,那么这个类的方法必须阻塞,那么最好的方式是基于现有的库类来构建,例如Semaphore.BlockingQueue或CountDownLatch,如第8章的ValueLatch所示。然而,有时候现有的库类不能提供足够的功能,在这种情况下,可以使用内置的条件队列、显式的Condition对象或者AbstractQueuedSynchronizer来构建自己的同步器。内置条件队列与内置锁是紧密绑定在一起的,这是因为管理状态依赖性的机制必须与确保状态一致性的机制关联起来。同样,显式的Condition与显式的Lock也是紧密地绑定到一起的,并且与内置条件队列相比,还提供了一个扩展的功能集,包括每个锁对应于多个等待线程集,可中断或不可中断的条件等待,公平或非公平的队列操作,以及基于时限的等待。

第15章 原子变量与非阻塞同步机制

在java.util.concurrent包的许多类中,例如Semaphore和ConcurrentLinkedQueue,都提供了比synchronized机制更高的性能和可伸缩性。本章将介绍这种性能提升的主要来源:原子变量和非阻塞的同步机制。
  近年来,在并发算法领域的大多数研究都侧重于非阻塞算法,这种算法用底层的原子机器指令(例如比较并交换指令)代替锁来确保数据在并发访问中的一致性。非阻塞算法被广泛地用于在操作系统和JVM中实现线程/进程调度机制、垃圾回收机制以及锁和其他并发数据结构。
  与基于锁的方案相比,非阻塞算法在设计和实现上都要复杂得多,但它们在可伸缩性和活跃性上却拥有巨大的优势。由于非阻塞算法可以使多个线程在竞争相同的数据时不会发生阻塞,因此它能在粒度更细的层次上进行协调,并且极大地减少调度开销。而且,在非阻塞算法中不存在死锁和其他活跃性问题。在基于锁的算法中,如果一个线程在休眠或自旋的同时持有一个锁,那么其他线程都无法执行下去,而非阻塞算法不会受到单个线程失败的影响。从Java 5.0开始,可以使用原子变量类(例如AtomicInteger和AtomicReference)来构建高效的非阻塞算法。
  即使原子变量没有用于非阻塞算法的开发,它们也可以用做一种“更好的volatile类型变量”。原子变量提供了与volatile类型变量相同的内存语义,此外还支持原子的更新操作,从而使它们更加适用于实现计数器、序列发生器和统计数据收集等,同时还能比基于锁的方法提供更高的可伸缩性。

15.1 锁的劣势

通过使用一致的锁定协议来协调对共享状态的访问,可以确保无论哪个线程持有守护变量的锁,都能采用独占方式来访问这些变量,并且对变量的任何修改对随后获得这个锁的其他线程都是可见的。
  现代的许多JVM都对非竞争锁获取和锁释放等操作进行了极大的优化,但如果有多个线程同时请求锁,那么JVM就需要借助操作系统的功能。如果出现了这种情况,那么一些线程将被挂起并且在稍后恢复运行【当线程在锁上发生竞争时,智能的JVM不一定会挂起线程,而是根据之前获取操作中对锁的持有时间长短来判断是使此线程挂起还是自旋等待。】。当线程恢复执行时,必须等待其他线程执行完它们的时间片以后,才能被调度执行。在挂起和恢复线程等过程中存在着很大的开销,并且通常存在着较长时间的中断。如果在基于锁的类中包含有细粒度的操作(例如同步容器类,在其大多数方法中只包含了少量操作),那么当在锁上存在着激烈的竞争时,调度开销与工作开销的比值会非常高。
  与锁相比,volatile变量是一种更轻量级的同步机制,因为在使用这些变量时不会发生上下文切换或线程调度等操作。然而,volatile变量同样存在一些局限:虽然它们提供了相似的可见性保证,但不能用于构建原子的复合操作。因此,当一个变量依赖其他的变量时,或者当变量的新值依赖于旧值时,就不能使用volatile变量。这些都限制了volatile变量的使用,因此它们不能用来实现一些常见的工具,例如计数器或互斥体(mutex)。【虽然理论上可以基于volatile的语义来构造互斥体和其他同步器,但在实际情况中很难实现。】
  例如,虽然自增操作(++i)看起来像一个原子操作,但事实上它包含了3个独立的操作——获取变量的当前值,将这个值加1,然后再写入新值。为了确保更新操作不被丢失,整个的读-改-写操作必须是原子的。到目前为止,我们实现这种原子操作的唯一方式就是使用锁定方法,如第2章的Counter所示。
  Counter是线程安全的,并且在没有竞争的情况下能运行得很好。但在竞争的情况下,其性能会由于上下文切换的开销和调度延迟而降低。如果锁的持有时间非常短,那么当在不恰当的时间请求锁时,使线程休眠将付出很高的代价。
  锁定还存在其他一些缺点。当一个线程正在等待锁时,它不能做任何其他事情。如果一个线程在持有锁的情况下被延迟执行(例如发生了缺页错误、调度延迟,或者其他类似情况),那么所有需要这个锁的线程都无法执行下去。如果被阻塞线程的优先级较高,而持有锁的线程优先级较低,那么这将是一个严重的问题——也被称为优先级反转(Priority Inversion)。即使高优先级的线程可以抢先执行,但仍然需要等待锁被释放,从而导致它的优先级会降至低优先级线程的级别。如果持有锁的线程被永久地阻塞(例如由于出现了无限循环,死锁,活锁或者其他的活跃性故障),所有等待这个锁的线程就永远无法执行下去。
  即使忽略这些风险,锁定方式对于细粒度的操作(例如递增计数器)来说仍然是一种高开销的机制。在管理线程之间的竞争时应该有一种粒度更细的技术,类似于volatile变量的机制,同时还要支持原子的更新操作。幸运的是,在现代的处理器中提供了这种机制。

15.2 硬件对并发的支持

独占锁是一项悲观技术——它假设最坏的情况(如果你不锁门,那么捣蛋鬼就会闯入并搞得一团糟),并且只有在确保其他线程不会造成干扰(通过获取正确的锁)的情况下才能执行下去。
  对于细粒度的操作,还有另外一种更高效的方法,也是一种乐观的方法,通过这种方法可以在不发生干扰的情况下完成更新操作。这种方法需要借助冲突检查机制来判断在更新过程中是否存在来自其他线程的干扰,如果存在,这个操作将失败,并且可以重试(也可以不重试)。这种乐观的方法就好像一句谚语:“原谅比准许更容易得到”,其中“更容易”在这里相当于“更高效”。
  在针对多处理器操作而设计的处理器中提供了一些特殊指令,用于管理对共享数据的并发访问。在早期的处理器中支持原子的测试并设置(Test-and-Set),获取并递增(Fetch-and-Increment)以及交换(Swap)等指令,这些指令足以实现各种互斥体,而这些互斥体又可以实现一些更复杂的并发对象。现在,几乎所有的现代处理器中都包含了某种形式的原子读-改-写指令,例如比较并交换(Compare-and-Swap)或者关联加载/条件存储(Load-Linked/Store-Conditional)。操作系统和JVM使用这些指令来实现锁和并发的数据结构,但在Java 5.0之前,在Java类中还不能直接使用这些指令。

15.2.1 比较并交换

在大多数处理器架构(包括IA32和Sparc)中采用的方法是实现一个比较并交换(CAS)指令。(在其他处理器中,例如PowerPC,采用一对指令来实现相同的功能:关联加载与条件存储。)CAS包含了3个操作数——需要读写的内存位置V、进行比较的值A和拟写入的新值B。当且仅当V的值等于A时,CAS才会通过原子方式用新值B来更新V的值,否则不会执行任何操作。无论位置V的值是否等于A,都将返回V原有的值。(这种变化形式被称为比较并设置,无论操作是否成功都会返回。)CAS的含义是:“我认为V的值应该为A,如果是,那么将V的值更新为B,否则不修改并告诉V的值实际为多少”。CAS是一项乐观的技术,它希望能成功地执行更新操作,并且如果有另一个线程在最近一次检查后更新了该变量,那么CAS能检测到这个错误。程序清单15-1中的SimulatedCAS说明了CAS语义(而不是实现或性能)。
  程序清单15-1 模拟CAS操作

@ThreadSafe
public class SimulatedCAS {@GuardedBy("this") private int value;public synchronized int get() { return value; }public synchronized int compareAndSwap(int expectedValue, int newValue) {int oldValue = value;if (oldValue == expectedValue) {value = newValue;}return oldValue;}public synchronized boolean compareAndSet(int expectedValue, int newValue) {return expectedValue == compareAndSwap(expectedValue, newValue);}
}

当多个线程尝试使用CAS同时更新同一个变量时,只有其中一个线程能更新变量的值,而其他线程都将失败。然而,失败的线程并不会被挂起(这与获取锁的情况不同:当获取锁失败时,线程将被挂起),而是被告知在这次竞争中失败,并可以再次尝试。由于一个线程在竞争CAS时失败不会阻塞,因此它可以决定是否重新尝试,或者执行一些恢复操作,也或者不执行任何操作。【如果在CAS失败时不执行任何操作,那么是一种明智的做法。在非阻塞算法中,例如15.4.2节中的链表队列算法,当CAS失败时,意味着其他线程已经完成了你想要执行的操作。】这种灵活性就大大减少了与锁相关的活跃性风险(尽管在一些不常见的情况下仍然存在活锁风险——请参见10.3.3节。
  CAS的典型使用模式是:首先从V中读取值A,并根据A计算新值B,然后再通过CAS以原子方式将V中的值由A变成B(只要在这期间没有任何线程将V的值修改为其他值)。由于CAS能检测到来自其他线程的干扰,因此即使不使用锁也能够实现原子的读-改-写操作序列。

15.2.2 非阻塞的计数器

程序清单15-2中的CasCounter使用CAS实现了一个线程安全的计数器。递增操作采用了标准形式——读取旧的值,根据它计算出新值(加1),并使用CAS来设置这个新值。如果CAS失败,那么该操作将立即重试。通常,反复地重试是一种合理的策略,但在一些竞争很激烈的情况下,更好的方式是在重试之前首先等待一段时间或者回退,从而避免造成活锁问题。
  程序清单15-2 基于CAS实现的非阻塞计数器

@ThreadSafe
public class CasCounter {private SimulatedCAS value;public int getValue() { return value.get(); }public int increment() {int v;do {v = value.get();}while (v != value.compareAndSwap(v, v + 1));return v + 1;}
}

CasCounter不会阻塞,但如果其他线程同时更新计数器,那么会多次执行重试操作【理论上,如果其他线程在每次竞争CAS时总是获胜,那么这个线程每次都会重试,但在实际中,很少发生这种类型的饥饿问题。】。(在实际情况中,如果仅需要一个计数器或序列生成器,那么可以直接使用AtomicInteger或AtomicLong,它们能提供原子的递增方法以及其他算术方法。)
  初看起来,基于CAS的计数器似乎比基于锁的计数器在性能上更差一些,因为它需要执行更多的操作和更复杂的控制流,并且还依赖看似复杂的CAS操作。但实际上,当竞争程度不高时,基于CAS的计数器在性能上远远超过了基于锁的计数器,而在没有竞争时甚至更高。如果要快速获取无竞争的锁,那么至少需要一次CAS操作再加上与其他锁相关的操作,因此基于锁的计数器即使在最好的情况下也会比基于CAS的计数器在一般情况下能执行更多的操作。由于CAS在大多数情况下都能成功执行(假设竞争程度不高),因此硬件能够正确地预测while循环中的分支,从而把复杂控制逻辑的开销降至最低。
  虽然Java语言的锁定语法比较简洁,但JVM和操作在管理锁时需要完成的工作却并不简单。在实现锁定时需要遍历JVM中一条非常复杂的代码路径,并可能导致操作系统级的锁定、线程挂起以及上下文切换等操作。在最好的情况下,在锁定时至少需要一次CAS,因此虽然在使用锁时没有用到CAS,但实际上也无法节约任何执行开销。另一方面,在程序内部执行CAS时不需要执行JVM代码、系统调用或线程调度操作。在应用级上看起来越长的代码路径,如果加上JVM和操作系统中的代码调用,那么事实上却变得更短。CAS的主要缺点是,它将使调用者处理竞争问题(通过重试、回退、放弃),而在锁中能自动处理竞争问题(线程在获得锁之前将一直阻塞)。【事实上,CAS最大的缺陷在于难以围绕着CAS正确地构建外部算法。】
  CAS的性能会随着处理器数量的不同而变化很大。在单CPU系统中,CAS通常只需要很少的时钟周期,因为不需要处理器之间的同步。在编写本书时,非竞争的CAS在多CPU系统中需要10到150个时钟周期的开销。CAS的执行性能不仅在不同的体系架构之间变化很大,甚至在相同处理器的不同版本之间也会发生改变。生产厂商迫于竞争的压力,在接下来的几年内还会继续提高CAS的性能。一个很管用的经验法则是:在大多数处理器上,在无竞争的锁获取和释放的“快速代码路径”上的开销,大约是CAS开销的两倍。

15.2.3 JVM对CAS的支持

那么,Java代码如何确保处理器执行CAS操作?在Java 5.0之前,如果不编写明确的代码,那么就无法执行CAS。在Java 5.0中引入了底层的支持,在int、long和对象的引用等类型上都公开了CAS操作,并且JVM把它们编译为底层硬件提供的最有效方法。在支持CAS的平台上,运行时把它们编译为相应的(多条)机器指令。在最坏的情况下,如果不支持CAS指令,那么JVM将使用自旋锁。在原子变量类(例如java.util.concurrent.atomic中的AtomicXxx)中使用了这些底层的JVM支持为数字类型和引用类型提供一种高效的CAS操作,而在java.util.concurrent中的大多数类在实现时则直接或间接地使用了这些原子变量类。

15.3 原子变量类

原子变量比锁的粒度更细,量级更轻,并且对于在多处理器系统上实现高性能的并发代码来说是非常关键的。原子变量将发生竞争的范围缩小到单个变量上,这是你获得的粒度最细的情况(假设算法能够基于这种细粒度来实现)。更新原子变量的快速(非竞争)路径不会比获取锁的快速路径慢,并且通常会更快,而它的慢速路径肯定比锁的慢速路径快,因为它不需要挂起或重新调度线程。在使用基于原子变量而非锁的算法中,线程在执行时更不易出现延迟,并且如果遇到竞争,也更容易恢复过来。
  原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作。AtomicInteger表示一个int类型的值,并提供了get和set方法,这些Volatile类型的int变量在读取和写入上有着相同的内存语义。它还提供了一个原子的compareAndSet方法(如果该方法成功执行,那么将实现与读取/写入一个volatile变量相同的内存效果),以及原子的添加、递增和递减等方法。AtomicInteger表面上非常像一个扩展的Counter类,但在发生竞争的情况下能提供更高的可伸缩性,因为它直接利用了硬件对并发的支持。
  共有12个原子变量类,可分为4组:标量类(Scalar)、更新器类、数组类以及复合变量类。最常用的原子变量就是标量类:AtomicInteger、AtomicLong、AtomicBoolean以及AtomicReference。所有这些类都支持CAS,此外,AtornicInteger和AtomicLong还支持算术运算。(要想模拟其他基本类型的原子变量,可以将short或byte等类型与int类型进行转换,以及使用floatToIntBits或doubleToLongBits来转换浮点数。)
  原子数组类(只支持Integer、Long和Reference版本)中的元素可以实现原子更新。原子数组类为数组的元素提供了volatile类型的访问语义,这是普通数组所不具备的特性——volatile类型的数组仅在数组引用上具有volatile语义,而在其元素上则没有。(15.4.3节和15.4.4节将讨论其他类型的原子变量。)
  尽管原子的标量类扩展了Number类,但并没有扩展一些基本类型的包装类,例如Integer或Long。事实上,它们也不能进行扩展:基本类型的包装类是不可修改的,而原子变量类是可修改的。在原子变量类中同样没有重新定义hashCode或equals方法,每个实例都是不同的。与其他可变对象相同,它们也不宜用做基于散列的容器中的键值。

15.3.1 原子变量类是一种“更好的volatile”

在3.4.2节中,我们使用了一个指向不可变对象的volatile引用来原子地更新多个状态变量。这个示例依赖于“先检查再运行”,但这种特殊的情况下,竞争是无害的,因为我们并不关心是否会偶尔地丢失更新操作。而在大多数情况下,这种“先检查再运行”不会是无害的,并且可能破坏数据的一致性。例如,第4章中的NumberRange既不能使用指向不可变对象的volatile引用来安全地实现上界和下界,也不能使用原子的整数来保存这两个边界。由于有一个不变性条件限制了两个数值,并且它们无法在同时更新时还维持该不变性条件,因此如果在数值范围类中使用volatile引用或者多个原子整数,那么将出现不安全的“先检查再运行”操作序列。
  可以将OneValueCache中的技术与原子引用结合起来,并且通过对指向不可变对象(其中保存了下界和上界)的引用进行原子更新以避免竞态条件。在程序清单15-3的CasNumber-Range中使用了AtomicReference和IntPair来保存状态,并通过使用compare-AndSet,使它在更新上界或下界时能避免NumberRange的竞态条件。
  程序清单15-3 通过CAS来维持包含多个变量的不变性条件

@ThreadSafe
public class CasNumberRange {@Immutableprivate static class IntPair {final int lower; // 不变性条件 lower <= upperfinal int upper;}private final AtomicReference<IntPair> values = new AtomicReference<IntPair>(new IntPair(0,0));public int getLower() { return values.get().lower; }public int getUpper() { return values.get().upper; }public void setLower(int i) {while (true) {IntPair oldV = values.get();if (i > oldV.upper) {throw new IllegalArgumentException("Can't set lower to " + i + " > upper.");}IntPair newV = new IntPair(i, oldV.upper);if (values.compareAndSet(oldV, newV)) {return;}}}// 对setUpper采用类似对方法
}

15.3.2 性能比较: 锁与原子变量

为了说明锁和原子变量之间的可伸缩性差异,我们构造了一个测试基准,其中将比较伪随机数字生成器(PRNG)的几种不同实现。在PRNG中,在生成下一个随机数字时需要用到上一个数字,所以在PRNG中必须记录前一个数值并将其作为状态的一部分。
  程序清单15-4和程序清单15-5给出了线程安全的PRNG的两种实现,一种使用Reentrant-Lock,另一种使用AtomicInteger。测试程序将反复调用它们,在每次迭代中将生成一个随机数字(在此过程中将读取并修改共享的seed状态),并执行一些仅在线程本地数据上执行的“繁忙”迭代。这种方式模拟了一些典型操作,以及一些在共享状态以及线程本地状态上的操作。
  程序清单15-4 基于ReentrantLock实现的随机数生成器

@ThreadSafe
public class ReentrantLockPseudoRandom extends PseudoRandom {private final Lock lock = new ReentrantLock(false);private int seed;ReentrantLockPseudoRandom(int seed) { this.seed = seed; }public int nextInt(int n) {lock.lock();try {int s = seed;seed = calculateNext(s);int remainder = s % n;return remainder > 0 ? remainder : remainder + n;} finally {lock.unlock();}}
}

程序清单15-5 基于AtomicInteger实现的随机数生成器

@ThreadSafe
public class AtomicPesudoRandom extends PseudoRandom {private AtomicInteger seed;AtomicPesudoRandom(int seed) { this.seed = new AtomicInteger(seed); }public int nextInt(int n) {while (true) {int s = seed.get();int nextSeed = calculateNext(s);if (seed.compareAndSet(s, nextSeed)) {int remainder = s % n;return remainder > 0 ? remainder : remainder + n;}}}
}

图15-1和图15-2给出了在每次迭代中工作量较低以及适中情况下的吞吐量。如果线程本地的计算量较少,那么在锁和原子变量上的竞争将非常激烈。如果线程本地的计算量较多,那么在锁和原子变量上的竞争会降低,因为在线程中访问锁和原子变量的频率将降低。
  从这些图中可以看出,在高度竞争的情况下,锁的性能将超过原子变量的性能,但在更真实的竞争情况下,原子变量的性能将超过锁的性能【这个结论在其他领域同样成立:当交通拥堵时,交通信号灯能够实现更高的吞吐量,而在低拥堵时,环岛能实现更高的吞吐量。在以太网中使用的竞争机制在低通信流量的情况下运行得很好,但在高通信流量的情况下,令牌环网络中的令牌传递机制则表现得更好。】。这是因为锁在发生竞争时会挂起线程,从而降低了CPU的使用率和共享内存总线上的同步通信量。(这类似于在生产者-消费者设计中的可阻塞生产者,它能降低消费者上的工作负载,使消费者的处理速度赶上生产者的处理速度。)另一方面,如果使用原子变量,那么发出调用的类负责对竞争进行管理。与大多数基于CAS的算法一样,AtomicPseudoRandom在遇到竞争时将立即重试,这通常是一种正确的方法,但在激烈竞争环境下却导致了更多的竞争。

  在批评AtomicPseudoRandom写得太糟糕或者原子变量比锁更糟糕之前,应该意识到图15-1中竞争级别过高而有些不切实际:任何一个真实的程序都不会除了竞争锁或原子变量,其他什么工作都不做。在实际情况中,原子变量在可伸缩性上要高于锁,因为在应对常见的竞争程度时,原子变量的效率会更高。
  锁与原子变量在不同竞争程度上的性能差异很好地说明了各自的优势和劣势。在中低程度的竞争下,原子变量能提供更高的可伸缩性,而在高强度的竞争下,锁能够更有效地避免竞争。(在单CPU的系统上,基于CAS的算法在性能上同样会超过基于锁的算法,因为CAS在单CPU的系统上通常能执行成功,只有在偶然情况下,线程才会在执行读-改-写的操作过程中被其他线程抢占执行。)
  在图15-1和图15-2中都包含了第三条曲线,它是一个使用ThreadLocal来保存PRNG状态的PseudoRandom。这种实现方法改变了类的行为,即每个线程都只能看到自己私有的伪随机数字序列,而不是所有线程共享同一个随机数序列,这说明了,如果能够避免使用共享状态,那么开销将会更小。我们可以通过提高处理竞争的效率来提高可伸缩性,但只有完全消除竞争,才能实现真正的可伸缩性。

15.4 非阻塞算法

在基于锁的算法中可能会发生各种活跃性故障。如果线程在持有锁时由于阻塞I/O,内存页缺失或其他延迟而导致推迟执行,那么很可能所有线程都不能继续执行下去。如果在某种算法中,一个线程的失败或挂起不会导致其他线程也失败或挂起,那么这种算法就被称为非阻塞算法。如果在算法的每个步骤中都存在某个线程能够执行下去,那么这种算法也被称为无锁(Lock-Free)算法。如果在算法中仅将CAS用于协调线程之间的操作,并且能正确地实现,那么它既是一种无阻塞算法,又是一种无锁算法。无竞争的CAS通常都能执行成功,并且如果有多个线程竞争同一个CAS,那么总会有一个线程在竞争中胜出并执行下去。在非阻塞算法中通常不会出现死锁和优先级反转问题(但可能会出现饥饿和活锁问题,因为在算法中会反复地重试)。到目前为止,我们已经看到了一个非阻塞算法:CasCounter。在许多常见的数据结构中都可以使用非阻塞算法,包括栈、队列、优先队列以及散列表等,而要设计一些新的这种数据结构,最好还是由专家们来完成。

15.4.1 非阻塞的栈

在实现相同功能的前提下,非阻塞算法通常比基于锁的算法更为复杂。创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性。在链式容器类(例如队列)中,有时候无须将状态转换操作表示为对节点链接的修改,也无须使用AtomicReference来表示每个必须采用原子操作来更新的链接。
  栈是最简单的链式数据结构:每个元素仅指向一个元素,并且每个元素也只被一个元素引用。在程序清单15-6的ConcurrentStack中给出了如何通过原子引用来构建栈的示例。栈是由Node元素构成的一个链表,其中栈顶作为根节点,并且在每个元素中都包含了一个值以及指向下一个元素的链接。push方法创建一个新的节点,该节点的next域指向当前的栈顶,然后使用CAS把这个新节点放入栈顶。如果在开始插入节点时,位于栈顶的节点没有发生变化,那么CAS就会成功,如果栈顶节点发生了变化(例如由于其他线程在本线程开始之前插入或移除了元素),那么CAS将会失败,而push方法会根据栈的当前状态来更新节点,并且再次尝试。无论哪种情况,在CAS执行完成后,后栈仍会处于一致的状态。
   程序清单15-6 使用Treiber算法(Treiber,1986)构造的非阻塞栈

@ThreadSafe
public class ConcurrentStack<E> {AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();public void push(E item) {Node<E> newHead = new Node<E>(item);Node<E> oldHead;do {oldHead = top.get();newHead.next = oldHead;} while (! top.compareAndSet(oldHead, newHead));}public E pop() {Node<E> oldHead, newHead;do {oldHead = top.get();if (oldHead == null) {  return null; } newHead = oldHead.next;} while (! top.compareAndSet(oldHead, newHead));return oldHead.item;}private static class Node<E> {public final E item;public Node<E> next;public Node(E item) { this.item = item; } }
}

在CasCounter和ConcurrentStack中说明了非阻塞算法的所有特性:某项工作的完成具有不确定性,必须重新执行。在ConcurrentStack中,当构造表示新元素的Node时,我们希望当把这个新节点压入到栈时,其next引用的值仍然是正确的,同时也准备好在发生竞争的情况下重新尝试。
  在像ConcurrentStack这样的非阻塞算法中都能确保线程安全性,因为compareAndSet像锁定机制一样,既能提供原子性,又能提供可见性。当一个线程需要改变栈的状态时,将调用compareAndSet,这个方法与写入volaitle类型的变量有着相同的内存效果。当线程检查栈的状态时,将在同一个AtomicReference上调用get方法,这个方法与读取volaitle类型的变量有着相同的内存效果。因此,一个线程执行的任何修改结构都可以安全地发布给其他正在查看状态的线程。并且,这个栈是通过compareAndSet来修改的,因此将采用原子操作来更新top的引用,或者在发现存在其他线程干扰的情况下,修改操作将失败。

15.4.2 非阻塞的链表

到目前为止,我们已经看到了两个非阻塞算法,计数器和栈,它们很好地说明了CAS的基本使用模式:在更新某个值时存在不确定性,以及在更新失败时重新尝试。构建非阻塞算法的技巧在于:将执行原子修改的范围缩小到单个变量上。这在计数器中很容易实现,在栈中也很简单,但对于一些更复杂的数据结构,例如队列、散列表或树,则要复杂得多。
  链接队列比栈更为复杂,因为它必须支持对头节点和尾结点的快速访问。因此,它需要单独维护的头指针和尾指针。有两个指针指向位于尾部的节点:当前最后一个元素的next指针,以及尾节点。当成功地插入一个新元素时,这两个指针都需要采用原子操作来更新。初看起来,这个操作无法通过原子变量来实现。在更新这两个指针时需要不同的CAS操作,并且如果第一个CAS成功,但第二个CAS失败,那么队列将处于不一致的状态。而且,即使这两个CAS都成功了,那么在执行这两个CAS之间,仍可能有另一个线程会访问这个队列。因此,在为链接队列构建非阻塞算法时,需要考虑到这两种情况
  我们需要使用一些技巧。第一个技巧是,即使在一个包含多个步骤的更新操作中,也要确保数据结构总是处于一致的状态。这样,当线程B到达时,如果发现线程A正在执行更新,那么线程B就可以知道有一个操作已部分完成,并且不能立即开始执行自己的更新操作。然后,B可以等待(通过反复检查队列的状态)并直到A完成更新,从而使两个线程不会相互干扰。
  虽然这种方法能够使不同的线程“轮流”访问数据结构,并且不会造成破坏,但如果一个线程在更新操作中失败了,那么其他的线程都无法再访问队列。要使得该算法成为一个非阻塞算法,必须确保当一个线程失败时不会妨碍其他线程继续执行下去。因此,第二个技巧是,如果当B到达时发现A正在修改数据结构,那么在数据结构中应该有足够多的信息,使得B能完成A的更新操作。如果B“帮助”A完成了更新操作,那么B可以执行自己的操作,而不用等待A的操作完成。当A恢复后再试图完成其操作时,会发现B已经替它完成了。
  在程序清单15-7的LinkedQueue中给出了Michael-Scott提出的非阻塞链接队列算法中的插入部分(Michael and Scott,1996),在ConcurrentLinkedQueue中使用的正是该算法。在许多队列算法中,空队列通常都包含一个“哨兵(Sentinel)节点”或者“哑(Dummy)节点”,并且头节点和尾节点在初始化时都指向该哨兵节点。尾节点通常要么指向哨兵节点(如果队列为空),即队列的最后一个元素,要么(当有操作正在执行更新时)指向倒数第二个元素。图15-3给出了一个处于正常状态(或者说稳定状态)的包含两个元素的队列。

@ThreadSafe
public class LinkedQueue<E> {private static class Node<E> {final E item;final AtomicReference<Node<E>> next;public Node(E item, Node<E> next) {this.item = item;this.next = new AtomicReference<>(next);}}private final Node<E> dummy = new Node<E>(null, null);private final AtomicReference<Node<E>> head = new AtomicReference<>(dummy);private final AtomicReference<Node<E>> tail = new AtomicReference<>(dummy);public boolean put(E item) {Node<E> newNode = new Node<>(item, null);while (true) {Node<E> curTail = tail.get();Node<E> tailNext = curTail.next.get();if (curTail == tail.get()) {if (tailNext != null) {// 队列处于中间状态, 推进尾节点tail.compareAndSet(curTail, tailNext);} else {// 处于稳定状态, 尝试进入新节点if (curTail.next.compareAndSet(null, newNode)) {// 插入成功, 尝试推进尾节点tail.compareAndSet(curTail, newNode);return true;}}}}}
}

当插入一个新的元素时,需要更新两个指针。首先更新当前最后一个元素的next指针,将新节点链接到列表队尾,然后更新尾节点,将其指向这个新元素。在这两个操作之间,队列处于一种中间状态,如图15-4所示。在第二次更新完成后,队列将再次处于稳定状态,如图15-5所示。
  实现这两个技巧时的关键点在于:当队列处于稳定状态时,尾节点的next域将为空,如果队列处于中间状态,那么tail.next将为非空。因此,任何线程都能够通过检查tail.next来获取队列当前的状态。而且,当队列处于中间状态时,可以通过将尾节点向前移动一个节点,从而结束其他线程正在执行的插入元素操作,并使得队列恢复为稳定状态。【在(Michael and Scott,1996)以及(Herlihy and Shavit,2006)中都给出了对算法正确性的完整分析。】

  LinkedQueue.put方法在插入新元素之前,将首先检查队列是否处于中间状态(步骤A)。如果是,那么有另一个线程正在插入元素(在步骤C和D之间)。此时当前线程不会等待其他线程执行完成,而是帮助它完成操作,并将尾节点向前推进一个节点(步骤B)。然后,它将重复执行这种检查,以免另一个线程已经开始插入新元素,并继续推进尾节点,直到它发现队列处于稳定状态之后,才会开始执行自己的插入操作。
  由于步骤C中的CAS将把新节点链接到队列尾部,因此如果两个线程同时插入元素,那么这个CAS将失败。在这样的情况下,并不会造成破坏:不会发生任何变化,并且当前的线程只需重新读取尾节点并再次重试。如果步骤C成功了,那么插入操作将生效,第二个CAS(步骤D)被认为是一个“清理操作”,因为它既可以由执行插入操作的线程来执行,也可以由其他任何线程来执行。如果步骤D失败,那么执行插入操作的线程将返回,而不是重新执行CAS,因为不再需要重试——另一个线程已经在步骤B中完成了这个工作。这种方式能够工作,因为在任何线程尝试将一个新节点插入到队列之前,都会首先通过检查tail.next是否非空来判断是否需要清理队列。如果是,它首先会推进尾节点(可能需要执行多次),直到队列处于稳定状态。

15.4.3 原子的域更新器

程序清单15-7说明了在ConcurrentLinkedQueue中使用的算法,但在实际的实现中略有区别。在ConcurrentLinkedQueue中没有使用原子引用来表示每个Node,而是使用普通的volatile类型引用,并通过基于反射的AtomicReferenceFieldUpdater来进行更新,如程序清单15-8所示。
  程序清单15-8 在ConcurrentLinkedQueue中使用原子的域更新器

@ThreadSafe
public class Node<E> {private final E item;private volatile Node<E> next;public Node(E item) { this.item = item; }private static AtomicReferenceFieldUpdater<Node, Node> nextUpdater= AtomicReferenceFieldUpdater.newUpdater(Node.class,Node.class,"next");
}

原子的域更新器类表示现有volatile域的一种基于反射的“视图”,从而能够在已有的volatile域上使用CAS。在更新器类中没有构造函数,要创建一个更新器对象,可以调用newUpdater工厂方法,并制定类和域的名字。域更新器类没有与某个特定的实例关联在一起,因而可以更新目标类的任意实例中的域。更新器类提供的原子性保证比普通原子类更弱一些,因为无法保证底层的域不被直接修改——compareAndSet以及其他算术方法只能确保其他使用原子域更新器方法的线程的原子性。
  在ConcurrentLinkedQueue中,使用nextUpdater的compareAndSet方法来更新Node的next域。这个方法有点繁琐,但完全是为了提升性能。对于一些频繁分配并且生命周期短暂的对象,例如队列的链接节点,如果能去掉每个Node的AtomicReference创建过程,那么将极大地降低插入操作的开销。然而,几乎在所有情况下,普通原子变量的性能都很不错,只有在很少的情况下才需要使用原子的域更新器。(如果在执行原子更新的同时还需要维持现有类的串行化形式,那么原子的域更新器将非常有用。)

15.4.4 ABA问题

ABA问题是一种异常现象:如果在算法中的节点可以被循环使用,那么在使用“比较并交换”指令时就可能出现这种问题(主要在没有垃圾回收机制的环境中)。在CAS操作中将判断“V的值是否仍然为A?”,并且如果是的话就继续执行更新操作。在大多数情况下,包括本章给出的示例,这种判断是完全足够的。然而,有时候还需要知道“自从上次看到V的值为A以来,这个值是否发生了变化?”在某些算法中,如果V的值首先由A变成B,再由B变成A,那么仍然被认为是发生了变化,并需要重新执行算法中的某些步骤。
  如果在算法中采用自己的方式来管理节点对象的内存,那么可能出现ABA问题。在这种情况下,即使链表的头节点仍然指向之前观察到的节点,那么也不足以说明链表的内容没有发生改变。如果通过垃圾回收器来管理链表节点仍然无法避免ABA问题,那么还有一个相对简单的解决方案:不是更新某个引用的值,而是更新两个值,包括一个引用和一个版本号。即使这个值由A变为B,然后又变为A,版本号也将是不同的。AtomicStampedReference(以及AtomicMarkableReference)支持在两个变量上执行原子的条件更新。AtomicStampedReference将更新一个“对象-引用”二元组,通过在引用上加上“版本号”,从而避免【在实际中,无论如何,理论上计数器都应该这样包装。】ABA问题。类似地,AtomicMarkableReference将更新一个“对象引用-布尔值”二元组,在某些算法中将通过这种二元组使节点保存在链表中同时又将其标记为“已删除的节点”。【许多处理器都提供了各种二元的CAS(CAS2或CASX)操作,用于对一些“指针-整数”二元组进行操作,从而使这种操作的效率得到极大提高。从Java 6开始,在AtomicStampedReference并没有使用这种CAS(即使平台支持这种操作)。(这种二元CAS与DCAS不同,DCAS能在内存的两个互不相关的位置上执行操作,而在编写本书时,还没有处理器实现了DCAS。)】

小结

非阻塞算法通过底层的并发原语(例如比较并交换而不是锁)来维持线程的安全性。这些底层的原语通过原子变量类向外公开,这些类也用做一种“更好的volatile变量”,从而为整数和对象引用提供原子的更新操作。
  非阻塞算法在设计和实现时非常困难,但通常能够提供更高的可伸缩性,并能更好地防止活跃性故障的发生。在JVM从一个版本升级到下一个版本的过程中,并发性能的主要提升都来自于(在JVM内部以及平台类库中)对非阻塞算法的使用。

第16章 Java内存模型

本书中,我们尽可能地避开了Java内存模型(JMM)的底层细节,而将重点放在一些高层设计问题,例如安全发布,同步策略的规范以及一致性等。它们的安全性都来自于JMM,并且当你理解了这些机制的工作原理后,就能更容易地使用它们。本章将介绍Java内存模型的底层需求以及所提供的保证,此外还将介绍在本书给出的一些高层设计原则背后的原理。

16.1 什么是内存模型,为什么需要它

假设一个线程为变量aVariable赋值:aVariable=3; 内存模型需要解决这个问题:“在什么条件下,读取aVariable的线程将看到这个值为3?”这听起来似乎是一个愚蠢的问题,但如果缺少同步,那么将会有许多因素使得线程无法立即甚至永远,看到另一个线程的操作结果。在编译器中生成的指令顺序,可以与源代码中的顺序不同,此外编译器还会把变量保存在寄存器而不是内存中;处理器可以采用乱序或并行等方式来执行指令;缓存可能会改变将写入变量提交到主内存的次序;而且,保存在处理器本地缓存中的值,对于其他处理器是不可见的。这些因素都会使得一个线程无法看到变量的最新值,并且会导致其他线程中的内存操作似乎在乱序执行——如果没有使用正确的同步。
  在单线程环境中,我们无法看到所有这些底层技术,它们除了提高程序的执行速度外,不会产生其他影响。Java语言规范要求JVM在线程中维护一种类似串行的语义:只要程序的最终结果与在严格串行环境中执行的结果相同,那么上述所有操作都是允许的。这确实是一件好事情,因为在最近几年中,计算性能的提升在很大程度上要归功于这些重新排序措施。当然,时钟频率的提供同样提升了性能,此外还有不断提升的并行性——采用流水线的超标量执行单元,动态指令调度,猜测执行以及完备的多级缓存。随着处理变得越来越强大,编译器也在不断地改进:通过对指令重新排序来实现优化执行,以及使用成熟的全局寄存器分配算法。由于时钟频率越来越难以提高,因此许多处理器制造厂商都开始转而生产多核处理器,因为能够提高的只有硬件并行性。
  在多线程环境中,维护程序的串行性将导致很大的性能开销。对于并发应用程序中的线程来说,它们在大部分时间里都执行各自的任务,因此在线程之间的协调操作只会降低应用程序的运行速度,而不会带来任何好处。只有当多个线程要共享数据时,才必须协调它们之间的操作,并且JVM依赖程序通过同步操作来找出这些协调操作将在何时发生。
  JMM规定了JVM必须遵循一组最小保证,这组保证规定了对变量的写入操作在何时将对于其他线程可见。JMM在设计时就在可预测性和程序的易于开发性之间进行了权衡,从而在各种主流的处理器体系架构上能实现高性能的JVM。如果你不了解在现代处理器和编译器中使用的程序性能提升措施,那么在刚刚接触JMM的某些方面时会感到困惑。

16.1.1 平台的内存模型

在共享内存的多处理器体系架构中,每个处理器都拥有自己的缓存,并且定期地与主内存进行协调。在不同的处理器架构中提供了不同级别的缓存一致性(Cache Coherence),其中一部分只提供最小的保证,即允许不同的处理器在任意时刻从同一个存储位置上看到不同的值。操作系统、编译器以及运行时(有时甚至包括应用程序)需要弥合这种在硬件能力与线程安全需求之间的差异。
  要想确保每个处理器都能在任意时刻知道其他处理器正在进行的工作,将需要非常大的开销。在大多数时间里,这种信息是不必要的,因此处理器会适当放宽存储一致性保证,以换取性能的提升。在架构定义的内存模型中将告诉应用程序可以从内存系统中获得怎样的保证,此外还定义了一些特殊的指令(称为内存栅栏或栅栏),当需要共享数据时,这些指令就能实现额外的存储协调保证。为了使Java开发人员无须关心不同架构上内存模型之间的差异,Java还提供了自己的内存模型,并且JVM通过在适当的位置上插入内存栅栏来屏蔽在JMM与底层平台内存模型之间的差异。
  程序执行一种简单假设:想象在程序中只存在唯一的操作执行顺序,而不考虑这些操作在何种处理器上执行,并且在每次读取变量时,都能获得在执行序列中(任何处理器)最近一次写入该变量的值。这种乐观的模型就被称为串行一致性。软件开发人员经常会错误地假设存在串行一致性,但在任何一款现代多处理器架构中都不会提供这种串行一致性,JMM也是如此。冯·诺伊曼模型这种经典的串行计算模型,只能近似描述现代多处理器的行为。
  在现代支持共享内存的多处理器(和编译器)中,当跨线程共享数据时,会出现一些奇怪的情况,除非通过使用内存栅栏来防止这些情况的发生。幸运的是,Java程序不需要指定内存栅栏的位置,而只需通过正确地使用同步来找出何时将访问共享状态。

16.1.2 重排序

在第2章中介绍竞态条件和原子性故障时,我们使用了交互图来说明:在没有充分同步的程序中,如果调度器采用不恰当的方式来交替执行不同线程的操作,那么将导致不正确的结果。更糟的是,JMM还使得不同线程看到的操作执行顺序是不同的,从而导致在缺乏同步的情况下,要推断操作的执行顺序将变得更加复杂。各种使操作延迟或者看似乱序执行的不同原因,都可以归为重排序。
  在程序清单16-1的PossibleReordering中说明了,在没有正确同步的情况下,即使要推断最简单的并发程序的行为也很困难。很容易想象PossibleReordering是如何输出(1,0)或(0,1)或(1,1)的:线程A可以在线程B开始之前就执行完成,线程B也可以在线程A开始之前执行完成,或者二者的操作交替执行。但奇怪的是,PossibleReordering还可以输出(0,0)。由于每个线程中的各个操作之间不存在数据流依赖性,因此这些操作可以乱序执行。(即使这些操作按照顺序执行,但在将缓存刷新到主内存的不同时序中也可能出现这种情况,从线程B的角度看,线程A中的赋值操作可能以相反的次序执行。)图16-1给出了一种可能由重排序导致的交替执行方式,在这种情况中会输出(0,0)。
  程序清单16-1 如果在程序中没有包含足够的同步,那么可能产生奇怪的结果(不要这么做)

public class PossibleReordering {static int x = 0, y = 0, a = 0, b = 0;public static void main(String[] args) throws InterruptedException {Thread one = new Thread(() -> {  a = 1; x = b; });Thread other = new Thread(() -> { b = 1; y = a; });one.start(); other.start();one.join();  other.join();System.out.println("(" + x + "," + y + ")");}
}


  PossibleReordering是一个简单程序,但要列举出它所有可能的结果却非常困难。内存级的重排序会使程序的行为变得不可预测。如果没有同步,那么推断出执行顺序将是非常困难的,而要确保在程序中正确地使用同步却是非常容易的。同步将限制编译器、运行时和硬件对内存操作重排序的方式,从而在实施重排序时不会破坏JMM提供的可见性保证【在大多数主流的处理器架构中,内存模型都非常强大,使得读取volatile变量的性能与读取非volatile变量的性能大致相当。】

16.1.3 Java内存模型简介

Java内存模型是通过各种操作来定义的,包括对变量的读/写操作,监视器的加锁和释放操作,以及线程的启动和合并操作。JMM为程序中所有的操作定义了一个偏序关系[插图],称之为Happens-Before。要想保证执行操作B的线程看到操作A的结果(无论A和B是否在同一个线程中执行),那么在A和B之间必须满足Happens-Before关系。如果两个操作之间缺乏Happens-Before关系,那么JVM可以对它们任意地重排序。
  当一个变量被多个线程读取并且至少被一个线程写入时,如果在读操作和写操作之间没有依照Happens-Before来排序,那么就会产生数据竞争问题。在正确同步的程序中不存在数据竞争,并会表现出串行一致性,这意味着程序中的所有操作都会按照一种固定的和全局的顺序执行。
Happens-Before的规则包括:

  • 程序顺序规则。如果程序中操作A在操作B之前,那么在线程中A操作将在B操作之前执行。
  • 监视器锁规则。在监视器锁上的解锁操作必须在同一个监视器锁上的加锁操作之前执行
  • volatile变量规则。对volatile变量的写入操作必须在对该变量的读操作之前执行
  • 线程启动规则。在线程上对Thread.Start的调用必须在该线程中执行任何操作之前执行。
  • 线程结束规则。线程中的任何操作都必须在其他线程检测到该线程已经结束之前执行,或者从Thread.join中成功返回,或者在调用Thread.isAlive时返回false。
  • 中断规则。当一个线程在另一个线程上调用interrupt时,必须在被中断线程检测到interrupt调用之前执行(通过抛出InterruptedException,或者调用isInterrupted和interrupted)。
  • 终结器规则。对象的构造函数必须在启动该对象的终结器之前执行完成。
  • 传递性。如果操作A在操作B之前执行,并且操作B在操作C之前执行,那么操作A必须在操作C之前执行。
      虽然这些操作只满足偏序关系,但同步操作,如锁的获取与释放等操作,以及volatile变量的读取与写入操作,都满足全序关系。因此,在描述Happens-Before关系时,就可以使用“后续的锁获取操作”和“后续的volatile变量读取操作”等表达术语。
      图16-2给出了当两个线程使用同一个锁进行同步时,在它们之间的Happens-Before关系。在线程A内部的所有操作都按照它们在源程序中的先后顺序来排序,在线程B内部的操作也是如此。由于A释放了锁M,并且B随后获得了锁M,因此A中所有在释放锁之前的操作,也就位于B中请求锁之后的所有操作之前。如果这两个线程是在不同的锁上进行同步的,那么就不能推断它们之间的动作顺序,因为在这两个线程的操作之间并不存在Happens-Before关系。

16.1.4 借助同步

由于Happens-Before的排序功能很强大,因此有时候可以“借助(Piggyback)”现有同步机制的可见性属性。这需要将Happens-Before的程序顺序规则与其他某个顺序规则(通常是监视器锁规则或者volatile变量规则)结合起来,从而对某个未被锁保护的变量的访问操作进行排序。这项技术由于对语句的顺序非常敏感,因此很容易出错。它是一项高级技术,并且只有当需要最大限度地提升某些类(例如ReentrantLock)的性能时,才应该使用这项技术。
  在FutureTask的保护方法AbstractQueuedSynchronizer中说明了如何使用这种“借助”技术。AQS维护了一个表示同步器状态的整数,FutureTask用这个整数来保存任务的状态:正在运行,已完成和已取消。但FutureTask还维护了其他一些变量,例如计算的结果。当一个线程调用set来保存结果并且另一个线程调用get来获取该结果时,这两个线程最好按照Happens-Before进行排序。这可以通过将执行结果的引用声明为volatile类型来实现,但利用现有的同步机制可以更容易地实现相同的功能。
  FutureTask在设计时能够确保,在调用tryAcquireShared之前总能成功地调用tryRelease-Shared。tryReleaseShared会写入一个volatile类型的变量,而tryAcquireShared将读取这个变量。程序清单16-2给出了innerSet和innerGet等方法,在保存和获取result时将调用这些方法。由于innerSet将在调用releaseShared(这又将调用tryReleaseShared)之前写入result,并且innerGet将在调用acquireShared(这又将调用tryReleaseShared)之后读取result,因此将程序顺序规则与volatile变量规则结合在一起,就可以确保innerSet中的写入操作在innerGet中的读取操作之前执行。
  程序清单16-2 说明如何借助同步的FutureTask的内部类

// FutureTask的内部类
private final class Sync extends AbstractQueueedSynchronizer {private static final int RUNNING = 1, RAN = 2, CANCELLED = 4;private V result; private Exception exception;void innerSet(V v) {while (true) {int s = getState();if (ranOrCancelled(s)) {return;}if (compareAndSetState(s, RAN)) {break;}}result = v; releaseShared(0);done();}void innerGet() throws InterruptedException, ExecutionException {acquireSharedInterruptibly(0);if (getState() == CANNELED) {throw new CancellationException();}if (exception != null) {throw new ExecutionException(exception());}return result;}
}

之所以将这项技术称为“借助”,是因为它使用了一种现有的Happens-Before顺序来确保对象X的可见性,而不是专门为了发布X而创建一种Happens-Before顺序。
  在FutureTask中使用的“借助”技术很容易出错,因此要谨慎使用。但在某些情况下,这种“借助”技术是非常合理的。例如,当某个类在其规范中规定它的各个方法之间必须遵循一种Happens-Before关系,基于BlockingQueue实现的安全发布就是一种“借助”。如果一个线程将对象置入队列并且另一个线程随后获取这个对象,那么这就是一种安全发布,因为在BlockingQueue的实现中包含有足够的内部同步来确保入列操作在出列操作之前执行。
在类库中提供的其他Happens-Before排序包括:

  • 将一个元素放入一个线程安全容器的操作将在另一个线程从该容器中获得这个元素的操作之前执行。
  • 在CountDownLatch上的倒数操作将在线程从闭锁上的await方法中返回之前执行。
  • Future表示的任务的所有操作将在从Future.get中返回之前执行。
  • 向Executor提交一个Runnable或Callable的操作将在任务开始执行之前执行。
  • 一个线程到达CyclicBarrier或Exchanger的操作将在其他到达该栅栏或交换点的线程被释放之前执行。如果CyclicBarrier使用一个栅栏操作,那么到达栅栏的操作将在栅栏操作之前执行,而栅栏操作又会在线程从栅栏中释放之前执行。

16.2 发布

第3章介绍了如何安全地或者不正确地发布一个对象。对于其中介绍的各种安全技术,它们的安全性都来自于JMM提供的保证,而造成不正确发布的真正原因,就是在“发布一个共享对象”与“另一个线程访问该对象”之间缺少一种Happens-Before排序。

16.2.1 不安全的发布

当缺少Happens-Before关系时,就可能出现重排序问题,这就解释了为什么在没有充分同步的情况下发布一个对象会导致另一个线程看到一个只被部分构造的对象(请参见3.5节)。在初始化一个新的对象时需要写入多个变量,即新对象中的各个域。同样,在发布一个引用时也需要写入一个变量,即新对象的引用。如果无法确保发布共享引用的操作在另一个线程加载该共享引用之前执行,那么对新对象引用的写入操作将与对象中各个域的写入操作重排序(从使用该对象的线程的角度来看)。在这种情况下,另一个线程可能看到对象引用的最新值,但同时也将看到对象的某些或全部状态中包含的是无效值,即一个被部分构造对象。
  错误的延迟初始化将导致不正确的发布,如程序清单16-3所示。初看起来,在程序中存在的问题只有在2.2.2节中介绍的竞态条件问题。在某些特定条件下,例如当Resource的所有实例都相同时,你或许会忽略这些问题(以及在多次创建Resource实例时存在的低效率问题)。然而,即使不考虑这些问题,UnsafeLazyInitialization仍然是不安全的,因为另一个线程可能看到对部分构造的Resource实例的引用。
  程序清单16-3 不安全的延迟初始化(不要这么做)

@NotThreadSafe
public class UnsafeLazyInitialization {private static Resource resource;public static Resource getInstance() {if (resource == null) {return new Resource(); // 不安全的发布}return resource;}
}

假设线程A是第一个调用getInstance的线程。它将看到resource为null,并且初始化一个新的Resource,然后将resource设置为执行这个新实例。当线程B随后调用getInstance,它可能看到resource的值为非空,因此使用这个已经构造好的Resource。最初这看不出任何问题,但线程A写入resource的操作与线程B读取resource的操作之间不存在Happens-Before关系。在发布对象时存在数据竞争问题,因此B并不一定能看到Resource的正确状态。
  当新分配一个Resource时,Resource的构造函数将把新实例中的各个域由默认值(由Object构造函数写入的)修改为它们的初始值。由于在两个线程中都没有使用同步,因此线程B看到的线程A中的操作顺序,可能与线程A执行这些操作时的顺序并不相同。因此,即使线程A初始化Resource实例之后再将resource设置为指向它,线程B仍可能看到对resource的写入操作将在对Resource各个域的写入操作之前发生。因此,线程B就可能看到一个被部分构造的Resource实例,该实例可能处于无效状态,并在随后该实例的状态可能出现无法预料的变化。
  除了不可变对象以外,使用被另一个线程初始化的对象通常都是不安全的,除非对象的发布操作是在使用该对象的线程开始使用之前执行。

16.2.2 安全的发布

第3章介绍的安全发布常用模式可以确保被发布对象对于其他线程是可见的,因为它们保证发布对象的操作将在使用对象的线程开始使用该对象的引用之前执行。如果线程A将X放入BlockingQueue(并且随后没有线程修改它),线程B从队列中获取X,那么可以确保B看到的X与A放入的X相同。这是因为在BlockingQueue的实现中有足够的内部同步确保了put方法在take方法之前执行。同样,通过使用一个由锁保护共享变量或者使用共享的volatile类型变量,也可以确保对该变量的读取操作和写入操作按照Happens-Before关系来排序。
  事实上,Happens-Before比安全发布提供了更强可见性与顺序保证。如果将X从A安全地发布到B,那么这种安全发布可以保证X状态的可见性,但无法保证A访问的其他变量的状态可见性。然而,如果A将X置入队列的操作在线程B从队列中获取X的操作之前执行,那么B不仅能看到A留下的X状态(假设线程A或其他线程都没有对X再进行修改),而且还能看到A在移交X之前所做的任何操作(再次注意同样的警告)。【JMM确保B至少可以看到A写入的最新值,而对于随后写入的值,B可能看到也可能看不到。】
  既然JMM已经提供了这种更强大的Happens-Before关系,那么为什么还要介绍@GuardedBy和安全发布呢?与内存写入操作的可见性相比,从转移对象的所有权以及对象公布等角度来看,它们更符合大多数的程序设计。Happens-Before排序是在内存访问级别上操作的,它是一种“并发级汇编语言”,而安全发布的运行级别更接近程序设计。

16.2.3 安全初始化模式

有时候,我们需要推迟一些高开销的对象初始化操作,并且只有当使用这些对象时才进行初始化,但我们也看到了在误用延迟初始化时导致的问题。在程序清单16-4中,通过将getResource方法声明为synchronized,可以修复UnsafeLazyInitialization中的问题。由于getInstance的代码路径很短(只包括一个判断预见和一个预测分支),因此如果getInstance没有被多个线程频繁调用,那么在SafeLazyInitialization上不会存在激烈的竞争,从而能提供令人满意的性能。
  程序清单16-4 线程安全的延迟初始化

@ThreadSafe
public class SafeLazyInitialization {private static Resource resource;public synchronized static Resource getInstance() {if (resource == null) {source = new Resource();}return resource;}
}

在初始器中采用了特殊的方式来处理静态域(或者在静态初始化代码块中初始化的值[JPL 2.2.1和2.5.3]),并提供了额外的线程安全性保证。静态初始化器是由JVM在类的初始化阶段执行,即在类被加载后并且被线程使用之前。由于JVM将在初始化期间获得一个锁[JLS 12.4.2],并且每个线程都至少获取一次这个锁以确保这个类已经加载,因此在静态初始化期间,内存写入操作将自动对所有线程可见。因此无论是在被构造期间还是被引用时,静态初始化的对象都不需要显式的同步。然而,这个规则仅适用于在构造时的状态,如果对象是可变的,那么在读线程和写线程之间仍然需要通过同步来确保随后的修改操作是可见的,以及避免数据破坏。
  如程序清单16-5所示,通过使用提前初始化(Eager Initialization),避免了在每次调用SafeLazyInitialization中的getInstance时所产生的同步开销。通过将这项技术和JVM的延迟加载机制结合起来,可以形成一种延迟初始化技术,从而在常见的代码路径中不需要同步。在程序清单16-6的“延迟初始化占位(Holder)类模式”[EJ Item 48]中使用了一个专门的类来初始化Resource。JVM将推迟ResourceHolder的初始化操作,直到开始使用这个类时才初始化[JLS 12.4.1],并且由于通过一个静态初始化来初始化Resource,因此不需要额外的同步。当任何一个线程第一次调用getResource时,都会使ResourceHolder被加载和被初始化,此时静态初始化器将执行Resource的初始化操作。
  程序清单16-5 提前初始化

@ThreadSafe
public class Eagerinitialization {private static Resource resource = new Resource();public static Resource getInstance() { return resouce; }
}

程序清单16-6 延长初始化占位类模式

@ThreadSafe
public class ResourceFactory {private static class ResourceHolder {public static Resource resource = new Resource();}public static Resource getResource() {return ResourceHolder.resource;}
}

16.2.4 双重检查枷锁

在任何一本介绍并发的书中都会讨论声名狼藉的双重检查加锁(DCL),如程序清单16-7所示。在早期的JVM中,同步(甚至是无竞争的同步)都存在着巨大的性能开销。因此,人们想出了许多“聪明的(或者至少看上去聪明)”技巧来降低同步的影响,有些技巧很好,但也有些技巧是不好的,甚至是糟糕的,DCL就属于“糟糕”的一类。
  程序清单16-7 双重检查加锁(不要这么做)

@NotThreadSafe
public class DoubleCheckedLocking {private static Resource resource;public static Resource getInstance() {if (resource == null) {synchronized(DoubleCheckedLocking.class) {if (resource == null) {resource = new Resource();}}}return resource;}
}

由于早期的JVM在性能上存在一些有待优化的地方,因此延迟初始化经常被用来避免不必要的高开销操作,或者降低程序的启动时间。在编写正确的延迟初始化方法中需要使用同步。但在当时,同步不仅执行速度很慢,并且更重要的是,开发人员还没有完全理解同步的含义:虽然人们能很好地理解了“独占性”的含义,但却没有很好地理解“可见性”的含义。
  DCL声称能实现两全其美——在常见代码路径上的延迟初始化中不存在同步开销。它的工作原理是,首先检查是否在没有同步的情况下需要初始化,如果resource引用不为空,那么就直接使用它。否则,就进行同步并再次检查Resource是否被初始化,从而保证只有一个线程对共享的Resource执行初始化。在常见的代码路径中——获取一个已构造好的Resource引用,并没有使用同步。这就是问题所在:在16.2.1节中介绍过,线程可能看到一个仅被部分构造的Resource。
  DCL的真正问题在于:当在没有同步的情况下读取一个共享对象时,可能发生的最糟糕事情只是看到一个失效值(在这种情况下是一个空值),此时DCL方法将通过在持有锁的情况下再次尝试来避免这种风险。然而,实际情况远比这种情况糟糕——线程可能看到引用的当前值,但对象的状态值却是失效的,这意味着线程可以看到对象处于无效或错误的状态。
  在JMM的后续版本(Java 5.0以及更高的版本)中,如果把resource声明为volatile类型,那么就能启用DCL,并且这种方式对性能的影响很小,因为volatile变量读取操作的性能通常只是略高于非volatile变量读取操作的性能。然而,DCL的这种使用方法已经被广泛地废弃了——促使该模式出现的驱动力(无竞争同步的执行速度很慢,以及JVM启动时很慢)已经不复存在,因而它不是一种高效的优化措施。延迟初始化占位类模式能带来同样的优势,并且更容易理解。

16.3 初始化过程中的安全性

如果能确保初始化过程的安全性,那么就可以使得被正确构造的不可变对象在没有同步的情况下也能安全地在多个线程之间共享,而不管它们是如何发布的,甚至通过某种数据竞争来发布。(这意味着,如果Resource是不可变的,那么UnsafeLazyInitialization实际上是安全的。)
  如果不能确保初始化的安全性,那么当在发布或线程中没有使用同步时,一些本应为不可变对象(例如String)的值将会发生改变。安全性架构依赖于String的不可变性,如果缺少了初始化安全性,那么可能会导致一个安全漏洞,从而使恶意代码绕过安全检查。
  初始化安全性将确保,对于被正确构造的对象,所有线程都能看到由构造函数为对象给各个final域设置的正确值,而不管采用何种方式来发布对象。而且,对于可以通过被正确构造对象中某个final域到达的任意变量(例如某个final数组中的元素,或者由一个final域引用的HashMap的内容)将同样对于其他线程是可见的。【这仅仅适用于那些在构造过程中从对象的final域出发可以到达的对象。】
  对于含有final域的对象,初始化安全性可以防止对对象的初始引用被重排序到构造过程之前。当构造函数完成时,构造函数对final域的所有写入操作,以及对通过这些域可以到达的任何变量的写入操作,都将被“冻结”,并且任何获得该对象引用的线程都至少能确保看到被冻结的值。对于通过final域可到达的初始变量的写入操作,将不会与构造过程后的操作一起被重排序。
  初始化安全性意味着,程序清单16-8的SafeStates可以安全地发布,即便通过不安全的延迟初始化,或者在没有同步的情况下将SafeStates的引用放到一个公有的静态域,或者没有使用同步以及依赖于非线程安全的HashSet。
  程序清单16-8 不可变对象的初始化安全性

@ThreadSafe
public class SafeStates {private final Map<String, String> states;public SafeStates() {states = new HashMap<>();states.put("alaska", "AK");states.put("alabama", "AL");// ... states.put("wyoming", "WY");}public String getAbbreviation(String s) {return states.get(s);}
}

然而,许多对SafaStates的细微修改都可能破坏它的线程安全性。如果states不是final类型,或者存在除构造函数以外的其他方法能修改states,那么初始化安全性将无法确保在缺少同步的情况下安全地访问SafeStates。如果在SafeStates中还有其他的非final域,那么其他线程仍然可能看到这些域上的不正确的值。这也导致了对象在构造过程中逸出,从而使初始化安全性的保证无效。
  初始化安全性只能保证通过final域可达的值从构造过程完成时开始的可见性。对于通过非final域可达的值,或者在构成过程完成后可能改变的值,必须采用同步来确保可见性。

小结

Java内存模型说明了某个线程的内存操作在哪些情况下对于其他线程是可见的。其中包括确保这些操作是按照一种Happens-Before的偏序关系进行排序,而这种关系是基于内存操作和同步操作等级别来定义的。如果缺少充足的同步,那么当线程访问共享数据时,会发生一些非常奇怪的问题。然而,如果使用第2章与第3章介绍的更高级规则,例如@GuardedBy和安全发布,那么即使不考虑Happens-Before的底层细节,也能确保线程安全性。

《Java并发编程实战》【第四部分 高级主题】相关推荐

  1. aqs clh java_【Java并发编程实战】—– AQS(四):CLH同步队列

    在[Java并发编程实战]-–"J.U.C":CLH队列锁提过,AQS里面的CLH队列是CLH同步锁的一种变形. 其主要从双方面进行了改造:节点的结构与节点等待机制.在结构上引入了 ...

  2. 《Java 并发编程实战》--读书笔记

    Java 并发编程实战 注: 极客时间<Java 并发编程实战>–读书笔记 GitHub:https://github.com/ByrsH/Reading-notes/blob/maste ...

  3. Java并发编程实战————Executor框架与任务执行

    引言 本篇博客介绍通过"执行任务"的机制来设计应用程序时需要掌握的一些知识.所有的内容均提炼自<Java并发编程实战>中第六章的内容. 大多数并发应用程序都是围绕&qu ...

  4. Java并发编程实战_不愧是领军人物!这种等级的“Java并发编程宝典”谁能撰写?...

    前言 大家都知道并发编程技术就是在同一个处理器上同时的去处理多个任务,充分的利用到处理器的每个核心,最大化的发挥处理器的峰值性能,这样就可以避免我们因为性能而产生的一些问题. 大厂的核心负载肯定是非常 ...

  5. 视频教程-Java并发编程实战-Java

    Java并发编程实战 2018年以超过十倍的年业绩增长速度,从中高端IT技术在线教育行业中脱颖而出,成为在线教育领域一匹令人瞩目的黑马.咕泡学院以教学培养.职业规划为核心,旨在帮助学员提升技术技能,加 ...

  6. 【极客时间】《Java并发编程实战》学习笔记

    目录: 开篇词 | 你为什么需要学习并发编程? 内容来源:开篇词 | 你为什么需要学习并发编程?-极客时间 例如,Java 里 synchronized.wait()/notify() 相关的知识很琐 ...

  7. Java并发编程实战之互斥锁

    文章目录 Java并发编程实战之互斥锁 如何解决原子性问题? 锁模型 Java synchronized 关键字 Java synchronized 关键字 只能解决原子性问题? 如何正确使用Java ...

  8. Java并发编程实战笔记2:对象的组合

    设计线程安全的类 在设计现车让安全类的过程之中,需要包含以下三步: 找出构成对象状态的所有变量 找出约束状态变量的不变性条件 建立对象状态的并发访问策略 实例封闭 通过封闭机制与合适的加锁策略结合起来 ...

  9. java 多线程缓存_[Java教程]【JAVA并发编程实战】12、使用condition实现多线程下的有界缓存先进先出队列...

    [Java教程][JAVA并发编程实战]12.使用condition实现多线程下的有界缓存先进先出队列 0 2016-11-29 17:00:10 package cn.study.concurren ...

  10. Java并发编程实战————恢复中断

    中断是一种协作机制,一个线程不能强制其他线程停止正在执行的操作而去执行其他操作. 什么是中断状态? 线程类有一个描述自身是否被中断了的boolean类型的状态,可以通过调用 .isInterrupte ...

最新文章

  1. 凶猛的飞禽 超跑奥迪
  2. 这是一篇“团队”博客
  3. installation of igraph for python2.7
  4. Java的org.apache.commons.lang3.StringUtils
  5. sql2008安装时提示参数不能为空_Java Validation API,实现参数的合法性校验
  6. java cxf 双向通讯_CXF 在spring boot 2 发布多个服务
  7. PS特效:图像碎片化
  8. Exchange Server 2010 SP3部署
  9. spring boot系列教程2--从helloworld开始
  10. 浅析foreach原理
  11. 聊聊springboot2的embeded container的配置改动 1
  12. python遥感图像开发小软件_遥感影像深度学习标注软件的开发要点
  13. matlab中m文件的命名规则 转,Matlab中m文件命名规则
  14. python pyplot颜色_matplotlib制图——颜色和样式
  15. Chapter 3 Raster Images
  16. STM32MP157C-DK2 开机测试
  17. React---关于useCallback和useMemo的详解
  18. 2023,VC投资的分水岭
  19. 【JavaWeb】9—监听器
  20. 微服务架构实践心得小结

热门文章

  1. GJB 150.10军用设备环境试验方法霉菌试验
  2. 软件dfmea_最全最专业解析!详解DFMEA新版六步法~fmea软件
  3. 数据库常用的sql语句大全--sql
  4. 《那些年啊,那些事——一个程序员的奋斗史》四
  5. 【MFC开发(9)】列表控件List Box
  6. PHP源码分析-hex2bin函数源码分析
  7. iperf 服务端发送数据_iperf3使用方法详解
  8. 如何安装JAVASE平台
  9. android+解锁工具,安卓手机解锁助手 (A Unlock Tool)
  10. C语言 - 汉诺塔详解(超详细)