运行时信号量机制 semaphore

前言

最近在看源码,发现好多地方用到了这个semaphore。

本文是在go version go1.13.15 darwin/amd64上进行的

作用是什么

下面是官方的描述

// Semaphore implementation exposed to Go.

// Intended use is provide a sleep and wakeup

// primitive that can be used in the contended case

// of other synchronization primitives.

// Thus it targets the same goal as Linux's futex,

// but it has much simpler semantics.

//

// That is, don't think of these as semaphores.

// Think of them as a way to implement sleep and wakeup

// such that every sleep is paired with a single wakeup,

// even if, due to races, the wakeup happens before the sleep.

// 具体的用法是提供 sleep 和 wakeup 原语

// 以使其能够在其它同步原语中的竞争情况下使用

// 因此这里的 semaphore 和 Linux 中的 futex 目标是一致的

// 只不过语义上更简单一些

//

// 也就是说,不要认为这些是信号量

// 把这里的东西看作 sleep 和 wakeup 实现的一种方式

// 每一个 sleep 都会和一个 wakeup 配对

// 即使在发生 race 时,wakeup 在 sleep 之前时也是如此

上面提到了和futex作用一样,关于futex

futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具

Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。

go中的semaphore作用和futex目标一样,提供sleep和wakeup原语,使其能够在其它同步原语中的竞争情况下使用。当一个goroutine需要休眠时,将其进行集中存放,当需要wakeup时,再将其取出,重新放入调度器中。

例如在读写锁的实现中,读锁和写锁之前的相互阻塞唤醒,就是通过sleep和wakeup实现,当有读锁存在的时候,新加入的写锁通过semaphore阻塞自己,当前面的读锁完成,在通过semaphore唤醒被阻塞的写锁。

写锁

// 获取互斥锁

// 阻塞等待所有读操作结束(如果有的话)

func (rw *RWMutex) Lock() {

...

// 原子的修改readerCount的值,直接将readerCount减去rwmutexMaxReaders

// 说明,有写锁进来了,这在上面的读锁中也有体现

r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders

// 当r不为0说明,当前写锁之前有读锁的存在

// 修改下readerWait,也就是当前写锁需要等待的读锁的个数

if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {

// 阻塞当前写锁

runtime_SemacquireMutex(&rw.writerSem, false, 0)

}

...

}

通过runtime_SemacquireMutex对当前写锁进行sleep

读锁释放

// 减少读操作计数,即readerCount--

// 唤醒等待写操作的协程(如果有的话)

func (rw *RWMutex) RUnlock() {

...

// 首先通过atomic的原子性使readerCount-1

// 1.若readerCount大于0, 证明当前还有读锁, 直接结束本次操作

// 2.若readerCount小于0, 证明已经没有读锁, 但是还有因为读锁被阻塞的写锁存在

if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {

// 尝试唤醒被阻塞的写锁

rw.rUnlockSlow(r)

}

...

}

func (rw *RWMutex) rUnlockSlow(r int32) {

...

// readerWait--操作,如果readerWait--操作之后的值为0,说明,写锁之前,已经没有读锁了

// 通过writerSem信号量,唤醒队列中第一个阻塞的写锁

if atomic.AddInt32(&rw.readerWait, -1) == 0 {

// 唤醒一个写锁

runtime_Semrelease(&rw.writerSem, false, 1)

}

}

写锁处理完之后,调用runtime_Semrelease来唤醒sleep的写锁

几个主要的方法

在go/src/sync/runtime.go中,定义了这几个方法

// Semacquire等待*s > 0,然后原子递减它。

// 它是一个简单的睡眠原语,用于同步

// library and不应该直接使用。

func runtime_Semacquire(s *uint32)

// SemacquireMutex类似于Semacquire,用来阻塞互斥的对象

// 如果lifo为true,waiter将会被插入到队列的头部

// skipframes是跟踪过程中要省略的帧数,从这里开始计算

// runtime_SemacquireMutex's caller.

func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

// Semrelease会自动增加*s并通知一个被Semacquire阻塞的等待的goroutine

// 它是一个简单的唤醒原语,用于同步

// library and不应该直接使用。

// 如果handoff为true, 传递信号到队列头部的waiter

// skipframes是跟踪过程中要省略的帧数,从这里开始计算

// runtime_Semrelease's caller.

func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

具体的实现是在go/src/runtime/sema.go中

//go:linkname sync_runtime_Semacquire sync.runtime_Semacquire

func sync_runtime_Semacquire(addr *uint32) {

semacquire1(addr, false, semaBlockProfile, 0)

}

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease

func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {

semrelease1(addr, handoff, skipframes)

}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex

func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {

semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)

}

如何实现

sudog 缓存

semaphore的实现使用到了sudog,我们先来看下

sudog 是运行时用来存放处于阻塞状态的goroutine的一个上层抽象,是用来实现用户态信号量的主要机制之一。 例如当一个goroutine因为等待channel的数据需要进行阻塞时,sudog会将goroutine及其用于等待数据的位置进行记录, 并进而串联成一个等待队列,或二叉平衡树。

// sudogs are allocated from a special pool. Use acquireSudog and

// releaseSudog to allocate and free them.

type sudog struct {

// 以下字段受hchan保护

g *g

// isSelect 表示 g 正在参与一个 select, so

// 因此 g.selectDone 必须以 CAS 的方式来获取wake-up race.

isSelect bool

next *sudog

prev *sudog

elem unsafe.Pointer // 数据元素(可能指向栈)

// 以下字段不会并发访问。

// 对于通道,waitlink只被g访问。

// 对于信号量,所有字段(包括上面的字段)

// 只有当持有一个semroot锁时才被访问。

acquiretime int64

releasetime int64

ticket uint32

parent *sudog //semaRoot 二叉树

waitlink *sudog // g.waiting 列表或 semaRoot

waittail *sudog // semaRoot

c *hchan // channel

}

sudog的获取和归还,遵循以下策略:

1、获取,首先从per-P缓存获取,对于per-P缓存,如果per-P缓存为空,则从全局池抓取一半,然后取出per-P缓存中的最后一个;

2、归还,归还到per-P缓存,如果per-P缓存满了,就把per-P缓存的一半归还到全局缓存中,然后归还sudog到per-P缓存中。

acquireSudog

1、如果per-P缓存的内容没达到长度的一般,则会从全局额缓存中抓取一半;

2、然后返回把per-P缓存中最后一个sudog返回,并且置空;

// go/src/runtime/proc.go

//go:nosplit

func acquireSudog() *sudog {

// Delicate dance: 信号量的实现调用acquireSudog,然后acquireSudog调用new(sudog)

// new调用malloc, malloc调用垃圾收集器,垃圾收集器在stopTheWorld调用信号量

// 通过在new(sudog)周围执行acquirem/releasem来打破循环

// acquirem/releasem在new(sudog)期间增加m.locks,防止垃圾收集器被调用。

// 获取当前 g 所在的 m

mp := acquirem()

// 获取p的指针

pp := mp.p.ptr()

if len(pp.sudogcache) == 0 {

lock(&sched.sudoglock)

// 首先,尝试从中央缓存获取一批数据。

for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {

s := sched.sudogcache

sched.sudogcache = s.next

s.next = nil

pp.sudogcache = append(pp.sudogcache, s)

}

unlock(&sched.sudoglock)

// 如果中央缓存中没有,新分配

if len(pp.sudogcache) == 0 {

pp.sudogcache = append(pp.sudogcache, new(sudog))

}

}

// 取缓存中最后一个

n := len(pp.sudogcache)

s := pp.sudogcache[n-1]

pp.sudogcache[n-1] = nil

// 将刚取出的在缓存中移除

pp.sudogcache = pp.sudogcache[:n-1]

if s.elem != nil {

throw("acquireSudog: found s.elem != nil in cache")

}

releasem(mp)

return s

}

releaseSudog

1、如果per-P缓存满了,就归还per-P缓存一般的内容到全局缓存;

2、然后将回收的sudog放到per-P缓存中。

// go/src/runtime/proc.go

//go:nosplit

func releaseSudog(s *sudog) {

if s.elem != nil {

throw("runtime: sudog with non-nil elem")

}

if s.isSelect {

throw("runtime: sudog with non-false isSelect")

}

if s.next != nil {

throw("runtime: sudog with non-nil next")

}

if s.prev != nil {

throw("runtime: sudog with non-nil prev")

}

if s.waitlink != nil {

throw("runtime: sudog with non-nil waitlink")

}

if s.c != nil {

throw("runtime: sudog with non-nil c")

}

gp := getg()

if gp.param != nil {

throw("runtime: releaseSudog with non-nil gp.param")

}

// 避免重新安排到另一个P

mp := acquirem() // avoid rescheduling to another P

pp := mp.p.ptr()

// 如果缓存满了

if len(pp.sudogcache) == cap(pp.sudogcache) {

// 将本地高速缓存的一半传输到中央高速缓存

var first, last *sudog

for len(pp.sudogcache) > cap(pp.sudogcache)/2 {

n := len(pp.sudogcache)

p := pp.sudogcache[n-1]

pp.sudogcache[n-1] = nil

pp.sudogcache = pp.sudogcache[:n-1]

if first == nil {

first = p

} else {

last.next = p

}

last = p

}

lock(&sched.sudoglock)

last.next = sched.sudogcache

sched.sudogcache = first

unlock(&sched.sudoglock)

}

// 归还sudog到`per-P`缓存中

pp.sudogcache = append(pp.sudogcache, s)

releasem(mp)

}

semaphore

// go/src/runtime/sema.go

// 用于sync.Mutex的异步信号量。

// semaRoot拥有一个具有不同地址(s.elem)的sudog平衡树。

// 每个sudog都可以依次(通过s.waitlink)指向一个列表,在相同地址上等待的其他sudog。

// 对具有相同地址的sudog内部列表进行的操作全部为O(1)。顶层semaRoot列表的扫描为O(log n),

// 其中,n是阻止goroutines的不同地址的数量,通过他们散列到给定的semaRoot。

type semaRoot struct {

lock mutex

// waiters的平衡树的根节点

treap *sudog

// waiters的数量,读取的时候无所

nwait uint32

}

// Prime to not correlate with any user patterns.

const semTabSize = 251

var semtable [semTabSize]struct {

root semaRoot

pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte

}

poll_runtime_Semacquire/sync_runtime_SemacquireMutex

// go/src/runtime/sema.go

//go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire

func poll_runtime_Semacquire(addr *uint32) {

semacquire1(addr, false, semaBlockProfile, 0)

}

//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex

func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {

semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)

}

func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {

// 判断这个goroutine,是否是m上正在运行的那个

gp := getg()

if gp != gp.m.curg {

throw("semacquire not on the G stack")

}

// *addr -= 1

if cansemacquire(addr) {

return

}

// 增加等待计数

// 再试一次 cansemacquire 如果成功则直接返回

// 将自己作为等待者入队

// 休眠

// (等待器描述符由出队信号产生出队行为)

// 获取一个sudog

s := acquireSudog()

root := semroot(addr)

t0 := int64(0)

s.releasetime = 0

s.acquiretime = 0

s.ticket = 0

if profile&semaBlockProfile != 0 && blockprofilerate > 0 {

t0 = cputicks()

s.releasetime = -1

}

if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {

if t0 == 0 {

t0 = cputicks()

}

s.acquiretime = t0

}

for {

lock(&root.lock)

// 添加我们自己到nwait来禁用semrelease中的"easy case"

atomic.Xadd(&root.nwait, 1)

// 检查cansemacquire避免错过唤醒

if cansemacquire(addr) {

atomic.Xadd(&root.nwait, -1)

unlock(&root.lock)

break

}

// 任何在 cansemacquire 之后的 semrelease 都知道我们在等待(因为设置了 nwait),因此休眠

// 队列将s添加到semaRoot中被阻止的goroutine中

root.queue(addr, s, lifo)

// 将当前goroutine置于等待状态并解锁锁。

// 通过调用goready(gp),可以使goroutine再次可运行。

goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)

if s.ticket != 0 || cansemacquire(addr) {

break

}

}

if s.releasetime > 0 {

blockevent(s.releasetime-t0, 3+skipframes)

}

// 归还sudog

releaseSudog(s)

}

func cansemacquire(addr *uint32) bool {

for {

v := atomic.Load(addr)

if v == 0 {

return false

}

if atomic.Cas(addr, v, v-1) {

return true

}

}

}

sync_runtime_Semrelease

// go/src/runtime/sema.go

//go:linkname sync_runtime_Semrelease sync.runtime_Semrelease

func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) {

semrelease1(addr, handoff, skipframes)

}

func semrelease1(addr *uint32, handoff bool, skipframes int) {

root := semroot(addr)

atomic.Xadd(addr, 1)

// Easy case:没有等待者

// 这个检查必须发生在xadd之后,以避免错过唤醒

if atomic.Load(&root.nwait) == 0 {

return

}

// Harder case: 找到等待者,并且唤醒

lock(&root.lock)

if atomic.Load(&root.nwait) == 0 {

// 该计数已被另一个goroutine占用,

// 因此无需唤醒其他goroutine。

unlock(&root.lock)

return

}

// 搜索一个等待着然后将其唤醒

s, t0 := root.dequeue(addr)

if s != nil {

atomic.Xadd(&root.nwait, -1)

}

unlock(&root.lock)

if s != nil { // 可能会很慢,因此先解锁

acquiretime := s.acquiretime

if acquiretime != 0 {

mutexevent(t0-acquiretime, 3+skipframes)

}

if s.ticket != 0 {

throw("corrupted semaphore ticket")

}

if handoff && cansemacquire(addr) {

s.ticket = 1

}

// goready(s.g, 5)

// 标记 runnable,等待被重新调度

readyWithTime(s, 5+skipframes)

}

}

摘自"同步原语"的一段总结

这一对 semacquire 和 semrelease 理解上可能不太直观。 首先,我们必须意识到这两个函数一定是在两个不同的 M(线程)上得到执行,否则不会出现并发,我们不妨设为 M1 和 M2。 当 M1 上的 G1 执行到 semacquire1 时,如果快速路径成功,则说明 G1 抢到锁,能够继续执行。但一旦失败且在慢速路径下 依然抢不到锁,则会进入 goparkunlock,将当前的 G1 放到等待队列中,进而让 M1 切换并执行其他 G。 当 M2 上的 G2 开始调用 semrelease1 时,只是单纯的将等待队列的 G1 重新放到调度队列中,而当 G1 重新被调度时(假设运气好又在 M1 上被调度),代码仍然会从 goparkunlock 之后开始执行,并再次尝试竞争信号量,如果成功,则会归还 sudog。

参考

到此这篇关于go中semaphore(信号量)源码解读的文章就介绍到这了,更多相关go中semaphore源码内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

Linux 信号量 源码,一文读懂go中semaphore(信号量)源码相关推荐

  1. 一文读懂Java中File类、字节流、字符流、转换流

    一文读懂Java中File类.字节流.字符流.转换流 第一章 递归:File类: 1.1:概述 java.io.File 类是文件和目录路径名的抽象表示,主要用于文件和目录的创建.查找和删除等操作. ...

  2. 一文读懂信息安全中的恶意代码、病毒、木马、蠕虫......

    一文读懂信息安全中的恶意代码.病毒.木马.蠕虫...... 病毒:破坏计算机功能或数据,以破坏为主,传染其他程序的方式是通过修改其他程序来把自身或其变种复制进去完成的,典型的熊猫烧香 蠕虫:通过网络的 ...

  3. 一文读懂SpringBoot中的事件机制

    一文读懂SpringBoot中的事件机制?针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法. 要"监听"事件,我们总是 ...

  4. 一文读懂机器学习中的模型偏差

    一文读懂机器学习中的模型偏差 http://blog.sina.com.cn/s/blog_cfa68e330102yz2c.html 在人工智能(AI)和机器学习(ML)领域,将预测模型参与决策过程 ...

  5. java中date类型如何赋值_一文读懂java中的Reference和引用类型

    简介 java中有值类型也有引用类型,引用类型一般是针对于java中对象来说的,今天介绍一下java中的引用类型.java为引用类型专门定义了一个类叫做Reference.Reference是跟jav ...

  6. 一文读懂密码学中的证书

    一文读懂密码学中的证书 之前的文章中,我们讲到了数字签名,数字签名的作用就是防止篡改和伪装,并且能够防止否认.但是要正确运用数字签名技术还有一个非常大的前提,那就是用来验证签名的公钥必须真正的属于发送 ...

  7. 前端面试必会 | 一文读懂 JavaScript 中的 this 关键字

    this 是一个令无数 JavaScript 编程者又爱又恨的知识点.它的重要性毋庸置疑,然而真正想掌握它却并非易事.希望本文可以帮助大家理解 this. JavaScript 中的 this Jav ...

  8. 带你一文读懂Javascript中ES6的Symbol

    带你一文读懂Javascript中ES6的Symbol 前言 基础类型 Symbol Symbol.for 与 Symbol.keyFor Symbol.iterator Symbol.search ...

  9. 一文读懂机器学习中奇异值分解SVD

    点击上方"小白学视觉",选择加"星标"或"置顶" 重磅干货,第一时间送达 目录: 矩阵分解 1.1 矩阵分解作用 1.2 矩阵分解的方法一文 ...

最新文章

  1. 国学大师文怀沙郑州演讲 称骂河南就是骂娘
  2. windows下使用webpack的完美解决方案
  3. Exp7 网络欺诈防范
  4. 23种设计模式UML图
  5. Android 性能优化工具
  6. PHP初学者头疼问题总结
  7. 微信 oauth授权2
  8. 了解Go编译处理(一)—— go tool
  9. 于NXP芯片第一次无法进入CAN中断的问题
  10. Ghost Win7删除桌面IE图标
  11. UVM-- Sequencer和driver
  12. 笔记本CPU低压和标压有什么区别?
  13. 入门数学(二)素数,质因数分解
  14. [论文笔记] Oriented R-CNN 阅读笔记
  15. Socks 正向代理 - Srelay
  16. 【干货】气体分析仪与气体检测仪的区别
  17. Go使用redigo实现简单分布式锁
  18. ajax布林德,丹尼·布林德
  19. BatBot智慧电力(运维)云平台
  20. TPLINK路由器端口数量太少不够用怎么办

热门文章

  1. 初识Docker-Docker的安装
  2. Nginx负载均衡的原理及流程分析
  3. RabbitMQ工作线程代码
  4. MyBatis创建SqlSession-有没有更好的拿到SqlSessionTemplate 的方法?
  5. Spring 的前世今生
  6. 垂直拆分后,遇到瓶颈,数据水平拆分
  7. Redis中的淘汰策略
  8. 配置Bean的作用域对象
  9. 字符串中的编码解码问题
  10. 前端整合图片上传组件