什么是信号量

信号量是并发编程中常见的一种同步机制,在需要控制访问资源的线程数量时就会用到信号量,关于什么是信号量这个问题,我引用一下维基百科对信号量的解释,大家就明白了。

信号量的概念是计算机科学家 Dijkstra (Dijkstra算法的发明者)提出来的,广泛应用在不同的操作系统中。系统中,会给每一个进程一个信号量,代表每个进程当前的状态,未得到控制权的进程,会在特定的地方被迫停下来,等待可以继续进行的信号到来。

如果信号量是一个任意的整数,通常被称为计数信号量(Counting semaphore),或一般信号量(general semaphore);如果信号量只有二进制的0或1,称为二进制信号量(binary semaphore)。在linux系统中,二进制信号量(binary semaphore)又称互斥锁(Mutex)

计数信号量具备两种操作动作,称为V(signal())与P(wait())(即部分参考书常称的“PV操作”)。V操作会增加信号量S的数值,P操作会减少它。

运行方式:

  1. 初始化信号量,给与它一个非负数的整数值。

  2. 运行P(wait()),信号量S的值将被减少。企图进入临界区的进程,需要先运行P(wait())。当信号量S减为负值时,进程会被阻塞住,不能继续;当信号量S不为负值时,进程可以获准进入临界区。

  3. 运行V(signal()),信号量S的值会被增加。结束离开临界区的进程,将会运行V(signal())。当信号量S不为负值时,先前被阻塞住的其他进程,将可获准进入临界区。

我们一般用信号量保护一组资源,比如数据库连接池、一组客户端的连接等等。每次获取资源时都会将信号量中的计数器减去对应的数值,在释放资源时重新加回来。当遇到信号量资源不够时尝试获取的线程就会进入休眠,等待其他线程释放归还信号量。如果信号量是只有0和1的二进位信号量,那么,它的 P/V 就和互斥锁的 Lock/Unlock 一样了。

Go语言中的信号量表示

Go 内部使用信号量来控制goroutine的阻塞和唤醒,比如互斥锁sync.Mutex结构体定义的第二个字段就是一个信号量。

type Mutex struct {state int32sema  uint32
}

信号量的PV操作在Go内部是通过下面这几个底层函数实现的

func runtime_Semacquire(s *uint32)
func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)
func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

上面几个函数都是Go语言内部使用的,我们不能在编程时直接使用。不过Go 语言的扩展并发原语包中提供了带权重的信号量 semaphore.Weighted

使用信号量前,需先在项目里安装golang.org/x/sync/

安装方法:go get -u golang.org/x/sync

我们可以按照不同的权重对资源的访问进行管理,这个结构体对外提供了四个方法:

  • semaphore.NewWeighted 用于创建新的信号量,通过参数(n int64) 指定信号量的初始值。

  • semaphore.Weighted.Acquire 阻塞地获取指定权重的资源,如果当前没有空闲资源,就会陷入休眠等待;相当于 P 操作,你可以一次获取多个资源,如果没有足够多的资源,调用者就会被阻塞。它的第一个参数是 Context,这就意味着,你可以通过 Context 增加超时或者 cancel 的机制。如果是正常获取了资源,就返回 nil;否则,就返回ctx.Err(),信号量不改变。

  • semaphore.Weighted.Release 用于释放指定权重的资源;相当于 V 操作,可以将 n 个资源释放,返还给信号量。

  • semaphore.Weighted.TryAcquire 非阻塞地获取指定权重的资源,如果当前没有空闲资源,就会直接返回 false

在Go编程里使用信号量

在实际应用Go语言开发程序时,有哪些场景适合使用信号量呢?在需要控制访问资源的线程数量时就会需要信号量,我来举个例子帮助你理解。假设我们有一组要抓取的页面,资源有限最多允许我们同时执行三个抓取任务,当同时有三个抓取任务在执行时,在执行完一个抓取任务后才能执行下一个排队等待的任务。当然这个问题用Channel也能解决,不过这次我们使用Go提供的信号量原语来解决这个问题,代码如下:

package mainimport ("context""fmt""sync""time""golang.org/x/sync/semaphore"
)func doSomething(u string) {// 模拟抓取任务的执行fmt.Println(u)time.Sleep(2 * time.Second)
}const (Limit  = 3 // 同時并行运行的goroutine上限Weight = 1 // 每个goroutine获取信号量资源的权重
)func main() {urls := []string{"http://www.example.com","http://www.example.net","http://www.example.net/foo","http://www.example.net/bar","http://www.example.net/baz",}s := semaphore.NewWeighted(Limit)var w sync.WaitGroupfor _, u := range urls {w.Add(1)go func(u string) {s.Acquire(context.Background(), Weight)doSomething(u)s.Release(Weight)w.Done()}(u)}w.Wait()fmt.Println("All Done")
}

Go语言信号量的实现原理

Go语言扩展库中的信号量是使用互斥锁和List 实现的。互斥锁实现其它字段的保护,而 List 实现了一个等待队列,等待者的通知是通过 Channel 的通知机制实现的。

信号量的数据结构

我们来看一下信号量semaphore.Weighted的数据结构:

type Weighted struct {size    int64         // 最大资源数cur     int64         // 当前已被使用的资源mu      sync.Mutex    // 互斥锁,对字段的保护waiters list.List     // 等待队列
}
  • size字段用来记录信号量拥有的最大资源数。

  • cur标识当前已被使用的资源数。

  • mu是一个互斥锁用来提供对其他字段的临界区保护。

  • waiters表示申请资源时由于可使用资源不够而陷入阻塞等待的调用者列表。

Acquire请求信号量资源

Acquire方法会监控资源是否可用,而且还要检测传递进来的context.Context对象是否发送了超时过期或者取消的信号,我们来看一下它的代码实现:


func (s *Weighted) Acquire(ctx context.Context, n int64) error {s.mu.Lock()// 如果恰好有足够的资源,也没有排队等待获取资源的goroutine,// 将cur加上n后直接返回if s.size-s.cur >= n && s.waiters.Len() == 0 {s.cur += ns.mu.Unlock()return nil}// 请求的资源数大于能提供的最大的资源数// 这个任务处理不了,走错误处理逻辑if n > s.size {s.mu.Unlock()// 依赖ctx的状态返回,否则一直等待<-ctx.Done()return ctx.Err()}// 现存资源不够, 需要把调用者加入到等待队列中// 创建了一个ready chan,以便被通知唤醒ready := make(chan struct{})w := waiter{n: n, ready: ready}elem := s.waiters.PushBack(w)s.mu.Unlock()// 等待select {case <-ctx.Done(): // context的Done被关闭err := ctx.Err()s.mu.Lock()select {case <-ready: // 如果被唤醒了,忽略ctx的状态err = nildefault: // 通知waiterisFront := s.waiters.Front() == elems.waiters.Remove(elem)// 通知其它的waiters,检查是否有足够的资源if isFront && s.size > s.cur {s.notifyWaiters()}}s.mu.Unlock()return errcase <-ready: // 等待者被唤醒了return nil}}

如果调用者请求不到信号量的资源就会被加入等待者列表里,这里等待者列表的结构体定义是:

type waiter struct {n     int64ready chan<- struct{} // 当调用者可以获取到信号量资源时, close调这个chan
}

包含了两个字段,调用者请求的资源数,以及一个ready 通道。ready通道会在调用者可以被重新唤醒的时候被close调,从而起到通知正在阻塞读取ready通道的等待者的作用。

NotifyWaiters 通知等待者

notifyWaiters方法会逐个检查队列里等待的调用者,如果现存资源够等待者请求的数量n,或者是没有等待者了,就返回:

func (s *Weighted) notifyWaiters() {for {next := s.waiters.Front()if next == nil {break // 没有等待者了,直接返回}w := next.Value.(waiter)if s.size-s.cur < w.n {// 如果现有资源不够队列头调用者请求的资源数,就退出所有等待者会继续等待// 这里还是按照先入先出的方式处理是为了避免饥饿break}s.cur += w.ns.waiters.Remove(next)close(w.ready)}}

notifyWaiters方法是按照先入先出的方式唤醒调用者。当释放 100 个资源的时候,如果第一个等待者需要 101 个资源,那么,队列中的所有等待者都会继续等待,即使队列后面有的等待者只需要 1 个资源。这样做的目的是避免饥饿,否则的话,资源可能总是被那些请求资源数小的调用者获取,这样一来,请求资源数巨大的调用者,就没有机会获得资源了。

Release归还信号量资源

Release方法就很简单了,它将当前计数值减去释放的资源数 n,并调用notifyWaiters方法,尝试唤醒等待队列中的调用者,看是否有足够的资源被获取。


func (s *Weighted) Release(n int64) {s.mu.Lock()s.cur -= nif s.cur < 0 {s.mu.Unlock()panic("semaphore: released more than held")}s.notifyWaiters()s.mu.Unlock()
}

总结

Go语言中信号量有时候也会被Channel类型所取代,因为一个 buffered chan 也可以代表 n 个资源。不过既然Go语言通过golang.orgx/sync扩展库对外提供了semaphore.Weight这一种信号量实现,遇到使用信号量的场景时还是尽量使用官方提供的实现。在使用的过程中我们需要注意以下的几个问题:

  • AcquireTryAcquire方法都可以用于获取资源,前者会阻塞地获取信号量。后者会非阻塞地获取信号量,如果获取不到就返回false

  • Release归还信号量后,会以先进先出的顺序唤醒等待队列中的调用者。如果现有资源不够处于等待队列前面的调用者请求的资源数,所有等待者会继续等待。

  • 如果一个goroutine申请较多的资源,由于上面说的归还后唤醒等待者的策略,它可能会等待比较长的时间。

推荐阅读

  • 用GoModules管理项目依赖的方法和经验总结

  • 深入理解StatefulSet,用Kubernetes编排有状态应用

- END -

关注公众号,每周分享给你一个进阶知识

并发编程-信号量的使用方法和其实现原理相关推荐

  1. pv原语模拟实现_并发编程信号量的使用方法和其实现原理

    什么是信号量 信号量是并发编程中常见的一种同步机制,在需要控制访问资源的线程数量时就会用到信号量,关于什么是信号量这个问题,我引用一下维基百科对信号量的解释,大家就明白了. 信号量的概念是计算机科学家 ...

  2. Java并发编程-信号量

    Semaphore 直译是信号量,它的功能比较好理解,就是通过构造函数设定一个数量的许可,然后通过 acquire 方法获得许可,release 方法释放许可.它还有 tryAcquire 和 acq ...

  3. java queue 线程安全_java并发编程之线程安全方法

    线程安全的实现方法,包含如下方式 一, 互斥同步 使用互斥锁的方式. 举个栗子 synchronized,最常用的同步实现方案, ReentrantLock,java并发包中工具,后续介绍. 互斥同步 ...

  4. Java并发编程的基础-interrupt方法

    当其他线程通过调用当前线程的interrupt方法,表示向当前线程打个招呼,告诉他可以中断线程的执行了,至于什么时候中断,取决于当前线程自己. 线程通过检查资深是否被中断来进行相应,可以通过isInt ...

  5. 学习笔记(29):Python网络编程并发编程-信号量

    立即学习:https://edu.csdn.net/course/play/24458/296446?utm_source=blogtoedu 信号量(了解):也是一把锁semaphore 1. fr ...

  6. Java并发编程,3分分钟深入分析volatile的实现原理

    volatile原理 volatile简介 Java内存模型告诉我们,各个线程会将共享变量从主内存中拷贝到工作内存,然后执行引擎会基于工作内存中的数据进行操作处理. 线程在工作内存进行操作后何时会写到 ...

  7. Python并发编程——paramiko远程控制的模块、病毒攻击原理、dll注入、

    文章目录 paramiko模块 作业 攻击原理解析 一.什么是dll 二.为何要有dll 什么是dll注入: 什么时候需要dll注入 dll注入的方法 使用SetWindowsHookEx函数对应用程 ...

  8. 【并发编程学习篇】FutureCompletableFuture的使用与原理剖析

    一.Callable&Future&FutureTask介绍 直接继承Thread或者实现Runnable接口都可以创建线程,但是这两种方法都有一个问题就是:没有返回值,也就是不能获取 ...

  9. 并发编程之 源码剖析 线程池 实现原理

    前言 在上一篇文章中我们介绍了线程池的使用,那么现在我们有个疑问:线程池到底是怎么实现的?毕竟好奇是人类的天性.那我们今天就来看看吧,扒开 他的源码,一探究竟. 1. 从 Demo 入手 上图是个最简 ...

最新文章

  1. 如何检查电脑是否安装了python-python-如何检查安装了scikit的nltk版本?
  2. 安全系列------web环境搭建组合
  3. swagger报错 java.lang.NumberFormatException: For input string: ““
  4. Real to Int
  5. 一文了解OOM及解决方案,你还看不明白?
  6. 误打误撞的模板字符串
  7. SpringBoot2.0.3 + SpringSecurity5.0.6 + vue 前后端分离认证授权
  8. matlab中电感元件,中性点经消弧线圈及其并电阻接地系统的MATLAB仿真
  9. PyTorch系列入门到精通——梯度消失与爆炸,损失函数
  10. linux重定向串口打印到telnet
  11. 李开复:21世纪最需要的7种人才
  12. 深度学习2-keras模型训练
  13. 数据库插中文变问号,Mybatis存储数据乱码,linux服务器上MySQL数据库乱码
  14. LayaBox---Dialog弹窗
  15. Sonic开源的云真机测试平台搭建记录
  16. java 继承作用_理解java的三大特性之继承
  17. 电脑qq游戏程序更改计算机,电脑怎么把qq游戏快捷到桌面
  18. MacBook Pro 中/英大写键盘灯不亮 解决方法
  19. 这些成人世界的“黑话”,你能听懂多少?
  20. 免费python编程自学网站-可以免费自学编程的12个网站

热门文章

  1. Oracle之批量生成数据
  2. jquery2.1.1 checkbox
  3. HTTP电脑发送短信接口调用示例
  4. LNMT/LAMT实现动静分离、负载均衡和会话保持
  5. vim环境设置和自动对齐
  6. 运营是一个产品价值传递的过程,互联网营销
  7. 【补充一则】身份证校验的c#代码
  8. 99%学习前端开发都会遇到的问题,百分之百都没绝对意识
  9. 五款提高工作效率的在线工具【神器】
  10. 关于PHP的错误机制总结