我们知道各种并发框架如CountDownLatch、CyclicBarrier和Semaphore是基于AQS (AbstractQueuedSynchronizer)框架实现的,AQS框架借助于两个类:

  • Unsafe(提供CAS操作) //JDK9以后引入了VarHandle变量句柄,代替了Unsafe
  • LockSupport(提供park/unpark操作)

而LockSupport的park和unpark的实现是依赖于Unsafe类的prak和unpark的。重载方法中可以传入一个blocker对象,在dump线程时能获得更多的信息,用于问题排查或系统监控。

那么Unsafe类是个什么东西呢?Unsafe的全限定名是sun.misc.Unsafe。在源码的注释中我们可以看到:执行低级、不安全操作的方法集合。从名字就知道它是不安全的,它的功能有很多,比如:volatile的读写一个变量的field,有序的写一个变量的field,直接内存操作:申请内存,释放内存,CAS的修改变量等。

本文主要说说park和unpark方法。最简单的理解:park阻塞一个线程,unpark唤醒一个线程。

核心设计原理:“许可”。park方法本质是消费许可,如果没有可消费的许可,那么就阻塞当前线程,一直等待,直到阻塞线程的unpark方法被其他线程调用,然后消费许可,当前线程被唤醒,继续执行。unpark方法本质是生产许可,一个线程刚创建出来,然后运行,此时是没有许可的,所以unpark方法可以在park方法前调用。下次park方法调用时,直接消费许可,线程不用阻塞等待许可。许可最多只有一个,连续多次调用unpark只能生产一个许可。

park方法有几种重载的形式,可以设置等待时间,等待时间可以设置为绝对的或者相对的,超过等待时间,线程会自动被唤醒。

底层实现原理:

在Linux系统下,是用的Posix线程库pthread中的mutex(互斥量),condition(条件变量)来实现的。
mutex和condition保护了一个_counter的变量,简单点说:当park时,这个变量被设置为0,当unpark时,这个变量被设置为1。

park源码跟踪

park的声明形式有一下两大块

一部分多了一个Object参数,作为blocker,另外的则没有。blocker的好处在于,在诊断问题的时候能够知道park的原因

推荐使用带有Object的park操作

park函数作用
park用于挂起当前线程,如果许可可用,会立马返回,并消费掉许可。

  • park(Object): 恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情
  • parkNanos(Object blocker, long nanos):恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情;4:过期时间到了
  • parkUntil(Object blocker, long deadline):恢复的条件为 1:线程调用了unpark; 2:其它线程中断了线程;3:发生了不可预料的事情;4:指定的deadLine已经到了

以park的源码为例

public static void park(Object blocker) {//获取当前线程Thread t = Thread.currentThread();//记录当前线程阻塞的原因,底层就是unsafe.putObject,就是把对象存储起来setBlocker(t, blocker);//执行parkunsafe.park(false, 0L);//线程恢复后,去掉阻塞原因setBlocker(t, null);
}

从源码可以看到真实的实现均在 unsafe

unsafe.park
核心实现如下

JavaThread* thread=JavaThread::thread_from_jni_environment(env);
...
thread->parker()->park(isAbsolute != 0, time);

就是获取java线程的parker对象,然后执行它的park方法。Parker的定义如下

class Parker : public os::PlatformParker {
private://表示许可volatile int _counter ; Parker * FreeNext ;JavaThread * AssociatedWith ; // Current association
public:Parker() : PlatformParker() {//初始化_counter_counter       = 0 ; FreeNext       = NULL ;AssociatedWith = NULL ;}
protected:~Parker() { ShouldNotReachHere(); }
public:void park(bool isAbsolute, jlong time);void unpark();// Lifecycle operators  static Parker * Allocate (JavaThread * t) ;static void Release (Parker * e) ;
private:static Parker * volatile FreeList ;static volatile int ListLock ;};

它继承了os::PlatformParker,内置了一个volatitle的 _counter。PlatformParker则是在不同的操作系统中有不同的实现,以linux为例

class PlatformParker : public CHeapObj {protected://互斥变量类型pthread_mutex_t _mutex [1] ; //条件变量类型pthread_cond_t  _cond  [1] ;public:        ~PlatformParker() { guarantee (0, "invariant") ; }public:PlatformParker() {int status;//初始化条件变量,使用    pthread_cond_t之前必须先执行初始化status = pthread_cond_init (_cond, NULL);assert_status(status == 0, status, "cond_init”);// 初始化互斥变量,使用    pthread_mutex_t之前必须先执行初始化status = pthread_mutex_init (_mutex, NULL);assert_status(status == 0, status, "mutex_init");}
}

上述代码均为POSIX线程接口使用,所以pthread指的也就是posixThread

parker实现如下

void Parker::park(bool isAbsolute, jlong time) {if (_counter > 0) {//已经有许可了,用掉当前许可_counter = 0 ;//使用内存屏障,确保 _counter赋值为0(写入操作)能够被内存屏障之后的读操作获取内存屏障事前的结果,也就是能够正确的读到0OrderAccess::fence();//立即返回return ;}Thread* thread = Thread::current();assert(thread->is_Java_thread(), "Must be JavaThread");JavaThread *jt = (JavaThread *)thread;if (Thread::is_interrupted(thread, false)) {// 线程执行了中断,返回return;}if (time < 0 || (isAbsolute && time == 0) ) { //时间到了,或者是代表绝对时间,同时绝对时间是0(此时也是时间到了),直接返回,java中的parkUtil传的就是绝对时间,其它都不是return;}if (time > 0) {//传入了时间参数,将其存入absTime,并解析成absTime->tv_sec(秒)和absTime->tv_nsec(纳秒)存储起来,存的是绝对时间unpackTime(&absTime, isAbsolute, time);}//进入safepoint region,更改线程为阻塞状态ThreadBlockInVM tbivm(jt);if (Thread::is_interrupted(thread, false) || pthread_mutex_trylock(_mutex) != 0) {//如果线程被中断,或者是在尝试给互斥变量加锁的过程中,加锁失败,比如被其它线程锁住了,直接返回return;}
//这里表示线程互斥变量锁成功了int status ;if (_counter > 0)  {// 有许可了,返回_counter = 0;//对互斥变量解锁status = pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;OrderAccess::fence();return;}#ifdef ASSERT// Don't catch signals while blocked; let the running threads have the signals.
// (This allows a debugger to break into the running thread.)  //debug用
sigset_t oldsigs;sigset_t* allowdebug_blocked = os::Linux::allowdebug_blocked_signals();pthread_sigmask(SIG_BLOCK, allowdebug_blocked, &oldsigs);
#endif
//将java线程所拥有的操作系统线程设置成 CONDVAR_WAIT状态 ,表示在等待某个条件的发生
OSThreadWaitState osts(thread->osthread(), false /* not Object.wait() */);
//将java的_suspend_equivalent参数设置为truejt->set_suspend_equivalent();// cleared by handle_special_suspend_equivalent_condition() or java_suspend_self()if (time == 0) {//把调用线程放到等待条件的线程列表上,然后对互斥变量解锁,(这两是原子操作),这个时候线程进入等待,当它返回时,互斥变量再次被锁住。//成功返回0,否则返回错误编号status = pthread_cond_wait (_cond, _mutex) ;} else {//同pthread_cond_wait,只是多了一个超时,如果超时还没有条件出现,那么重新获取胡吃两然后返回错误码 ETIMEDOUTstatus = os::Linux::safe_cond_timedwait (_cond, _mutex, &absTime) ;if (status != 0 && WorkAroundNPTLTimedWaitHang) {//WorkAroundNPTLTimedWaitHang 是JVM的运行参数,默认为1//去除初始化pthread_cond_destroy (_cond) ;
//重新初始化pthread_cond_init    (_cond, NULL);}}assert_status(status == 0 || status == EINTR ||status == ETIME || status == ETIMEDOUT,status, "cond_timedwait");#ifdef ASSERTpthread_sigmask(SIG_SETMASK, &oldsigs, NULL);
#endif//等待结束后,许可被消耗,改为0  _counter = 0 ;
//释放互斥量的锁status = pthread_mutex_unlock(_mutex) ;assert_status(status == 0, status, "invariant") ;// If externally suspended while waiting, re-suspend if (jt->handle_special_suspend_equivalent_condition()) {jt->java_suspend_self();}
//加入内存屏障指令OrderAccess::fence();
}

从park的实现可以看到

  1. 无论是什么情况返回,park方法本身都不会告知调用方返回的原因,所以调用的时候一般都会去判断返回的场景,根据场景做不同的处理
  2. 线程的等待与挂起、唤醒等等就是使用的POSIX的线程API
  3. park的许可通过原子变量_count实现,当被消耗时,_count为0,只要拥有许可,就会立即返回

OrderAccess::fence();
在linux中实现原理如下


inline void OrderAccess::fence() {if (os::is_MP()) {
#ifdef AMD64// 没有使用mfence,因为mfence有时候性能差于使用 locked addl__asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
#else    __asm__ volatile ("lock; addl $0,0(%%esp)" : : : "cc", "memory");
#endif  }
}

内存重排序网上的验证

ThreadBlockInVM tbivm(jt)

这属于C++新建变量的语法,也就是调用构造函数新建了一个变量,变量名为tbivm,参数为jt。类的实现为

class ThreadBlockInVM : public ThreadStateTransition {public:ThreadBlockInVM(JavaThread *thread): ThreadStateTransition(thread) {// Once we are blocked vm expects stack to be walkable    thread->frame_anchor()->make_walkable(thread);//把线程由运行状态转成阻塞状态trans_and_fence(_thread_in_vm, _thread_blocked);}...
};

_thread_in_vm 表示线程当前在VM中执行,_thread_blocked表示线程当前阻塞了,他们是globalDefinitions.hpp中定义的枚举

//这个枚举是用来追踪线程在代码的那一块执行,用来给 safepoint code使用,有4种重要的类型,_thread_new/_thread_in_native/_thread_in_vm/_thread_in_Java。形如xxx_trans的状态都是中间状态,表示线程正在由一种状态变成另一种状态,这种方式使得 safepoint code在处理线程状态时,不需要对线程进行挂起,使得safe point code运行更快,而给定一个状态,通过+1就可以得到他的转换状态
enum JavaThreadState {_thread_uninitialized     =  0, // should never happen (missing initialization)
_thread_new               =  2, // just starting up, i.e., in process of being initialized
_thread_new_trans         =  3, // corresponding transition state (not used, included for completeness)
_thread_in_native         =  4, // running in native code  . This is a safepoint region, since all oops will be in jobject handles
_thread_in_native_trans   =  5, // corresponding transition state
_thread_in_vm             =  6, // running in VM
_thread_in_vm_trans       =  7, // corresponding transition state
_thread_in_Java           =  8, //  Executing either interpreted or compiled Java code running in Java or in stub code
_thread_in_Java_trans     =  9, // corresponding transition state (not used, included for completeness)
_thread_blocked           = 10, // blocked in vm
_thread_blocked_trans     = 11, // corresponding transition state
_thread_max_state         = 12  // maximum thread state+1 - used for statistics allocation
};

父类ThreadStateTransition中定义trans_and_fence如下

void trans_and_fence(JavaThreadState from, JavaThreadState to) { transition_and_fence(_thread, from, to);} //_thread即构造函数传进来de thread
// transition_and_fence must be used on any thread state transition
// where there might not be a Java call stub on the stack, in
// particular on Windows where the Structured Exception Handler is
// set up in the call stub. os::write_memory_serialize_page() can
// fault and we can't recover from it on Windows without a SEH in
// place.
//transition_and_fence方法必须在任何线程状态转换的时候使用
static inline void transition_and_fence(JavaThread *thread, JavaThreadState from, JavaThreadState to) {assert(thread->thread_state() == from, "coming from wrong thread state");assert((from & 1) == 0 && (to & 1) == 0, "odd numbers are transitions states");
//标识线程转换中thread->set_thread_state((JavaThreadState)(from + 1));// 设置内存屏障,确保新的状态能够被VM 线程看到
if (os::is_MP()) {if (UseMembar) {// Force a fence between the write above and read below     OrderAccess::fence();} else {// Must use this rather than serialization page in particular on Windows      InterfaceSupport::serialize_memory(thread);}}if (SafepointSynchronize::do_call_back()) {SafepointSynchronize::block(thread);}
//线程状态转换成最终的状态,对待这里的场景就是阻塞thread->set_thread_state(to);CHECK_UNHANDLED_OOPS_ONLY(thread->clear_unhandled_oops();)
}

操作系统线程状态的一般取值

在osThread中给定了操作系统线程状态的大致取值,它本身是依据平台而定

enum ThreadState {ALLOCATED,                    // Memory has been allocated but not initialized
INITIALIZED,                  // The thread has been initialized but yet started
RUNNABLE,                     // Has been started and is runnable, but not necessarily running
MONITOR_WAIT,                 // Waiting on a contended monitor lock
CONDVAR_WAIT,                 // Waiting on a condition variable
OBJECT_WAIT,                  // Waiting on an Object.wait() call
BREAKPOINTED,                 // Suspended at breakpoint
SLEEPING,                     // Thread.sleep()
ZOMBIE                        // All done, but not reclaimed yet
};

unpark 源码追踪
实现如下

void Parker::unpark() {int s, status ;//给互斥量加锁,如果互斥量已经上锁,则阻塞到互斥量被解锁
//park进入wait时,_mutex会被释放status = pthread_mutex_lock(_mutex);assert (status == 0, "invariant") ; //存储旧的_counters = _counter;
//许可改为1,每次调用都设置成发放许可_counter = 1;if (s < 1) {//之前没有许可if (WorkAroundNPTLTimedWaitHang) {//默认执行 ,释放信号,表明条件已经满足,将唤醒等待的线程status = pthread_cond_signal (_cond) ;assert (status == 0, "invariant") ;//释放锁status = pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;} else {status = pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;status = pthread_cond_signal (_cond) ;assert (status == 0, "invariant") ;}} else {//一直有许可,释放掉自己加的锁,有许可park本身就返回了pthread_mutex_unlock(_mutex);assert (status == 0, "invariant") ;}
}

从源码可知unpark本身就是发放许可,并通知等待的线程,已经可以结束等待了

总结:

LockSupport的park和unpark方法相比于Synchronize的wait和notify,notifyAll方法:

1.更简单,不需要获取锁,能直接阻塞线程。

2.更直观,以thread为操作对象更符合阻塞线程的直观定义;

3.更精确,可以准确地唤醒某一个线程(notify随机唤醒一个线程,notifyAll唤醒所有等待的线程);

4.更灵活 ,unpark方法可以在park方法前调用。

LockSupport的park和unpark的原理相关推荐

  1. Java LockSupport以及park、unpark方法源码深度解析

    介绍了JUC中的LockSupport阻塞工具以及park.unpark方法的底层原理,从Java层面深入至JVM层面. 文章目录 1 LockSupport的概述 2 LockSupport的特征和 ...

  2. LockSupport 的 park 和 unpark 以及线程中断对 park 的影响

    park() Thread t1 = new Thread(() -> {System.out.println("t1 park");LockSupport.park(); ...

  3. java lock park_java并发编程-LockSupport中park与unpark基本使用与原理简单分析

    文章目录 java并发编程原理之---park与unpark 基本使用 情况一,先park再unpark,代码举例与分析 情况二,先unpark再park,代码举例与分析 特点 原理之park &am ...

  4. LockSupport的park和unpark

    LockSupport是JDK中比较底层的类,用来创建锁和其他同步工具类的基本线程阻塞原语. Java锁和同步器框架的核心AQS:AbstractQueuedSynchronizer,就是通过调用Lo ...

  5. LockSupport 以及 park、unpark 方法

    一.LockSupport 是 jsr 166 中新增的 juc 工具类. LockSupport 类主要用于创建锁和其他同步类来实现线程阻塞. 这个类与他使用的每个线程进行关联, 如果可用就立即 p ...

  6. LockSupport的park/unpark分析

    https://blog.csdn.net/u013978512/article/details/120011860?spm=1001.2014.3001.5501 这篇文章我们讲了AQS的实现过程, ...

  7. c语言goord函数,park、unpark、ord 函数使用方法(转)

    park,unpark,ord这3个函数,在我们工作中,用到它们的估计不多. 我在最近一个工作中,因为通讯需要用到二进制流,然后接口用php接收.当时在处理时候,查阅不少资料.因为它们使用确实比较少, ...

  8. Java并发学习(五)-LockSupport里面的park和unpark

    学习AQS源码时候,发现当判断队列需要入队挂起时,都是调用LockSupport里面的park和unpark方法,例如: //park并且检查是否中断 private final boolean pa ...

  9. LockSupport park和unpark

    前言 在上一篇文章线程池返回值Future中,源码分析线程池结果获取阻塞的原因. LockSupport.unpark(t); LockSupport.parkNanos(this, nanos);或 ...

最新文章

  1. 解决Word出错--一打开就反复重启的问题
  2. 【待继续研究】建模-听说你的坏样本不太够
  3. 上海应用物理所计算机,【中国科学报】上海应用物理所建立组合学原理DNA计算器原型...
  4. 第四范式团队KDD Cup世界冠军方案详解:解密共享出行场景中的优化问题
  5. 矩阵维度必须一致_如何从看得懂到会使用矩阵思维
  6. Unity(一)必然事件
  7. 浏览器兼容之JavaScript篇——已在IE、FF、Chrome测试
  8. JS的jsonp是什么?5分钟学会jsonp跨域请求
  9. 【数据库中间件】分布式组件 - ClusterDB-Client
  10. 深入理解前端跨域问题的解决方案——前端面试
  11. spotlight on mysql--安装以及简介
  12. 兼容移动端的 Web 档案馆可视化管理系统
  13. 初始MySQL数据库
  14. 跟论文作者要源码和数据集的邮件怎么写
  15. R语言 图片识别文字 PNG JPG图片转文字 OCR tesseract包
  16. 验证方法学的历史及比较
  17. matlab左侧栏没了,AI软件左侧的工具栏不见了没有了怎么显示出来
  18. Invalid HTTP method: PATCH executing PATCH
  19. ubuntu软件的卸载
  20. 草柴返利APP:淘宝天猫满减店铺优惠券领取入口怎么免费领淘宝天猫粉丝福利购大额内部隐藏优惠券?

热门文章

  1. 关于我们 | ApacheCN(apache中文网)
  2. 【前端词典】分享 8 个有趣且实用的 API
  3. laravel下视图间共享数据
  4. ccs dat数据 matlab,详解CCS中的.dat文件
  5. python如何群控手机_python调用adb脚本来实现群控安卓手机初探
  6. 群控系统服务器掉线,手机群控系统安装调试问题汇总以及解决办法
  7. 四旋翼动力学和仿真翻译(Quadcopter Dynamics and Simulation)
  8. java的io安卓能用吗_阳光沙滩-用java写了一个Socket.IO的服务端,可是用安卓一直连接不上怎么办;...
  9. 使用 Flutter 开发 Google Translate 程序
  10. 大华硬盘录像机、网络摄像机、 网络硬盘录像机外网远程设置DDNS方法