文章目录

  • 知识点
    • 并发
    • 并行
    • 进程
    • 线程
    • 协程
    • 通信模型
      • CSP communicating sequential process
      • share memory
    • 线程模型
      • 1. 用户级线程模型M:1
      • 2. 内核级线程模型1:1
      • 3. 两级线程模型M:N
  • 调度 GPM
    • 调度策略
    • 源码分析
    • 触发调度
      • 线程启动
      • 协程执行结束
      • 主动挂起
      • 系统调用
      • 协作式调度
      • 系统监控
  • 协程 goroutine
    • 对比
    • 状态转移
    • 源码分析
    • 使用
  • 通道 channel
    • 源码分析
    • 使用
  • 同步 sync
    • 互斥锁 mutex
      • 锁模式
      • 锁状态
      • 上锁
      • 解锁
    • 读写锁 RWMutex

知识点

并发

多线程程序在一个cpu上调度交替时间运行

并行

多线程程序在多个cpu上同段时间运行

进程

程序执行过程,系统分配资源和调度的基本单位

线程

cpu运算调度的最小单位(KSE,Kernal Scheduling Entity),进程至少有一个主线程

协程

主线程开启的逻辑上轻量级线程,独立栈空间,共享堆空间,调度由用户空间控制

通信模型

CSP communicating sequential process

顺序通信进程模型,使用通道收发减少协程间共享内存带来的锁开销。

share memory

共享内存进行线程间通信,需要利用内存屏障(锁)保证内存的多线程共享内存可见一致性。

线程模型

1. 用户级线程模型M:1

M个用户线程映射到1个内核线程(KSE),用户空间上进行线程管理调度

优点:
1. 用户线程切换上下文比内核线程模态切换性能更高
缺点:
1. 内核线程被某个用户线程阻塞后(如IO)会阻塞整个程序
2. 本质单内核线程,需要多进程才能充分使用多核CPU资源

2. 内核级线程模型1:1

用户线程1:1映射到内核线程(KSE),内核调度器进行管理调度

优点:
1. 多线程充分使用多核CPU资源,不会因单个用户线程阻塞影响程序
缺点:
1. 每个用户线程都需要创建一个内核线程,创建线程的开销大,性能低。
2. 多进程使用多核CPU开销大。

3. 两级线程模型M:N

M个用户线程映射到N个内核线程(KSE),用户空间上进行用户线程管理调度,内核空间由内核调度内核线程。

优点:
1. 大部分调度切换发生在用户空间,开销小性能高
2. 充分利用多核cpu,没有线程阻塞影响整个程序风险
缺点:
1. 依赖用户空间调度器优化

调度 GPM

使用两级线程模型改进

  1. M(Main Thread): go封装的虚拟用户线程,对应1个内核线程(KSE)

    1. 启动程序后的编号为0的主线程M0。

      • 在全局变量runtime.m0,不需要在heap分配
      • 负责执行初始化调度器,启动第一个G,之后与其他M一样
    2. M绑定有效P,M会启动一个内核线程KSE进入调度循环,处理P管理的G。
    3. LRQ空,(work stealing机制),M窃取G,详见Shedule。
    4. G阻塞,当前M释放P进入睡眠,(hand off机制),P没有空闲M可用会创建M,新M接管P。
    5. 没有的P需要空闲M则回收M。
    6. M默认上限10000,debug.SetMaxThreads()设置,实际由P决定。
  2. P(Processor):go封装的"CPU",P的数量即为协程最大并行数量

    1. P最大数量由GOMAXPROCS配置,默认为CPU核心数,启动程序后初始化所有P。
    2. 处理器
      1. G执行环境(Context)
      2. 内存分配(mcahe)
      3. 本地队列(LRQ) 最大256
    3. LRQ满则放至G的全局队列(GRQ)
  3. G(Goroutine):go封装的调度单元

    1. go func 关键字创建G
    2. 协程结构体
      1. 运行堆栈
      2. 状态
      3. 函数及参数
    3. G绑定P才能被调度执行
    4. G0是每一个启动的M第一个创建的G
      1. G0深度参与调度G

      2. 调度或者系统调用时M切换G0,使用G0的栈空间调度

  4. Sched:go封装的调度器

    1. 调度器

      1. G的全局队列(GRQ)
      2. M的全局队列
      3. 调度器状态

调度策略

  1. 复用线程

    1. 作用: 避免频繁的创建、销毁线程,而是对线程的复用,减小开销。
    2. work stealing机制
      当M无可运行的G时,尝试从其他M绑定的P偷取G,而不是销毁线程。
    3. hand off机制
      当M因为G进行系统调用阻塞时,线程释放绑定的P,把P转移给其他空闲的M执行。
  2. 利用并行

    1. 作用: 提高运行效率
    2. P = GOMAXPROCS <= CPU 核 M>=P。
  3. 抢占

    1. 作用: 保证公平,G超时,防止其他G饿死
    2. G最多占用CPU 10ms,切换g0进入调度循环。
    • _Prunning下的g,多半是死循环导致,会放置到全局队列
    • _Psyscall下的g,多半是阻塞性系统调用引起的
  4. 全局G队列

    1. 作用: 本地G队列相当于全局G队列的缓存,用于放置无法消化的G
    2. work stealing机制/抢占均使用到

源码分析

G

详见goroutine

M

// runtime2.go/runtime.m
type m struct {// G相关g0   *g   //持有调度栈的goroutine(深度参与M调度)  curg *g   //当前M运行的用户goroutine// P相关p             puintptr   //当前M正在运行的处理器nextp         puintptr   //暂存的处理器oldp          puintptr   //执行系统调用之前处理器// M相关spinning      bool       // 自旋状态// 信号处理gsignal       *g   // signal-handling g//其他字段...
}// ------- M 初始化 ----------
// Pre-allocated ID may be passed as 'id', or omitted by passing -1.
func mcommoninit(mp *m, id int64) {_g_ := getg()// g0 stack won't make sense for user (and is not necessary unwindable).if _g_ != _g_.m.g0 {callers(1, mp.createstack[:])}lock(&sched.lock)if id >= 0 {mp.id = id} else {mp.id = mReserveID()}mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))if mp.fastrand[0]|mp.fastrand[1] == 0 {mp.fastrand[1] = 1}mpreinit(mp)if mp.gsignal != nil {mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard}// 添加到 allm 中,从而当它刚保存到寄存器或本地线程存储时候 GC 不会释放 g.mmp.alllink = allm// NumCgoCall() 会在没有使用 schedlock 时遍历 allm,等价于 allm = mpatomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))unlock(&sched.lock)//其他代码...
}

P

// runtime2.go/runtime.p
const (// P status 状态// 处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空_Pidle = iota// 被线程 M 持有,并且正在执行用户代码或者调度器_Prunning// 没有执行用户代码,当前线程陷入系统调用_Psyscall// 被线程 M 持有,当前处理器由于垃圾回收被停止_Pgcstop// 当前处理器已经不被使用_Pdead
)
// runtime2.go/runtime.p
type p struct {// P 自身id       int32status   uint32// M 相关m           muintptr    //当前持有P的M(nil 则表示 idle)mcache      *mcache     //pcache      pageCache// G 相关lock     mutex  runqhead uint32         //本地队列头runqtail uint32         //本地队列尾runq     [256]guintptr  //P本地G队列 256 缓存友好runnext guintptr        //下一个需要执行的G 局部性原理,为了尽量保证新创建G尽量在同一个P中//其他字段...
}

schedt

// runtime/runtime2.go/runtime.schedt
type schedt struct {// G相关lock mutexrunq     gQueue //  全局G队列,链表runqsize int32gFree struct {        // 有效 dead G 的全局缓存.lock    mutexstack   gList   // 包含栈的 GsnoStack gList // 没有栈的 Gsn       int32}// P相关pidle      puintptr   // 空闲 p 链表npidle     uint32 // 空闲 p 数量// M相关nmspinning uint32   // 自旋状态的 M 的数量// 其他代码...
}//--------------------- 调度器启动 ---------------------
// runtime/proc.go/schedinit
func schedinit() {// 其他代码..._g_ := getg()             // g0// 其他代码...sched.maxmcount = 10000  // 线程默认上限数mcommoninit(_g_.m, -1)   // 初始化M// 其他代码...procs := ncpu           // 初始化Pif n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {procs = n}if procresize(procs) != nil {throw("unknown runnable goroutine during bootstrap")}// 其他代码...
}//--------------------- 调度循环 ---------------------
//  1.P每61循环,尝试获取平分全局G
//  2.P尝试P的runnext局部优先获取G
//  3.P本地G尝试获取G
//  4.P尝试从全局G平分G
//  5.P尝试从网络轮询器获取G
//  6.P尝试从其他P窃取1半G到自己本地G队列
// runtime/proc.go/schedule
func schedule() {_g_ := getg()// 其他代码...top:// 其他代码...var gp *gvar inheritTime bool// 其他代码...if gp == nil {// 1. 每执行61次循环看一下全局队列。为了保证公平,避免全局队列一直没有得到执行。if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {lock(&sched.lock)gp = globrunqget(_g_.m.p.ptr(), 1)unlock(&sched.lock)}}if gp == nil {// 2. 先尝试从p的runnext和本地队列查找Ggp, inheritTime = runqget(_g_.m.p.ptr())}if gp == nil {// 3. 本地g先找// 4. 仍找不到,去全局队列中查找G// 5. 找不到,去网络轮询器中查找G// 6. 找不到,去其他g中窃取Ggp, inheritTime = findrunnable()}// 其他代码...//调用execute,继续调度循环execute(gp, inheritTime)
}//--------------------- 运行G ---------------------
// 1. 准备G的
// 2. 调度M执行
// runtime/proc.go/execute
func execute(gp *g, inheritTime bool) {// 1. 准备G的_g_ := getg()_g_.m.curg = gpgp.m = _g_.mcasgstatus(gp, _Grunnable, _Grunning)gp.waitsince = 0gp.preempt = falsegp.stackguard0 = gp.stack.lo + _StackGuardif !inheritTime {_g_.m.p.ptr().schedtick++}// 其他代码...// 2.调度M执行gogo(&gp.sched)
}//--------------------- 全局队列G ---------------------
// 1.窃取G = 全局G/P+1
// 2.窃取G <= P/2 = 128
// 3.更新P的本地队列
// runtime/proc.go/globrunqget
func globrunqget(_p_ *p, max int32) *g {assertLockHeld(&sched.lock)if sched.runqsize == 0 {return nil}// 1. 计算窃取g个数 = 全局g数量/p数量+1n := sched.runqsize/gomaxprocs + 1if n > sched.runqsize {n = sched.runqsize}if max > 0 && n > max {n = max}// 1.2 窃取g个数最大为当前g的一半即128个if n > int32(len(_p_.runq))/2 {n = int32(len(_p_.runq)) / 2}// 2. 窃取g到本地队列,返回窃取的第1个sched.runqsize -= ngp := sched.runq.pop()n--for ; n > 0; n-- {gp1 := sched.runq.pop()// 1.2. 更新p的本地队列runqput(_p_, gp1, false)}return gp
}//--------------------- worksteal机制 ---------------------
// 1.本地G找
// 2.全局G平分
// 3.轮询器中查找G
// 4.窃取其他P一半G
//   a.4次尝试
//   b.P选择使用伪随机保证公平
//   c.最后一次尝试,如果发现定时器则尝试本地查找,否则进入新的调度循环
// runtime/proc.go/findrunnable
func findrunnable() (gp *g, inheritTime bool) {_g_ := getg()top:_p_ := _g_.m.p.ptr()// 其他代码...// 1.本地G找if gp, inheritTime := runqget(_p_); gp != nil {return gp, inheritTime}// 2.全局G窃取if sched.runqsize != 0 {lock(&sched.lock)gp := globrunqget(_p_, 0)unlock(&sched.lock)if gp != nil {return gp, false}}// 3.轮询器中查找Gif netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {if list := netpoll(0); !list.empty() { // non-blockinggp := list.pop()injectglist(&list)casgstatus(gp, _Gwaiting, _Grunnable)if trace.enabled {traceGoUnpark(gp, 0)}return gp, false}}// 4.窃取其他P一半gprocs := uint32(gomaxprocs)ranTimer := false// If number of spinning M's >= number of busy P's, block.// This is necessary to prevent excessive CPU consumption// when GOMAXPROCS>>1 but the program parallelism is low.if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {goto stop}if !_g_.m.spinning {_g_.m.spinning = trueatomic.Xadd(&sched.nmspinning, 1)}const stealTries = 4 // 4.1尝试4次偷取for i := 0; i < stealTries; i++ {stealTimersOrRunNextG := i == stealTries-1for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {if sched.gcwaiting != 0 {goto top}p2 := allp[enum.position()]if _p_ == p2 {continue}// 第四次循环检查定时器if stealTimersOrRunNextG && timerpMask.read(enum.position()) {tnow, w, ran := checkTimers(p2, now)now = tnowif w != 0 && (pollUntil == 0 || w < pollUntil) {pollUntil = w}if ran {// 如果存在定时器,大概率本地有g,从新获取一次本地Gif gp, inheritTime := runqget(_p_); gp != nil {return gp, inheritTime}// 没有则从新进入循环ranTimer = true}}// 4.2 p2不空闲,则窃取Gif !idlepMask.read(enum.position()) {if gp := runqsteal(_p_, p2, stealTimersOrRunNextG); gp != nil {return gp, false}}}}if ranTimer {// 重新进入调度循环goto top}stop:// 其他代码...}//--------------------- 窃取g ---------------------
// runtime/proc.go/runqsteal
func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {t := _p_.runqtail// 1. 窃取P2g到当前p本地队列n := runqgrab(p2, &_p_.runq, t, stealRunNextG)if n == 0 {return nil}// 2. 窃取1个直接返回n--gp := _p_.runq[(t+n)%uint32(len(_p_.runq))].ptr()if n == 0 {return gp}// 2.1 多个需要处理锁h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumersif t-h+n >= uint32(len(_p_.runq)) {throw("runqsteal: runq overflow")}atomic.StoreRel(&_p_.runqtail, t+n) // store-release, makes the item available for consumptionreturn gp
}//--------------------- 窃取g ---------------------
//  1. 计算窃取G数量  n = n - n/2
//  2. n = 0 如果偷取达到四次且有定时器等,尝试获取runnext
//  3. n > 当前P的G队列,溢出从新计算
//  4. 拷贝G到当前P的G队列
// runtime/proc.go/runqgrab
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {for {h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumerst := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producern := t - h       // 计算本地队列有多少Gn = n - n/2      // 取一半的Gif n == 0 {if stealRunNextG {// Try to steal from _p_.runnext.if next := _p_.runnext; next != 0 {if _p_.status == _Prunning {if GOOS != "windows" {usleep(3)} else {osyield()}}if !_p_.runnext.cas(next, 0) {continue}batch[batchHead%uint32(len(batch))] = nextreturn 1}}return 0}if n > uint32(len(_p_.runq)/2) { // read inconsistent h and tcontinue}for i := uint32(0); i < n; i++ {g := _p_.runq[(h+i)%uint32(len(_p_.runq))]batch[(batchHead+i)%uint32(len(batch))] = g}if atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consumereturn n}}
}

触发调度

  • 线程启动
  • 协程执行结束
  • 主动挂起
  • 系统调用
  • 协作式调度
  • 系统监控

线程启动

runtime/proc.go/mstart - > runtime/proc.go/mstart1

// 线程启动时会触发此函数
// runtime/proc.go/mstart
func mstart() {_g_ := getg()osStack := _g_.stack.lo == 0if osStack {size := _g_.stack.hiif size == 0 {size = 8192 * sys.StackGuardMultiplier}_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))_g_.stack.lo = _g_.stack.hi - size + 1024}_g_.stackguard0 = _g_.stack.lo + _StackGuard_g_.stackguard1 = _g_.stackguard0mstart1()if mStackIsSystemAllocated() {osStack = true}mexit(osStack)
}// runtime/proc.go/mstart1
func mstart1() {_g_ := getg()if _g_ != _g_.m.g0 {throw("bad runtime·mstart")}save(getcallerpc(), getcallersp())asminit()minit()if _g_.m == &m0 {mstartm0()}if fn := _g_.m.mstartfn; fn != nil {fn()}if _g_.m != &m0 {acquirep(_g_.m.nextp.ptr())_g_.m.nextp = 0}schedule()  // 触发调度循环
}

协程执行结束

runtime/proc.go/goexit0

// runtime/proc.go/goexit0
func goexit0(gp *g) {_g_ := getg()// 1.修改G 为_Gdead状态casgstatus(gp, _Grunning, _Gdead)  if isSystemGoroutine(gp, false) {atomic.Xadd(&sched.ngsys, -1)}// 2.清理G内容gp.m = nillocked := gp.lockedm != 0gp.lockedm = 0_g_.m.lockedg = 0gp.preemptStop = falsegp.paniconfault = falsegp._defer = nil // should be true already but just in case.gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.gp.writebuf = nilgp.waitreason = 0gp.param = nilgp.labels = nilgp.timer = nilif gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {assistWorkPerByte := float64frombits(atomic.Load64(&gcController.assistWorkPerByte))scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))atomic.Xaddint64(&gcController.bgScanCredit, scanCredit)gp.gcAssistBytes = 0}// 3.M和G解除dropg()if GOARCH == "wasm" { // no threads yet on wasmgfput(_g_.m.p.ptr(), gp)schedule() // never returns}if _g_.m.lockedInt != 0 {print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")throw("internal lockOSThread error")}// 4.将G放入P的缓存队列gfput(_g_.m.p.ptr(), gp)if locked {if GOOS != "plan9" { // See golang.org/issue/22227.gogo(&_g_.m.g0.sched)} else {_g_.m.lockedExt = 0}}schedule()  // 触发调度循环
}

主动挂起

runtime/proc.go/gopark - > runtime/proc.go/park_m

// runtime/proc.go/gopark
// 当前G暂停
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {//其他代码...mp := acquirem()gp := mp.curgstatus := readgstatus(gp)//其他代码...mp.waitlock = lockmp.waitunlockf = unlockfgp.waitreason = reasonmp.waittraceev = traceEvmp.waittraceskip = traceskipreleasem(mp)mcall(park_m)
}// runtime/proc.go/park_m
// 1.当前G切换为Gwiting
// 2.移除G和M的关系
// 3.触发调度
func park_m(gp *g) {_g_ := getg()//其他代码...casgstatus(gp, _Grunning, _Gwaiting)dropg()//其他代码...schedule() // 触发调度循环
}
// runtime/proc.go/goready
// g唤醒
func goready(gp *g, traceskip int) {systemstack(func() {ready(gp, traceskip, true)})
}// runtime/proc.go/ready
// 1.当前G切换为_Grunnable
// 2.放入当前运行队列中
// 3.唤醒
func ready(gp *g, traceskip int, next bool) {//其他代码...status := readgstatus(gp)_g_ := getg()//其他代码...// 1.当前G切换为_Grunnablecasgstatus(gp, _Gwaiting, _Grunnable)// 2.放入当前运行队列中runqput(_g_.m.p.ptr(), gp, next)// 3.唤醒wakep()//其他代码...
}

系统调用

runtime/proc.go/exitsyscall -> runtime/proc.go/exitsyscall0

// runtime/proc.go/exitsyscall
//系统调用结束后会调用此函数
// 1.为当前G重新分配资源
// 2.切换调度器G0 触发 调度
func exitsyscall() {_g_ := getg()//其他代码...// 1.为当前G重新分配资源oldp := _g_.m.oldp.ptr()_g_.m.oldp = 0// 1.2 如果原有P处于_Psyscall,通过wirep将G于原有处理取重新关联// 1.3 如果没有,调度存在限制处理器会acquirp获取闲置处理器于当前G关联if exitsyscallfast(oldp) {//其他代码..._g_.m.p.ptr().syscalltick++// We need to cas the status and scan before resuming...casgstatus(_g_, _Gsyscall, _Grunning)//其他代码...return}//其他代码...//2.如果没有合适的P 切换G0调度mcall(exitsyscall0)//其他代码..._g_.m.p.ptr().syscalltick++_g_.throwsplit = false
}// 如果没有合适的P 切换G0调度
// runtime/proc.go/exitsyscall0
func exitsyscall0(gp *g) {_g_ := getg()// 1. 切换G 为_Grunnable 状态casgstatus(gp, _Gsyscall, _Grunnable)// 2. 移除M和G的关联dropg()lock(&sched.lock)var _p_ *p// 3. 获取闲置处理器Pif schedEnabled(_g_) {_p_ = pidleget()}// 3. 没有限制处理器则放入全局队列if _p_ == nil {globrunqput(gp)} else if atomic.Load(&sched.sysmonwait) != 0 {atomic.Store(&sched.sysmonwait, 0)notewakeup(&sched.sysmonnote)}unlock(&sched.lock)// 4. 获取成功 直接执行if _p_ != nil {acquirep(_p_)execute(gp, false) // Never returns.}if _g_.m.lockedg != 0 {stoplockedm()execute(gp, false) // Never returns.}stopm()schedule() // 触发调度循环
}
// runtime/proc.go/exitsyscall
// 系统调用前会运行此函数
func entersyscall() {reentersyscall(getcallerpc(), getcallersp())
}// runtime/proc.go/reentersyscall
// 完成系统调用准备工作
func reentersyscall(pc, sp uintptr) {_g_ := getg()_g_.m.locks++             _g_.stackguard0 = stackPreempt        // 禁止其他M抢占_g_.throwsplit = true                 // 禁止当前函数栈分裂 增长save(pc, sp)                          // 保存当前程序计数器PC和栈指针SP内容_g_.syscallsp = sp_g_.syscallpc = pccasgstatus(_g_, _Grunning, _Gsyscall) // 将G状态改为 _Gsyscall//其他代码..._g_.m.syscalltick = _g_.m.p.ptr().syscalltick_g_.sysblocktraced = truepp := _g_.m.p.ptr()              pp.m = 0                              // PM暂时分离_g_.m.oldp.set(pp)                    // M记录旧P        _g_.m.p = 0                           // M解除Patomic.Store(&pp.status, _Psyscall)   // P状态改为_Psyscallif sched.gcwaiting != 0 {systemstack(entersyscall_gcwait)save(pc, sp)}_g_.m.locks--
}

协作式调度

runtime/proc.go/Gosched - >runtime/proc.go/gosched_m->runtime/proc.go/goschedImpl

// runtime/proc.go/Gosched
func Gosched() {//其他代码...// g0mcall(gosched_m)
}// runtime/proc.go/gosched_m
// g0运行
func gosched_m(gp *g) {//其他代码...goschedImpl(gp)
}// runtime/proc.go/goschedImpl
// 1. 将G状态改为 _Grunnable
// 2. 移除M和G的关联
// 3. 将G放置全局队列
func goschedImpl(gp *g) {//其他代码...// 1. 将G状态改为 _Grunnablecasgstatus(gp, _Grunning, _Grunnable)// 2. 移除M和G的关联dropg()lock(&sched.lock)// 3. 将G放置全局队列globrunqput(gp)unlock(&sched.lock)schedule()  // 触发调度循环
}

系统监控

  • 独立M上运行
  • 抢占P/G
  • 触发GC
  • 清理堆span
// runtime/proc.go/sysmon
// 系统监控在一个独立的 m 上运行
// 总是在没有 P 的情况下运行,因此不能出现写屏障
//go:nowritebarrierrec
func sysmon() {lock(&sched.lock)sched.nmsys++ // 不计入死锁的系统 m 的数量checkdead()   // 死锁检查unlock(&sched.lock)atomic.Store(&sched.sysmonStarting, 0)lasttrace := int64(0)idle := 0 // how many cycles in succession we had not wokeup somebodydelay := uint32(0)for {if idle == 0 { // 每次启动先休眠 20usdelay = 20} else if idle > 50 { // 1ms 后就翻倍休眠时间delay *= 2}if delay > 10*1000 { // 增加到 10msdelay = 10 * 1000}usleep(delay) // 休眠mDoFixup()// 如果在 STW,则暂时休眠now := nanotime()if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {lock(&sched.lock)if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {syscallWake := falsenext, _ := timeSleepUntil()if next > now {atomic.Store(&sched.sysmonwait, 1)unlock(&sched.lock)// 确保 wake-up 周期足够小从而进行正确的采样sleep := forcegcperiod / 2if next-now < sleep {sleep = next - now}shouldRelax := sleep >= osRelaxMinNSif shouldRelax {osRelax(true)}syscallWake = notetsleep(&sched.sysmonnote, sleep)mDoFixup()if shouldRelax {osRelax(false)}lock(&sched.lock)atomic.Store(&sched.sysmonwait, 0)noteclear(&sched.sysmonnote)}if syscallWake {idle = 0delay = 20}}unlock(&sched.lock)}lock(&sched.sysmonlock)now = nanotime()// 需要时触发 libc interceptorif *cgo_yield != nil {asmcgocall(*cgo_yield, nil)}// 如果超过 10ms 没有 poll,则 poll 一下网络lastpoll := int64(atomic.Load64(&sched.lastpoll))if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))list := netpoll(0) // 非阻塞,返回 Goroutine 列表if !list.empty() {// 需要在插入 g 列表前减少空闲锁住的 m 的数量(假装有一个正在运行)// 否则会导致这些情况:// injectglist 会绑定所有的 p,但是在它开始 M 运行 P 之前,另一个 M 从 syscall 返回,// 完成运行它的 G ,注意这时候没有 work 要做,且没有其他正在运行 M 的死锁报告。incidlelocked(-1)injectglist(&list)incidlelocked(1)}}mDoFixup()if GOOS == "netbsd" {if next, _ := timeSleepUntil(); next < now {startm(nil, false)}}if atomic.Load(&scavenge.sysmonWake) != 0 {wakeScavenger()}// 抢夺在 syscall 中阻塞的 P、运行时间过长的 Gif retake(now) != 0 {idle = 0} else {idle++}// 检查是否需要强制触发 GCif t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {lock(&forcegc.lock)forcegc.idle = 0var list gListlist.push(forcegc.g)injectglist(&list)unlock(&forcegc.lock)}if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {lasttrace = nowschedtrace(debug.scheddetail > 0)}unlock(&sched.sysmonlock)}
}// runtime/proc.go/retake
// 抢占P
func retake(now int64) uint32 {n := 0// 防止 allp 数组发生变化,除非我们已经 STW,此锁将完全没有人竞争lock(&allpLock)for i := 0; i < len(allp); i++ {_p_ := allp[i]if _p_ == nil {continue}pd := &_p_.sysmonticks := _p_.statussysretake := falseif s == _Prunning || s == _Psyscall {// 如果 G 运行时时间太长则进行抢占t := int64(_p_.schedtick)if int64(pd.schedtick) != t {pd.schedtick = uint32(t)pd.schedwhen = now} else if pd.schedwhen+forcePreemptNS <= now {// 对于 syscall 的情况,因为 M 没有与 P 绑定,// preemptone() 不工作preemptone(_p_)sysretake = true}}// 对阻塞在系统调用上的 P 进行抢占if s == _Psyscall {// 如果已经超过了一个系统监控的 tick(20us),则从系统调用中抢占 Pt := int64(_p_.syscalltick)if !sysretake && int64(pd.syscalltick) != t {pd.syscalltick = uint32(t)pd.syscallwhen = nowcontinue}// 一方面,在没有其他 work 的情况下,我们不希望抢夺 P// 另一方面,因为它可能阻止 sysmon 线程从深度睡眠中唤醒,所以最终我们仍希望抢夺 P  if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {continue}// 解除 allpLock,从而可以获取 sched.lockunlock(&allpLock)// 在 CAS 之前需要减少空闲 M 的数量(假装某个还在运行)// 否则发生抢夺的 M 可能退出 syscall 然后再增加 nmidle ,进而发生死锁// 这个过程发生在 stoplockedm 中incidlelocked(-1)if atomic.Cas(&_p_.status, s, _Pidle) { // 将 P 设为 idle,从而交于其他 M 使用if trace.enabled {traceGoSysBlock(_p_)traceProcStop(_p_)}n++_p_.syscalltick++handoffp(_p_)}incidlelocked(1)lock(&allpLock)}}unlock(&allpLock)return uint32(n)
}// runtime/proc.go/preemptone
func preemptone(_p_ *p) bool {// 检查 M 与 P 是否绑定mp := _p_.m.ptr()if mp == nil || mp == getg().m {return false}gp := mp.curgif gp == nil || gp == mp.g0 {return false}// 将 G 标记为抢占gp.preempt = true// 一个 Goroutine 中的每个调用都会通过比较当前栈指针和 gp.stackgard0// 来检查栈是否溢出。// 设置 gp.stackgard0 为 StackPreempt 来将抢占转换为正常的栈溢出检查。gp.stackguard0 = stackPreempt// 请求该 P 的异步抢占if preemptMSupported && debug.asyncpreemptoff == 0 {_p_.preempt = truepreemptM(mp)}return true
}

// todo 抢占

协程 goroutine

对比

  1. 内存上:

    1. 内核线程通常2MB栈内存
    2. 协程通常2KB内存(根据需要自动调整)。
  2. 调度上:

    1. 协程使用M:N线程模型,m个协程分配到n个内核线程上,用户态runtime均匀调度,调度切换成本低,重分利用硬件资源。
    2. 用户线程:内核线程KSE=1:1,由os内核调度,调度切换成本高。

状态转移

// runtime2.go/runtime.g
// defined constants
const (// G status G状态// 刚刚被分配并且还没有被初始化_Gidle = iota // 0// 没有执行代码,没有栈的所有权,存储在运行队列中_Grunnable // 1// 可以用户执行代码,拥有栈的所有权,被赋予了内核线程 M 和处理器 P_Grunning // 2// 正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上_Gsyscall // 3// 由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上_Gwaiting // 4// _Gmoribund_unused is currently unused, but hardcoded in gdb// scripts._Gmoribund_unused // 5//    没有被使用,没有执行代码,可能有分配的栈。可能刚被初始化或者刚退出_Gdead // 6// _Genqueue_unused is currently unused._Genqueue_unused // 7// 栈正在被拷贝,没有执行代码,不在运行队列上_Gcopystack // 8// 由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒_Gpreempted // 9// GC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在_Gscan          = 0x1000_Gscanrunnable  = _Gscan + _Grunnable  // 0x1001_Gscanrunning   = _Gscan + _Grunning   // 0x1002_Gscansyscall   = _Gscan + _Gsyscall   // 0x1003_Gscanwaiting   = _Gscan + _Gwaiting   // 0x1004_Gscanpreempted = _Gscan + _Gpreempted // 0x1009
)

源码分析

// runtime2.go/runtime.g
// G goroutine结构体
// 存储goroutine运行数据
type g struct {//栈相关stack       stack   //当前goroutine内存范围[stack.lo,stack.hi)stackguard0 uintptr //用于抢占式调度//抢占调度preempt       bool // 抢占信号preemptStop   bool // 抢占时将状态修改成 `_Gpreempted`preemptShrink bool // 在同步安全点收缩栈//defer/panic栈_panic       *_panic // 最内侧的panic结构体链表_defer       *_defer // 最内侧的延迟函数结构体链表//调度相关m              *m      //当前goroutine占用的线程 可能为空sched          gobuf   //goroutine调度相关数据,保存g的上下文atomicstatus   uint32  //goroutine的状态goid           int64   //goroutine的idgopc           uintptr //goroutine的入口函数startpc        uintptr //goroutine函数的地址//其他字段...
}// runtime2.go/runtime.g.gobuf
// goroutine调度相关数据
// 调度切换上下文,栈指针和程序计数器用来保持或者恢复寄存器的值
type gobuf struct {sp   uintptr    // 栈指针pc   uintptr    // 运行到的程序位置g    guintptr   // 持有runtine.gobuf的goroutineret  sys.Uintreg //系统调用返回值//其他字段...
}// runtime2.go/runtime.g.stack
// goroutine栈
type stack struct{lo uintptr   // 栈的下界内存地址hi uintptr   // 栈的上界内存地址
}//-------------------------- 创建G ---------------------------
// go 关键字 映射runtime.newproc()
// 1.创建G
// 2.放置本地队列
// runtime/proc.go/newproc
func newproc(siz int32, fn *funcval) {argp := add(unsafe.Pointer(&fn), sys.PtrSize)gp := getg()pc := getcallerpc()systemstack(func() {// 1.创建Gnewg := newproc1(fn, argp, siz, gp, pc)_p_ := getg().m.p.ptr()// 2.放置本地队列runqput(_p_, newg, true)if mainStarted {wakep()}})
}// ------------------------- 放置G ---------------------------
// 1. next为true 设置G为P的runnext
// 2. next为false 设置G为本地队列
// 2.1 如果P的本地队列没有空间则放全局队列
// runtime/proc.go/runqput
func runqput(_p_ *p, gp *g, next bool) {if randomizeScheduler && next && fastrand()%2 == 0 {next = false}// 1. next为true 设置G为P的runnextif next {retryNext:oldnext := _p_.runnextif !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {goto retryNext}if oldnext == 0 {return}gp = oldnext.ptr()}// 2. next为false 设置G为本地队列
retry:h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumerst := _p_.runqtailif t-h < uint32(len(_p_.runq)) {_p_.runq[t%uint32(len(_p_.runq))].set(gp)atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumptionreturn}// 2.1 如果P的本地队列没有空间则放全局队列if runqputslow(_p_, gp, h, t) {return}goto retry
}// ------------------------- 全局队列放置G ---------------------------
// 1.计算放置全局队列的个数
// 2.取本地队列一半+1的G到全局队列
// runtime/proc.go/runqputslow
func runqputslow(_p_ *p, gp *g, h, t uint32) bool {var batch [len(_p_.runq)/2 + 1]*g// 1.计算放置全局队列的个数// n = n /2 + 1n := t - hn = n / 2if n != uint32(len(_p_.runq)/2) {throw("runqputslow: queue is not full")}for i := uint32(0); i < n; i++ {batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()}if !atomic.CasRel(&_p_.runqhead, h, h+n) { // cas-release, commits consumereturn false}batch[n] = gpif randomizeScheduler {for i := uint32(1); i <= n; i++ {j := fastrandn(i + 1)batch[i], batch[j] = batch[j], batch[i]}}// 2.取本地队列一半+1的G到全局队列for i := uint32(0); i < n; i++ {batch[i].schedlink.set(batch[i+1])}var q gQueueq.head.set(batch[0])q.tail.set(batch[n])// Now put the batch on global queue.lock(&sched.lock)globrunqputbatch(&q, int32(n+1))unlock(&sched.lock)return true
}//-------------------------- 创建G ---------------------------
// 1.获取G的结构体
//   a.获取空闲G
//   b.创建新的G
// 2.拷贝fn argp参数到G的栈中
// 3.重置G的结构体参数。例如 指针/程序计数器/状态
// runtime/proc.go/newproc
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {_g_ := getg()//其他代码...siz := nargsiz = (siz + 7) &^ 7//其他代码...// 1. 创建G_p_ := _g_.m.p.ptr()// 1.1 获取空闲Gnewg := gfget(_p_)if newg == nil {// 1.2 创建新的Gnewg = malg(_StackMin)casgstatus(newg, _Gidle, _Gdead)allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.}//其他代码...// 2.拷贝fn argp参数到G的栈中totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frametotalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlignsp := newg.stack.hi - totalSizespArg := sp//其他代码...if narg > 0 {memmove(unsafe.Pointer(spArg), argp, uintptr(narg))//其他代码...}// 3.重置G的结构体参数。例如 指针/程序计数器/状态memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))newg.sched.sp = spnewg.stktopsp = spnewg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same functionnewg.sched.g = guintptr(unsafe.Pointer(newg))gostartcallfn(&newg.sched, fn)newg.gopc = callerpcnewg.ancestors = saveAncestors(callergp)newg.startpc = fn.fnif _g_.m.curg != nil {newg.labels = _g_.m.curg.labels}if isSystemGoroutine(newg, false) {atomic.Xadd(&sched.ngsys, +1)}casgstatus(newg, _Gdead, _Grunnable)if _p_.goidcache == _p_.goidcacheend {_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)_p_.goidcache -= _GoidCacheBatch - 1_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch}newg.goid = int64(_p_.goidcache)_p_.goidcache++//其他代码...return newg
}//-------------------------- 获取空闲G ---------------------------
// 从p中获取空闲G复用
// 1. 处理器P不存在空闲G且调取器存在空闲 调度器中获取空闲G到P
// 2. 获取P中空闲G
// runtime/proc.go/gfget
func gfget(_p_ *p) *g {retry:// 1. 调度器中获取空闲G到Pif _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {lock(&sched.gFree.lock)for _p_.gFree.n < 32 {gp := sched.gFree.stack.pop()if gp == nil {gp = sched.gFree.noStack.pop()if gp == nil {break}}sched.gFree.n--_p_.gFree.push(gp)_p_.gFree.n++}unlock(&sched.gFree.lock)goto retry}// 2. 获取P中空闲Ggp := _p_.gFree.pop()if gp == nil {return nil}_p_.gFree.n--if gp.stack.lo == 0 {systemstack(func() {gp.stack = stackalloc(_FixedStack)})gp.stackguard0 = gp.stack.lo + _StackGuard} else {if raceenabled {racemalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)}if msanenabled {msanmalloc(unsafe.Pointer(gp.stack.lo), gp.stack.hi-gp.stack.lo)}}return gp
}//-------------------------- 创建G ---------------------------
// 初始化G
// 1. 创建G结构体
// 2. 申请栈内存
// runtime/proc.go/malg
func malg(stacksize int32) *g {//1. 创建G结构体newg := new(g)if stacksize >= 0 {stacksize = round2(_StackSystem + stacksize)systemstack(func() {//2. 申请栈内存newg.stack = stackalloc(uint32(stacksize))})newg.stackguard0 = newg.stack.lo + _StackGuardnewg.stackguard1 = ^uintptr(0)*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0}return newg
}

使用

//关键字go + 方法调用开启协程
go func([params ...])([values...])//例子
//随机打印0~9
for i:=0;i<10;i++{go func(int a){fmt.Println(a)}(i)
}// 随机显示 0~9

通道 channel

  1. 先入先出队列FIFO,线程安全,锁保护。

  2. channel零值为nil,使用前必须make创建

  3. 双向通道可以转换为单向通道,反之不行。

动作\状态 close nil 无缓冲未读 无缓冲未写 缓冲满了 缓冲未满
接收 - 阻塞 正常 阻塞 正常 正常
发送 panic 阻塞 阻塞 正常 阻塞 正常
close panic panic 正常读 零值 正常读 正常读

源码分析

// runtime.go/runtime.hchan
//维护协程之间通信同步队列,使用互斥锁保证同步
type hchan struct { // chan相关closed   uint32   // channel是否关闭elemtype *_type   // channel能够收发的原属类型// 环形bufqcount   uint    //channel中元素个数dataqsiz uint    //channel中循环队列长度buf      unsafe.Pointer  //channel中缓冲区数据数据指针elemsize uint16   //channel能够收发的元素大小sendx    uint    //channel中发送操作处理到的位置recvx    uint    //channel中接收操作处理到的位置lock     mutex   //channel锁// G相关recvq    waitq   //channel缓冲不足阻塞的接收队列(双向链表)sendq    waitq   //channel缓冲不足阻塞的发送队列(双向链表)}// runtime.go/runtime.hchan.waitq
// 存储等待列表中的goroutine
type waitq struct {first *sudog   // 头last  *sudog   // 尾
}// runtime.go/runtime.hchan.waitq.sudog
// 双向链表
type sudog struct{g *g                // 当前Gnext *sudog         // 下一个Gprev *sudog         // 上一个Gelem unsafe.Pointer // 指向信号c *hchan            // 当前通道// 其他代码...
}// ----------------创建通道 -----------------

使用

//声明
var ch chan int//初始化 make(chan type,[buffer])
//buffer不存在 为无缓冲 发送和接收为同步
//buffer存在 为有缓冲 发送和接收为异步
ch :=make(chan struct{})
ch1 :=make(chan struct{},1)
var ch2 chan<-struct{}=ch  //只写
var ch3 <-chan struct{}=ch //只读//写通道数据
ch2<-struct{}{}
//读通道数据
<-ch3//遍历通道数据
for i,ok:=range ch {//i 为接收数据//ok 是否还有数据
}//关闭通道,关闭后无法写入否则panic
close(ch)

同步 sync

互斥锁 mutex

锁模式

  1. 正常模式。非公平锁,锁的初始模式

    1. goroutine在CAS4次后未获得锁进入锁的等待队列(FIFO)
    2. 被唤醒goroutine和新goroutine进行CAS争抢锁,大概率失败
    3. 等待队列中goroutine超过1ms没有获取锁则切换饥饿模式
  2. 饥饿模式。公平锁
    1. 释放锁的goroutine直接交给等待队列中队首goroutine
    2. 新goroutine直接加入等待队列队位
    3. 队尾goroutine获取锁或者等待时间小于1ms获取所择切换正常模式

锁状态

const (mutexLocked = 1 << iota   // 锁定mutexWoken                // 唤醒mutexStarving             // 饥饿mutexWaiterShift = iota   // 锁等待位starvationThresholdNs = 1e6 // 1ms
)// 互斥锁
// sync/mutex.go
type Mutex struct {// 29bit mutexWaiterShift  等待锁的协程数,判断是否需要释放信号量// 1bit  mutexStarving    1 表示饥饿// 1bit  mutexWoken       1 表示有协程被唤醒// 1bit  mutexLocked      1 表示锁定state int32              // 锁状态// 该地址是唯一的 取hash值可以在semtable中获取的semroot// semroot中 sulog保存了g的队列sema  uint32             // 信号量
}

上锁

// sync/mutex.go
func (m *Mutex) Lock() {// 1.获取锁if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {// 其他代码....return}// 2.锁失败 m.lockSlow()
}// 2.锁失败后处理
func (m *Mutex) lockSlow() {var waitStartTime int64starving := falseawoke := falseiter := 0old := m.statefor {// 1.符合自旋条件 且 正常模式 进行CAS自旋争抢// 饥饿模式跳过自旋争抢if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {// 1.2.主动自旋尝试唤醒if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 && atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {// 1.2.1 设置唤醒标志位awoke = true}// 1.3.调用系统自旋逻辑// 资源被锁// cpu核数>1// 该g自旋次数<4// 空闲p// 当前p本地g队列空,只有当前gruntime_doSpin() // 30次PASEiter++ old = m.statecontinue}// 2.计算互斥锁状态new := old// 2.1 如果是正常模式,尝试标记获取锁if old&mutexStarving == 0 {new |= mutexLocked}// 2.2 如果已锁或者饥饿模式,等待位+1if old&(mutexLocked|mutexStarving) != 0 {new += 1 << mutexWaiterShift}// 2.3 如果等待超过1ms 尝试转为饥饿模式if starving && old&mutexLocked != 0 {new |= mutexStarving}// 2.4 如果之前唤醒锁 尝试重置标志位if awoke {// 其他代码....new &^= mutexWoken}// 3.尝试获取锁// 获取锁成功if atomic.CompareAndSwapInt32(&m.state, old, new) {if old&(mutexLocked|mutexStarving) == 0 {// 3.1 CAS获取锁成功 放行break }queueLifo := waitStartTime != 0if waitStartTime == 0 {waitStartTime = runtime_nanotime()}// 3.2 系统通过信号量等待释放  // 等待锁的持有者唤醒 runtime_SemacquireMutex(&m.sema, queueLifo, 1)// 3.2.1 计算等待时间释放进入饥饿模式starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNsold = m.state// 3.2.2 饥饿模式处理if old&mutexStarving != 0 {...// 3.2.2.1 修改锁标志位delta := int32(mutexLocked - 1<<mutexWaiterShift)// 3.2.2.2 是否退出饥饿模式// g的执行等待时间<1ms// 等待队列只有当前gif !starving || old>>mutexWaiterShift == 1 {delta -= mutexStarving}atomic.AddInt32(&m.state, delta)break}awoke = trueiter = 0} else {// 获取锁失败 继续cas循环old = m.state}}// 其他代码....
}

解锁

// sync/mutex.go
func (m *Mutex) Unlock() {// 其他代码...// 1.轻松解锁new := atomic.AddInt32(&m.state, -mutexLocked)if new != 0 {// 2.其他解锁逻辑m.unlockSlow(new)}
}func (m *Mutex) unlockSlow(new int32) {if (new+mutexLocked)&mutexLocked == 0 {throw("sync: unlock of unlocked mutex")}// 1. 正常模式if new&mutexStarving == 0 {old := newfor {// 1.1 没有其他锁状态直接返回if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {return}// 1.2 有等待者 移交锁的所有权和唤醒new = (old - 1<<mutexWaiterShift) | mutexWokenif atomic.CompareAndSwapInt32(&m.state, old, new) {runtime_Semrelease(&m.sema, false, 1)return}old = m.state}} else {// 2. 饥饿模式// 尝试唤醒下一个runtime_Semrelease(&m.sema, true, 1)}
}

读写锁 RWMutex

待续

参考

golang修养之路

go语言设计与实现

golang 并发编程相关推荐

  1. golang并发编程goroutine+channel(一)

    go语言的设计初衷除了在不影响程序性能的情况下减少复杂度,另一个目的是在当今互联网大量运算下,如何让程序的并发性能和代码可读性达到极致.go语言的并发关键词 "go" go dos ...

  2. ​Golang 并发编程指南

    分享 Golang 并发基础库,扩展以及三方库的一些常见问题.使用介绍和技巧,以及对一些并发库的选择和优化探讨. go 原生/扩展库 提倡的原则 不要通过共享内存进行通信;相反,通过通信来共享内存. ...

  3. Golang 并发编程之同步原语

    当提到并发编程.多线程编程时,我们往往都离不开『锁』这一概念,Go 语言作为一个原生支持用户态进程 Goroutine 的语言,也一定会为开发者提供这一功能,锁的主要作用就是保证多个线程或者 Goro ...

  4. Golang并发编程组件

    选择channel和互斥量 我们一般根据以下原则选择: Y N Y N Y N Y N 性能要求很高的临界区 使用传统的锁 转让数据所有权 使用channel 保护结构的内部状态 协调多个逻辑片段 协 ...

  5. Golang并发编程-GPM协程调度模型原理及组成分析

    文章目录 一.操作系统的进程和线程模型 1.1.基础知识 1.2.KST/ULT 二.Golang的GPM协程调度模型 三.M的结构及对应关系 四.P的结构及状态转换 五.G的结构及状态转换 六.GP ...

  6. Golang并发编程进程通信channel了解及简单使用

    概念及作用 channel是一个数据类型,用于实现同步,用于两个协程之间交换数据.goroutine奉行通过通信来共享内存,而不是共享内存来通信. 引用类型channel是CSP模式的具体实现,用于多 ...

  7. Golang并发编程入门教程

    时间单位 1S = 1000ms 1ms = 1000us 1us = 1000ns 并发与并行 并行: 借助多核 cpu 实现. (真 并行) 并发: 宏观:用户体验上,程序在并行执行. 微观:多个 ...

  8. Go语言自学系列 | golang并发编程之原子变量的引入

    视频来源:B站<golang入门到项目实战 [2021最新Go语言教程,没有废话,纯干货!持续更新中...]> 一边学习一边整理老师的课程内容及试验笔记,并与大家分享,侵权即删,谢谢支持! ...

  9. golang并发编程-04-通道-02-定时器、断续器

    文章目录 1. 定时器 1.1 time.NewTimer 1.2 <-time.After() 1.3 停止 1.4 定时器重置 2 断续器 2.1 断续器使用 2.2 断续器中断 1. 定时 ...

最新文章

  1. 攻击技术还原:维基解密是如何遭到黑客攻击的?
  2. HttpWatch工具简介及使用技巧(转载)
  3. rewrite 伪静态,地址重写
  4. python连接池框架_python pymysql 连接池
  5. CF986B Petr and Permutations 思维
  6. SQL2000 MD5加密
  7. Java进阶day03继承
  8. 复现原文(二):Single-cell RNA sequencing of human
  9. bzoj 1003: [ZJOI2006]物流运输
  10. 模拟计算机用英语怎么说,电脑里的 属性 英语怎么说
  11. 力行《促进大数据发展行动纲要》 普元数据治理解决方案出炉
  12. shell脚本合集2
  13. abap SD 定价公式(例程,即Formula)
  14. Win10怎么去除桌面快捷方式图标左下角的小箭头
  15. 学习网络的几本好书推荐
  16. 计算机cpu近几年价格,CPU性能过剩的福利 十年老电脑还能再战几年?
  17. 微信小程序 之wx.previewImage图片预览(单张图片预览)
  18. 如何查找计算机中的视频文件,win7系统快速搜索查找电脑里的视频文件的操作方法...
  19. java必会单词_java必会的英语单词
  20. find the longest of the shortest HDU - 1595

热门文章

  1. (转)为什么BAT干不掉海康威视?——关于人工智能的思考
  2. 插入100万条有随机姓名,随机电话等字段的数据最快需要几秒?
  3. 水果FL studio20.8中文汉化翻译语言
  4. 星空主题设计理念_星空规划设计说明
  5. windows编译skia的注意事项
  6. 机器视觉中的前景和背景是什么意思
  7. ei capitan mysql_退出MySQL而不重新启动El Capitan
  8. 信息学奥赛一本通 珍珠
  9. flink中的事件时间和水印
  10. 编程基础篇:有两个矩阵a和b,均为2行3列。求两个矩阵之和。(C++重载)