概述

我们前面讲过 操作系统的信号量,以及 golang中的Mutex原理解析,就抛出了一个问题,操作系统的信号量的管理对象是线程,而 Mutex 中使用的信号量是针对协程的,那么这就意味着golang需要重新实现一套基于协程的信号量,随着对golang源码的研究,我发现golang的 runtime 就像一个微型的操作系统,功能非常强大。

go version go1.18.3 windows/amd64

// src/runtime/sema.go// Semaphore implementation exposed to Go.
// Intended use is provide a sleep and wakeup
// primitive that can be used in the contended case
// of other synchronization primitives.
// Thus it targets the same goal as Linux's futex,
// but it has much simpler semantics.
//
// That is, don't think of these as semaphores.
// Think of them as a way to implement sleep and wakeup
// such that every sleep is paired with a single wakeup,
// even if, due to races, the wakeup happens before the sleep.
//
// See Mullender and Cox, ``Semaphores in Plan 9,''
// https://swtch.com/semaphore.pdf

具体的用法是提供 sleep 和 wakeup 原语
以使其能够在其它同步原语中的竞争情况下使用
因此这里的 semaphore 和 Linux 中的 futex 目标是一致的
只不过语义上更简单一些

也就是说,不要认为这些是信号量
把这里的东西看作 sleep 和 wakeup 实现的一种方式
每一个 sleep 都会和一个 wakeup 配对
即使在发生 race 时,wakeup 在 sleep 之前时也是如此

上面提到了和futex作用一样,关于futex

futex(快速用户区互斥的简称)是一个在Linux上实现锁定和构建高级抽象锁如信号量和POSIX互斥的基本工具。

Futex 由一块能够被多个进程共享的内存空间(一个对齐后的整型变量)组成;这个整型变量的值能够通过汇编语言调用CPU提供的原子操作指令来增加或减少,并且一个进程可以等待直到那个值变成正数。Futex 的操作几乎全部在用户空间完成;只有当操作结果不一致从而需要仲裁时,才需要进入操作系统内核空间执行。这种机制允许使用 futex 的锁定原语有非常高的执行效率:由于绝大多数的操作并不需要在多个进程之间进行仲裁,所以绝大多数操作都可以在应用程序空间执行,而不需要使用(相对高代价的)内核系统调用。

go中的semaphore作用和futex目标一样,提供sleepwakeup原语,使其能够在其它同步原语中的竞争情况下使用。当一个goroutine需要休眠时,将其进行集中存放,当需要wakeup时,再将其取出,重新放入调度器中。

主要源码

// src/sync/runtime.go// SemacquireMutex is like Semacquire, but for profiling contended Mutexes.
// If lifo is true, queue waiter at the head of wait queue.
// skipframes is the number of frames to omit during tracing, counting from
// runtime_SemacquireMutex's caller.
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)// ----------------------------------------------------------------// src/runtime/sema.gotype semaRoot struct {lock  mutextreap *sudog // root of balanced tree of unique waiters.nwait uint32 // Number of waiters. Read w/o the lock.
}// Prime to not correlate with any user patterns.
const semTabSize = 251var semtable [semTabSize]struct {root semaRootpad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}//go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex
func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) {semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes)
}func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) {// 获取当前协程gp := getg()if gp != gp.m.curg {throw("semacquire not on the G stack")}// Easy case.if cansemacquire(addr) {return}// Harder case:// increment waiter count//    try cansemacquire one more time, return if succeeded//  enqueue itself as a waiter//    sleep// (waiter descriptor is dequeued by signaler)s := acquireSudog()root := semroot(addr)t0 := int64(0)s.releasetime = 0s.acquiretime = 0s.ticket = 0if profile&semaBlockProfile != 0 && blockprofilerate > 0 {t0 = cputicks()s.releasetime = -1}if profile&semaMutexProfile != 0 && mutexprofilerate > 0 {if t0 == 0 {t0 = cputicks()}s.acquiretime = t0}for {lockWithRank(&root.lock, lockRankRoot)// Add ourselves to nwait to disable "easy case" in semrelease.atomic.Xadd(&root.nwait, 1)// Check cansemacquire to avoid missed wakeup.if cansemacquire(addr) {atomic.Xadd(&root.nwait, -1)unlock(&root.lock)break}// Any semrelease after the cansemacquire knows we're waiting// (we set nwait above), so go to sleep.root.queue(addr, s, lifo)goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)if s.ticket != 0 || cansemacquire(addr) {break}}if s.releasetime > 0 {blockevent(s.releasetime-t0, 3+skipframes)}releaseSudog(s)
}func cansemacquire(addr *uint32) bool {for {v := atomic.Load(addr)if v == 0 {return false}if atomic.Cas(addr, v, v-1) {return true}}
}

cansemacquire(),此函数通过原子操作来修改和判断信号量的值。此处加载的包是runtime/internal/atomic,对应的函数。

//go:noescape
func Cas(ptr *uint32, old, new uint32) bool// src/runtime/internal/atomic/atomic_amd64.s// bool Cas(int32 *val, int32 old, int32 new)
// Atomically:
//  if(*val == old){//      *val = new;
//      return 1;
//  } else
//      return 0;
TEXT ·Cas(SB),NOSPLIT,$0-17MOVQ ptr+0(FP), BXMOVL  old+8(FP), AXMOVL  new+12(FP), CXLOCKCMPXCHGL CX, 0(BX)SETEQ  ret+16(FP)RET

addr 为 uint32 类型,那么atomic.Cas(addr, v, v-1)最低只能将其值修改到0,如果v-1 < 0,那么就会返回 false,并放弃修改,这就实现了比较和修改的原子化。

golang中的信号量没有做初始化,默认值是0,那么在阅读函数cansemacquire()的时候肯定会有疑惑。实际上,在充分理解了 Mutex 和 RWMutex 源码之后才会知道,golang中不对 sema 做初始化,它们的使用规范是先释放信号量,再获取信号量,如果还不理解可以看看golang中的Mutex原理解析。

这里的Easy case 和 Harder case就是Fast path 和 slow path,golang源码中对于循环代码块都喜欢这个干。

skipframe 参数是用作trace跟踪性能分析用的,包括releasetimeacquiretime

数据结构

看到这里要先停下来搞清楚semtable, semaRoot, sudug的关系。

addr 为一个信号量的地址,在一个程序中可能存在多个信号量,那么这些 addr 会被放入 semtable 数组中,采用取模的方式,semtable 长度为251,在声明的时候就做了初始化,每一个元素中包含一个 semaRoot,而 semaRoot 中包含一个平衡二叉数结构,用来存储着竞争信号量的协程 sudug。

func semroot(addr *uint32) *semaRoot {return &semtable[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}// sudog represents a g in a wait list, such as for sending/receiving
// on a channel.
//
// sudog is necessary because the g ↔ synchronization object relation
// is many-to-many. A g can be on many wait lists, so there may be
// many sudogs for one g; and many gs may be waiting on the same
// synchronization object, so there may be many sudogs for one object.
//
// sudogs are allocated from a special pool. Use acquireSudog and
// releaseSudog to allocate and free them.
type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on// this for sudogs involved in channel ops.g *gnext *sudogprev *sudogelem unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// For channels, waitlink is only accessed by g.// For semaphores, all fields (including the ones above)// are only accessed when holding a semaRoot lock.acquiretime int64releasetime int64ticket      uint32// isSelect indicates g is participating in a select, so// g.selectDone must be CAS'd to win the wake-up race.isSelect bool// success indicates whether communication over channel c// succeeded. It is true if the goroutine was awoken because a// value was delivered over channel c, and false if awoken// because c was closed.success boolparent   *sudog // semaRoot binary treewaitlink *sudog // g.waiting list or semaRootwaittail *sudog // semaRootc        *hchan // channel
}

取模的过程肯定会存在冲突,类似于哈希冲突,因此不同的 addr 可能会被定位到同一个 semaRoot,那么在操作 semaRoot 的时候依然还需要带上 addr 参数,并将 addr 参数填充到 sudug 的 elem 字段,比如root.queue(addr) 和 root.dequeue(addr)操作。

lifo为后进先出模式,fifo为先进先出。

sudug 的结构比较丰富,即可以通过它来构造一个平衡二叉树(parent, prev, next),又可以构造一个单向链表(waitlink, waittail),并且可以同时存在。二叉树的查找是为了满足多个 addr 通过取模后落到了同一个位置,提高查询效率,二叉树的每一个节点都意味着不同的 addr,所以相同的 addr 进来之后发现在二叉树上存在这个 addr 的节点,那么就会作为单向链表节点挂在这个节点下面。

sudug 的waittail都指向链表的最后一个元素。

关于sleep和wakeup协程

与线程的挂起和唤醒原理类似,在前面成功的将协程加入到 semaRoot 之后,只需要将协程的状态设置为 Gwaiting 就可以实现挂起,而唤醒的过程是将其移出 semaRoot ,修改状态,加入到就绪队列。

// src/runtime/sema.go
func readyWithTime(s *sudog, traceskip int) {if s.releasetime != 0 {s.releasetime = cputicks()}goready(s.g, traceskip)
}// src/runtime/proc.go// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) {gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}func goready(gp *g, traceskip int) {systemstack(func() {ready(gp, traceskip, true)})
}
// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
func Gosched() {checkTimeouts()mcall(gosched_m)
}// goyield is like Gosched, but it:
// - emits a GoPreempt trace event instead of a GoSched trace event
// - puts the current G on the runq of the current P instead of the globrunq
func goyield() {checkTimeouts()mcall(goyield_m)
}

readyWithTime() 把 sudog 对应的 g 唤醒,并且放到P本地队列的下一个执行位置。

goyield()是调度控制,让出执行权,并放到P本地队列的的队尾,并不会挂起。

runtime.Gosched()是调度控制,让出执行权,并放到全局队列的的队尾,并不会挂起。

关于lock

在对二叉树做操作的时候肯定是要加锁的,显然这个锁是要加在 semaRoot 上的,而采用 semtable 分散化在一定程度上可以降低锁的粒度。

golang通过 sema 来实现 sync.Mutex,然后在实现 sema 的时候又用了 mutex,那么这里的 mutex 是什么呢?

相关函数

// Mutual exclusion locks.  In the uncontended case,
// as fast as spin locks (just a few user-level instructions),
// but on the contention path they sleep in the kernel.
// A zeroed Mutex is unlocked (no need to initialize each lock).
// Initialization is helpful for static lock ranking, but not required.
type mutex struct {// Empty struct if lock ranking is disabled, otherwise includes the lock ranklockRankStruct// Futex-based impl treats it as uint32 key,// while sema-based impl as M* waitm.// Used to be a union, but unions break precise GC.key uintptr
}goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes)
lockWithRank(&root.lock, lockRankRoot)
unlock(&root.lock)
// src/runtime/lock_sema.gofunc lock2(l *mutex) {gp := getg()if gp.m.locks < 0 {throw("runtime·lock: lock count")}gp.m.locks++// Speculative grab for lock.if atomic.Casuintptr(&l.key, 0, locked) {return}semacreate(gp.m)// On uniprocessor's, no point spinning.// On multiprocessors, spin for ACTIVE_SPIN attempts.spin := 0if ncpu > 1 {spin = active_spin}
Loop:for i := 0; ; i++ {v := atomic.Loaduintptr(&l.key)if v&locked == 0 {// Unlocked. Try to lock.if atomic.Casuintptr(&l.key, v, v|locked) {return}i = 0}if i < spin {procyield(active_spin_cnt)} else if i < spin+passive_spin {osyield()} else {// Someone else has it.// l->waitm points to a linked list of M's waiting// for this lock, chained through m->nextwaitm.// Queue this M.for {gp.m.nextwaitm = muintptr(v &^ locked)if atomic.Casuintptr(&l.key, v, uintptr(unsafe.Pointer(gp.m))|locked) {break}v = atomic.Loaduintptr(&l.key)if v&locked == 0 {continue Loop}}if v&locked != 0 {// Queued. Wait.semasleep(-1)i = 0}}}
}//go:nowritebarrier
// We might not be holding a p in this code.
func unlock2(l *mutex) {gp := getg()var mp *mfor {v := atomic.Loaduintptr(&l.key)if v == locked {if atomic.Casuintptr(&l.key, locked, 0) {break}} else {// Other M's are waiting for the lock.// Dequeue an M.mp = muintptr(v &^ locked).ptr()if atomic.Casuintptr(&l.key, v, uintptr(mp.nextwaitm)) {// Dequeued an M.  Wake it.semawakeup(mp)break}}}gp.m.locks--if gp.m.locks < 0 {throw("runtime·unlock: lock count")}if gp.m.locks == 0 && gp.preempt { // restore the preemption request in case we've cleared it in newstackgp.stackguard0 = stackPreempt}
}

golang中能同时并行执行的G的个数其实就是逻辑CPU的个数,也就是GMP模型中的M个数,此时每一个M上正在运行一个G,而这些G同时都在抢 mutex 来操作二叉树,通过源码可以大致判断出,此处是直接对M加的锁,通过atomic.Casuintptr(&l.key, 0, 1)来限制只能是第一个G能操作成功,从而能获得锁,其他的G则要继续往下走,先是自旋一定次数获取锁,还是不行的话就调用操作系统的信号量来对线程M进行阻塞,自然G也就没法执行了,要知道,这个锁只发生在对二叉树的操作前后,时间很短,当然如果要抢锁的G过多肯定会造成M被锁的时间变长。

golang中的信号量的实现原理相关推荐

  1. Golang中panic与recover的实现原理

    今天我们讲讲golang中panic异常,以及recover对异常的捕获,由于panic.recover.defer之间非常亲密,所以今天就放在一起讲解,这里会涉及到一些defer的知识,有兴趣可以看 ...

  2. Golang 中的 Goroutine 调度原理与 Chanel 通信

    简介   在 Go 中,每一个并发的活动称为一个 Goroutine 或者 协程.当一个程序启动时,只有一个 Goroutine 来调用 main 函数,称之为 主Goroutine.新的 Gorou ...

  3. Golang中select的实现原理

    前言 select是Golang在语言层面提供的多路IO复用的机制.与switch语句稍微有点相似,也会有case和最后的default选择支.每一个case代表一个通信操作(在某个channel上进 ...

  4. Golang中的自动伸缩和自防御设计

    Raygun服务由许多活动组件构成,每个组件用于特定的任务.其中一个模块是用Golang编写的,负责对iOS崩溃报告进行处理.简而言之,它接受本机iOS崩溃报告,查找相关的dSYM文件,并生成开发者可 ...

  5. go语言os.exit(1)_在Golang中各种永远阻塞的姿势

    在Golang中各种永远阻塞的姿势 Go的运行时的当前设计,假定程序员自己负责检测何时终止一个goroutine以及何时终止该程序. 可以通过调用os.Exit或从main()函数的返回来以正常方式终 ...

  6. golang 函数传多个参数_关于Golang中方法参数的传递

    结构体声明 为了说明函数以及方法调用的过程,这里先定义一个struct,在下面的描述中会使用到它. type Person struct { Name string Age uint16 } 普通函数 ...

  7. Golang中各种永远阻塞的方法

    在Golang中各种永远阻塞的姿势 Go的运行时的当前设计,假定程序员自己负责检测何时终止一个goroutine以及何时终止该程序. 可以通过调用os.Exit或从main()函数的返回来以正常方式终 ...

  8. C#笔记20:多线程之线程同步中的信号量AutoResetEvent和ManualResetEvent

    C#笔记20:多线程之线程同步中的信号量AutoResetEvent和ManualResetEvent 本章概要: 1:终止状态和非终止状态 2:AutoResetEvent和ManualResetE ...

  9. golang中的包管理工具——govendor和godep简单学习

    为什么用vendor目录 依赖问题 我们知道,一个工程稍大一点,通常会依赖各种各样的包.而Go使用统一的GOPATH管理依赖包,且每个包仅保留一个版本.而不同的依赖包由各自的版本工具独立管理,所以当所 ...

最新文章

  1. jquery源码解析:jQuery数据缓存机制详解2
  2. 十一、深入JavaScript的定时器(七)
  3. 继Science发文后,Nature也发文评论曹雪涛“误用图片”调查结果
  4. python下载之后无法启动_安装后启动时,适用于Python的Eric IDE崩溃
  5. Python-进程相关概念
  6. pcl中ransac提取直线_多目标跟踪中的相机运动模型
  7. linux通过操作界面和命令行的方式查看ip地址、mac地址
  8. 数据库索引设计与优化pdf
  9. 公众号开发 单独 给某个用户 推送消息_校区学生会微信公众平台征稿启事
  10. android 滤镜录制,Android Camera 实时滤镜
  11. 深信服技术认证之Openstack云平台使用入门
  12. MySQL建库建表语句
  13. MySql的基石——索引
  14. linux命令中cd / 和 cd ~ 是什么意思?
  15. 身份证ocr识别开源方案_多因素身份验证的开源替代方案:privacyIDEA
  16. 让div占据父元素剩下的所有位置
  17. 移动开发者大会 -- 后感
  18. C语言将信息保存到文件中
  19. html文件bak,轻松解包MIUI小米备份bak文件 还原出明文数据
  20. 教你如何轻松搞定云上打印管理

热门文章

  1. Oracle 12安装教程
  2. 2022-2028全球与中国修枝剪刀片市场现状及未来发展趋势
  3. BCC-funccount
  4. kali2021更新源(最全的更新源)
  5. 如何从正确的原理图生成PCB图
  6. 易课寄在线购课系统开发笔记(二十九)--完成用户登录功能
  7. 关于CPU科普,这篇说得最详细
  8. 自制计算机考试系统,出试卷的软件
  9. 【ShoppingPeeker】-基于Webkit内核的爬虫蜘蛛引擎 ShoppingWebCrawler的姊妹篇-可视化任务Web管理...
  10. HTML5期末大作业:茶叶文化网站设计——茶叶(10页) HTML+CSS+JavaScript 文化主题 dw茶叶网页设计 web前端大作业 web课程设计网页规划与设计 dw学生网页设计