Mutex锁分为normal模式和starvation模式。一开始默认处于normal模式。在normal模式中,每个新加入竞争锁行列的协程都会直接参与到锁的竞争当中来,而处于starvation模式时,所有所有新进入的协程都会直接被放入等待队列中挂起,直到其所在队列之前的协程全部执行完毕。

在normal模式中协程的挂起等待时间如果大于某个值,就会进入starvation模式。

type Mutex struct {state int32sema  uint32
}

其中,state用来保存mutex的状态量,低一位表示是否上锁,低二位表示当前锁对象是否被唤醒,低三位表示该锁是否处于staration状态,而后几位表示当前正被该锁阻塞的协程数。而sema则是作为信号量来作为阻塞的依据。

Lock()方法进行加锁。

func (m *Mutex) Lock() {// Fast path: grab unlocked mutex.if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {if race.Enabled {race.Acquire(unsafe.Pointer(m))}return}var waitStartTime int64starving := falseawoke := falseiter := 0old := m.statefor {// Don't spin in starvation mode, ownership is handed off to waiters// so we won't be able to acquire the mutex anyway.if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// Active spinning makes sense.// Try to set mutexWoken flag to inform Unlock// to not wake other blocked goroutines.if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}runtime_doSpin()iter++old = m.statecontinue}new := old// Don't try to acquire starving mutex, new arriving goroutines must queue.if old&mutexStarving == 0 {new |= mutexLocked}if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}// The current goroutine switches mutex to starvation mode.// But if the mutex is currently unlocked, don't do the switch.// Unlock expects that starving mutex has waiters, which will not// be true in this case.if starving && old&mutexLocked != 0 {new |= mutexStarving}if awoke {// The goroutine has been woken from sleep,// so we need to reset the flag in either case.if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}new &^= mutexWoken}if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// If we were already waiting before, queue at the front of the queue.queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}runtime_SemacquireMutex(&m.sema, queueLifo)starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.stateif old&mutexStarving != 0 {// If this goroutine was woken and mutex is in starvation mode,// ownership was handed off to us but mutex is in somewhat// inconsistent state: mutexLocked is not set and we are still// accounted as waiter. Fix that.if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {// Exit starvation mode.// Critical to do it here and consider wait time.// Starvation mode is so inefficient, that two goroutines// can go lock-step infinitely once they switch mutex// to starvation mode.delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {old = m.state}}if race.Enabled {race.Acquire(unsafe.Pointer(m))}
}

一开始会直接通过cas将原本值为0(也就是当前没任何协程占用锁)的state赋为1,表示这个锁已经有人加锁。如果成功,表示这是当前锁第一次加锁并且加锁成功,那么可以直接返回。

如果之前加锁失败,也就是刚刚的cas操作失败,那么说明就需要等待锁的释放,首先判断是否已经加锁并处于normal模式,将原先锁的state与1和4相或的结果相与,如果与1相等,则说明此时处于normal模式并且已经加锁,而后判断当前协程是否可以自旋。如果可以自旋,则通过右移三位判断是否还有协程正在等待这个锁,如果有,并通过低2位判断是否该所处于被唤醒状态,如果并没有,则将其状态量设为被唤醒的状态,之后进行自旋,直到该协程自旋数量达到上限,或者当前锁被解锁,或者当前锁已经处于starvation模式。

if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// Active spinning makes sense.// Try to set mutexWoken flag to inform Unlock// to not wake other blocked goroutines.if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {awoke = true}runtime_doSpin()iter++old = m.statecontinue
}

在超过自旋数量上限或者当前锁已经解锁或者当前锁已经处于starvation模式,那么就在循环中进入下面的部分。

new := old
// Don't try to acquire starving mutex, new arriving goroutines must queue.
if old&mutexStarving == 0 {new |= mutexLocked
}
if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift
}
// The current goroutine switches mutex to starvation mode.
// But if the mutex is currently unlocked, don't do the switch.
// Unlock expects that starving mutex has waiters, which will not
// be true in this case.
if starving && old&mutexLocked != 0 {new |= mutexStarving
}
if awoke {// The goroutine has been woken from sleep,// so we need to reset the flag in either case.if new&mutexWoken == 0 {throw("sync: inconsistent mutex state")}new &^= mutexWoken
}
if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {break // locked the mutex with CAS}// If we were already waiting before, queue at the front of the queue.queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}runtime_SemacquireMutex(&m.sema, queueLifo)starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.stateif old&mutexStarving != 0 {// If this goroutine was woken and mutex is in starvation mode,// ownership was handed off to us but mutex is in somewhat// inconsistent state: mutexLocked is not set and we are still// accounted as waiter. Fix that.if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {// Exit starvation mode.// Critical to do it here and consider wait time.// Starvation mode is so inefficient, that two goroutines// can go lock-step infinitely once they switch mutex// to starvation mode.delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0
} else {old = m.state
}

首先,如果此时还是由于别的协程的占用无法获得锁或者处于starvation模式,都在其state加8表示有新的协程正在处于等待状态。并且如果之前由于自旋而将该锁唤醒,那么此时将其低二位的状态量赋值为0。之后判断starving是否为true,如果为true说明在上一次的循环中,锁需要被定义为starvation模式,那么在这里就将相应的状态量低三位设置为1表示进入starvation模式。

之后尝试通过cas将新的state状态量赋值给state,如果失败,则重新获得其 state在下一步循环重新重复上述的操作。如果成功,首先判断已经阻塞时间,如果为零,则从现在开始记录。

之后通过runtime_SemacquireMutex()通过信号量将当前协程阻塞。

上述runtime_SemacquireMutex()方法的具体实现在了sema.go中。

func sync_runtime_SemacquireMutex(addr *uint32, lifo bool) {semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile)
}func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags) {gp := getg()if gp != gp.m.curg {throw("semacquire not on the G stack")}// Easy case.if cansemacquire(addr) {return}// Harder case:// increment waiter count// try cansemacquire one more time, return if succeeded// enqueue itself as a waiter// sleep// (waiter descriptor is dequeued by signaler)s := acquireSudog()root := semroot(addr)t0 := int64(0)s.releasetime = 0s.acquiretime = 0s.ticket = 0if profile&semaBlockProfile != 0 && blockprofilerate > 0 {t0 = cputicks()s.releasetime = -1}if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {if t0 == 0 {t0 = cputicks()}s.acquiretime = t0}for {lock(&root.lock)// Add ourselves to nwait to disable "easy case" in semrelease.atomic.Xadd(&root.nwait, 1)// Check cansemacquire to avoid missed wakeup.if cansemacquire(addr) {atomic.Xadd(&root.nwait, -1)unlock(&root.lock)break}// Any semrelease after the cansemacquire knows we're waiting// (we set nwait above), so go to sleep.root.queue(addr, s, lifo)goparkunlock(&root.lock, "semacquire", traceEvGoBlockSync, 4)if s.ticket != 0 || cansemacquire(addr) {break}}if s.releasetime > 0 {blockevent(s.releasetime-t0, 3)}releaseSudog(s)
}

首先,在上述的方法中,首先通过semroot()方法根据传入的地址获得semRoot,其具体操作如下。

func semroot(addr *uint32) *semaRoot {return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

将传入的Mutex的信号量sema的地址右移三位并与251取余,得到的新地址来得到semRoot,做到将semRoot通过信号量sema来与相应的Mutex绑定的目的。

semRoot的结构如下。

type semaRoot struct {lock  mutextreap *sudog // root of balanced tree of unique waiters.nwait uint32 // Number of waiters. Read w/o the lock.
}

其中的mutex与之前的Mutex无关,只是一个简单的uintptr来简单的实现并发的线程安全的功能。Treap则是其中平衡二叉树的根节点,nwait则表示证在平衡二叉树阻塞的协程数量。

此时,会对信号量sema的值进行判断,如果为0,则继续,否则尝试减1并返回。

而后通过semRoot中的mutex进行加锁,这里的锁实现很简单,简单来说实则只是对互斥信号量的cas操作。

之后给semRoot的nwait加一,表示新的协程进入等待。

之后通过queue()方法正式将目标协程放入平衡二叉树中等待。

对于这个节点,首先设置该节点中保存的协程为当前协程,并保存当前信号量地址。

首先,如果是第一次根据新的信号量而要加入的节点,那么会直接加入到平衡二叉树中,这颗二叉树中节点的位置通过信号量的地址作为排序的依据,然后插入。

s.ticket = fastrand() | 1
s.parent = last
*pt = s// Rotate up into tree according to ticket (priority).
for s.parent != nil && s.parent.ticket > s.ticket {if s.parent.prev == s {root.rotateRight(s.parent)} else {if s.parent.next != s {panic("semaRoot queue")}root.rotateLeft(s.parent)}
}

如果不是第一次的插入,那么首先根据信号量的地址从平衡二叉树根节点开始寻找对应的信号量地址所绑定的节点,通过大小确定寻找的左儿子节点或者右儿子节点,直到找到。

找到之后,之前在将协程准备阻塞之前会判断以等待时间,如果不为0,说明该协程已经进入过该平衡二叉树。那么将新生成的节点取代原本节点在平衡二叉树的位置,并将老节点放置在该信号量绑定节点的等待队列的头部。如果是第一次,那么只需要将新的节点放在等待队列的末尾。

var last *sudog
pt := &root.treap
for t := *pt; t != nil; t = *pt {if t.elem == unsafe.Pointer(addr) {// Already have addr in list.if lifo {// Substitute s in t's place in treap.*pt = ss.ticket = t.tickets.acquiretime = t.acquiretimes.parent = t.parents.prev = t.prevs.next = t.nextif s.prev != nil {s.prev.parent = s}if s.next != nil {s.next.parent = s}// Add t first in s's wait list.s.waitlink = ts.waittail = t.waittailif s.waittail == nil {s.waittail = t}t.parent = nilt.prev = nilt.next = nilt.waittail = nil} else {// Add s to end of t's wait list.if t.waittail == nil {t.waitlink = s} else {t.waittail.waitlink = s}t.waittail = ss.waitlink = nil}return}last = tif uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) {pt = &t.prev} else {pt = &t.next}
}

将当次阻塞加入平衡二叉树中队列之后,就可以先将semRoot中的mutex解锁,并将当前协程挂起。

回到Mutex的Lock()中,当之前调用方法将协程挂起后,如果协程被唤醒,那么就会继续下面的流程。

starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
old = m.state
if old&mutexStarving != 0 {// If this goroutine was woken and mutex is in starvation mode,// ownership was handed off to us but mutex is in somewhat// inconsistent state: mutexLocked is not set and we are still// accounted as waiter. Fix that.if old&(mutexLocked|mutexWoken) != 0 || old>>mutexWaiterShift == 0 {throw("sync: inconsistent mutex state")}delta := int32(mutexLocked - 1<<mutexWaiterShift)if !starving || old>>mutexWaiterShift == 1 {// Exit starvation mode.// Critical to do it here and consider wait time.// Starvation mode is so inefficient, that two goroutines// can go lock-step infinitely once they switch mutex// to starvation mode.delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break
}

如果这里协程阻塞而挂起的时间超过了默认值,那么就会将starve设置为true,就会在下一次的循环中将该锁这是为starvation模式。如果已经是这个模式,那么就会将状态量的等待数减1,并判断当前如果已经没有等待的协程,就没有必要继续维持starvation模式,同时也没必要继续执行该循环(当前只有一个协程在占用锁)。

解锁通过Unlock()方法。

func (m *Mutex) Unlock() {if race.Enabled {_ = m.staterace.Release(unsafe.Pointer(m))}// Fast path: drop lock bit.new := atomic.AddInt32(&m.state, -mutexLocked)if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")}if new&mutexStarving == 0 {old := newfor {// If there are no waiters or a goroutine has already// been woken or grabbed the lock, no need to wake anyone.// In starvation mode ownership is directly handed off from unlocking// goroutine to the next waiter. We are not part of this chain,// since we did not observe mutexStarving when we unlocked the mutex above.// So get off the way.if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}// Grab the right to wake someone.new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false)return}old = m.state}} else {// Starving mode: handoff mutex ownership to the next waiter.// Note: mutexLocked is not set, the waiter will set it after wakeup.// But mutex is still considered locked if mutexStarving is set,// so new coming goroutines won't acquire it.runtime_Semrelease(&m.sema, true)}
}

解锁首先直接将第一位状态量变为0,表示已经解锁。然后根据模式,如果处于normal模式,根据状态量当前是否有协程等待,或者已经有协程已经在自旋等待锁,那么就可以直接结束。否则,就通过runtime_Semrelease()方法尝试唤醒挂起的协程。在runtime_Semrelease()中与之前对应,通过dequeue()方法将寻找到的二叉树节点,也就是循环队列的头部取出,节点中保存的协程作为要唤醒的协程。但是,这里唤醒的携程并不一定会立即获取锁,锁的获取仍旧需要竞争。

而如果处于starvation模式,那么会直接通过runtime_Semrelease()方法尝试唤醒挂起的协程,这里唤醒的协程必定持有锁。

golang 1.10 mutex互斥锁源码相关推荐

  1. golang RWMutex读写互斥锁源码分析

    针对Golang 1.9的sync.RWMutex进行分析,与Golang 1.10基本一样除了将panic改为了throw之外其他的都一样. RWMutex是读写互斥锁.锁可以由任意数量的读取器或单 ...

  2. Linux线程同步(三)---互斥锁源码分析

    先给自己打个广告,本人的微信公众号:嵌入式Linux江湖,主要关注嵌入式软件开发,股票基金定投,足球等等,希望大家多多关注,有问题可以直接留言给我,一定尽心尽力回答大家的问题. 一 源码分析 1.li ...

  3. Mutex:互斥锁源码解读

    Mutex count++操作问题 count++是非原子操作,所以有并发问题 什么是原子操作 所谓原子操作是指不会被线程调度机制打断的操作:这种操作一旦开始,就一直运行到结束,中间不会有任何 con ...

  4. golang sync.Mutex 互斥锁 使用实例

    实例: var mutex sync.Mutex //互斥锁 func printer(str string){mutex.Lock() //加锁defer mutex.Unlock() //解锁fo ...

  5. 从零单排之golang:mutex使用及源码详解

    mutex(互斥锁)详解:互斥锁是一个值类型,实现了locker接口,所以使用的时候需要注意参数的传递,它的底层嵌套了linux的信号量(Semaphore),每次操作其实就是PV操作 type Mu ...

  6. mysql5.1编译安装centos7_CentOS7下 Nginx1.13.5 + PHP7.1.10 + MySQL5.7.19 源码编译安装

    在CentOS7下 Nginx1.13.5 + PHP7.1.10 + MySQL5.7.19 源码编译安装过程记录. 一.安装Nginx 1.安装依赖扩展 # yum -y install wget ...

  7. Linux系统编程:使用mutex互斥锁和条件变量实现多个生成者和消费者模型

    实现代码 如题,使用mutex互斥锁和条件变量实现多个生成者和消费者模型. 直接上代码,需要线程中的互斥锁和条件变量的相关知识进行支撑.这里就不细说了呀,代码中有一定的注释. #include < ...

  8. 最新全自动更新采集影视带10套模板PHP源码+功能多

    正文: 全自动更新采集影视带10套模板PHP源码,一个全自动采集影视的网站程序,里面包含了十种影视模板. 支持放到二级目录.里面有教程,怎么修改接口啥的,具体自己下载研究吧. 程序: wwhegu.l ...

  9. Go (Golang) 工具之copyright 添加 | go源码添加授权头

    文章目录 Go (Golang) 工具之copyright 添加 | go源码添加版权 什么是addlicense addlicense安装和使用 Go (Golang) 工具之copyright 添 ...

最新文章

  1. ValueError: do_handshake_on_connect should not be specified for non-blocking sockets
  2. 网络编程项目(聊天室项目)
  3. ClassPathResource详解
  4. 按钮打开Activity
  5. 【AI识人】OpenPose:实时多人2D姿态估计 | 附视频测试及源码链接
  6. Python功能使用学习笔记(4)--链接数据库
  7. 看云|专注于文档在线创作、协作、分享和托管
  8. CM,AOKP系统没有Logcat 解决办法
  9. [CC2642r1] ble5 stacks 蓝牙协议栈 介绍和理解 TI协议栈下载
  10. 自问自答学ArrayList,看这篇就够了,详解问答
  11. ensp使用web登录防火墙
  12. C语言实现shell
  13. TESTTESTTESTTESTTESTTEST
  14. php ajax 实现三级省市区联动
  15. Android studio占用C盘资源的解决方法
  16. 打开Spring-Ioc的新世界大门
  17. 查询快递物流筛选出被拦截单号标色记号
  18. PADS PCB如何设计邮票孔
  19. The 15th Chinese Northeast Collegiate H - Loneliness(思维,构造)
  20. python 三方库字典

热门文章

  1. 诗与远方:无题(九十)
  2. SpringBoot实战:整合Redis、mybatis,封装RedisUtils工具类等(附源码)
  3. Docker小结(五)
  4. node php环境变量配置,关于NodeJS、NPM安装配置步骤(windows版本) 以及环境变量的介绍...
  5. cmd删除txt部分文字_Python识别图片中的文字
  6. redis相关(搭建和数据落盘)
  7. 第一周冲刺第二天博客
  8. WEB站点服务器安全配置
  9. 诺微联盟催生智能手机的三足鼎立
  10. 101 LINQ Samples