Golang sync包提供了基础的异步操作方法,包括互斥锁Mutex,执行一次Once和并发等待组WaitGroup。
本文主要介绍sync包提供的这些功能的基本使用方法。

  • Mutex: 互斥锁
  • RWMutex:读写锁
  • WaitGroup:并发等待组
  • Once:执行一次
  • Cond:信号量
  • Pool:临时对象池
  • Map:自带锁的map

二. sync.Mutex

sync.Mutex称为互斥锁,常用在并发编程里面。协程是一种用户态的轻量级线程。(所以我们可以用线程的思想去理解)

互斥锁的概念: 对共享数据进行锁定,保证同一时刻只有能一个线程或者协程去操作。

注意: 互斥锁是多个线程或者协程一起去抢,抢到锁的线程或者协程去先执行,没抢到的就等待。等互斥锁使用完释放后,其他等待的线程或者协程去抢这个锁。

sync.Mutex有2个函数LockUnLock分别表示获得锁和释放锁。

func (m *Mutex) Lock()
func (m *Mutex) UnLock()

sync.Mutex初始值为UnLock状态,并且sync.Mutex常做为其它结构体的匿名变量使用。

举个例子: 我们经常使用网上支付购物东西,就会出现同一个银行账户在某一个时间既有支出也有收入,那银行就得保证我们余额准确,保证数据无误。
我们可以简单的实现银行的支出和收入来说明Mutex的使用。

type Bank struct {sync.Mutexbalance map[string]float64
}// In 收入
func (b *Bank) In(account string, value float64) {// 加锁 保证同一时间只有一个协程能访问这段代码b.Lock()defer b.Unlock()v, ok := b.balance[account]if !ok {b.balance[account] = 0.0}b.balance[account] += v
}// Out 支出
func (b *Bank) Out(account string, value float64) error {// 加锁 保证同一时间只有一个协程能访问这段代码b.Lock()defer b.Unlock()v, ok := b.balance[account]if !ok || v < value {return errors.New("account not enough balance")}b.balance[account] -= valuereturn nil
}

三. sync.RWMutex

sync.RWMutex称为读写锁是sync.Mutex的一种变种,RWMutex来自于计算机操作系统非常有名的读者写者问题。
sync.RWMutex目的是为了能够支持多个并发协程同时读取某一个资源,但只有一个并发协程能够更新资源。也就是说读和写是互斥的,写和写也是互斥的,读和读是不互斥的。

总结起来如下:

  • 当有一个协程在读的时候,所有写的协程必须等到所有读的协程结束才可以获得锁进行写操作。
  • 当有一个协程在读的时候,所有读的协程不受影响都可以进行读操作。
  • 当有一个协程在写的时候,所有读、写的协程必须等到写的协程结束才可以获得锁进行读、写操作。
  • RWMutex有5个函数,分别为读和写提供锁操作。
写操作
func (rw *RWMutex) Lock()
func (rw *RWMutex) Unlock()读操作
func (rw *RWMutex) RLock()
func (rw *RWMutex) RUnlock()RLocker()能获取读锁,然后传递给其他协程使用。
func (rw *RWMutex) RLocker() Locker

举个例子,sync.Mutex一节例子里面我们没有提供查询操作,如果用Mutex互斥锁就没有办法支持多人同时查询,所以我们使用sync.RWMutex来改写这个代码

type Bank struct {sync.RWMutexbalance map[string]float64
}func (b *Bank) In(account string, value float64) {b.Lock()defer b.Unlock()v, ok := b.balance[account]if !ok {b.balance[account] = 0.0}b.balance[account] += v
}func (b *Bank) Out(account string, value float64) error {b.Lock()defer b.Unlock()v, ok := b.balance[account]if !ok || v < value {return errors.New("account not enough balance")}b.balance[account] -= valuereturn nil
}func (b *Bank) Query(account string) float64 {b.RLock()defer b.RUnlock()v, ok := b.balance[account]if !ok {return 0.0}return v
}

sync.WaitGroup指的是等待组,在Golang并发编程里面非常常见,指的是等待一组工作完成后,再进行下一组工作

sync.WaitGroup有3个函数:

func (wg *WaitGroup) Add(delta int)  Add添加n个并发协程
func (wg *WaitGroup) Done()  Done完成一个并发协程
func (wg *WaitGroup) Wait()  Wait等待其它并发协程结束

sync.WaitGroup在Golang编程里面最常用于协程池,下面这个例子会同时启动1000个并发协程。

func main() {wg := &sync.WaitGroup{}for i := 0; i < 1000; i++ {wg.Add(1)go func() {defer func() {wg.Done()}()time.Sleep(1 * time.Second)fmt.Println("hello world ~")}()}// 等待所有协程结束wg.Wait()fmt.Println("WaitGroup all process done ~")
}

sync.WaitGroup没有办法指定最大并发协程数,在一些场景下会有问题。例如操作数据库场景下,我们不希望某一些时刻出现大量连接数据库导致数据库不可访问。所以,为了能够控制最大的并发数,推荐使用最下面的,用法和sync.WaitGroup非常类似。

下面这个例子最多只有10个并发协程,如果已经达到10个并发协程,只有某一个协程执行了Done才能启动一个新的协程。

import  "github.com/remeh/sizedwaitgroup"func main() {# 最大10个并发wg := sizedwaitgroup.New(10)for i = 0; i < 1000; i++ {wg.Add()go func() {defer func() {wg.Done()}()time.Sleep(1 * time.Second)fmt.Println("hello world ~")}()}// 等待所有协程结束wg.Wait()fmt.Println("WaitGroup all process done ~")
}

sync.Once

sync.Once指的是只执行一次的对象实现,常用来控制某些函数只能被调用一次。sync.Once的使用场景例如单例模式、系统初始化。
例如并发情况下多次调用channel的close会导致panic,解决这个问题我们可以使用sync.Once来保证close只会被执行一次。

sync.Once的结构如下所示,只有一个函数。使用变量done来记录函数的执行状态,使用sync.Mutex和sync.atomic来保证线程安全的读取done。

type Once struct {m    Mutex     #互斥锁done uint32    #执行状态
}func (o *Once) Do(f func())

举个例子,1000个并发协程情况下只有一个协程会执行到fmt.Printf,多次执行的情况下输出的内容还不一样,因为这取决于哪个协程先调用到该匿名函数。

func main() {once := &sync.Once{}for i := 0; i < 1000; i++ {go func(idx int) {once.Do(func() {time.Sleep(1 * time.Second)fmt.Printf("hello world index: %d", idx)})}(i)}time.Sleep(5 * time.Second)
}

sync.Cond

sync.Cond指的是同步条件变量,一般需要与互斥锁组合使用,本质上是一些正在等待某个条件的协程的同步机制。

// NewCond returns a new Cond with Locker l.
func NewCond(l Locker) *Cond {return &Cond{L: l}
}// A Locker represents an object that can be locked and unlocked.
type Locker interface {Lock()Unlock()
}

sync.Cond有3个函数WaitSignalBroadcast:

// Wait 等待通知
func (c *Cond) Wait()
// Signal 单播通知
func (c *Cond) Signal()
// Broadcast 广播通知
func (c *Cond) Broadcast()

举个例子,sync.Cond用于并发协程条件变量。

var sharedRsc = make(map[string]interface{})
func main() {var wg sync.WaitGroupwg.Add(2)m := sync.Mutex{}c := sync.NewCond(&m)go func() {// this go routine wait for changes to the sharedRscc.L.Lock()for len(sharedRsc) == 0 {c.Wait()}fmt.Println(sharedRsc["rsc1"])c.L.Unlock()wg.Done()}()go func() {// this go routine wait for changes to the sharedRscc.L.Lock()for len(sharedRsc) == 0 {c.Wait()}fmt.Println(sharedRsc["rsc2"])c.L.Unlock()wg.Done()}()// this one writes changes to sharedRscc.L.Lock()sharedRsc["rsc1"] = "foo"sharedRsc["rsc2"] = "bar"c.Broadcast()c.L.Unlock()wg.Wait()
}

sync.Pool

sync.Pool指的是临时对象池,Golang和Java具有GC机制,因此很多开发者基本上都不会考虑内存回收问题,不像C++很多时候开发需要自己回收对象。
Gc是一把双刃剑,带来了编程的方便但同时也增加了运行时开销,使用不当可能会严重影响程序的性能,因此性能要求高的场景不能任意产生太多的垃圾。
sync.Pool正是用来解决这类问题的,Pool可以作为临时对象池来使用,不再自己单独创建对象,而是从临时对象池中获取出一个对象。

sync.Pool有2个函数Get和Put,Get负责从临时对象池中取出一个对象,Put用于结束的时候把对象放回临时对象池中。

func (p *Pool) Get() interface{}
func (p *Pool) Put(x interface{})

看一个官方例子:

var bufPool = sync.Pool{New: func() interface{} {return new(bytes.Buffer)},
}func timeNow() time.Time {return time.Unix(1136214245, 0)
}func Log(w io.Writer, key, val string) {// 获取临时对象,没有的话会自动创建b := bufPool.Get().(*bytes.Buffer)b.Reset()b.WriteString(timeNow().UTC().Format(time.RFC3339))b.WriteByte(' ')b.WriteString(key)b.WriteByte('=')b.WriteString(val)w.Write(b.Bytes())// 将临时对象放回到 Pool 中bufPool.Put(b)
}func main() {Log(os.Stdout, "path", "/search?q=flowers")
}

从上面的例子我们可以看到创建一个Pool对象并不能指定大小,所以sync.Pool的缓存对象数量是没有限制的(只受限于内存),那sync.Pool是如何控制缓存临时对象数的呢?

sync.Pool在init的时候注册了一个poolCleanup函数,它会清除所有的pool里面的所有缓存的对象,该函数注册进去之后会在每次Gc之前都会调用,因此sync.Pool缓存的期限只是两次Gc之间这段时间。正因Gc的时候会清掉缓存对象,所以不用担心pool会无限增大的问题。

正因为如此sync.Pool适合用于缓存临时对象,而不适合用来做持久保存的对象池(连接池等)。

sync.Map

Go在1.9版本之前自带的map对象是不具有并发安全的,很多时候我们都得自己封装支持并发安全的Map结构,如下所示给map加个读写锁sync.RWMutex。

type MapWithLock struct {sync.RWMutexM map[string]Kline
}

sync.Map总共5个函数,用法和原生的map差不多:

// 查询一个key
func (m *Map) Load(key interface{}) (value interface{}, ok bool)
// 设置key value
func (m *Map) Store(key, value interface{})
// 如果key存在则返回key对应的value,否则设置key value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool)
// 删除一个key
func (m *Map) Delete(key interface{})
// 遍历map,仍然是无序的
func (m *Map) Range(f func(key, value interface{}) bool)

为了能够控制最大的并发数:

// Based upon sync.WaitGroup, SizedWaitGroup allows to start multiple
// routines and to wait for their end using the simple API.// SizedWaitGroup adds the feature of limiting the maximum number of
// concurrently started routines. It could for example be used to start
// multiples routines querying a database but without sending too much
// queries in order to not overload the given database.
//
// Rémy Mathieu © 2016
package sizedwaitgroupimport ("context""math""sync"
)// SizedWaitGroup has the same role and close to the
// same API as the Golang sync.WaitGroup but adds a limit of
// the amount of goroutines started concurrently.
type SizedWaitGroup struct {Size intcurrent chan struct{}wg      sync.WaitGroup
}// New creates a SizedWaitGroup.
// The limit parameter is the maximum amount of
// goroutines which can be started concurrently.
func New(limit int) SizedWaitGroup {size := math.MaxInt32 // 2^32 - 1if limit > 0 {size = limit}return SizedWaitGroup{Size: size,current: make(chan struct{}, size),wg:      sync.WaitGroup{},}
}// Add increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Add() {s.AddWithContext(context.Background())
}// AddWithContext increments the internal WaitGroup counter.
// It can be blocking if the limit of spawned goroutines
// has been reached. It will stop blocking when Done is
// been called, or when the context is canceled. Returns nil on
// success or an error if the context is canceled before the lock
// is acquired.
//
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) AddWithContext(ctx context.Context) error {select {case <-ctx.Done():return ctx.Err()case s.current <- struct{}{}:break}s.wg.Add(1)return nil
}// Done decrements the SizedWaitGroup counter.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Done() {<-s.currents.wg.Done()
}// Wait blocks until the SizedWaitGroup counter is zero.
// See sync.WaitGroup documentation for more information.
func (s *SizedWaitGroup) Wait() {s.wg.Wait()
}
package sizedwaitgroupimport ("context""sync/atomic""testing"
)func TestWait(t *testing.T) {swg := New(10)var c uint32for i := 0; i < 10000; i++ {swg.Add()go func(c *uint32) {defer swg.Done()atomic.AddUint32(c, 1)}(&c)}swg.Wait()if c != 10000 {t.Fatalf("%d, not all routines have been executed.", c)}
}func TestThrottling(t *testing.T) {var c uint32swg := New(4)if len(swg.current) != 0 {t.Fatalf("the SizedWaitGroup should start with zero.")}for i := 0; i < 10000; i++ {swg.Add()go func(c *uint32) {defer swg.Done()atomic.AddUint32(c, 1)if len(swg.current) > 4 {t.Fatalf("not the good amount of routines spawned.")return}}(&c)}swg.Wait()
}func TestNoThrottling(t *testing.T) {var c uint32swg := New(0)if len(swg.current) != 0 {t.Fatalf("the SizedWaitGroup should start with zero.")}for i := 0; i < 10000; i++ {swg.Add()go func(c *uint32) {defer swg.Done()atomic.AddUint32(c, 1)}(&c)}swg.Wait()if c != 10000 {t.Fatalf("%d, not all routines have been executed.", c)}
}func TestAddWithContext(t *testing.T) {ctx, cancelFunc := context.WithCancel(context.TODO())swg := New(1)if err := swg.AddWithContext(ctx); err != nil {t.Fatalf("AddContext returned error: %v", err)}cancelFunc()if err := swg.AddWithContext(ctx); err != context.Canceled {t.Fatalf("AddContext returned non-context.Canceled error: %v", err)}}
# SizedWaitGroup[![GoDoc](https://godoc.org/github.com/remeh/sizedwaitgroup?status.svg)](https://godoc.org/github.com/remeh/sizedwaitgroup)`SizedWaitGroup` has the same role and API as `sync.WaitGroup` but it adds a limit of the amount of goroutines started concurrently.`SizedWaitGroup` adds the feature of limiting the maximum number of concurrently started routines. It could for example be used to start multiples routines querying a database but without sending too much queries in order to not overload the given database.# Example```go
package mainimport ("fmt""math/rand""time""github.com/remeh/sizedwaitgroup"
)func main() {rand.Seed(time.Now().UnixNano())// Typical use-case:// 50 queries must be executed as quick as possible// but without overloading the database, so only// 8 routines should be started concurrently.swg := sizedwaitgroup.New(8)for i := 0; i < 50; i++ {swg.Add()go func(i int) {defer swg.Done()query(i)}(i)}swg.Wait()
}func query(i int) {fmt.Println(i)ms := i + 500 + rand.Intn(500)time.Sleep(time.Duration(ms) * time.Millisecond)
}
```# LicenseMIT# CopyrightRémy Mathieu © 2016

Golang 并发之锁相关推荐

  1. Golang之自旋锁

    Golang之自旋锁 目录 Golang之自旋锁 自旋锁 golang实现自旋锁 可重入的自旋锁和不可重入的自旋锁 自旋锁的其他变种 1. TicketLock TicketLock主要解决的是公平性 ...

  2. golang的乐观锁与悲观锁

    golang的乐观锁与悲观锁 基本概念 基本概念 乐观锁和悲观锁是两种思想,用于解决并发场景下的数据竞争问题. 乐观锁:乐观锁在操作数据时非常乐观,认为别人不会同时修改数据.因此乐观锁不会上锁,只是在 ...

  3. golang实现无锁队列

    golang实现无锁队列 locklessqueue.go //locklessqueue.go package locklessimport ("sync/atomic" )ty ...

  4. golang windows 判断锁屏

    golang windows 判断是否锁屏: package osapiimport ("syscall""unsafe""github.com/lx ...

  5. 深入理解 golang 的互斥锁

    How to implement Golang Mutex golang 是如何实现互斥锁的 在开始之前,我们需要知道锁实现的几种方式. # 信号量 操作系统中有 P 和 V 操作.P 操作是将信号量 ...

  6. golang mutex互斥锁分析

    互斥锁:没有读锁写锁之分,同一时刻,只能有一个gorutine获取一把锁 数据结构设计: type Mutex struct {state int32 // 将一个32位整数拆分为 当前阻塞的goro ...

  7. Java高并发之锁优化

    本文主要讲并行优化的几种方式, 其结构如下: 锁优化 减少锁的持有时间 例如避免给整个方法加锁 1 public synchronized void syncMethod(){ 2 othercode ...

  8. 问题 | golang开发之go.mod的使用方法

    文章目录 go mod之坑 背景 项目 mod实战 主程序调用同目录里面的包 主程序调用其他目录的包 重要提醒 问答FAQs 1. 有些包由于特定网络原因无法访问怎么办? 2. 公司通过 gitlab ...

  9. Golang 开发之Cobra初探

    一 背景 在云原生如日中天的当下,相信很多人对Kubernetes/etcd等都有所听闻,当我们看其源码或对其进行二次开发的时候,可以发现其均使用了一个命令行程序库Cobra,其是一个用来编写命令行的 ...

  10. 利用redis实现golang的分布式锁

    go使用redis锁 基于Redis的SetNX方法,创建并使用redis锁 曾经在一便文档中,有一句话,引发的我的思考:如果公司内已有可以使用的ZooKeeper.etcd或者Redis集群,那么就 ...

最新文章

  1. Java项目:潜艇大战项目(java+swing)
  2. centos7 virtualbox使用internal network 内网模式
  3. R语言关联规则挖掘数据集预览、分析、筛选:项目数的分布形态(分位数、密度图)、itemFrequency函数统计每一项目在所有事务中出现的次数、最常发生的项目、数据筛选(交易的集合项目大于1)
  4. RedHat7/CentOS7 压缩解压命令汇总
  5. Python学习笔记(六)if判断语句
  6. 释疑のABAP输入框字符自动变成大写问题
  7. Linux系统学习----前言
  8. Windows 11 上大招!正式支持安卓!
  9. android app的签名,Android APP的签名
  10. “新闻”频道“最新更新”有问题吗?
  11. 【层级多标签文本分类】融合标签层级结构的文本分类
  12. js自执行函数前加个分号是什么意思?
  13. 挺进商用车自动驾驶,德赛西威与MAXIEYE联合发布“九逵计划”
  14. 11.1 项目风险管理
  15. 对多个Excel表中的数据进行合并计算
  16. 静态时序分析 Static Timing Analysis 教程
  17. 解决:SpringBoot使用@Value读取application.properties中文乱码
  18. C#实现毫秒级计时器
  19. 20221125使用PR2023自动识别obs-studio录屏生成的MKV视频的字幕
  20. 2W字!详解20道Redis经典面试题!(珍藏版)

热门文章

  1. yarn : 无法加载文件 C:\Users\Emily\AppData\Roaming\npm\yarn.ps1,因为在此系统上禁止运行脚本。
  2. 从看脸到读心:深度理解人的视觉技术走到哪了?
  3. 12、Urban Radiance Fields
  4. Ubuntu18中添加中文输入法
  5. 在职阿里3年,一个27岁女软件测试工程师的心声
  6. 中国开源走向第二梯队!
  7. C语言求乘方、幂数、取余
  8. 酒店如何实现上网认证的呢
  9. Android主题色设为透明
  10. linux 下的无线网络配置,详解在LINUX环境下怎样设置无线网络配置