Synchronized锁升级底层原理
思考问题
- 首先请您思考下面的问题:
- Synchronized锁同步机制性能不好嘛?
- 一个对象天生对应一个monitor锁吗?
- 为什么说synchronized是非公平锁?
synchronized字节码
- 使用java反编译,javap -c -p -v class文件
- 使用jclasslib插件,更加方便快捷
public synchronized int getAge(){return 18 ; } //synchronized使用在实例方法上标记为ACC_SYNCHRONIZED,如果是类方法ACC_STATIC public synchronized int getAge(); descriptor: ()I flags: ACC_PUBLIC, ACC_SYNCHRONIZED Code:stack=1, locals=1, args_size=10: bipush 182: ireturnpublic String setName(){synchronized (this){return "shiq";} }public java.lang.String setName();Code:0: aload_0 //一个局部变量下标0加载到操作栈即将this加载到操作数栈1: dup //复制2: astore_1 //数值从操作数栈存储到局部变量表3: monitorenter //对操作数栈顶元素加锁即this4: ldc #2 // String shiq6: aload_17: monitorexit8: areturn9: astore_210: aload_111: monitorexit12: aload_213: athrowException table: //方法异常表,保证能够释放锁from to target type4 8 9 any9 12 9 any//如果将上方替换成反编译后 synchronized (SynchronizedTest.class)0: ldc #2 // class SynchronizedTest2: dup3: astore_14: monitorenter //0行表示将一个常量池中位置为2的常量加载到操作数栈,查询常量池#2 = Class #24 // SynchronizedTest
- 以上我们可以看到synchronized修饰this表示对象本身加锁,修饰class表示对类加锁
对象内存构成
- 一个对象的内存构成有对象头,实例数据,对齐填充
oop层级
- oopDesc是对象类的顶层基类,每个Java Object 在 JVM 内部都有一个 native 的 C++ 对象 oop/oopDesc 与之对应。源码位置 openjdk\hotspot\src\share\vm\oops\oop.hpp
- 每个对象头由两部分组成,klass pointer和Mark Word和Array length(只有是数组才会有)源码在vm/oops/oop.hpp中定义的OopDes
- _mark表示对象标记、属于markOop类型,也就是接下来要讲解的Mark World,它记录了对象和锁有关的信息
- Klass Pointer对象指向它的类元数据的指针,虚拟机通过这个指针来确定对象是哪个类的实例。即下方的_metadata表示类元信息,类元信息存储的是对象指向它的类元数据(Klass)的首地址,其中Klass表示普通指针、 _compressed_klass表示压缩类指针
- 实例数据:按对象头为基址做相对偏移后操作 obj_field_addr(offset偏移量)
- 对齐填充 :java默认的8字节对齐规则,如果占位不足要补齐
class oopDesc {friend class VMStructs;private:volatile markOop _mark; markOop:Mark Word标记字段union _metadata {Klass* _klass; 对象类型元数据的指针narrowKlass _compressed_klass;} _metadata;public:markOop mark() const { return _mark; } //返回Mark Word数值markOop* mark_addr() const { return (markOop*) &_mark; } //返回一个地址值为_markpublic:// Need this as public for garbage collection.template <class T> T* obj_field_addr(int offset) const;}
oopDes结构图:
对象头中Mark World
- markOopDesc 位于openjdk\hotspot\src\share\vm\oops\markOop.hpp 继承oopDesc
//mark Word 在markOopDesc中有注释
// 32 bits:
// --------
// hash:25 ------------>| age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:23 epoch:2 age:4 biased_lock:1 lock:2 (biased object)
// size:32 ------------------------------------------>| (CMS free block)
// PromotedObject*:29 ---------->| promo_bits:3 ----->| (CMS promoted object)
//
// 64 bits:
// --------
// unused:25 hash:31 -->| unused:1 age:4 biased_lock:1 lock:2 (normal object)
// JavaThread*:54 epoch:2 unused:1 age:4 biased_lock:1 lock:2 (biased object)
// PromotedObject*:61 --------------------->| promo_bits:3 ----->| (CMS promoted object)
// size:64 ----------------------------------------------------->| (CMS free block)
/**
通过以上可知是个32bit数值,不太可能是指针
*/
//观察其中的方法 ,是否是重量级锁monitor_value = 2 => 锁标记为 10 , 如果value()为重量级锁标记则***10 & 10 != 0成立
//因此value()函数代表的即为当前obj的mark Word数值
bool has_monitor() const {return ((value() & monitor_value) != 0);}private:// Conversionuintptr_t value() const { return (uintptr_t) this; }//是否设置了偏向标志 biased_lock_mask_in_place = 3 bool has_bias_pattern() const {return (mask_bits(value(), biased_lock_mask_in_place) == biased_lock_pattern);}
- mark word 图解
- 以上分析也可以很容易理解了markOopDesc中函数monitor: 返回一个ObjectMonitor指针对象 这个ObjectMonitor 其实就是对象监视器
ObjectMonitor* monitor() const {assert(has_monitor(), "check"); //断定为重量级锁// Use xor instead of &~ to provide one extra tag-bit check.return (ObjectMonitor*) (value() ^ monitor_value); //将Mark Word值转化成指针,指向的就是obj的ObjectMonitor锁对象}
- 用于返回一个ObjectMonitor锁对象,方法在vm/runtime/synchronizer.cpp中
锁入口
- 无论是synchronized代码块和synchronized方法,其底层获取锁的逻辑都是一样的。我们以代码块的monitorentor和monitorexit为例
- monitorenter指令在HotSpot的中有两处地方对monitorenter指令进行解析:一个是在bytecodeInterpreter.cpp ,另一个是在templateTable_x86_64.cpp
- 前者是JVM字节码解释器,有C++书写,优点是实现相对简单且容易理解,缺点是执行慢;
- 后者是模板解释器:对每个指令都写了一段对应的汇编代码,启动时将每个指令与对应汇编代码入口绑定,效率极高但理解不易;
偏向锁
- 首先理解两个类BasicObjectLock,BasicLock BasicObjectLock将特定的Java对象与BasicLock关联。
class BasicObjectLock VALUE_OBJ_CLASS_SPEC {friend class VMStructs;private:BasicLock _lock; //BasicLock对象oop _obj; //与上方_lock关联的java对象public:oop obj() const { return _obj; }void set_obj(oop obj) { _obj = obj; }BasicLock* lock() { return &_lock; }class BasicLock VALUE_OBJ_CLASS_SPEC {friend class VMStructs;private:volatile markOop _displaced_header; //mark word字段public:markOop displaced_header() const { return _displaced_header; }void set_displaced_header(markOop header) { _displaced_header = header; }void print_on(outputStream* st) const;// move a basic lock (used during deoptimizationvoid move_to(oop obj, BasicLock* dest);static int displaced_header_offset_in_bytes(){ return offset_of(BasicLock, _displaced_header); }
};
- BasicLock中的markOop其实就是偏向锁中保存当前mark word字段使用的
偏向锁加锁
- 偏向锁入口函数_monitorenter字节码
CASE(_monitorenter): {//当前锁对象oop lockee = STACK_OBJECT(-1);BasicObjectLock* limit = istate->monitor_base();BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();BasicObjectLock* entry = NULL;while (most_recent != limit ) {if (most_recent->obj() == NULL) entry = most_recent;else if (most_recent->obj() == lockee) break;most_recent++;}if (entry != NULL) { //找到entry即可用的最高位的Lock Recordentry->set_obj(lockee); //Lock Record的obj指向锁对象int success = false; //标记位: 用于确定是否需要升级锁为轻量级锁uintptr_t epoch_mask_in_place = (uintptr_t)markOopDesc::epoch_mask_in_place;markOop mark = lockee->mark();intptr_t hash = (intptr_t) markOopDesc::no_hash;/**1. 可以使用偏向锁*/if (mark->has_bias_pattern()) {uintptr_t thread_ident;uintptr_t anticipated_bias_locking_value;thread_ident = (uintptr_t)istate->thread();anticipated_bias_locking_value =(((uintptr_t)lockee->klass()->prototype_header() | thread_ident) ^ (uintptr_t)mark) &~((uintptr_t) markOopDesc::age_mask_in_place);/*** 1.2 : 当前偏向线程是自己,什么都不做,设置success = true,防止进入下方锁升级*/if (anticipated_bias_locking_value == 0) {// already biased towards this thread, nothing to doif (PrintBiasedLockingStatistics) {(* BiasedLocking::biased_lock_entry_count_addr())++;}success = true;}//批量重偏向 klass()->prototype_header不支持偏向锁else if ((anticipated_bias_locking_value & markOopDesc::biased_lock_mask_in_place) != 0) {// try revoke biasmarkOop header = lockee->klass()->prototype_header();if (hash != markOopDesc::no_hash) {header = header->copy_set_hash(hash);}if (Atomic::cmpxchg_ptr(header, lockee->mark_addr(), mark) == mark) {if (PrintBiasedLockingStatistics)(*BiasedLocking::revoked_lock_entry_count_addr())++;}}/*** 1.3 当前偏向锁的epoch失效,则重新偏向*/else if ((anticipated_bias_locking_value & epoch_mask_in_place) !=0) {// 1.3.1 构造一个偏向当前线程切epoch为class最新的mark wordmarkOop new_header = (markOop) ( (intptr_t) lockee->klass()->prototype_header() | thread_ident);if (hash != markOopDesc::no_hash) {new_header = new_header->copy_set_hash(hash);}// 1.3.2 CAS 替换 mark wordif (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), mark) == mark) {if (PrintBiasedLockingStatistics)(* BiasedLocking::rebiased_lock_entry_count_addr())++;}else {/*** /替换ThreadID失败,有两种可能:1. 撤销原有偏向ID,重新设置2. 有竞争,升级锁,当下方success=true 所以在monitorenter中升级的*/CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);}success = true;}else {/*** 1.1 当前无锁或 1.3 当前偏向其他线程*/markOop header = (markOop) ((uintptr_t) mark & ((uintptr_t)markOopDesc::biased_lock_mask_in_place |(uintptr_t)markOopDesc::age_mask_in_place |epoch_mask_in_place));if (hash != markOopDesc::no_hash) {header = header->copy_set_hash(hash);}/*** 1. 构造一个偏向当前线程的mark Word2. 设置首位的Lock Record lock即BasicLock为线程ID默认 0xdeaddead3. 使用CAS替换操作*/markOop new_header = (markOop) ((uintptr_t) header | thread_ident);// debugging hintDEBUG_ONLY(entry->lock()->set_displaced_header((markOop) (uintptr_t) 0xdeaddead);)if (Atomic::cmpxchg_ptr((void*)new_header, lockee->mark_addr(), header) == header) {if (PrintBiasedLockingStatistics)(* BiasedLocking::anonymously_biased_lock_entry_count_addr())++;}else {/*** 替换ThreadID失败,有两种可能:1. 撤销原有偏向ID,重新设置2. 有竞争,升级锁,当下方success=true 所以在monitorenter中升级的*/CALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);}success = true;}}
- 偏向锁示意图:
- 整个过程示意图:
偏向锁撤销
- 在获取偏向锁的过程因为不满足条件导致要将锁对象改为非偏向锁状态
- 调取过程在 InterpreterRuntime::monitorenter -> 开启JVM偏向锁 ObjectSynchronizer::fast_enter -> java线程 BiasedLocking::revoke_and_rebias
static BiasedLocking::Condition revoke_bias(oop obj, bool allow_rebias, bool is_bulk, JavaThread* requesting_thread, JavaThread** biased_locker) {markOop mark = obj->mark();//如果不是偏向模式,返回if (!mark->has_bias_pattern()) {return BiasedLocking::NOT_BIASED;}//创建两个mark word,一个是匿名偏向模式(101),一个是无锁模式(001)uint age = mark->age();markOop biased_prototype = markOopDesc::biased_locking_prototype()->set_age(age);markOop unbiased_prototype = markOopDesc::prototype()->set_age(age);...JavaThread* biased_thread = mark->biased_locker();if (biased_thread == NULL) {// 匿名偏向。当调用锁对象的hashcode()方法可能会导致走到这个逻辑// 如果不允许重偏向,则将对象的mark word设置为无锁模式if (!allow_rebias) {obj->set_mark(unbiased_prototype);}return BiasedLocking::BIAS_REVOKED;}//0. 判断偏向线程是否还存活bool thread_is_alive = false;//0.1 如果当前线程就是偏向线程if (requesting_thread == biased_thread) {thread_is_alive = true;} else {// 0.2 遍历当前jvm的所有线程,如果能找到,则说明偏向的线程还存活for (JavaThread* cur_thread = Threads::first(); cur_thread != NULL; cur_thread = cur_thread->next()) {if (cur_thread == biased_thread) {thread_is_alive = true; //偏向线程还在break;}}}/*** 1. 如果偏向线程不存在了*/if (!thread_is_alive) {// 1.1 允许重偏向则将对象mark word设置为匿名偏向状态101,否则设置为无锁状态001if (allow_rebias) {obj->set_mark(biased_prototype);} else {obj->set_mark(unbiased_prototype);}return BiasedLocking::BIAS_REVOKED;}/*** 2. 如果偏向线程存在,则遍历该线程栈中所有的Lock Record*/GrowableArray<MonitorInfo*>* cached_monitor_info = get_or_compute_monitor_info(biased_thread); //获取线程中的栈 All Lock RecordBasicLock* highest_lock = NULL;for (int i = 0; i < cached_monitor_info->length(); i++) { //从上向下遍历MonitorInfo* mon_info = cached_monitor_info->at(i);/*** 2.1 判断当前Lock Record 的 owner是否指向锁对象,如果指向说明当前线程在偏向,这个时候需要升级为轻量级锁啦*/if (TraceBiasedLocking && Verbose) {tty->print_cr(" mon_info->owner (" PTR_FORMAT ") == obj (" PTR_FORMAT ")",p2i((void *) mon_info->owner()),p2i((void *) obj));}/** 2.2 低位lock 直接修改偏向线程栈中的Lock Record。为了处理锁重入的case,在这里将Lock Record的Displaced Mark Word设置为null,* 第一个Lock Record会在下面的代码中再处理*/markOop mark = markOopDesc::encode((BasicLock*) NULL);highest_lock = mon_info->lock(); //highest_lock->set_displaced_header(mark);} else {...}}/*** 2.3 对高位进行处理,此时Displaced Mark Word即lock不能在设置成null*/if (highest_lock != NULL) {// 2.3.1 修改第一个Lock Record为unbiased_prototype无锁状态,highest_lock->set_displaced_header(unbiased_prototype);//2.3.2 将锁对象obj obj的mark word设置为指向该Lock Record的指针obj->release_set_mark(markOopDesc::encode(highest_lock));assert(!obj->mark()->has_bias_pattern(), "illegal mark state: stack lock used bias bit");if (TraceBiasedLocking && (Verbose || !is_bulk)) {tty->print_cr(" Revoked bias of currently-locked object");}} else {/*** 3. 走到这里说明偏向线程已经不在同步块中了*/if (allow_rebias) {//3.1 设置为匿名偏向状态obj->set_mark(biased_prototype);} else {// 3.2 设置成无锁状态obj->set_mark(unbiased_prototype);}}
- 查看偏向的线程是否存活,如果已经不存活了,则直接撤销偏向锁。JVM维护了一个集合存放所有存活的线程,通过遍历该集合判断某个线程是否存活。
- 偏向的线程是否还在同步块中,如果不在了,则撤销偏向锁。我们回顾一下偏向锁的加锁流程:每次进入同步块(即执行monitorenter)的时候都会以从高往低的顺序在栈中找到第一个可用的Lock Record,将其obj字段指向锁对象。每次解锁(即执行monitorexit)的时候都会将最低的一个相关Lock Record移除掉。所以可以通过遍历线程栈中的Lock Record来判断线程是否还在同步块中。
- 将偏向线程所有相关Lock Record的Displaced Mark Word设置为null,然后将最高位的Lock Record的Displaced Mark Word 设置为无锁状态,最高位的Lock Record也就是第一次获得锁时的Lock Record(这里的第一次是指重入获取锁时的第一次),然后将对象头指向最高位的Lock Record,这里不需要用CAS指令,因为是在safepoint。 执行完后,就升级成了轻量级锁。原偏向线程的所有Lock Record都已经变成轻量级锁的状态。
偏向锁的释放
- 释放过程相对比较简单
CASE(_monitorexit): {oop lockee = STACK_OBJECT(-1);CHECK_NULL(lockee);// derefing's lockee ought to provoke implicit null check// find our monitor slotBasicObjectLock* limit = istate->monitor_base();BasicObjectLock* most_recent = (BasicObjectLock*) istate->stack_base();// 从低往高遍历栈的Lock Recordwhile (most_recent != limit ) {// 如果Lock Record关联的是该锁对象if ((most_recent)->obj() == lockee) {BasicLock* lock = most_recent->lock();markOop header = lock->displaced_header();// 释放Lock Recordmost_recent->set_obj(NULL);// 如果是偏向模式,仅仅释放Lock Record就好了。否则要走轻量级锁or重量级锁的释放流程if (!lockee->mark()->has_bias_pattern()) {bool call_vm = UseHeavyMonitors;// header!=NULL说明不是重入,则需要将Displaced Mark Word CAS到对象头的Mark Wordif (header != NULL || call_vm) {if (call_vm || Atomic::cmpxchg_ptr(header, lockee->mark_addr(), lock) != lock) {// CAS失败或者是重量级锁则会走到这里,先将obj还原,然后调用monitorexit方法most_recent->set_obj(lockee);CALL_VM(InterpreterRuntime::monitorexit(THREAD, most_recent), handle_exception);}}}//执行下一条命令UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);}//处理下一条Lock Recordmost_recent++;}// Need to throw illegal monitor state exceptionCALL_VM(InterpreterRuntime::throw_illegal_monitor_state_exception(THREAD), handle_exception);ShouldNotReachHere();
}
- 偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动去释放偏向锁。
- 偏向锁的撤销,需要等待全局安全点,它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态,撤销偏向锁后恢复到未锁定(标志位为“01”)或轻量级锁(标志位为“00”)的状态
- -XX:+PrintGCApplicationStoppedTime -XX:+PrintSafepointStatistics -XX:PrintSafepointStatisticsCount=1 查看停顿时间详细信息
- JDK1.6以后默认开启偏向锁UseBiasedLocking,对于高并发提升效率-XX:-UseBiasedLocking
轻量级锁
轻量级锁加锁
- 入口函数同上
CASE(_monitorenter): {oop lockee = STACK_OBJECT(-1);...if (entry != NULL) {...// 上面省略的代码中如果CAS操作失败也会调用到InterpreterRuntime::monitorenterif (!success) {// 构建一个无锁状态的Displaced Mark WordmarkOop displaced = lockee->mark()->set_unlocked();// 设置到Lock Record中去entry->lock()->set_displaced_header(displaced);bool call_vm = UseHeavyMonitors;if (call_vm || Atomic::cmpxchg_ptr(entry, lockee->mark_addr(), displaced) != displaced) {// 如果CAS替换不成功,代表锁对象不是无锁状态,这时候判断下是不是锁重入// Is it simple recursive case?if (!call_vm && THREAD->is_lock_owned((address) displaced->clear_lock_bits())) {entry->lock()->set_displaced_header(NULL);} else {// CAS操作失败则调用monitorenterCALL_VM(InterpreterRuntime::monitorenter(THREAD, entry), handle_exception);}}}UPDATE_PC_AND_TOS_AND_CONTINUE(1, -1);} else {istate->set_msg(more_monitors);UPDATE_PC_AND_RETURN(0); // Re-execute} }
- 构建一个无锁的mark Work 设置到Lock Record 中lock中去
- CAS替换锁对象的Mark word为指向1中的Lock Record,成功轻量级锁完成
- 如果失败,两种情况
- 是否是锁重入,如果是则设置lock为null
- 否则,调取monitorenter 自旋锁或者升级锁
- 过程示意图
轻量级解锁
- 解锁过程由下往上查找当前线程的Lock Record 中owner为锁对象后判断BasicLock
- 如果BasicLock == null ,表示一个锁重入,当前线程还在同步块中,不做操作
- 如果BasicLock有值,即为锁对象指向的Display Mark Word ,则表示这时候需要真正释放锁,将DisPlay Mark Word替换回锁对象的Mark Word中去,如果失败,则调用InterpreterRuntime::monitorexit
- 总结:轻量级锁解锁时,把复制的对象头替换回去(cas)如果替换成功(就是要把无所的状态放回去给对象头,之后锁继续被拿还是轻量级锁,但是如果锁已经是重量级锁了,那么就失败,之后锁就是重量级的锁了),锁结束,之后别的线程来拿还是轻量级锁,如果失败,说明已有竞争,释放锁,此时把对象头设为重量级锁,并notify 唤醒其他等待线程。
重量级锁
重量级锁加锁
- 入口函数,当存在竞争时,调用ObjectSynchronizer::slow_enter() 最后一行
//需要膨胀为重量级锁,膨胀前,设置Displaced Mark Word为一个特殊值,代表该锁正在用一个重量级锁的monitorlock->set_displaced_header(markOopDesc::unused_mark());//先调用inflate膨胀为重量级锁,该方法返回一个ObjectMonitor对象,然后调用其enter方法ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
- 锁膨胀,即获取一个ObjectMonitor对象inflate()
//结构体如下objectMonitor.hpp中定义
ObjectMonitor::ObjectMonitor() { _header = NULL; //对象头的Mark Word_count = 0; _waiters = 0, _recursions = 0; //线程的重入次数_object = NULL; _owner = NULL; //标识拥有该monitor的线程_WaitSet = NULL; //等待线程组成的双向循环链表,_WaitSet是第一个节点_WaitSetLock = 0 ; _Responsible = NULL ; _succ = NULL ; _cxq = NULL ; //多线程竞争锁进入时的单向链表FreeNext = NULL ; _EntryList = NULL ; //_owner从该双向循环链表中唤醒线程结点,_EntryList是第一个节点_SpinFreq = 0 ; _SpinClock = 0 ; OwnerIsThread = 0 ;
}
inflate膨胀
- inflate膨胀有四种情况:
- Inflated(重量级锁状态) - 直接返回
- INFLATING(膨胀中) - 忙等待直到膨胀完成
- Stack-locked(轻量级锁状态) - 膨胀
- Neutral(无锁状态) - 膨胀
- Inflated 重量级锁状态: 直接返回
/*** 1. 已经是重量级锁了,直接返回obj ->mark word指向的monitor对象*/if (mark->has_monitor()) {ObjectMonitor * inf = mark->monitor() ;return inf ;}
- INFLATING 其他线程正在膨胀中,则当前线程等待
/*** 2. 有其他线程正在膨胀中,这个时候忙等待后continue*/if (mark == markOopDesc::INFLATING()) {TEVENT (Inflate: spin while INFLATING) ;//3.1 在该方法中会进行spin/yield/park等操作完成自旋动作ReadStableMark(object) ;continue ;}
- Stack-locked(轻量级锁状态升级) - 膨胀
/**
* 3. 轻量级锁升级
*/
if (mark->has_locker()) {// 3.1 omAlloc从当前线程的Self->omFreeList的 ObjectMonitor数组中获取,成功加入全局中管理,否则从全局中获取ObjectMonitor * m = omAlloc (Self) ;// 初始化ObjectMonitor对象m->Recycle();m->_Responsible = NULL ;m->OwnerIsThread = 0 ;m->_recursions = 0 ;m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // Consider: maintain by type/class// 将锁对象的mark word设置为INFLATING (0)状态markOop cmp = (markOop) Atomic::cmpxchg_ptr (markOopDesc::INFLATING(), object->mark_addr(), mark) ;if (cmp != mark) {omRelease (Self, m, true) ;continue ; // Interference -- just retry}// 栈中的displaced mark wordmarkOop dmw = mark->displaced_mark_helper() ;// monitor中的header设置为Displaced mark Wordm->set_header(dmw) ;//将原有的Lock Record设置给monitor的ownerm->set_owner(mark->locker());//monitor的obj设置锁对象m->set_object(object);//将锁对象头设置为重量级锁状态object->release_set_mark(markOopDesc::encode(m));...return m ;
}
- 无锁状态
/*** 4. 无锁状态*///获取一个ObjectMonitor对象ObjectMonitor * m = omAlloc (Self) ;// 初始化操作m->Recycle();m->set_header(mark); //设置header为mark Wordm->set_owner(NULL); //Lock Record不存在m->set_object(object);m->OwnerIsThread = 1 ;m->_recursions = 0 ;m->_Responsible = NULL ;m->_SpinDuration = ObjectMonitor::Knob_SpinLimit ; // consider: keep metastats by type/class//用CAS替换对象头的mark word为重量级锁状态if (Atomic::cmpxchg_ptr (markOopDesc::encode(m), object->mark_addr(), mark) != mark) {m->set_object (NULL) ;m->set_owner (NULL) ;m->OwnerIsThread = 0 ;m->Recycle() ;omRelease (Self, m, true) ;m = NULL ;continue ;}...return m ;
}
获取锁->enter(THREAD)
- 在ObjectMonitor::enter()四种情况
- 无锁直接可以获取到
- 重入情况
- 轻量级升级情况
- 重量级锁竞争情况
void ATTR ObjectMonitor::enter(TRAPS) {Thread * const Self = THREAD ;void * cur ;/*** 1. owner为null代表无锁状态,如果能CAS设置成功,则当前线程直接获得锁*/cur = Atomic::cmpxchg_ptr (Self, &_owner, NULL) ;if (cur == NULL) {...return ;}/*** 2. 如果是重入的情况*/if (cur == Self) {// TODO-FIXME: check for integer overflow! BUGID 6557169._recursions ++ ;return ;}/*** 3. 当前线程是之前持有轻量级锁的线程。由轻量级锁膨胀且第一次调用enter方* 法,那cur是指向Lock Record的指针*/if (Self->is_lock_owned ((address)cur)) {assert (_recursions == 0, "internal state error");// 重入计数重置为1_recursions = 1 ;// 设置owner字段为当前线程(之前owner是指向Lock Record的指针)_owner = Self ;OwnerIsThread = 1 ;return ;}...// 在调用系统的同步操作之前,先尝试自旋获得锁if (Knob_SpinEarly && TrySpin (Self) > 0) {...//自旋的过程中获得了锁,则直接返回Self->_Stalled = 0 ;return ;}...{ .../*** 4. 重量级锁竞争情况*/for (;;) {jt->set_suspend_equivalent();// 在该方法中调用系统同步操作,真正的竞争锁EnterI (THREAD) ;...}Self->set_current_pending_monitor(NULL);}...}
重量级锁竞争
- 首先分析objectMonitor中的元素: cxq(下图中的ContentionList),EntryList ,WaitSet,owner, 前三个是由ObjectWaiter的链表结构, owner是当前持有锁的线程
- ObjectWaiter结构
class ObjectWaiter : public StackObj {public:enum TStates { TS_UNDEF, TS_READY, TS_RUN, TS_WAIT, TS_ENTER, TS_CXQ } ;enum Sorted { PREPEND, APPEND, SORTED } ;ObjectWaiter * volatile _next;ObjectWaiter * volatile _prev;Thread* _thread;jlong _notifier_tid;ParkEvent * _event;volatile int _notified ;volatile TStates TState ;Sorted _Sorted ; // List placement dispositionbool _active ; // Contention monitoring is enabledpublic:ObjectWaiter(Thread* thread); //对应获取锁的线程void wait_reenter_begin(ObjectMonitor *mon);void wait_reenter_end(ObjectMonitor *mon);
};
- 重量级加锁过程
void ATTR ObjectMonitor::EnterI (TRAPS) {Thread * Self = THREAD ;...// 尝试获得锁if (TryLock (Self) > 0) {...return ;}DeferredInitialize () ;// 自旋if (TrySpin (Self) > 0) {...return ;}...// 将线程封装成node节点ObjectWaiter中ObjectWaiter node(Self) ;Self->_ParkEvent->reset() ;node._prev = (ObjectWaiter *) 0xBAD ;node.TState = ObjectWaiter::TS_CXQ ;/*** 1. 将node节点插入到_cxq队列的头部,cxq是一个单向链表*/ObjectWaiter * nxt ;for (;;) {node._next = nxt = _cxq ;if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;// CAS失败的话 再尝试获得锁,这样可以降低插入到_cxq队列的频率if (TryLock (Self) > 0) {...return ;}}// SyncFlags默认为0,如果没有其他等待的线程,则将_Responsible设置为自己if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;}TEVENT (Inflated enter - Contention) ;int nWakeups = 0 ;int RecheckInterval = 1 ;for (;;) {if (TryLock (Self) > 0) break ;assert (_owner != Self, "invariant") ;...// park selfif (_Responsible == Self || (SyncFlags & 1)) {// 当前线程是_Responsible时,调用的是带时间参数的parkTEVENT (Inflated enter - park TIMED) ;Self->_ParkEvent->park ((jlong) RecheckInterval) ;// Increase the RecheckInterval, but clamp the value.RecheckInterval *= 8 ;if (RecheckInterval > 1000) RecheckInterval = 1000 ;} else {//否则直接调用park挂起当前线程TEVENT (Inflated enter - park UNTIMED) ;Self->_ParkEvent->park() ;}if (TryLock(Self) > 0) break ;...if ((Knob_SpinAfterFutile & 1) && TrySpin (Self) > 0) break ;.../*** 2. 在当前线程释放锁时,_succ会被设置为EntryList或_cxq中的一个线程:表示假定继承人* _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ; 在ExitEpilog中设置* ExitEpilog(w) 调用w = _cxq 或者 w = _EntryList */if (_succ == Self) _succ = NULL ;// Invariant: after clearing _succ a thread *must* retry _owner before parking.OrderAccess::fence() ;}// 走到这里说明已经获得锁了assert (_owner == Self , "invariant") ;assert (object() != NULL , "invariant") ;// 将当前线程的node从cxq或EntryList中移除UnlinkAfterAcquire (Self, &node) ;if (_succ == Self) _succ = NULL ;if (_Responsible == Self) {_Responsible = NULL ;OrderAccess::fence();}...return ;
}
- 当一个线程尝试获得锁时,如果该锁已经被占用,则会将该线程封装成一个ObjectWaiter对象插入到cxq的队列的队首,然后调用park函数挂起当前线程。
- 当线程释放锁时,会从cxq或EntryList中挑选一个线程唤醒,被选中的线程叫做Heir presumptive即假定继承人(Ready Thread),假定继承人被唤醒后会尝试获得锁,但synchronized是非公平的,所以假定继承人不一定能获得锁;
- 如果线程获得锁后调用Object#wait方法,则会将线程加入到WaitSet中
- 当被Object#notify唤醒后,会将线程从WaitSet移动到cxq或EntryList中去。
- 调用一个锁对象的wait或notify方法时,如当前锁的状态是偏向锁或轻量级锁则会先膨胀成重量级锁。
重量级锁解锁
- 解锁包括: 1. 当前是可重入锁 2. 正常释放锁
- 可重入锁
// 重入计数器还不为0,则计数器-1后返回
if (_recursions != 0) {_recursions--; // this is simple recursive enterTEVENT (Inflated exit - recursive) ;return ;
}
- 正常释放锁,可能需要唤醒其他线程
if (Knob_ExitPolicy == 0) { //Knob_ExitPolicy默认为0/*** 1. synchronized非公平锁原因: 当前线程先释放锁,这时如果有其他线程进入同步块则能获得锁,不管刚刚上面的假定继承人及_succ是否存在,因此它是非公平锁*/OrderAccess::release_store_ptr (&_owner, NULL) ; // drop the lock//内存屏障,是否需要唤醒继任者OrderAccess::storeload() ; //See if we need to wake a successor /*** 2. 如果没有等待线程_EntryList和_cxq均为空,或者_succ假定继承人存在,这个时候是没有竞争的,可以运行,不需要唤醒,直接返回就好了*/if ((intptr_t(_EntryList)|intptr_t(_cxq)) == 0 || _succ != NULL) {TEVENT (Inflated exit - simple egress) ;return ;}TEVENT (Inflated exit - complex egress) ;/*** 3. 走到这里表示,当前_EntryList ,_cxq不为空,且没有假定继承人,需要唤醒操作了* 而要执行之后的操作就需要重新获得锁,即设置_owner为当前线程*/if (Atomic::cmpxchg_ptr (THREAD, &_owner, NULL) != NULL) {return ;}.../*** 4. 根据不同的QMode执行不同的唤醒策略,默认为0*/ObjectWaiter * w = NULL ;// code 4:根据QMode的不同会有不同的唤醒策略,默认为0int QMode = Knob_QMode ;if (QMode == 2 && _cxq != NULL) {/*** QMode == 2 : cxq中的线程有更高优先级,直接唤醒cxq的队首线程,返回了,最终唤醒的就是这个队首线程_succ == w*/w = _cxq ;assert (w != NULL, "invariant") ;assert (w->TState == ObjectWaiter::TS_CXQ, "Invariant") ;ExitEpilog (Self, w) ;return ;}f (QMode == 3 && _cxq != NULL) {/*** QMode == 3: 将cxq中的元素插入到EntryList的末尾,继续向下执行程序*/...}if (QMode == 4 && _cxq != NULL) {/*** QMode == 4: 将cxq插入到EntryList的队首*/...}/*** 默认QMode == 0 : 什么也不做*//*** 3. 如果_EntryList不为null ,取队首w ,唤醒ObjectWaiter对象的线程,然后立即返回*/w = _EntryList ;if (w != NULL) {assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;ExitEpilog (Self, w) ;return ;}/*** 4. 如果EntryList的首元素为空,就将cxq的所有元素放入到EntryList中* 然后再从EntryList中取出来队首元素执行ExitEpilog方法,然后立即返回** 使用w保存_cxq队列后,将_cxq设置null*/w = _cxq ;if (w == NULL) continue ;// Drain _cxq into EntryList - bulk transfer.// First, detach _cxq.// The following loop is tantamount to: w = swap (&cxq, NULL)for (;;) {assert (w != NULL, "Invariant") ;//下方用到w替代_cxqObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;if (u == w) break ;w = u ;}if (QMode == 1) {// QMode == 1 : 将cxq中的元素转移到EntryList,并反转顺序...} else {// QMode == 0 or QMode == 2‘// 将cxq中的元素转移到EntryList_EntryList = w ;ObjectWaiter * q = NULL ;ObjectWaiter * p ;for (p = w ; p != NULL ; p = p->_next) {guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;p->TState = ObjectWaiter::TS_ENTER ;p->_prev = q ;q = p ;}//如果当前有个新增的假定继承人,所以不需要当前线程去唤醒,以减少上下文切换的比率if (_succ != NULL) continue;/*** 4.1 将上方有cxq转移到_EntryList首位唤醒即可*/w = _EntryList ;if (w != NULL) {guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;ExitEpilog (Self, w) ;return ;}//ExitEpilog用于唤醒线程操作
void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {assert (_owner == Self, "invariant") ;/*** 假定继承人的设置,查看调用方法在解锁exit中设置w = _EntryList* 只有QMode == 2 时 w = _cxq 中取*/_succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;ParkEvent * Trigger = Wakee->_event ;Wakee = NULL ;//如果有其他线程进来,将会直接运行,不会唤醒操作了OrderAccess::release_store_ptr (&_owner, NULL) ;OrderAccess::fence() ; if (SafepointSynchronize::do_call_back()) {TEVENT (unpark before SAFEPOINT) ;}DTRACE_MONITOR_PROBE(contended__exit, this, object(), Self);Trigger->unpark() ; //唤醒线程// Maintain stats and report events to JVMTIif (ObjectMonitor::_sync_Parks != NULL) {ObjectMonitor::_sync_Parks->inc() ;}
}
- 释放锁,这个时刻其他的线程能获取到锁,synchronized是非公平锁的原因
- 如果当前没有等待的线程或succ != null 有一个假定继承人存在(可以运行它) ,则直接返回就好了,因为不需要唤醒其他线程。
- 当前线程重新获得锁,因为之后要操作cxq和EntryList队列以及唤醒线程
- 根据QMode的不同,会执行不同的唤醒策略: 默认为0
- QMode = 2且cxq非空:取cxq队列队首的ObjectWaiter对象,调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,然后立即返回,后面的代码不会执行了;
- QMode = 3且cxq非空:把cxq队列插入到EntryList的尾部;
- QMode = 4且cxq非空:把cxq队列插入到EntryList的头部;
- QMode = 0:默认,暂时什么都不做,继续往下看;
- 如果EntryList的首元素非空,就取出来调用ExitEpilog方法,该方法会唤醒ObjectWaiter对象的线程,然后立即返回;
- 如果EntryList的首元素为空,就将cxq的所有元素放入到EntryList中,然后再从EntryList中取出来队首元素执行ExitEpilog方法,然后立即返回;
wait ,notify唤醒操作
- wait将当前Thread构造ObjectWaiter后通过AddWaiter加入_waitSet队列
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {// 从waitSet头部添加if (_WaitSet == NULL) {_WaitSet = node;node->_prev = node;node->_next = node;} else {ObjectWaiter* head = _WaitSet ;ObjectWaiter* tail = head->_prev;assert(tail->_next == head, "invariant check");tail->_next = node;head->_prev = node;node->_next = head;node->_prev = tail;}
}
- notify唤醒wait
//获取等待队列中一个ObjectWaiter
ObjectWaiter * iterator = DequeueWaiter() ; //也是从头部取值呀
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {// dequeue the very first waiterObjectWaiter* waiter = _WaitSet;if (waiter) {DequeueSpecificWaiter(waiter);}return waiter;
}...int Policy = Knob_MoveNotifyee ; //默认为2ObjectWaiter * List = _EntryList ;if (Policy == 2) { // prepend to cxq// 如果_EntryList为null,将唤醒的加入_EntryList很大几率运行if (List == NULL) {iterator->_next = iterator->_prev = NULL ;_EntryList = iterator ;} else { //否则加入_cxq队首,等待_EntryList为null后加入运行,QMode =默认0时iterator->TState = ObjectWaiter::TS_CXQ ;for (;;) {ObjectWaiter * Front = _cxq ;iterator->_next = Front ;if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {break ;}}}
}
- 加入等待队列从队首添加,取出也是从队首,notify只唤醒一个添加到_cxq或者_EntryList中,notifyAll()逻辑一致,只是使用for循环获取waitSet中所有数据添加而已
思考
- 对于下方代码,请您分析其执行顺序:
public class SyncDemo {public static void main(String[] args) throws InterruptedException {SyncDemo syncDemo = new SyncDemo();syncDemo.startThreadA();Thread.sleep(100);syncDemo.startThreadB();Thread.sleep(100);syncDemo.startThreadC();}final Object lock = new Object();public void startThreadA() {new Thread(() -> {synchronized (lock) {System.out.println("ThreadA get lock");try {Thread.sleep(500);} catch (Exception e) {}System.out.println("ThreadA release lock");}}, "thread-A").start();}public void startThreadB() {new Thread(() -> {synchronized (lock) {System.out.println("ThreadB get lock");}}, "thread-B").start();}public void startThreadC() {new Thread(() -> {synchronized (lock) {System.out.println("ThreadC get lock");}}, "thread-C").start();}
}
//最终打印结果一定是 A -> C -> B
下载链接
- jdk8u版本下载 点击左侧zip
- 锁状态 https://blog.csdn.net/u014590757/article/details/79717549
Synchronized锁升级底层原理相关推荐
- synchronized 锁的底层原理
线程A想要获取这个对象,就去找到该对象的monitor, 看看owner是否为空,如果为空说明该对象没有被锁住,并将自己的线程ID设置进去,并count++,如果owner不为空,则将其线程ID放到w ...
- synchronized锁升级过程及其实现原理
本文链接:https://blog.csdn.net/wangyy130/article/details/106495180 问:为什么会有锁升级的过程呢 答:在java6以前synchronized ...
- 22-10-14 西安 spring循环依赖、对象内存布局、synchronized锁升级
关于锁升级参考了周阳老师在b站的juc视频,阳哥讲的很好 尚硅谷2022版JUC并发编程(对标阿里P6-P7)_哔哩哔哩_bilibili spring循环依赖 1.循环依赖问题 什么是循环依赖 默认 ...
- Synchronized锁升级:无锁-> 偏向锁 -> 轻量级锁 -> 重量级锁
一. 概述 1. Synchronized锁升级的原因 用锁能够实现数据的安全性,但是会带来性能下降.无锁能够基于线程并行提升程序性能,但是会带来安全性下降. 2. Synchronized锁升级的过 ...
- Synchronized锁升级、降级
多线程中锁的升级 synchronized锁升级原理:在锁对象的对象头里面有一个threadid字段,在第一次访问的时候threadid为空,jvm 让其持有偏向锁,并将threadid 设置为其线程 ...
- Synchronized 锁升级机制
在 JDK 早期的版本,synchronized 锁的效率是非常低的,它的效率远低于 lock 锁,但是 sychronized 毕竟是 java 的关键词,它不应该就此淘汰.所以在 JDK1.6 中 ...
- 【MySQL进阶】MySQL事务隔离与锁机制底层原理万字总结(建议收藏!!)
[MySQL进阶]MySQL事务隔离与锁机制底层原理万字总结(建议收藏!!) 参考资料: 美团技术团队:Innodb中事务隔离级别和锁的关系 数据库的锁,到底锁的是什么? 阿里面试:说说一致性读实现原 ...
- synchronized 锁升级过程
synchronized 锁升级过程就是其优化的核心:偏向锁 -> 轻量级锁 -> 重量级锁 class Test{private static final Object object = ...
- synchronized锁升级过程详解
32位: 64位: 无锁: 1001001110000101111101010101110 HashCode:1237514926 十进制:1237514926 二进制:0100100 1100001 ...
最新文章
- leetcode-55 跳跃游戏
- python可以使用二维元组吗_python中读入二维csv格式的表格方法详解(以元组/列表形式表示)...
- 聚类分析与相关算法(Kmeans等)详解
- 【渝粤教育】 国家开放大学2020年春季 2134成本会计 参考试题
- Swiftfox:极速的冲浪体验
- linux 多个cpu使用率,统计多台linux的CPU使用率
- eclipse 国内镜像高速下载
- 九大神招,让Python里数据分析神器Jupyter,完美升华
- __name__ == '__main__'的用法
- 真香无疑了!新iPhone抢断货,国内最受欢迎的颜色是它
- JAVA简单选择排序算法原理及实现
- 随想录(cloud 网络库)
- PMP考试-风险管理专项突破(第六版)
- 精品化游戏《热血征途》掀起网页游戏大变革
- 合批/批量渲染 (Batch)、实例化Instancing
- 数据同步工具ETL-kettle使用
- GIS小知识-由对GCJ-02的疑问引出的坐标系相关概念
- Anomalies,Factors,andMultiFactorModels
- MAC使用技巧之苹果电脑新手最容易犯的20个错误
- 推荐一款全能测试开发神器!1分钟快速上手!
热门文章
- 华为鸿蒙os尝鲜,华为鸿蒙os尝鲜版
- SwiftUI iOS 精品完成项目之宠物展示与领养App MVVM(教程含源码)
- 电脑开机后报bootsafe.sys丢失,报0x00000098状态码
- 2张图教你认识世界人口分布
- 机器学习笔记(杂) oecd_bli_2015.csv TOT
- Spring解决跨域问题方案
- 云集微店怎么做 我的第一份生意经
- access sql 取余_国家计算机二级ACCESS函数总结
- 扇形图形用html,如何用css画扇形?
- 机器自动翻译古文拼音 - 宋词 - 桂枝香 金陵怀古 王安石