synchronized的用法和实现原理

  • synchronized实现线程同步的用法和实现原理

不足

  • synchronized在线程同步的使用方面,优点是使用简单,可以自动加锁和解锁,但是也存在一些不足:

    1. synchronized是阻塞的,不支持非阻塞,中断和超时退出特性;
    2. synchronized是互斥锁,不支持多个线程对资源的共享访问,如多个读线程进行并发读;
    3. 当多个方法共享多个monitor时,要注意使用synchronized加锁的顺序,否则容易产生死锁;
    4. synchronized只支持基于monitor这个对象来进行线程之间条件化通信,即多个线程只能基于一个monitor的wait,notify,notifyAll来进行线程之间的通信,不够灵活,如读写线程之间无法区分;同时由于只有monitor对象的一个等待队列,故所有等待线程均处于该等待队列中,无法区分出是生产者线程还是消费者线程,所以当需要唤醒某个线程时,需要调用notifyAll来唤醒所有线程,否则如果使用notify则可能会出现假死,如本来想唤醒生产者线程,而notify了一个消费者线程,则此时系统就出现“假死”了;
    5. synchronized基于操作系统的Metux Lock来实现,线程之间的切换需要进行上下文切换,成本较高,性能较低。
  • 所以为了解决以上问题,在JDK1.5中提供了Lock和Condition接口来实现synchronized和监视器monitor的功能,核心实现为ReentrantLock。

ReentrantLock:可重入锁

设计目的

  • ReentrantLock也是一个可重入的互斥锁,跟synchronized提供一样的线程同步功能,但是比synchronized更加灵活,即优化了以上所说的synchronized的不足。

  • ReentrantLock是基于AQS来实现线程同步功能的:

    • AQS提供了线程等待队列的实现,ReentrantLock自定义线程同步状态state来实现互斥锁的功能即可,即state等于0,表示当前没有任何线程占用这个锁,state大于0,表示当前存在线程占用这个锁,且该占用线程每访问一个使用该锁同步的方法,则state递增1,实现可重入,这个实现逻辑是跟synchronized一样的;
    • AQS是通过使用自旋和UNSAFE提供的CAS硬件级别的原子操作来对线程等待队列进行增删节点来实现线程的切换的,整个过程为无锁操作,即不需要依赖于操作系统的Metux Lock来实现,故不需要进行线程上下文切换,提高了性能。
  • 在使用方面,与synchronized的自动加锁解锁不同的是,ReentrantLock是需要在应用代码中显式进行加锁和解锁操作的,通常需要结合try-finally来实现,避免异常是无法解锁,如下:

    class X {private final ReentrantLock lock = new ReentrantLock();// ...public void m() {lock.lock();  // block until condition holdstry {// ... method body} finally {lock.unlock()}}
    }
    

实现

  • ReentrantLock基于AQS实现,故需要使用一个内部类来实现AQS接口,提供Syn同步锁的功能。在AQS实现类中,主要是需要实现tryAcquire方法定义是否可以成功获取锁,实现tryRelease方法定义释放锁。
  • 对于获取锁tryAcquire,ReentrantLock提供了公平和非公平锁两个实现,默认为非公平锁,公平的含义是根据线程请求获取锁的先后顺序来获取锁,即利用了FIFO队列的特性;非公平的含义是每个请求获取锁的线程在需要锁时,先请求一下是否可以获取锁,如果无法获取,则再放入FIFO队列中。
请求获取锁tryAcquire
  • 公平版本:

    /*** Fair version of tryAcquire.  Don't grant access unless* recursive call or no waiters or is first.*/
    // 判断当前线程是否可以访问共享资源,返回true或false,
    // 在AQS的acquire方法中定义了模板实现,即调用了tryAcquire,
    // 该方法返回false,则在acquire中调用acquireQueued方法将当前线程节点放入等待队列中
    protected final boolean tryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {if (!hasQueuedPredecessors() &&compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 递增state,实现可重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0)throw new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
    }
    
  • 非公平版本:

    /*** Sync object for non-fair locks*/
    static final class NonfairSync extends Sync {private static final long serialVersionUID = 7316153563782823691L;/*** Performs lock.  Try immediate barge, backing up to normal* acquire on failure.*/final void lock() {// 先尝试获取,实现非公平if (compareAndSetState(0, 1))setExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}protected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}
    }// 在基类Sync中定义,Sync继承AbstractQueuedSynchronizer
    /*** Performs non-fair tryLock.  tryAcquire is implemented in* subclasses, but both need nonfair try for trylock method.*/
    final boolean nonfairTryAcquire(int acquires) {final Thread current = Thread.currentThread();int c = getState();if (c == 0) {// 先尝试获取,失败返回false则入队,实现非公平if (compareAndSetState(0, acquires)) {setExclusiveOwnerThread(current);return true;}}// 递增state,实现可重入else if (current == getExclusiveOwnerThread()) {int nextc = c + acquires;if (nextc < 0) // overflowthrow new Error("Maximum lock count exceeded");setState(nextc);return true;}return false;
    }
    

释放锁tryRelease

  • 在基类Sync中定义,Sync继承AbstractQueuedSynchronizer:递减state直到0

    protected final boolean tryRelease(int releases) {// 递减stateint c = getState() - releases;if (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;// state等于0,则释放锁if (c == 0) {free = true;setExclusiveOwnerThread(null);}setState(c);return free;
    }
    

各个版本的lock加锁

  1. 阻塞加锁直到获取锁为止,与synchronized语义一样,不支持中断、超时:

    /*** Acquires the lock.** <p>Acquires the lock if it is not held by another thread and returns* immediately, setting the lock hold count to one.** <p>If the current thread already holds the lock then the hold* count is incremented by one and the method returns immediately.** <p>If the lock is held by another thread then the* current thread becomes disabled for thread scheduling* purposes and lies dormant until the lock has been acquired,* at which time the lock hold count is set to one.*/
    public void lock() {sync.lock();
    }
    
  2. 阻塞可中断版本:

    public void lockInterruptibly() throws InterruptedException {sync.acquireInterruptibly(1);
    }
    
  3. 非阻塞版本:非阻塞,非公平,即使使用的是公平锁,能获取锁则返回true,否则返回false。

    public boolean tryLock() {return sync.nonfairTryAcquire(1);
    }
    
  4. 阻塞可超时版本:阻塞指定时间,若在该指定时间到达之后,还没获取锁,则返回false:

    public boolean tryLock(long timeout, TimeUnit unit)throws InterruptedException {return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    

基于Condition实现生产者消费者模型

  • Condition在Lock体系设计中,用于实现与synchronized中monitor对象所提供的wait,notify,notifyAll相同的语义,对应的方法分别为await,signal,signalAll。在此基础上进行的优化是:一个Lock可以对应多个Condition,每个Condition对应一个条件化线程等待队列,而在synchronized中只能使用monitor这一个Condition。
  • 一个Lock支持多个Condition的好处是:可以将等待线程进行分类,即每个Condition对应一个条件化线程等待队列,而不是全部放在一个条件化线程等待队列,这样每个Condition在条件满足时,可以调用signal或者signalAll来通知该Condition对应的条件化线程等待队列的线程,而不是所有线程:
    • 这样可以在一定程度上优化性能,特别是signalAll,只需通知对应的条件化线程等待队列即可,让这个线程子集去竞争Lock锁,其他Condition的条件化等待队列中的线程继续休眠;
    • 其次对于signal的调用,可以“精确”通知到该Condition对应的条件化线程等待队列中的一个线程,从而避免了在synchronized中的可能出现“假死”的问题:如在生产者消费者模型中,当生产者往数据队列放入数据后,基于Condition的实现中,可以通知和唤醒消费者线程等待队列的一个线程去数据队列读取数据了,而在synchronized的实现中,由于没有对线程进行区分,故可能通知到线程等待队列的一个生产者线程,如果此时数据队列满了,则该生产者线程被唤醒后发现数据队列还是满了,则继续休眠,此后则没有生产者线程来通知消费者线程消费数据了,整个生产者消费者体系就“假死”了,即生产者无法填充数据,消费者不知道有数据可读继续休眠,所以在synchronized中通常需要使用notifyAll来避免这种情况发生,唤醒所有线程去竞争锁。
    • 所以有了Condition之后,只需要调用condition的signal就看准确唤醒对应某个线程,如生产者线程或者消费者线程,而不需要调用signalAll方法,像synchronized一样调用monitor对象的notifyAll唤醒所有线程,从而提高了性能。具体可以参考LinkedBlockingQueue的实现源码:JDK1.8源码分析:阻塞队列LinkedBlockingQueue与BlockingDeque(双端)的设计与实现

await等待的实现

  • synchronized的wait的语义为:线程占有锁,发现条件不满足,释放锁,线程阻塞休眠,进入条件化等待队列,等待其他线程notify唤醒。

  • Lock的await的调用也是跟synchronized的wait一样,首先对应的线程需要获取Lock锁进入同步代码,即如果方法没有先调用如lock.lock()获取锁的情况下,调用await了,则会抛IllegalMonitorStateException异常。

  • await对synchronized的wait的语义实现如下:将当前线程放入条件化等待队列,然后释放锁,在while循环内阻塞休眠,直到被放到AQS同步队列了,这时说明条件满足了,可以去竞争获取锁了,通过调用acquireQueued去竞争获取锁。如果获取锁成功了,则真正从await返回。

    /*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,*      throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of*      {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/
    public final void await() throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();// 进入条件化等待队列Node node = addConditionWaiter();// 释放锁int savedState = fullyRelease(node);int interruptMode = 0;// 自旋检查是否被放到同步队列了while (!isOnSyncQueue(node)) {// 阻塞休眠线程LockSupport.park(this);// 在等待队列中被中断,则退出等待if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}// 被其他线程通过signal唤醒了,则放到同步队列合适的位置,// 等待重新获取锁,从而从该await方法返回,继续执行if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();// 唤醒被获取锁之后,再检查是否之前被中断过,补上中断if (interruptMode != 0)reportInterruptAfterWait(interruptMode);
    }
    
  • 在应用代码中,await通常需要在while循环中检查条件是否满足,只有对应线程被唤醒,获取锁成功,然后再在while循环检查条件是否满足,如果满足,则继续执行,因为此时只有当前线程占有锁,不会出现并发修改导致条件不满足,如下为LinkedBlockingQueue的put的实现:

    public void put(E e) throws InterruptedException {if (e == null) throw new NullPointerException();// Note: convention in all put/take/etc is to preset local var// holding count negative to indicate failure unless set.int c = -1;Node<E> node = new Node<E>(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;putLock.lockInterruptibly();try {/** Note that count is used in wait guard even though it is* not protected by lock. This works because count can* only decrease at this point (all other puts are shut* out by lock), and we (or some other waiting put) are* signalled if it ever changes from capacity. Similarly* for all other uses of count in other wait guards.*/// 阻塞直到数据队列存在空间可以存放数据while (count.get() == capacity) {notFull.await();}enqueue(node);c = count.getAndIncrement();// 可能同时存在多个生产者在写,故通知下一个生产者继续写if (c + 1 < capacity)notFull.signal();} finally {putLock.unlock();}// 可能之前为空,故写数据进去之后分空了,故通知等待非空的消费者来读取if (c == 0)signalNotEmpty();
    }
    
各个版本的await等待条件满足
  1. 可中断阻塞等待

    /*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,*      throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of*      {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/
    public final void await() throws InterruptedException
    
  2. 可中断,可超时阻塞等待:分别为基于纳秒,指定日期,自定义时间单位的版本

    /*** Implements timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,*      throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of*      {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/
    public final long awaitNanos(long nanosTimeout)throws InterruptedException/*** Implements absolute timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,*      throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of*      {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* <li> If timed out while blocked in step 4, return false, else true.* </ol>*/
    public final boolean awaitUntil(Date deadline) throws InterruptedException/*** Implements timed condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,*      throwing IllegalMonitorStateException if it fails.* <li> Block until signalled, interrupted, or timed out.* <li> Reacquire by invoking specialized version of*      {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* <li> If timed out while blocked in step 4, return false, else true.* </ol>*/
    public final boolean await(long time, TimeUnit unit) throws InterruptedException
    

signal通知的实现

  • signal主要是当前占有锁正在执行的线程,在条件满足时,通知和唤醒该Condition对应的条件化等待队列的一个线程,让该线程去竞争获取锁,然后继续执行。

  • Condition的signal实现主要是将Condition对应的条件化等待队列的头结点移到到AQS的同步队列中,具体为移动到同步队列的尾部,这样这个节点对应的线程就可以去竞争锁了。如下:

    /*** Moves the longest-waiting thread, if one exists, from the* wait queue for this condition to the wait queue for the* owning lock.** @throws IllegalMonitorStateException if {@link #isHeldExclusively}*         returns {@code false}*/
    public final void signal() {if (!isHeldExclusively())throw new IllegalMonitorStateException();// 条件化等待队列的头结点Node first = firstWaiter;if (first != null)doSignal(first);
    }/*** Removes and transfers nodes until hit non-cancelled one or* null. Split out from signal in part to encourage compilers* to inline the case of no waiters.* @param first (non-null) the first node on condition queue*/
    private void doSignal(Node first) {do {if ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;first.nextWaiter = null;} while (!transferForSignal(first) &&(first = firstWaiter) != null);
    }/*** Transfers a node from a condition queue onto sync queue.* Returns true if successful.* @param node the node* @return true if successfully transferred (else the node was* cancelled before signal)*/
    final boolean transferForSignal(Node node) {/** If cannot change waitStatus, the node has been cancelled.*/// 将节点的waitStatus从CONDITION修改为0,// 0是添加到同步队列的节点的初始化状态if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;/** Splice onto queue and try to set waitStatus of predecessor to* indicate that thread is (probably) waiting. If cancelled or* attempt to set waitStatus fails, wake up to resync (in which* case the waitStatus can be transiently and harmlessly wrong).*/// 将该节点添加到同步队列尾部,// 并将前置节点的waitStatus设置为SIGNAL,// 从而使得该前置节点知道后面有节点在排队等待获取锁// 在应用代码中,当执行完signal之后,一般接下来的代码为lock.unlock()释放锁,// unlock内部调用release方法,从这个同步队列唤醒下一个执行的线程,// 所以当前这个刚刚被移到同步队列的线程就可能是这个执行的线程了。Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;
    }
    
  • 在使用方面,signal不需要在while循环中,因为调用signal的线程是当前占有锁,正在执行的线程。

为什么await,signal,signalAll需要在获取lock锁的前提下调用?

  • 与monitor对象的wait,notify,notifyAll需要在synchronized同步的方法或者方法块内执行一样,Condition的await,signal,signalAll需要在获取lock锁的前提下调用,否则会抛IllegalMonitorStateException一次:因为Condition的条件化等待队列中的线程在唤醒时,是被移动到Lock的同步队列中,然后与其他同步队列中的线程一样竞争获取Lock锁,故需要在获取lock锁的前提下才能调用。

JDK1.8源码分析:可重入锁ReentrantLock和Condition的实现原理相关推荐

  1. 精尽 Redisson 源码分析 —— 可重入分布式锁 ReentrantLock

    1. 概述 在 Redisson 中,提供了 8 种分布锁的实现,具体我们可以在 <Redisson 文档 -- 分布式锁和同步器> 中看到.绝大数情况下,我们使用可重入锁(Reentra ...

  2. 【JUC】JDK1.8源码分析之ArrayBlockingQueue(三)

    一.前言 在完成Map下的并发集合后,现在来分析ArrayBlockingQueue,ArrayBlockingQueue可以用作一个阻塞型队列,支持多任务并发操作,有了之前看源码的积累,再看Arra ...

  3. 【集合框架】JDK1.8源码分析之HashMap(一)

    转载自  [集合框架]JDK1.8源码分析之HashMap(一) 一.前言 在分析jdk1.8后的HashMap源码时,发现网上好多分析都是基于之前的jdk,而Java8的HashMap对之前做了较大 ...

  4. 闲聊AQS面试和源码解读---可重入锁、LockSupport、CAS;从ReentrantLock源码来看公平锁与非公平锁、AQS到底是怎么用CLH队列来排队的?

    AQS原理可谓是JUC面试中的重灾区之一,今天我们就来一起看看AQS到底是什么? 这里我先整理了一些JUC面试最常问的问题? 1.Synchronized 相关问题以及可重入锁 ReentrantLo ...

  5. 【集合框架】JDK1.8源码分析HashSet LinkedHashSet(八)

    一.前言 分析完了List的两个主要类之后,我们来分析Set接口下的类,HashSet和LinkedHashSet,其实,在分析完HashMap与LinkedHashMap之后,再来分析HashSet ...

  6. Java 重入锁 ReentrantLock 原理分析

    1.简介 可重入锁ReentrantLock自 JDK 1.5 被引入,功能上与synchronized关键字类似.所谓的可重入是指,线程可对同一把锁进行重复加锁,而不会被阻塞住,这样可避免死锁的产生 ...

  7. Java多线程系列——深入重入锁ReentrantLock

    简述 ReentrantLock 是一个可重入的互斥(/独占)锁,又称为"独占锁". ReentrantLock通过自定义队列同步器(AQS-AbstractQueuedSychr ...

  8. java中多线程reentlock_Java多线程系列——深入重入锁ReentrantLock

    简述 ReentrantLock 是一个可重入的互斥(/独占)锁,又称为"独占锁". ReentrantLock通过自定义队列同步器(AQS-AbstractQueuedSychr ...

  9. 并发编程-19AQS同步组件之重入锁ReentrantLock、 读写锁ReentrantReadWriteLock、Condition

    文章目录 J.U.C脑图 ReentrantLock概述 ReentrantLock 常用方法 synchronized 和 ReentrantLock的比较 ReentrantLock示例 读写锁R ...

最新文章

  1. 【GStreamer】gstreamer工具详解之:gst-inspect-1.0
  2. 基带工程师是做什么的_【思唯网络学院】网络工程师认证可以用来做什么?
  3. 符号常量和变量有什么区别_“变量”和“常量”,计算机程序中的那个“量”是什么“量”...
  4. 多线程计算0-100 0-200 的和
  5. mysql游标事例_MySQL游标语法实例
  6. bootstrap 输入错误提示_win7系统提示explorer.exe应用程序错误怎么办
  7. 《零基础》MySQL WHERE 子句(十三)
  8. request获取各种路径记录
  9. hdu-1542 Atlantis(离散化+线段树+扫描线算法)
  10. Windows 安装JDK
  11. matplotlib.pyplot库解析
  12. GIS招聘 | 云南省自然资源厅所属事业单位
  13. C#:实现一个将字符串转换为整数的方法
  14. 华为老员工谈华为终端的来龙去脉
  15. 表情识别(七)--面部表情识别阶段综述(2018.4)
  16. shiroFilter生命周期
  17. 任意多边形的面积计算
  18. mysqlbinlog 加-v -vv 的区别
  19. 计算机专业新老生交流会ppt,新老生交流会.ppt
  20. 广州大学学生实验报告,进程控制与进程通信

热门文章

  1. 【数学建模】多元回归分析模型(评价与决策)
  2. 制作pve引导盘---U盘安装Proxmox VE(一)
  3. (7)centos7 同步服务器时间
  4. C#京东模拟注册~滑块轨迹算法~EID~密码RSA加密
  5. 2022危化品企业双重预防机制数字化建设成为迫切任务
  6. python中成语接龙游戏_python——成语接龙小游戏
  7. 小米红米6Pro解BL锁教程申请BootLoader解锁教程
  8. 简易版人生重开模拟器开发(python版)
  9. 《Python与硬件项目案例》— 基于Python与指纹模块AS608的指纹识别签到考勤系统(下篇)(期末大作业、课程设计、毕业设计、结课项目)
  10. 塑造成功性格的15种方法