四、Golang进阶部分(并发)

并发编程在当前软件领域是一个非常重要的概念,随着CPU等硬件的发展,我们无一例外的想让我们的程序运行的快一点、再快一点。Go语言在语言层面天生支持并发,充分利用现代CPU的多核优势,这也是Go语言能够大范围流行的一个很重要的原因

4.1 基本概念

首先我们先来了解几个与并发编程相关的基本概念。

串行、并发与并行

串行:我们都是先读小学,小学毕业后再读初中,读完初中再读高中。

并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。

并行:同一时刻执行多个任务(你和你朋友都在用微信和女朋友聊天)。

进程、线程和协程

进程(process):程序在操作系统中的一次执行过程,系统进行资源分配和调度的一个独立单位。

线程(thread):操作系统基于进程开启的轻量级进程,是操作系统调度执行的最小单位。

协程(coroutine):非操作系统提供而是由用户自行创建和控制的用户态‘线程’,比线程更轻量级。

并发模型

业界将如何实现并发编程总结归纳为各式各样的并发模型,常见的并发模型有以下几种:

  • 线程&锁模型
  • Actor模型
  • CSP模型
  • Fork&Join模型

Go语言中的并发程序主要是通过基于CSP(communicating sequential processes)的goroutine和channel来实现,当然也支持使用传统的多线程共享内存的并发方式。

4.2 goroutine

Goroutine 是 Go 语言支持并发的核心,在一个Go程序中同时创建成百上千个goroutine是非常普遍的,一个goroutine会以一个很小的栈开始其生命周期,一般只需要2KB。区别于操作系统线程由系统内核进行调度, goroutine 是由Go运行时(runtime)负责调度。例如Go运行时会智能地将 m个goroutine 合理地分配给n个操作系统线程,实现类似m:n的调度机制,不再需要Go开发者自行在代码层面维护一个线程池。

Goroutine 是 Go 程序中最基本的并发执行单元。每一个 Go 程序都至少包含一个 goroutine——main goroutine,当 Go 程序启动时它会自动创建。

在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能——goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个 goroutine 去执行这个函数就可以了,就是这么简单粗暴。

go关键字

Go语言中使用 goroutine 非常简单,只需要在函数或方法调用前加上go关键字就可以创建一个 goroutine ,从而让该函数或方法在新创建的 goroutine 中执行。

go f()  // 创建一个新的 goroutine 运行函数f

匿名函数也支持使用go关键字创建 goroutine 去执行。

go func(){// ...
}()

一个 goroutine 必定对应一个函数/方法,可以创建多个 goroutine 去执行相同的函数/方法。

启动单个goroutine

启动 goroutine 的方式非常简单,只需要在调用函数(普通函数和匿名函数)前加上一个go关键字。

我们先来看一个在 main 函数中执行普通函数调用的示例。

package mainimport ("fmt"
)func hello() {fmt.Println("hello")
}func main() {hello()fmt.Println("你好")
}

将上面的代码编译后执行,得到的结果如下:

hello
你好

代码中 hello 函数和其后面的打印语句是串行的。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-SG41suTT-1673623884409)(null)]

接下来我们在调用 hello 函数前面加上关键字go,也就是启动一个 goroutine 去执行 hello 这个函数。

func main() {go hello() // 启动另外一个goroutine去执行hello函数fmt.Println("main goroutine done!")
}

将上述代码重新编译后执行,得到输出结果如下。

你好

这一次的执行结果只在终端打印了”你好”,并没有打印 hello。这是为什么呢?

其实在 Go 程序启动时,Go 程序就会为 main 函数创建一个默认的 goroutine 。在上面的代码中我们在 main 函数中使用 go 关键字创建了另外一个 goroutine 去执行 hello 函数,而此时 main goroutine 还在继续往下执行,我们的程序中此时存在两个并发执行的 goroutine。当 main 函数结束时整个程序也就结束了,同时 main goroutine 也结束了,所有由 main goroutine 创建的 goroutine 也会一同退出。也就是说我们的 main 函数退出太快,另外一个 goroutine 中的函数还未执行完程序就退出了,导致未打印出“hello”。

main goroutine 就像是《权利的游戏》中的夜王,其他的 goroutine 都是夜王转化出的异鬼,夜王一死它转化的那些异鬼也就全部GG了。

所以我们要想办法让 main 函数‘“等一等”将在另一个 goroutine 中运行的 hello 函数。其中最简单粗暴的方式就是在 main 函数中“time.Sleep”一秒钟了(这里的1秒钟只是我们为了保证新的 goroutine 能够被正常创建和执行而设置的一个值)。

按如下方式修改我们的示例代码。

package mainimport ("fmt""time"
)func hello() {fmt.Println("hello")
}func main() {go hello()fmt.Println("你好")time.Sleep(time.Second)
}

将我们的程序重新编译后再次执行,程序会在终端输出如下结果,并且会短暂停顿一会儿。

你好
hello

为什么会先打印你好呢?

这是因为在程序中创建 goroutine 执行函数需要一定的开销,而与此同时 main 函数所在的 goroutine 是继续执行的。

在上面的程序中使用time.Sleep让 main goroutine 等待 hello goroutine执行结束是不优雅的,当然也是不准确的。

Go 语言中通过sync包为我们提供了一些常用的并发原语,我们会在后面的小节单独介绍sync包中的内容。在这一小节,我们会先介绍一下 sync 包中的WaitGroup。当你并不关心并发操作的结果或者有其它方式收集并发操作的结果时,WaitGroup是实现等待一组并发操作完成的好方法。

下面的示例代码中我们在 main goroutine 中使用sync.WaitGroup来等待 hello goroutine 完成后再退出。

package mainimport ("fmt""sync"
)// 声明全局等待组变量
var wg sync.WaitGroupfunc hello() {fmt.Println("hello")wg.Done() // 告知当前goroutine完成
}func main() {wg.Add(1) // 登记1个goroutinego hello()fmt.Println("你好")wg.Wait() // 阻塞等待登记的goroutine完成
}

将代码编译后再执行,得到的输出结果和之前一致,但是这一次程序不再会有多余的停顿,hello goroutine 执行完毕后程序直接退出。

启动多个goroutine

在 Go 语言中实现并发就是这样简单,我们还可以启动多个 goroutine 。让我们再来看一个新的代码示例。这里同样使用了sync.WaitGroup来实现 goroutine 的同步。

package mainimport ("fmt""sync"
)var wg sync.WaitGroupfunc hello(i int) {defer wg.Done() // goroutine结束就登记-1fmt.Println("hello", i)
}
func main() {for i := 0; i < 10; i++ {wg.Add(1) // 启动一个goroutine就登记+1go hello(i)}wg.Wait() // 等待所有登记的goroutine都结束
}

多次执行上面的代码会发现每次终端上打印数字的顺序都不一致。这是因为10个 goroutine 是并发执行的,而 goroutine 的调度是随机的。

groutine什么时候结束?

groutine对应的函数结束了,groutine就结束了。

main函数执行完了,又main函数创建的gourtine也结束了

随机:

rand.Seed(time.Now().UnixNano()) //加随机种子(为了每一次都不同的随机)用纳秒
for i := 0; i <= 5; i++ {r1 := rand.Int()r2 := rand.Intn(10) //在0<=n<10随机fmt.Println(r1, r2)

动态栈

操作系统的线程一般都有固定的栈内存(通常为2MB),而 Go 语言中的 goroutine 非常轻量级,一个 goroutine 的初始栈空间很小(一般为2KB),所以在 Go 语言中一次创建数万个 goroutine 也是可能的。并且 goroutine 的栈不是固定的,可以根据需要动态地增大或缩小, Go 的 runtime 会自动为 goroutine 分配合适的栈空间。

goroutine调度

操作系统的线程会被操作系统内核调度时会挂起当前执行的线程并将它的寄存器内容保存到内存中,选出下一次要执行的线程并从内存中恢复该线程的寄存器信息,然后恢复执行该线程的现场并开始执行线程。从一个线程切换到另一个线程需要完整的上下文切换。因为可能需要多次内存访问,索引这个切换上下文的操作开销较大,会增加运行的cpu周期。

区别于操作系统内核调度操作系统线程,goroutine 的调度是Go语言运行时(runtime)层面的实现,是完全由 Go 语言本身实现的一套调度系统——go scheduler。它的作用是按照一定的规则将所有的 goroutine 调度到操作系统线程上执行。

在经历数个版本的迭代之后,目前 Go 语言的调度器采用的是 GPM 调度模型。

其中:

  • G:表示 goroutine,每执行一次go f()就创建一个 G,包含要执行的函数和上下文信息。
  • 全局队列(Global Queue):存放等待运行的 G。
  • P:表示 goroutine 执行所需的资源,最多有 GOMAXPROCS 个。
  • P 的本地队列:同全局队列类似,存放的也是等待运行的G,存的数量有限,不超过256个。新建 G 时,G 优先加入到 P 的本地队列,如果本地队列满了会批量移动部分 G 到全局队列。
  • M:线程想运行任务就得获取 P,从 P 的本地队列获取 G,当 P 的本地队列为空时,M 也会尝试从全局队列或其他 P 的本地队列获取 G。M 运行 G,G 执行之后,M 会从 P 获取下一个 G,不断重复下去。
  • Goroutine 调度器和操作系统调度器是通过 M 结合起来的,每个 M 都代表了1个内核线程,操作系统调度器负责把内核线程分配到 CPU 的核上执行。

单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的, goroutine 则是由Go运行时(runtime)自己的调度器调度的,完全是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身 goroutine 的超轻量级,以上种种特性保证了 goroutine 调度方面的性能。

GOMAXPROCS

Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个 OS 线程来同时执行 Go 代码。默认值是机器上的 CPU 核心数。例如在一个 8 核心的机器上,GOMAXPROCS 默认为 8。Go语言中可以通过runtime.GOMAXPROCS函数设置当前程序并发时占用的 CPU逻辑核心数。(Go1.5版本之前,默认使用的是单核心执行。Go1.5 版本之后,默认使用全部的CPU 逻辑核心数。)

4.3 channel

单纯地将函数并发执行是没有意义的。函数与函数间需要交换数据才能体现并发执行函数的意义。

虽然可以使用共享内存进行数据交换,但是共享内存在不同的 goroutine 中容易发生竞态问题。为了保证数据交换的正确性,很多并发模型中必须使用互斥量对内存进行加锁,这种做法势必造成性能问题。

Go语言采用的并发模型是CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信

如果说 goroutine 是Go程序并发的执行体,channel就是它们之间的连接。channel是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

Go 语言中的通道(channel)是一种特殊的类型。通道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个通道都是一个具体类型的导管,也就是声明channel的时候需要为其指定元素类型。

channel类型

channel是 Go 语言中一种特有的类型。声明通道类型变量的格式如下:

var 变量名称 chan 元素类型

其中:

  • chan:是关键字
  • 元素类型:是指通道中传递元素的类型

举几个例子:

var ch1 chan int   // 声明一个传递整型的通道
var ch2 chan bool  // 声明一个传递布尔型的通道
var ch3 chan []int // 声明一个传递int切片的通道

channel零值

未初始化的通道类型变量其默认零值是nil

var ch chan int
fmt.Println(ch) // <nil>

初始化channel

声明的通道类型变量需要使用内置的make函数初始化之后才能使用。具体格式如下:

make(chan 元素类型, [缓冲大小])

其中:

  • channel的缓冲大小是可选的。

举几个例子:

ch4 := make(chan int)
ch5 := make(chan bool, 1)  // 声明一个缓冲区大小为1的通道

channel操作

通道共有发送(send)、接收(receive)和关闭(close)三种操作。而发送和接收操作都使用<-符号。

现在我们先使用以下语句定义一个通道:

ch := make(chan int)
发送

将一个值发送到通道中。

ch <- 10 // 把10发送到ch中
接收

从一个通道中接收值。

x := <- ch // 从ch中接收值并赋值给变量x
<-ch       // 从ch中接收值,忽略结果
关闭

我们通过调用内置的close函数来关闭通道。

close(ch)

**注意:**一个通道值是可以被垃圾回收掉的。通道通常由发送方执行关闭操作,并且只有在接收方明确等待通道关闭的信号时才需要执行关闭操作。它和关闭文件不一样,通常在结束操作之后关闭文件是必须要做的,但关闭通道不是必须的。

关闭后的通道有以下特点:

  1. 对一个关闭的通道再发送值就会导致 panic。
  2. 对一个关闭的通道进行接收会一直获取值直到通道为空。
  3. 对一个关闭的并且没有值的通道执行接收操作会得到对应类型的零值。
  4. 关闭一个已经关闭的通道会导致 panic。

无缓冲的通道

无缓冲的通道又称为阻塞的通道。我们来看一下如下代码片段。

func main() {ch := make(chan int)ch <- 10fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!goroutine 1 [chan send]:
main.main().../main.go:8 +0x54

deadlock表示我们程序中的 goroutine 都被挂起导致程序死锁了。为什么会出现deadlock错误呢?

因为我们使用ch := make(chan int)创建的是无缓冲的通道,无缓冲的通道只有在有接收方能够接收值的时候才能发送成功,否则会一直处于等待发送的阶段。同理,如果对一个无缓冲通道执行接收操作时,没有任何向通道中发送值的操作那么也会导致接收操作阻塞。就像田径比赛中的4x100接力赛,想要完成交棒必须有一个能够接棒的运动员,否则只能等待。简单来说就是无缓冲的通道必须有至少一个接收方才能发送成功。

上面的代码会阻塞在ch <- 10这一行代码形成死锁,那如何解决这个问题呢?

其中一种可行的方法是创建一个 goroutine 去接收值,例如:

func recv(c chan int) {ret := <-cfmt.Println("接收成功", ret)
}func main() {ch := make(chan int)go recv(ch) // 创建一个 goroutine 从通道接收值ch <- 10fmt.Println("发送成功")
}

首先无缓冲通道ch上的发送操作会阻塞,直到另一个 goroutine 在该通道上执行接收操作,这时数字10才能发送成功,两个 goroutine 将继续执行。相反,如果接收操作先执行,接收方所在的 goroutine 将阻塞,直到 main goroutine 中向该通道发送数字10。

使用无缓冲通道进行通信将导致发送和接收的 goroutine 同步化。因此,无缓冲通道也被称为同步通道

有缓冲的通道

还有另外一种解决上面死锁问题的方法,那就是使用有缓冲区的通道。我们可以在使用 make 函数初始化通道时,可以为其指定通道的容量,例如:

func main() {ch := make(chan int, 1) // 创建一个容量为1的有缓冲区通道ch <- 10fmt.Println("发送成功")
}

只要通道的容量大于零,那么该通道就属于有缓冲的通道,通道的容量表示通道中最大能存放的元素数量。当通道内已有元素数达到最大容量后,再向通道执行发送操作就会阻塞,除非有从通道执行接收操作。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

我们可以使用内置的len函数获取通道内元素的数量,使用cap函数获取通道的容量,虽然我们很少会这么做。

多返回值模式

当向通道中发送完数据时,我们可以通过close函数来关闭通道。当一个通道被关闭后,再往该通道发送值会引发panic,从该通道取值的操作会先取完通道中的值。通道内的值被接收完后再对通道执行接收操作得到的值会一直都是对应元素类型的零值。那我们如何判断一个通道是否被关闭了呢?

对一个通道执行接收操作时支持使用如下多返回值模式。

value, ok := <- ch

其中:

  • value:从通道中取出的值,如果通道被关闭则返回对应类型的零值。
  • ok:通道ch关闭时返回 false,否则返回 true。

下面代码片段中的f2函数会循环从通道ch中接收所有值,直到通道被关闭后退出。

func f2(ch chan int) {for {v, ok := <-chif !ok {fmt.Println("通道已关闭")break}fmt.Printf("v:%#v ok:%#v\n", v, ok)}
}func main() {ch := make(chan int, 2)ch <- 1ch <- 2close(ch)f2(ch)
}

for range接收值

通常我们会选择使用for range循环从通道中接收值,当通道被关闭后,会在通道内的所有值被接收完毕后会自动退出循环。上面那个示例我们使用for range改写后会很简洁。

func f3(ch chan int) {for v := range ch {fmt.Println(v)}
}

**注意:**目前Go语言中并没有提供一个不对通道进行读取操作就能判断通道是否被关闭的方法。不能简单的通过len(ch)操作来判断通道是否被关闭。

单向通道

在某些场景下我们可能会将通道作为参数在多个任务函数间进行传递,通常我们会选择在不同的任务函数中对通道的使用进行限制,比如限制通道在某个函数中只能执行发送或只能执行接收操作。想象一下,我们现在有ProducerConsumer两个函数,其中Producer函数会返回一个通道,并且会持续将符合条件的数据发送至该通道,并在发送完成后将该通道关闭。而Consumer函数的任务是从通道中接收值进行计算,这两个函数之间通过Processer函数返回的通道进行通信。完整的示例代码如下。

package mainimport ("fmt"
)// Producer 返回一个通道
// 并持续将符合条件的数据发送至返回的通道中
// 数据发送完成后会将返回的通道关闭
func Producer() chan int {ch := make(chan int, 2)// 创建一个新的goroutine执行发送数据的任务go func() {for i := 0; i < 10; i++ {if i%2 == 1 {ch <- i}}close(ch) // 任务完成后关闭通道}()return ch
}// Consumer 从通道中接收数据进行计算
func Consumer(ch chan int) int {sum := 0for v := range ch {sum += v}return sum
}func main() {ch := Producer()res := Consumer(ch)fmt.Println(res) // 25}

从上面的示例代码中可以看出正常情况下Consumer函数中只会对通道进行接收操作,但是这不代表不可以在Consumer函数中对通道进行发送操作。作为Producer函数的提供者,我们在返回通道的时候可能只希望调用方拿到返回的通道后只能对其进行接收操作。但是我们没有办法阻止在Consumer函数中对通道进行发送操作。

Go语言中提供了单向通道来处理这种需要限制通道只能进行某种操作的情况。

<- chan int // 只接收通道,只能接收不能发送
chan <- int // 只发送通道,只能发送不能接收

其中,箭头<-和关键字chan的相对位置表明了当前通道允许的操作,这种限制将在编译阶段进行检测。另外对一个只接收通道执行close也是不允许的,因为默认通道的关闭操作应该由发送方来完成。

我们使用单向通道将上面的示例代码进行如下改造。

// Producer2 返回一个接收通道
func Producer2() <-chan int {ch := make(chan int, 2)// 创建一个新的goroutine执行发送数据的任务go func() {for i := 0; i < 10; i++ {if i%2 == 1 {ch <- i}}close(ch) // 任务完成后关闭通道}()return ch
}// Consumer2 参数为接收通道
func Consumer2(ch <-chan int) int {sum := 0for v := range ch {sum += v}return sum
}func main() {ch2 := Producer2()res2 := Consumer2(ch2)fmt.Println(res2) // 25
}

这一次,Producer函数返回的是一个只接收通道,这就从代码层面限制了该函数返回的通道只能进行接收操作,保证了数据安全。很多读者看到这个示例可能会觉着这样的限制是多余的,但是试想一下如果Producer函数可以在其他地方被其他人调用,你该如何限制他人不对该通道执行发送操作呢?并且返回限制操作的单向通道也会让代码语义更清晰、更易读。

在函数传参及任何赋值操作中全向通道(正常通道)可以转换为单向通道,但是无法反向转换。

var ch3 = make(chan int, 1)
ch3 <- 10
close(ch3)
Consumer2(ch3) // 函数传参时将ch3转为单向通道var ch4 = make(chan int, 1)
ch4 <- 10
var ch5 <-chan int // 声明一个只接收通道ch5
ch5 = ch4          // 变量赋值时将ch4转为单向通道
<-ch5

worker pool(goroutine池)

总结

下面的表格中总结了对不同状态下的通道执行相应操作的结果。

**注意:**对已经关闭的通道再执行 close 也会引发 panic。

4.4 select多路复用

在某些场景下我们可能需要同时从多个通道接收数据。通道在接收数据时,如果没有数据可以被接收那么当前 goroutine 将会发生阻塞。你也许会写出如下代码尝试使用遍历的方式来实现从多个通道中接收值。

for{// 尝试从ch1接收值data, ok := <-ch1// 尝试从ch2接收值data, ok := <-ch2…
}

这种方式虽然可以实现从多个通道接收值的需求,但是程序的运行性能会差很多。Go 语言内置了select关键字,使用它可以同时响应多个通道的操作。

Select 的使用方式类似于之前学到的 switch 语句,它也有一系列 case 分支和一个默认的分支。每个 case 分支会对应一个通道的通信(接收或发送)过程。select 会一直等待,直到其中的某个 case 的通信操作完成时,就会执行该 case 分支对应的语句。具体格式如下:

select {case <-ch1://...
case data := <-ch2://...
case ch3 <- 10://...
default://默认操作
}

Select 语句具有以下特点。

  • 可处理一个或多个 channel 的发送/接收操作。
  • 如果多个 case 同时满足,select 会随机选择一个执行。
  • 对于没有 case 的 select 会一直阻塞,可用于阻塞 main 函数,防止退出。

下面的示例代码能够在终端打印出10以内的奇数,我们借助这个代码片段来看一下 select 的具体使用。

package mainimport "fmt"func main() {ch := make(chan int, 1)for i := 1; i <= 10; i++ {select {case x := <-ch:fmt.Println(x)case ch <- i:}}
}

上面的代码输出内容如下。

1
3
5
7
9

示例中的代码首先是创建了一个缓冲区大小为1的通道 ch,进入 for 循环后:

  • 第一次循环时 i = 1,select 语句中包含两个 case 分支,此时由于通道中没有值可以接收,所以x := <-ch 这个 case 分支不满足,而ch <- i这个分支可以执行,会把1发送到通道中,结束本次 for 循环;
  • 第二次 for 循环时,i = 2,由于通道缓冲区已满,所以ch <- i这个分支不满足,而x := <-ch这个分支可以执行,从通道接收值1并赋值给变量 x ,所以会在终端打印出 1;
  • 后续的 for 循环以此类推会依次打印出3、5、7、9。

4.5 通道误用示例

接下来,我们将展示两个因误用通道导致程序出现 bug 的代码片段,希望能够加深读者对通道操作的印象。

示例1

各位读者可以查看以下示例代码,尝试找出其中存在的问题。

// demo1 通道误用导致的bug
func demo1() {wg := sync.WaitGroup{}ch := make(chan int, 10)for i := 0; i < 10; i++ {ch <- i}close(ch)wg.Add(3)for j := 0; j < 3; j++ {go func() {for {task := <-ch// 这里假设对接收的数据执行某些操作fmt.Println(task)}wg.Done()}()}wg.Wait()
}

将上述代码编译执行后,匿名函数所在的 goroutine 并不会按照预期在通道被关闭后退出。因为task := <- ch的接收操作在通道被关闭后会一直接收到零值,而不会退出。此处的接收操作应该使用task, ok := <- ch,通过判断布尔值ok为假时退出;或者使用select 来处理通道。

示例2

各位读者阅读下方代码片段,尝试找出其中存在的问题。

// demo2 通道误用导致的bug
func demo2() {ch := make(chan string)go func() {// 这里假设执行一些耗时的操作time.Sleep(3 * time.Second)ch <- "job result"}()select {case result := <-ch:fmt.Println(result)case <-time.After(time.Second): // 较小的超时时间return}
}

上述代码片段可能导致 goroutine 泄露(goroutine 并未按预期退出并销毁)。由于 select 命中了超时逻辑,导致通道没有消费者(无接收操作),而其定义的通道为无缓冲通道,因此 goroutine 中的ch <- "job result"操作会一直阻塞,最终导致 goroutine 泄露。

4.6 并发安全和锁

有时候我们的代码中可能会存在多个 goroutine 同时操作一个资源(临界区)的情况,这种情况下就会发生竞态问题(数据竞态)。这就好比现实生活中十字路口被各个方向的汽车竞争,还有火车上的卫生间被车厢里的人竞争。

我们用下面的代码演示一个数据竞争的示例。

package mainimport ("fmt""sync"
)var (x int64wg sync.WaitGroup // 等待组
)// add 对全局变量x执行5000次加1操作
func add() {for i := 0; i < 5000; i++ {x = x + 1}wg.Done()
}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x)
}

我们将上面的代码编译后执行,不出意外每次执行都会输出诸如9537、5865、6527等不同的结果。这是为什么呢?

在上面的示例代码片中,我们开启了两个 goroutine 分别执行 add 函数,这两个 goroutine 在访问和修改全局的x变量时就会存在数据竞争,某个 goroutine 中对全局变量x的修改可能会覆盖掉另一个 goroutine 中的操作,所以导致最后的结果与预期不符。

互斥锁

互斥锁是一种常用的控制共享资源访问的方法,它能够保证同一时间只有一个 goroutine 可以访问共享资源。Go 语言中使用sync包中提供的Mutex类型来实现互斥锁。

sync.Mutex提供了两个方法供我们使用。

方法名 功能
func (m *Mutex) Lock() 获取互斥锁
func (m *Mutex) Unlock() 释放互斥锁

我们在下面的示例代码中使用互斥锁限制每次只有一个 goroutine 才能修改全局变量x,从而修复上面代码中的问题。

package mainimport ("fmt""sync"
)// sync.Mutexvar (x int64wg sync.WaitGroup // 等待组m sync.Mutex // 互斥锁
)// add 对全局变量x执行5000次加1操作
func add() {for i := 0; i < 5000; i++ {m.Lock() // 修改x前加锁x = x + 1m.Unlock() // 改完解锁}wg.Done()
}func main() {wg.Add(2)go add()go add()wg.Wait()fmt.Println(x)
}

将上面的代码编译后多次执行,每一次都会得到预期中的结果——10000。

使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。

读写互斥锁

互斥锁是完全互斥的,但是实际上有很多场景是读多写少的,当我们并发的去读取一个资源而不涉及资源修改的时候是没有必要加互斥锁的,这种场景下使用读写锁是更好的一种选择。读写锁在 Go 语言中使用sync包中的RWMutex类型。

sync.RWMutex提供了以下5个方法。

方法名 功能
func (rw *RWMutex) Lock() 获取写锁
func (rw *RWMutex) Unlock() 释放写锁
func (rw *RWMutex) RLock() 获取读锁
func (rw *RWMutex) RUnlock() 释放读锁
func (rw *RWMutex) RLocker() Locker 返回一个实现Locker接口的读写锁

读写锁分为两种:读锁和写锁。当一个 goroutine 获取到读锁之后,其他的 goroutine 如果是获取读锁会继续获得锁,如果是获取写锁就会等待;而当一个 goroutine 获取写锁之后,其他的 goroutine 无论是获取读锁还是写锁都会等待。

下面我们使用代码构造一个读多写少的场景,然后分别使用互斥锁和读写锁查看它们的性能差异。

var (x       int64wg      sync.WaitGroupmutex   sync.MutexrwMutex sync.RWMutex
)// writeWithLock 使用互斥锁的写操作
func writeWithLock() {mutex.Lock() // 加互斥锁x = x + 1time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒mutex.Unlock()                    // 解互斥锁wg.Done()
}// readWithLock 使用互斥锁的读操作
func readWithLock() {mutex.Lock()                 // 加互斥锁time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒mutex.Unlock()               // 释放互斥锁wg.Done()
}// writeWithLock 使用读写互斥锁的写操作
func writeWithRWLock() {rwMutex.Lock() // 加写锁x = x + 1time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒rwMutex.Unlock()                  // 释放写锁wg.Done()
}// readWithRWLock 使用读写互斥锁的读操作
func readWithRWLock() {rwMutex.RLock()              // 加读锁time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒rwMutex.RUnlock()            // 释放读锁wg.Done()
}func do(wf, rf func(), wc, rc int) {start := time.Now()// wc个并发写操作for i := 0; i < wc; i++ {wg.Add(1)go wf()}//  rc个并发读操作for i := 0; i < rc; i++ {wg.Add(1)go rf()}wg.Wait()cost := time.Since(start)fmt.Printf("x:%v cost:%v\n", x, cost)}

我们假设每一次读操作都会耗时1ms,而每一次写操作会耗时10ms,我们分别测试使用互斥锁和读写互斥锁执行10次并发写和1000次并发读的耗时数据。

// 使用互斥锁,10并发写,1000并发读
do(writeWithLock, readWithLock, 10, 1000) // x:10 cost:1.466500951s// 使用读写互斥锁,10并发写,1000并发读
do(writeWithRWLock, readWithRWLock, 10, 1000) // x:10 cost:117.207592ms

从最终的执行结果可以看出,使用读写互斥锁在读多写少的场景下能够极大地提高程序的性能。不过需要注意的是如果一个程序中的读操作和写操作数量级差别不大,那么读写互斥锁的优势就发挥不出来。

sync.WaitGroup

在代码中生硬的使用time.Sleep肯定是不合适的,Go语言中可以使用sync.WaitGroup来实现并发任务的同步。 sync.WaitGroup有以下几个方法:

方法名 功能
func (wg * WaitGroup) Add(delta int) 计数器+delta
(wg *WaitGroup) Done() 计数器-1
(wg *WaitGroup) Wait() 阻塞直到计数器变为0

sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少。例如当我们启动了 N 个并发任务时,就将计数器值增加N。每个任务完成时通过调用 Done 方法将计数器减1。通过调用 Wait 来等待并发任务执行完,当计数器值为 0 时,表示所有并发任务已经完成。

我们利用sync.WaitGroup将上面的代码优化一下:

var wg sync.WaitGroupfunc hello() {defer wg.Done()fmt.Println("Hello Goroutine!")
}
func main() {wg.Add(1)go hello() // 启动另外一个goroutine去执行hello函数fmt.Println("main goroutine done!")wg.Wait()
}

需要注意sync.WaitGroup是一个结构体,进行参数传递的时候要传递指针。

sync.Once

在某些场景下我们需要确保某些操作即使在高并发的场景下也只会被执行一次,例如只加载一次配置文件等。

Go语言中的sync包中提供了一个针对只执行一次场景的解决方案——sync.Oncesync.Once只有一个Do方法,其签名如下:

func (o *Once) Do(f func())

**注意:**如果要执行的函数f需要传递参数就需要搭配闭包来使用。

加载配置文件示例

延迟一个开销很大的初始化操作到真正用到它的时候再执行是一个很好的实践。因为预先初始化一个变量(比如在init函数中完成初始化)会增加程序的启动耗时,而且有可能实际执行过程中这个变量没有用上,那么这个初始化操作就不是必须要做的。我们来看一个例子:

var icons map[string]image.Imagefunc loadIcons() {icons = map[string]image.Image{"left":  loadIcon("left.png"),"up":    loadIcon("up.png"),"right": loadIcon("right.png"),"down":  loadIcon("down.png"),}
}// Icon 被多个goroutine调用时不是并发安全的
func Icon(name string) image.Image {if icons == nil {loadIcons()}return icons[name]
}

多个 goroutine 并发调用Icon函数时不是并发安全的,现代的编译器和CPU可能会在保证每个 goroutine 都满足串行一致的基础上自由地重排访问内存的顺序。loadIcons函数可能会被重排为以下结果:

func loadIcons() {icons = make(map[string]image.Image)icons["left"] = loadIcon("left.png")icons["up"] = loadIcon("up.png")icons["right"] = loadIcon("right.png")icons["down"] = loadIcon("down.png")
}

在这种情况下就会出现即使判断了icons不是nil也不意味着变量初始化完成了。考虑到这种情况,我们能想到的办法就是添加互斥锁,保证初始化icons的时候不会被其他的 goroutine 操作,但是这样做又会引发性能问题。

使用sync.Once改造的示例代码如下:

var icons map[string]image.Imagevar loadIconsOnce sync.Oncefunc loadIcons() {icons = map[string]image.Image{"left":  loadIcon("left.png"),"up":    loadIcon("up.png"),"right": loadIcon("right.png"),"down":  loadIcon("down.png"),}
}// Icon 是并发安全的
func Icon(name string) image.Image {loadIconsOnce.Do(loadIcons)//有一个标志位t/freturn icons[name]
}
并发安全的单例模式

下面是借助sync.Once实现的并发安全的单例模式:

package singletonimport ("sync"
)type singleton struct {}var instance *singleton
var once sync.Oncefunc GetInstance() *singleton {once.Do(func() {instance = &singleton{}})return instance
}

sync.Once其实内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成。这样设计就能保证初始化操作的时候是并发安全的并且初始化操作也不会被执行多次。

sync.Map

Go 语言中内置的 map 不是并发安全的,请看下面这段示例代码。

package mainimport ("fmt""strconv""sync"
)var m = make(map[string]int)func get(key string) int {return m[key]
}func set(key string, value int) {m[key] = value
}func main() {wg := sync.WaitGroup{}for i := 0; i < 10; i++ {wg.Add(1)go func(n int) {key := strconv.Itoa(n)set(key, n)fmt.Printf("k=:%v,v:=%v\n", key, get(key))wg.Done()}(i)}wg.Wait()
}

将上面的代码编译后执行,会报出fatal error: concurrent map writes错误。

我们不能在多个 goroutine 中并发对内置的 map 进行读写操作,否则会存在数据竞争问题。

像这种场景下就需要为 map 加锁来保证并发的安全性了,Go语言的sync包中提供了一个开箱即用的并发安全版 map——sync.Map。开箱即用表示其不用像内置的 map 一样使用 make 函数初始化就能直接使用。同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操作方法。

方法名 功能
func (m *Map) Store(key, value interface{}) 存储key-value数据
func (m *Map) Load(key interface{}) (value interface{}, ok bool) 查询key对应的value
func (m *Map) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) 查询或存储key对应的value
func (m *Map) LoadAndDelete(key interface{}) (value interface{}, loaded bool) 查询并删除key
func (m *Map) Delete(key interface{}) 删除key
func (m *Map) Range(f func(key, value interface{}) bool) 对map中的每个key-value依次调用f

下面的代码示例演示了并发读写sync.Map

package mainimport ("fmt""strconv""sync"
)// 并发安全的map
var m = sync.Map{}func main() {wg := sync.WaitGroup{}// 对m执行20个并发的读写操作for i := 0; i < 20; i++ {wg.Add(1)go func(n int) {key := strconv.Itoa(n)m.Store(key, n)         // 存储key-valuevalue, _ := m.Load(key) // 根据key取值fmt.Printf("k=:%v,v:=%v\n", key, value)wg.Done()}(i)}wg.Wait()
}

4.7 原子操作

针对整数数据类型(int32、uint32、int64、uint64)我们还可以使用原子操作来保证并发安全,通常直接使用原子操作比使用锁操作效率更高。Go语言中原子操作由内置的标准库sync/atomic提供。

atomic包

1读取操作:

func LoadInt32(addr *int32) (val int32)

func LoadInt64(addr *int64) (val int64)

func LoadUint32(addr *uint32) (val uint32)

func LoadUint64(addr *uint64) (val uint64)

func LoadUintptr(addr *uintptr) (val uintptr)

func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)

2写入操作:

func StoreInt32(addr *int32, val int32)

func StoreInt64(addr *int64, val int64)

func StoreUint32(addr *uint32, val uint32)

func StoreUint64(addr *uint64, val uint64)

func StoreUintptr(addr *uintptr, val uintptr)

func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)

3修改操作:

func AddInt32(addr *int32, delta int32) (new int32)

func AddInt64(addr *int64, delta int64) (new int64)

func AddUint32(addr *uint32, delta uint32) (new uint32)

func AddUint64(addr *uint64, delta uint64) (new uint64)

func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)

4交换操作:

func SwapInt32(addr *int32, new int32) (old int32)

func SwapInt64(addr *int64, new int64) (old int64)

func SwapUint32(addr *uint32, new uint32) (old uint32)

func SwapUint64(addr *uint64, new uint64) (old uint64)

func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)

func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)

5比较并交换操作:

func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)

func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)

func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)

func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)

func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)

func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)

示例

我们填写一个示例来比较下互斥锁和原子操作的性能。

package mainimport ("fmt""sync""sync/atomic""time"
)type Counter interface {Inc()Load() int64
}// 普通版
type CommonCounter struct {counter int64
}func (c CommonCounter) Inc() {c.counter++
}func (c CommonCounter) Load() int64 {return c.counter
}// 互斥锁版
type MutexCounter struct {counter int64lock    sync.Mutex
}func (m *MutexCounter) Inc() {m.lock.Lock()defer m.lock.Unlock()m.counter++
}func (m *MutexCounter) Load() int64 {m.lock.Lock()defer m.lock.Unlock()return m.counter
}// 原子操作版
type AtomicCounter struct {counter int64
}func (a *AtomicCounter) Inc() {atomic.AddInt64(&a.counter, 1)//*******
}func (a *AtomicCounter) Load() int64 {return atomic.LoadInt64(&a.counter)
}func test(c Counter) {var wg sync.WaitGroupstart := time.Now()for i := 0; i < 1000; i++ {wg.Add(1)go func() {c.Inc()wg.Done()}()}wg.Wait()end := time.Now()fmt.Println(c.Load(), end.Sub(start))
}func main() {c1 := CommonCounter{} // 非并发安全test(c1)c2 := MutexCounter{} // 使用互斥锁实现并发安全test(&c2)c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高test(&c3)
}

atomic包提供了底层的原子级内存操作,对于同步算法的实现很有用。这些函数必须谨慎地保证正确使用。除了某些特殊的底层应用,使用通道或者 sync 包的函数/类型实现同步更好。

4.8 处理并发错误

我们可以在Go语言中十分便捷地开启goroutine去并发地执行任务,但是如何有效的处理并发过程中的错误则是一个很棘手的问题,本文介绍了一些处理并发错误的方法。

recover goroutine中的panic

我们知道可以在代码中使用 recover 来会恢复程序中意想不到的 panic,而 panic 只会触发当前 goroutine 中的 defer 操作。

例如在下面的示例代码中,无法在 main 函数中 recover 另一个goroutine中引发的 panic。

func f1() {defer func() {if e := recover(); e != nil {fmt.Printf("recover panic:%v\n", e)}}()// 开启一个goroutine执行任务go func() {fmt.Println("in goroutine....")// 只能触发当前goroutine中的deferpanic("panic in goroutine")}()time.Sleep(time.Second)fmt.Println("exit")
}

执行上面的 f1 函数会得到如下结果:

in goroutine....
panic: panic in goroutinegoroutine 6 [running]:
main.f1.func2()/Users/liwenzhou/workspace/github/the-road-to-learn-golang/ch12/goroutine_recover.go:20 +0x65
created by main.f1/Users/liwenzhou/workspace/github/the-road-to-learn-golang/ch12/goroutine_recover.go:17 +0x48Process finished with exit code 2

从输出结果可以看到程序并没有正常退出,而是由于 panic 异常退出了(exit code 2)。

正如上面示例演示的那样,在启用 goroutine 去执行任务的场景下,如果想要 recover goroutine中可能出现的 panic 就需要在 goroutine 中使用 recover。就像下面的 f2 函数那样。

func f2() {defer func() {if r := recover(); r != nil {fmt.Printf("recover outer panic:%v\n", r)}}()// 开启一个goroutine执行任务go func() {defer func() {if r := recover(); r != nil {fmt.Printf("recover inner panic:%v\n", r)}}()fmt.Println("in goroutine....")// 只能触发当前goroutine中的deferpanic("panic in goroutine")}()time.Sleep(time.Second)fmt.Println("exit")
}

执行 f2 函数会得到如下输出结果。

in goroutine....
recover inner panic:panic in goroutine
exit

程序中的 panic 被 recover 成功捕获,程序最终正常退出。

errgroup

在以往演示的并发示例中,我们通常像下面的示例代码那样在 go 关键字后,调用一个函数或匿名函数。

go func(){// ...
}go foo()

在之前讲解并发的代码示例中我们默认被并发的那些函数都不会返回错误,但真实的情况往往是事与愿违。

当我们想要将一个任务拆分成多个子任务交给多个 goroutine 去运行,这时我们该如何获取到子任务可能返回的错误呢?

假设我们有多个网址需要并发去获取它们的内容,这时候我们会写出类似下面的代码。

// fetchUrlDemo 并发获取url内容
func fetchUrlDemo() {wg := sync.WaitGroup{}var urls = []string{"http://pkg.go.dev","http://www.liwenzhou.com","http://www.yixieqitawangzhi.com",}for _, url := range urls {wg.Add(1)go func(url string) {defer wg.Done()resp, err := http.Get(url)if err == nil {fmt.Printf("获取%s成功\n", url)resp.Body.Close()}return // 如何将错误返回呢?}(url)}wg.Wait()// 如何获取goroutine中可能出现的错误呢?
}

执行上述fetchUrlDemo函数得到如下输出结果,由于 http://www.yixieqitawangzhi.com 是我随意编造的一个并不真实存在的 url,所以对它的 HTTP 请求会返回错误。

获取http://pkg.go.dev成功
获取http://www.liwenzhou.com成功

在上面的示例代码中,我们开启了 3 个 goroutine 分别去获取3个 url 的内容。类似这种将任务分为若干个子任务的场景会有很多,那么我们如何获取子任务中可能出现的错误呢?

errgroup 包就是为了解决这类问题而开发的,它能为处理公共任务的子任务而开启的一组 goroutine 提供同步、error 传播和基于context 的取消功能。

errgroup 包中定义了一个 Group 类型,它包含了若干个不可导出的字段。

type Group struct {cancel func()wg sync.WaitGrouperrOnce sync.Onceerr     error
}

errgroup.Group 提供了GoWait两个方法。

func (g *Group) Go(f func() error)
  • Go 函数会在新的 goroutine 中调用传入的函数f。
  • 第一个返回非零错误的调用将取消该Group;下面的Wait方法会返回该错误
func (g *Group) Wait() error
  • Wait 会阻塞直至由上述 Go 方法调用的所有函数都返回,然后从它们返回第一个非nil的错误(如果有)。

下面的示例代码演示了如何使用 errgroup 包来处理多个子任务 goroutine 中可能返回的 error。

// fetchUrlDemo2 使用errgroup并发获取url内容
func fetchUrlDemo2() error {g := new(errgroup.Group) // 创建等待组(类似sync.WaitGroup)var urls = []string{"http://pkg.go.dev","http://www.liwenzhou.com","http://www.yixieqitawangzhi.com",}for _, url := range urls {url := url // 注意此处声明新的变量// 启动一个goroutine去获取url内容g.Go(func() error {resp, err := http.Get(url)if err == nil {fmt.Printf("获取%s成功\n", url)resp.Body.Close()}return err // 返回错误})}if err := g.Wait(); err != nil {// 处理可能出现的错误fmt.Println(err)return err}fmt.Println("所有goroutine均成功")return nil
}

执行上面的fetchUrlDemo2函数会得到如下输出结果。

获取http://pkg.go.dev成功
获取http://www.liwenzhou.com成功
Get "http://www.yixieqitawangzhi.com": dial tcp: lookup www.yixieqitawangzhi.com: no such host

当子任务的 goroutine 中对http://www.yixieqitawangzhi.com 发起 HTTP 请求时会返回一个错误,这个错误会由 errgroup.Group 的 Wait 方法返回。

通过阅读下方 errgroup.Group 的 Go 方法源码,我们可以看到当任意一个函数 f 返回错误时,会通过g.errOnce.Do只将第一个返回的错误记录,并且如果存在 cancel 方法则会调用cancel。

func (g *Group) Go(f func() error) {g.wg.Add(1)go func() {defer g.wg.Done()if err := f(); err != nil {g.errOnce.Do(func() {g.err = errif g.cancel != nil {g.cancel()}})}}()
}

那么如何创建带有 cancel 方法的 errgroup.Group 呢?

答案是通过 errorgroup 包提供的 WithContext 函数。

func WithContext(ctx context.Context) (*Group, context.Context)

WithContext 函数接收一个父 context,返回一个新的 Group 对象和一个关联的子 context 对象。下面的代码片段是一个官方文档给出的示例。

package mainimport ("context""crypto/md5""fmt""io/ioutil""log""os""path/filepath""golang.org/x/sync/errgroup"
)// Pipeline demonstrates the use of a Group to implement a multi-stage
// pipeline: a version of the MD5All function with bounded parallelism from
// https://blog.golang.org/pipelines.
func main() {m, err := MD5All(context.Background(), ".")if err != nil {log.Fatal(err)}for k, sum := range m {fmt.Printf("%s:\t%x\n", k, sum)}
}type result struct {path stringsum  [md5.Size]byte
}// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(ctx context.Context, root string) (map[string][md5.Size]byte, error) {// ctx is canceled when g.Wait() returns. When this version of MD5All returns// - even in case of error! - we know that all of the goroutines have finished// and the memory they were using can be garbage-collected.g, ctx := errgroup.WithContext(ctx)paths := make(chan string)g.Go(func() error {return filepath.Walk(root, func(path string, info os.FileInfo, err error) error {if err != nil {return err}if !info.Mode().IsRegular() {return nil}select {case paths <- path:case <-ctx.Done():return ctx.Err()}return nil})})// Start a fixed number of goroutines to read and digest files.c := make(chan result)const numDigesters = 20for i := 0; i < numDigesters; i++ {g.Go(func() error {for path := range paths {data, err := ioutil.ReadFile(path)if err != nil {return err}select {case c <- result{path, md5.Sum(data)}:case <-ctx.Done():return ctx.Err()}}return nil})}go func() {g.Wait()close(c)}()m := make(map[string][md5.Size]byte)for r := range c {m[r.path] = r.sum}// Check whether any of the goroutines failed. Since g is accumulating the// errors, we don't need to send them (or check for them) in the individual// results sent on the channel.if err := g.Wait(); err != nil {return nil, err}return m, nil
}

或者这里有另外一个示例。

func GetFriends(ctx context.Context, user int64) (map[string]*User, error) {g, ctx := errgroup.WithContext(ctx)friendIds := make(chan int64)// Produceg.Go(func() error {defer close(friendIds)for it := GetFriendIds(user); ; {if id, err := it.Next(ctx); err != nil {if err == io.EOF {return nil}return fmt.Errorf("GetFriendIds %d: %s", user, err)} else {select {case <-ctx.Done():return ctx.Err()case friendIds <- id:}}}})friends := make(chan *User)// Mapworkers := int32(nWorkers)for i := 0; i < nWorkers; i++ {g.Go(func() error {defer func() {// Last one out closes shopif atomic.AddInt32(&workers, -1) == 0 {close(friends)}}()for id := range friendIds {if friend, err := GetUserProfile(ctx, id); err != nil {return fmt.Errorf("GetUserProfile %d: %s", user, err)} else {select {case <-ctx.Done():return ctx.Err()case friends <- friend:}}}return nil})}// Reduceret := map[string]*User{}g.Go(func() error {for friend := range friends {ret[friend.Name] = friend}return nil})return ret, g.Wait()
}

可惜这两个示例不太好理解。。。

4.9 Go标准库Context包

在 Go http包的Server中,每一个请求在都有一个对应的 goroutine 去处理。请求处理函数通常会启动额外的 goroutine 用来访问后端服务,比如数据库和RPC服务。用来处理一个请求的 goroutine 通常需要访问一些与请求特定的数据,比如终端用户的身份认证信息、验证相关的token、请求的截止时间。 当一个请求被取消或超时时,所有用来处理该请求的 goroutine 都应该迅速退出,然后系统才能释放这些 goroutine 占用的资源。

为什么需要Context

基本示例
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroup// 初始的例子func worker() {for {fmt.Println("worker")time.Sleep(time.Second)}// 如何接收外部命令实现退出wg.Done()
}func main() {wg.Add(1)go worker()// 如何优雅的实现结束子goroutinewg.Wait()fmt.Println("over")
}
全局变量方式
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroup
var exit bool// 全局变量方式存在的问题:
// 1. 使用全局变量在跨包调用时不容易统一
// 2. 如果worker中再启动goroutine,就不太好控制了。func worker() {for {fmt.Println("worker")time.Sleep(time.Second)if exit {break}}wg.Done()
}func main() {wg.Add(1)go worker()time.Sleep(time.Second * 3) // sleep3秒以免程序过快退出exit = true                 // 修改全局变量实现子goroutine的退出wg.Wait()fmt.Println("over")
}
通道方式
package mainimport ("fmt""sync""time"
)var wg sync.WaitGroup// 管道方式存在的问题:
// 1. 使用全局变量在跨包调用时不容易实现规范和统一,需要维护一个共用的channelfunc worker(exitChan chan struct{}) {LOOP:for {fmt.Println("worker")time.Sleep(time.Second)select {case <-exitChan: // 等待接收上级通知break LOOPdefault:}}wg.Done()
}func main() {var exitChan = make(chan struct{})wg.Add(1)go worker(exitChan)time.Sleep(time.Second * 3) // sleep3秒以免程序过快退出exitChan <- struct{}{}      // 给子goroutine发送退出信号close(exitChan)wg.Wait()fmt.Println("over")
}
官方版的方案
package mainimport ("context""fmt""sync""time"
)var wg sync.WaitGroupfunc worker(ctx context.Context) {LOOP:for {fmt.Println("worker")time.Sleep(time.Second)select {case <-ctx.Done(): // 等待上级通知break LOOPdefault:}}wg.Done()
}func main() {ctx, cancel := context.WithCancel(context.Background())wg.Add(1)go worker(ctx)time.Sleep(time.Second * 3)cancel() // 通知子goroutine结束wg.Wait()fmt.Println("over")
}

当子goroutine又开启另外一个goroutine时,只需要将ctx传入即可:

package mainimport ("context""fmt""sync""time"
)var wg sync.WaitGroupfunc worker(ctx context.Context) {go worker2(ctx)
LOOP:for {fmt.Println("worker")time.Sleep(time.Second)select {case <-ctx.Done(): // 等待上级通知break LOOPdefault:}}wg.Done()
}func worker2(ctx context.Context) {LOOP:for {fmt.Println("worker2")time.Sleep(time.Second)select {case <-ctx.Done(): // 等待上级通知break LOOPdefault:}}
}
func main() {ctx, cancel := context.WithCancel(context.Background())wg.Add(1)go worker(ctx)time.Sleep(time.Second * 3)cancel() // 通知子goroutine结束wg.Wait()fmt.Println("over")
}

Context初识

Go1.7加入了一个新的标准库context,它定义了Context类型,专门用来简化 对于处理单个请求的多个 goroutine 之间与请求域的数据、取消信号、截止时间等相关操作,这些操作可能涉及多个 API 调用。

对服务器传入的请求应该创建上下文,而对服务器的传出调用应该接受上下文。它们之间的函数调用链必须传递上下文,或者可以使用WithCancelWithDeadlineWithTimeoutWithValue创建的派生上下文。当一个上下文被取消时,它派生的所有上下文也被取消。

Context接口

context.Context是一个接口,该接口定义了四个需要实现的方法。具体签名如下:

type Context interface {Deadline() (deadline time.Time, ok bool)Done() <-chan struct{}Err() errorValue(key interface{}) interface{}
}

其中:

  • Deadline方法需要返回当前Context被取消的时间,也就是完成工作的截止时间(deadline);

  • Done方法需要返回一个Channel,这个Channel会在当前工作完成或者上下文被取消之后关闭,多次调用Done方法会返回同一个Channel;

  • Err
    

    方法会返回当前

    Context
    

    结束的原因,它只会在

    Done
    

    返回的Channel被关闭时才会返回非空的值;

    • 如果当前Context被取消就会返回Canceled错误;
    • 如果当前Context超时就会返回DeadlineExceeded错误;
  • Value方法会从Context中返回键对应的值,对于同一个上下文来说,多次调用Value 并传入相同的Key会返回相同的结果,该方法仅用于传递跨API和进程间跟请求域的数据;

Background()和TODO()

Go内置两个函数:Background()TODO(),这两个函数分别返回一个实现了Context接口的backgroundtodo。我们代码中最开始都是以这两个内置的上下文对象作为最顶层的partent context,衍生出更多的子上下文对象。

Background()主要用于main函数、初始化以及测试代码中,作为Context这个树结构的最顶层的Context,也就是根Context。

TODO(),它目前还不知道具体的使用场景,如果我们不知道该使用什么Context的时候,可以使用这个。

backgroundtodo本质上都是emptyCtx结构体类型,是一个不可取消,没有设置截止时间,没有携带任何值的Context。

With系列函数

此外,context包中还定义了四个With系列函数。

WithCancel

WithCancel的函数签名如下:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc)

WithCancel返回带有新Done通道的父节点的副本。当调用返回的cancel函数或当关闭父上下文的Done通道时,将关闭返回上下文的Done通道,无论先发生什么情况。

取消此上下文将释放与其关联的资源,因此代码应该在此上下文中运行的操作完成后立即调用cancel。

func gen(ctx context.Context) <-chan int {dst := make(chan int)n := 1go func() {for {select {case <-ctx.Done():return // return结束该goroutine,防止泄露case dst <- n:n++}}}()return dst}
func main() {ctx, cancel := context.WithCancel(context.Background())defer cancel() // 当我们取完需要的整数后调用cancelfor n := range gen(ctx) {fmt.Println(n)if n == 5 {break}}
}

上面的示例代码中,gen函数在单独的goroutine中生成整数并将它们发送到返回的通道。 gen的调用者在使用生成的整数之后需要取消上下文,以免gen启动的内部goroutine发生泄漏。

WithDeadline

WithDeadline的函数签名如下:

func WithDeadline(parent Context, deadline time.Time) (Context, CancelFunc)

返回父上下文的副本,并将deadline调整为不迟于d。如果父上下文的deadline已经早于d,则WithDeadline(parent, d)在语义上等同于父上下文。当截止日过期时,当调用返回的cancel函数时,或者当父上下文的Done通道关闭时,返回上下文的Done通道将被关闭,以最先发生的情况为准。

取消此上下文将释放与其关联的资源,因此代码应该在此上下文中运行的操作完成后立即调用cancel。

func main() {d := time.Now().Add(50 * time.Millisecond)ctx, cancel := context.WithDeadline(context.Background(), d)// 尽管ctx会过期,但在任何情况下调用它的cancel函数都是很好的实践。// 如果不这样做,可能会使上下文及其父类存活的时间超过必要的时间。defer cancel()select {case <-time.After(1 * time.Second):fmt.Println("overslept")case <-ctx.Done():fmt.Println(ctx.Err())}
}

上面的代码中,定义了一个50毫秒之后过期的deadline,然后我们调用context.WithDeadline(context.Background(), d)得到一个上下文(ctx)和一个取消函数(cancel),然后使用一个select让主程序陷入等待:等待1秒后打印overslept退出或者等待ctx过期后退出。

在上面的示例代码中,因为ctx 50毫秒后就会过期,所以ctx.Done()会先接收到context到期通知,并且会打印ctx.Err()的内容。

WithTimeout

WithTimeout的函数签名如下:

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)

WithTimeout返回WithDeadline(parent, time.Now().Add(timeout))

取消此上下文将释放与其相关的资源,因此代码应该在此上下文中运行的操作完成后立即调用cancel,通常用于数据库或者网络连接的超时控制。具体示例如下:

package mainimport ("context""fmt""sync""time"
)// context.WithTimeoutvar wg sync.WaitGroupfunc worker(ctx context.Context) {LOOP:for {fmt.Println("db connecting ...")time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒select {case <-ctx.Done(): // 50毫秒后自动调用break LOOPdefault:}}fmt.Println("worker done!")wg.Done()
}func main() {// 设置一个50毫秒的超时ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)wg.Add(1)go worker(ctx)time.Sleep(time.Second * 5)cancel() // 通知子goroutine结束wg.Wait()fmt.Println("over")
}
WithValue

WithValue函数能够将请求作用域的数据与 Context 对象建立关系。声明如下:

func WithValue(parent Context, key, val interface{}) Context

WithValue返回父节点的副本,其中与key关联的值为val。

仅对API和进程间传递请求域的数据使用上下文值,而不是使用它来传递可选参数给函数。

所提供的键必须是可比较的,并且不应该是string类型或任何其他内置类型,以避免使用上下文在包之间发生冲突。WithValue的用户应该为键定义自己的类型。为了避免在分配给interface{}时进行分配,上下文键通常具有具体类型struct{}。或者,导出的上下文关键变量的静态类型应该是指针或接口。

package mainimport ("context""fmt""sync""time"
)// context.WithValuetype TraceCode stringvar wg sync.WaitGroupfunc worker(ctx context.Context) {key := TraceCode("TRACE_CODE")traceCode, ok := ctx.Value(key).(string) // 在子goroutine中获取trace codeif !ok {fmt.Println("invalid trace code")}
LOOP:for {fmt.Printf("worker, trace code:%s\n", traceCode)time.Sleep(time.Millisecond * 10) // 假设正常连接数据库耗时10毫秒select {case <-ctx.Done(): // 50毫秒后自动调用break LOOPdefault:}}fmt.Println("worker done!")wg.Done()
}func main() {// 设置一个50毫秒的超时ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)// 在系统的入口中设置trace code传递给后续启动的goroutine实现日志数据聚合ctx = context.WithValue(ctx, TraceCode("TRACE_CODE"), "12512312234")wg.Add(1)go worker(ctx)time.Sleep(time.Second * 5)cancel() // 通知子goroutine结束wg.Wait()fmt.Println("over")
}

使用Context的注意事项

  • 推荐以参数的方式显示传递Context
  • 以Context作为参数的函数方法,应该把Context作为第一个参数。
  • 给一个函数方法传递Context的时候,不要传递nil,如果不知道传递什么,就使用context.TODO()
  • Context的Value相关方法应该传递请求域的必要数据,不应该用于传递可选参数
  • Context是线程安全的,可以放心的在多个goroutine中传递

客户端超时取消示例

调用服务端API时如何在客户端实现超时控制?

server端
// context_timeout/server/main.go
package mainimport ("fmt""math/rand""net/http""time"
)// server端,随机出现慢响应func indexHandler(w http.ResponseWriter, r *http.Request) {number := rand.Intn(2)if number == 0 {time.Sleep(time.Second * 10) // 耗时10秒的慢响应fmt.Fprintf(w, "slow response")return}fmt.Fprint(w, "quick response")
}func main() {http.HandleFunc("/", indexHandler)err := http.ListenAndServe(":8000", nil)if err != nil {panic(err)}
}
client端
// context_timeout/client/main.go
package mainimport ("context""fmt""io/ioutil""net/http""sync""time"
)// 客户端type respData struct {resp *http.Responseerr  error
}func doCall(ctx context.Context) {transport := http.Transport{// 请求频繁可定义全局的client对象并启用长链接// 请求不频繁使用短链接DisableKeepAlives: true,    }client := http.Client{Transport: &transport,}respChan := make(chan *respData, 1)req, err := http.NewRequest("GET", "http://127.0.0.1:8000/", nil)if err != nil {fmt.Printf("new requestg failed, err:%v\n", err)return}req = req.WithContext(ctx) // 使用带超时的ctx创建一个新的client requestvar wg sync.WaitGroupwg.Add(1)defer wg.Wait()go func() {resp, err := client.Do(req)fmt.Printf("client.do resp:%v, err:%v\n", resp, err)rd := &respData{resp: resp,err:  err,}respChan <- rdwg.Done()}()select {case <-ctx.Done()://transport.CancelRequest(req)fmt.Println("call api timeout")case result := <-respChan:fmt.Println("call server api success")if result.err != nil {fmt.Printf("call server api failed, err:%v\n", result.err)return}defer result.resp.Body.Close()data, _ := ioutil.ReadAll(result.resp.Body)fmt.Printf("resp:%v\n", string(data))}
}func main() {// 定义一个100毫秒的超时ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)defer cancel() // 调用cancel释放子goroutine资源doCall(ctx)
}

4.10 Go网络编程

互联网的核心是一系列协议,总称为”互联网协议”(Internet Protocol Suite),正是这一些协议规定了电脑如何连接和组网。我们理解了这些协议,就理解了互联网的原理。由于这些协议太过庞大和复杂,没有办法在这里一概而全,只能介绍一下我们日常开发中接触较多的几个协议。

互联网分层模型

互联网的逻辑实现被分为好几层。每一层都有自己的功能,就像建筑物一样,每一层都靠下一层支持。用户接触到的只是最上面的那一层,根本不会感觉到下面的几层。要理解互联网就需要自下而上理解每一层的实现的功能。如上图所示,互联网按照不同的模型划分会有不用的分层,但是不论按照什么模型去划分,越往上的层越靠近用户,越往下的层越靠近硬件。在软件开发中我们使用最多的是上图中将互联网划分为五个分层的模型。

接下来我们一层一层的自底向上介绍一下每一层。

物理层

我们的电脑要与外界互联网通信,需要先把电脑连接网络,我们可以用双绞线、光纤、无线电波等方式。这就叫做”实物理层”,它就是把电脑连接起来的物理手段。它主要规定了网络的一些电气特性,作用是负责传送0和1的电信号

数据链路层

单纯的0和1没有任何意义,所以我们使用者会为其赋予一些特定的含义,**规定解读电信号的方式:例如:多少个电信号算一组?每个信号位有何意义?**这就是”数据链接层”的功能,它在”物理层”的上方,确定了物理层传输的0和1的分组方式及代表的意义。早期的时候,每家公司都有自己的电信号分组方式。逐渐地,一种叫做”以太网”(Ethernet)的协议,占据了主导地位。

以太网规定,一组电信号构成一个数据包,叫做”帧”(Frame)。每一帧分成两个部分:标头(Head)和数据(Data)。其中”标头”包含数据包的一些说明项,比如发送者、接受者、数据类型等等;”数据”则是数据包的具体内容。”标头”的长度,固定为18字节。”数据”的长度,最短为46字节,最长为1500字节。因此,整个”帧”最短为64字节,最长为1518字节。如果数据很长,就必须分割成多个帧进行发送。

那么,发送者和接受者是如何标识呢?以太网规定,连入网络的所有设备都必须具有”网卡”接口。数据包必须是从一块网卡,传送到另一块网卡。网卡的地址,就是数据包的发送地址和接收地址,这叫做MAC地址。每块网卡出厂的时候,都有一个全世界独一无二的MAC地址,长度是48个二进制位,通常用12个十六进制数表示。前6个十六进制数是厂商编号,后6个是该厂商的网卡流水号。有了MAC地址,就可以定位网卡和数据包的路径了。

我们会通过ARP协议来获取接受方的MAC地址,有了MAC地址之后,如何把数据准确的发送给接收方呢?其实这里以太网采用了一种很”原始”的方式**,它不是把数据包准确送到接收方,而是向本网络内所有计算机都发送,让每台计算机读取这个包的”标头”**,找到接收方的MAC地址,然后与自身的MAC地址相比较,如果两者相同,就接受这个包,做进一步处理,否则就丢弃这个包。这种发送方式就叫做”广播”(broadcasting)。

网络层

按照以太网协议的规则我们可以依靠MAC地址来向外发送数据。理论上依靠MAC地址,你电脑的网卡就可以找到身在世界另一个角落的某台电脑的网卡了,但是这种做法有一个重大缺陷就是以太网采用广播方式发送数据包,所有成员人手一”包”,不仅效率低,而且发送的数据只能局限在发送者所在的子网络。也就是说如果两台计算机不在同一个子网络,广播是传不过去的。这种设计是合理且必要的,因为如果互联网上每一台计算机都会收到互联网上收发的所有数据包,那是不现实的。

因此,必须找到一种方法区分哪些MAC地址属于同一个子网络,哪些不是。如果是同一个子网络,就采用广播方式发送,否则就采用”路由”方式发送。这就导致了”网络层”的诞生。它的作用是引进一套新的地址,使得我们能够区分不同的计算机是否属于同一个子网络。这套地址就叫做”网络地址”,简称”网址”。

“网络层”出现以后,每台计算机有了两种地址,一种是MAC地址,另一种是网络地址。两种地址之间没有任何联系,MAC地址是绑定在网卡上的,网络地址则是网络管理员分配的。网络地址帮助我们确定计算机所在的子网络,MAC地址则将数据包送到该子网络中的目标网卡。因此,从逻辑上可以推断,必定是先处理网络地址,然后再处理MAC地址。

规定网络地址的协议,叫做IP协议。它所定义的地址,就被称为IP地址。目前,广泛采用的是IP协议第四版,简称IPv4。IPv4这个版本规定,网络地址由32个二进制位组成,我们通常习惯用分成四段的十进制数表示IP地址,从0.0.0.0一直到255.255.255.255。

根据IP协议发送的数据,就叫做IP数据包。IP数据包也分为”标头”和”数据”两个部分:”标头”部分主要包括版本、长度、IP地址等信息,”数据”部分则是IP数据包的具体内容。IP数据包的”标头”部分的长度为20到60字节,整个数据包的总长度最大为65535字节。

传输层

**有了MAC地址和IP地址,我们已经可以在互联网上任意两台主机上建立通信。但问题是同一台主机上会有许多程序都需要用网络收发数据,**比如QQ和浏览器这两个程序都需要连接互联网并收发数据,我们如何区分某个数据包到底是归哪个程序的呢?也就是说,我们还需要一个参数,表示这个数据包到底供哪个程序(进程)使用。这个参数就叫做”端口”(port),它其实是每一个使用网卡的程序的编号。每个数据包都发到主机的特定端口,所以不同的程序就能取到自己所需要的数据。

“端口”是0到65535之间的一个整数,**正好16个二进制位。0到1023的端口被系统占用,**用户只能选用大于1023的端口。有了IP和端口我们就能实现唯一确定互联网上一个程序,进而实现网络间的程序通信。

我们必须在数据包中加入端口信息,这就需要新的协议。最简单的实现叫做UDP协议,它的格式几乎就是在数据前面,加上端口号。UDP数据包,也是由”标头”和”数据”两部分组成:”标头”部分主要定义了发出端口和接收端口,”数据”部分就是具体的内容。UDP数据包非常简单,”标头”部分一共只有8个字节,总长度不超过65,535字节,正好放进一个IP数据包。

UDP协议的优点是比较简单,容易实现,但是缺点是可靠性较差,一旦数据包发出,无法知道对方是否收到。为了解决这个问题,提高网络可靠性,TCP协议就诞生了。TCP协议能够确保数据不会遗失。它的缺点是过程复杂、实现困难、消耗较多的资源。TCP数据包没有长度限制,理论上可以无限长,但是为了保证网络的效率,通常TCP数据包的长度不会超过IP数据包的长度,以确保单个TCP数据包不必再分割。

应用层

应用程序收到”传输层”的数据,接下来就要对数据进行解包。由于互联网是开放架构,数据来源五花八门,必须事先规定好通信的数据格式,否则接收方根本无法获得真正发送的数据内容。”应用层”的作用就是规定应用程序使用的数据格式,例如我们TCP协议之上常见的Email、HTTP、FTP等协议,这些协议就组成了互联网协议的应用层。

如下图所示,发送方的HTTP数据经过互联网的传输过程中会依次添加各层协议的标头信息,接收方收到数据包之后再依次根据协议解包得到数据。

socket编程

Socket是BSD UNIX的进程通信机制,通常也称作”套接字”,用于描述IP地址和端口,是一个通信链的句柄Socket可以理解为TCP/IP网络的API,它定义了许多函数或例程,程序员可以用它们来开发TCP/IP网络上的应用程序。电脑上运行的应用程序通常通过”套接字”向网络发出请求或者应答网络请求。

socket图解

Socket是应用层与TCP/IP协议族通信的中间软件抽象层。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket后面,对用户来说只需要调用Socket规定的相关函数,让Socket去组织符合指定的协议数据然后进行通信。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gU55YeLf-1673623885908)(null)]

Go语言实现TCP通信

TCP协议

TCP/IP(Transmission Control Protocol/Internet Protocol) 即传输控制协议/网间协议,是一种面向连接(连接导向)的、可靠的、基于字节流的传输层(Transport layer)通信协议,因为是面向连接的协议,数据像水流一样传输,会存在黏包问题。

TCP服务端

一个TCP服务端可以同时连接很多个客户端,例如世界各地的用户使用自己电脑上的浏览器访问淘宝网。因为Go语言中创建多个goroutine实现并发非常方便和高效,所以我们可以每建立一次链接就创建一个goroutine去处理。

TCP服务端程序的处理流程:

  1. 监听端口
  2. 接收客户端请求建立链接
  3. 创建goroutine处理链接。

我们使用Go语言的net包实现的TCP服务端代码如下:

// tcp/server/main.go// TCP server端// 处理函数
func process(conn net.Conn) {defer conn.Close() // 关闭连接for {reader := bufio.NewReader(conn)var buf [128]byten, err := reader.Read(buf[:]) // 读取数据if err != nil {fmt.Println("read from client failed, err:", err)break}recvStr := string(buf[:n])fmt.Println("收到client端发来的数据:", recvStr)conn.Write([]byte(recvStr)) // 发送数据}
}func main() {listen, err := net.Listen("tcp", "127.0.0.1:20000")if err != nil {fmt.Println("listen failed, err:", err)return}for {conn, err := listen.Accept() // 建立连接if err != nil {fmt.Println("accept failed, err:", err)continue}go process(conn) // 启动一个goroutine处理连接}
}

将上面的代码保存之后编译成serverserver.exe可执行文件。

TCP客户端

一个TCP客户端进行TCP通信的流程如下:

  1. 建立与服务端的链接
  2. 进行数据收发
  3. 关闭链接

使用Go语言的net包实现的TCP客户端代码如下:

// tcp/client/main.go// 客户端
func main() {conn, err := net.Dial("tcp", "127.0.0.1:20000")if err != nil {fmt.Println("err :", err)return}defer conn.Close() // 关闭连接inputReader := bufio.NewReader(os.Stdin)for {input, _ := inputReader.ReadString('\n') // 读取用户输入inputInfo := strings.Trim(input, "\r\n")if strings.ToUpper(inputInfo) == "Q" { // 如果输入q就退出return}_, err = conn.Write([]byte(inputInfo)) // 发送数据if err != nil {return}buf := [512]byte{}n, err := conn.Read(buf[:])if err != nil {fmt.Println("recv failed, err:", err)return}fmt.Println(string(buf[:n]))}
}

将上面的代码编译成clientclient.exe可执行文件,先启动server端再启动client端,在client端输入任意内容回车之后就能够在server端看到client端发送的数据,从而实现TCP通信。

TCP黏包

黏包示例

服务端代码如下:

// socket_stick/server/main.gofunc process(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)var buf [1024]bytefor {n, err := reader.Read(buf[:])if err == io.EOF {break}if err != nil {fmt.Println("read from client failed, err:", err)break}recvStr := string(buf[:n])fmt.Println("收到client发来的数据:", recvStr)}
}func main() {listen, err := net.Listen("tcp", "127.0.0.1:30000")if err != nil {fmt.Println("listen failed, err:", err)return}defer listen.Close()for {conn, err := listen.Accept()if err != nil {fmt.Println("accept failed, err:", err)continue}go process(conn)}
}

客户端代码如下:

// socket_stick/client/main.gofunc main() {conn, err := net.Dial("tcp", "127.0.0.1:30000")if err != nil {fmt.Println("dial failed, err", err)return}defer conn.Close()for i := 0; i < 20; i++ {msg := `Hello, Hello. How are you?`conn.Write([]byte(msg))}
}

将上面的代码保存后,分别编译。先启动服务端再启动客户端,可以看到服务端输出结果如下:

收到client发来的数据: Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?
收到client发来的数据: Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?
收到client发来的数据: Hello, Hello. How are you?Hello, Hello. How are you?
收到client发来的数据: Hello, Hello. How are you?Hello, Hello. How are you?Hello, Hello. How are you?
收到client发来的数据: Hello, Hello. How are you?Hello, Hello. How are you?

客户端分10次发送的数据,在服务端并没有成功的输出10次,而是多条数据“粘”到了一起。

为什么会出现粘包

主要原因就是tcp数据传递模式是流模式,在保持长连接的时候可以进行多次的收和发。

“粘包”可发生在发送端也可发生在接收端:

  1. 由Nagle算法造成的发送端的粘包:Nagle算法是一种改善网络传输效率的算法。简单来说就是当我们提交一段数据给TCP发送时,TCP并不立刻发送此段数据,而是等待一小段时间看看在等待期间是否还有要发送的数据,若有则会一次把这两段数据发送出去。
  2. 接收端接收不及时造成的接收端粘包:TCP会把接收到的数据存在自己的缓冲区中,然后通知应用层取数据。当应用层由于某些原因不能及时的把TCP的数据取出来,就会造成TCP缓冲区中存放了几段数据。
解决办法

出现”粘包”的关键在于接收方不确定将要传输的数据包的大小,因此我们可以对数据包进行封包和拆包的操作。

封包:封包就是给一段数据加上包头,这样一来数据包就分为包头和包体两部分内容了(过滤非法包时封包会加入”包尾”内容)。包头部分的长度是固定的,并且它存储了包体的长度,根据包头长度固定以及包头中含有包体长度的变量就能正确的拆分出一个完整的数据包。

我们可以自己定义一个协议,比如数据包的前4个字节为包头,里面存储的是发送的数据的长度。

// socket_stick/proto/proto.go
package protoimport ("bufio""bytes""encoding/binary"
)// Encode 将消息编码
func Encode(message string) ([]byte, error) {// 读取消息的长度,转换成int32类型(占4个字节)var length = int32(len(message))var pkg = new(bytes.Buffer)// 写入消息头err := binary.Write(pkg, binary.LittleEndian, length)if err != nil {return nil, err}// 写入消息实体err = binary.Write(pkg, binary.LittleEndian, []byte(message))if err != nil {return nil, err}return pkg.Bytes(), nil
}// Decode 解码消息
func Decode(reader *bufio.Reader) (string, error) {// 读取消息的长度lengthByte, _ := reader.Peek(4) // 读取前4个字节的数据lengthBuff := bytes.NewBuffer(lengthByte)var length int32err := binary.Read(lengthBuff, binary.LittleEndian, &length)if err != nil {return "", err}// Buffered返回缓冲中现有的可读取的字节数。if int32(reader.Buffered()) < length+4 {return "", err}// 读取真正的消息数据pack := make([]byte, int(4+length))_, err = reader.Read(pack)if err != nil {return "", err}return string(pack[4:]), nil
}

接下来在服务端和客户端分别使用上面定义的proto包的DecodeEncode函数处理数据。

服务端代码如下:

// socket_stick/server2/main.gofunc process(conn net.Conn) {defer conn.Close()reader := bufio.NewReader(conn)for {msg, err := proto.Decode(reader)if err == io.EOF {return}if err != nil {fmt.Println("decode msg failed, err:", err)return}fmt.Println("收到client发来的数据:", msg)}
}func main() {listen, err := net.Listen("tcp", "127.0.0.1:30000")if err != nil {fmt.Println("listen failed, err:", err)return}defer listen.Close()for {conn, err := listen.Accept()if err != nil {fmt.Println("accept failed, err:", err)continue}go process(conn)}
}

客户端代码如下:

// socket_stick/client2/main.gofunc main() {conn, err := net.Dial("tcp", "127.0.0.1:30000")if err != nil {fmt.Println("dial failed, err", err)return}defer conn.Close()for i := 0; i < 20; i++ {msg := `Hello, Hello. How are you?`data, err := proto.Encode(msg)if err != nil {fmt.Println("encode msg failed, err:", err)return}conn.Write(data)}
}

Go语言实现UDP通信

UDP协议

UDP协议(User Datagram Protocol)中文名称是用户数据报协议,是OSI(Open System Interconnection,开放式系统互联)参考模型中一种无连接的传输层协议,不需要建立连接就能直接进行数据发送和接收,属于不可靠的、没有时序的通信,但是UDP协议的实时性比较好,通常用于视频直播相关领域。

UDP服务端

使用Go语言的net包实现的UDP服务端代码如下:

// UDP/server/main.go// UDP server端
func main() {listen, err := net.ListenUDP("udp", &net.UDPAddr{IP:   net.IPv4(0, 0, 0, 0),Port: 30000,})if err != nil {fmt.Println("listen failed, err:", err)return}defer listen.Close()for {var data [1024]byten, addr, err := listen.ReadFromUDP(data[:]) // 接收数据if err != nil {fmt.Println("read udp failed, err:", err)continue}fmt.Printf("data:%v addr:%v count:%v\n", string(data[:n]), addr, n)_, err = listen.WriteToUDP(data[:n], addr) // 发送数据if err != nil {fmt.Println("write to udp failed, err:", err)continue}}
}
UDP客户端

使用Go语言的net包实现的UDP客户端代码如下:

// UDP 客户端
func main() {socket, err := net.DialUDP("udp", nil, &net.UDPAddr{IP:   net.IPv4(0, 0, 0, 0),Port: 30000,})if err != nil {fmt.Println("连接服务端失败,err:", err)return}defer socket.Close()sendData := []byte("Hello server")_, err = socket.Write(sendData) // 发送数据if err != nil {fmt.Println("发送数据失败,err:", err)return}data := make([]byte, 4096)n, remoteAddr, err := socket.ReadFromUDP(data) // 接收数据if err != nil {fmt.Println("接收数据失败,err:", err)return}fmt.Printf("recv:%v addr:%v count:%v\n", string(data[:n]), remoteAddr, n)
}

Go标准库net/http包

Go语言内置的net/http包提供了HTTP客户端和服务端的实现。

HTTP协议

超文本传输协议(HTTP,HyperText Transfer Protocol)是互联网上应用最为广泛的一种网络传输协议,所有的WWW文件都必须遵守这个标准。设计HTTP最初的目的是为了提供一种发布和接收HTML页面的方法。

HTTP客户端

基本的HTTP/HTTPS请求

Get、Head、Post和PostForm函数发出HTTP/HTTPS请求。

resp, err := http.Get("http://example.com/")
...
resp, err := http.Post("http://example.com/upload", "image/jpeg", &buf)
...
resp, err := http.PostForm("http://example.com/form",url.Values{"key": {"Value"}, "id": {"123"}})

程序在使用完response后必须关闭回复的主体。

resp, err := http.Get("http://example.com/")
if err != nil {// handle error
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
// ...
GET请求示例

使用net/http包编写一个简单的发送HTTP请求的Client端,代码如下:

package mainimport ("fmt""io/ioutil""net/http"
)func main() {resp, err := http.Get("https://www.liwenzhou.com/")if err != nil {fmt.Printf("get failed, err:%v\n", err)return}defer resp.Body.Close()body, err := ioutil.ReadAll(resp.Body)if err != nil {fmt.Printf("read from resp.Body failed, err:%v\n", err)return}fmt.Print(string(body))
}

将上面的代码保存之后编译成可执行文件,执行之后就能在终端打印liwenzhou.com网站首页的内容了,我们的浏览器其实就是一个发送和接收HTTP协议数据的客户端,我们平时通过浏览器访问网页其实就是从网站的服务器接收HTTP数据,然后浏览器会按照HTML、CSS等规则将网页渲染展示出来。

带参数的GET请求示例

关于GET请求的参数需要使用Go语言内置的net/url这个标准库来处理。

func main() {apiUrl := "http://127.0.0.1:9090/get"// URL paramdata := url.Values{}data.Set("name", "小王子")data.Set("age", "18")u, err := url.ParseRequestURI(apiUrl)if err != nil {fmt.Printf("parse url requestUrl failed, err:%v\n", err)}u.RawQuery = data.Encode() // URL encodefmt.Println(u.String())resp, err := http.Get(u.String())if err != nil {fmt.Printf("post failed, err:%v\n", err)return}defer resp.Body.Close()b, err := ioutil.ReadAll(resp.Body)if err != nil {fmt.Printf("get resp failed, err:%v\n", err)return}fmt.Println(string(b))
}

对应的Server端HandlerFunc如下:

func getHandler(w http.ResponseWriter, r *http.Request) {defer r.Body.Close()data := r.URL.Query()fmt.Println(data.Get("name"))fmt.Println(data.Get("age"))answer := `{"status": "ok"}`w.Write([]byte(answer))
}
Post请求示例

上面演示了使用net/http包发送GET请求的示例,发送POST请求的示例代码如下:

package mainimport ("fmt""io/ioutil""net/http""strings"
)// net/http post demofunc main() {url := "http://127.0.0.1:9090/post"// 表单数据//contentType := "application/x-www-form-urlencoded"//data := "name=小王子&age=18"// jsoncontentType := "application/json"data := `{"name":"小王子","age":18}`resp, err := http.Post(url, contentType, strings.NewReader(data))if err != nil {fmt.Printf("post failed, err:%v\n", err)return}defer resp.Body.Close()b, err := ioutil.ReadAll(resp.Body)if err != nil {fmt.Printf("get resp failed, err:%v\n", err)return}fmt.Println(string(b))
}

对应的Server端HandlerFunc如下:

func postHandler(w http.ResponseWriter, r *http.Request) {defer r.Body.Close()// 1. 请求类型是application/x-www-form-urlencoded时解析form数据r.ParseForm()fmt.Println(r.PostForm) // 打印form数据fmt.Println(r.PostForm.Get("name"), r.PostForm.Get("age"))// 2. 请求类型是application/json时从r.Body读取数据b, err := ioutil.ReadAll(r.Body)if err != nil {fmt.Printf("read request.Body failed, err:%v\n", err)return}fmt.Println(string(b))answer := `{"status": "ok"}`w.Write([]byte(answer))
}
自定义Client

要管理HTTP客户端的头域、重定向策略和其他设置,创建一个Client:

client := &http.Client{CheckRedirect: redirectPolicyFunc,
}
resp, err := client.Get("http://example.com")
// ...
req, err := http.NewRequest("GET", "http://example.com", nil)
// ...
req.Header.Add("If-None-Match", `W/"wyzzy"`)
resp, err := client.Do(req)
// ...
自定义Transport

要管理代理、TLS配置、keep-alive、压缩和其他设置,创建一个Transport:

tr := &http.Transport{TLSClientConfig:    &tls.Config{RootCAs: pool},DisableCompression: true,
}
client := &http.Client{Transport: tr}
resp, err := client.Get("https://example.com")

Client和Transport类型都可以安全的被多个goroutine同时使用。出于效率考虑,应该一次建立、尽量重用。

服务端

默认的Server

ListenAndServe使用指定的监听地址和处理器启动一个HTTP服务端。处理器参数通常是nil,这表示采用包变量DefaultServeMux作为处理器。

Handle和HandleFunc函数可以向DefaultServeMux添加处理器。

http.Handle("/foo", fooHandler)
http.HandleFunc("/bar", func(w http.ResponseWriter, r *http.Request) {fmt.Fprintf(w, "Hello, %q", html.EscapeString(r.URL.Path))
})
log.Fatal(http.ListenAndServe(":8080", nil))
默认的Server示例

使用Go语言中的net/http包来编写一个简单的接收HTTP请求的Server端示例,net/http包是对net包的进一步封装,专门用来处理HTTP协议的数据。具体的代码如下:

// http serverfunc sayHello(w http.ResponseWriter, r *http.Request) {fmt.Fprintln(w, "Hello 沙河!")
}func main() {http.HandleFunc("/", sayHello)err := http.ListenAndServe(":9090", nil)if err != nil {fmt.Printf("http server failed, err:%v\n", err)return}
}

将上面的代码编译之后执行,打开你电脑上的浏览器在地址栏输入127.0.0.1:9090回车,此时就能够看到如下页面了。

自定义Server

要管理服务端的行为,可以创建一个自定义的Server:

s := &http.Server{Addr:           ":8080",Handler:        myHandler,ReadTimeout:    10 * time.Second,WriteTimeout:   10 * time.Second,MaxHeaderBytes: 1 << 20,
}
log.Fatal(s.ListenAndServe())

4.11 Go单元测试

go test工具

Go语言中的测试依赖go test命令。编写测试代码和编写普通的Go代码过程是类似的,并不需要学习新的语法、规则或工具。

go test命令是一个按照一定约定和组织的测试代码的驱动程序。在包目录内,所有以_test.go为后缀名的源代码文件都是go test测试的一部分,不会被go build编译到最终的可执行文件中。

*_test.go文件中有三种类型的函数,单元测试函数、基准测试函数和示例函数。

类型 格式 作用
测试函数 函数名前缀为Test 测试程序的一些逻辑行为是否正确
基准函数 函数名前缀为Benchmark 测试函数的性能
示例函数 函数名前缀为Example 为文档提供示例文档

go test命令会遍历所有的*_test.go文件中符合上述命名规则的函数,然后生成一个临时的main包用于调用相应的测试函数,然后构建并运行、报告测试结果,最后清理测试中生成的临时文件。

测试函数

测试函数的格式

每个测试函数必须导入testing包,测试函数的基本格式(签名)如下:

func TestName(t *testing.T){// ...
}

测试函数的名字必须以Test开头,可选的后缀名必须以大写字母开头,举几个例子:

func TestAdd(t *testing.T){ ... }
func TestSum(t *testing.T){ ... }
func TestLog(t *testing.T){ ... }

其中参数t用于报告测试失败和附加的日志信息。 testing.T的拥有的方法如下:

func (c *T) Error(args ...interface{})
func (c *T) Errorf(format string, args ...interface{})
func (c *T) Fail()
func (c *T) FailNow()
func (c *T) Failed() bool
func (c *T) Fatal(args ...interface{})
func (c *T) Fatalf(format string, args ...interface{})
func (c *T) Log(args ...interface{})
func (c *T) Logf(format string, args ...interface{})
func (c *T) Name() string
func (t *T) Parallel()
func (t *T) Run(name string, f func(t *T)) bool
func (c *T) Skip(args ...interface{})
func (c *T) SkipNow()
func (c *T) Skipf(format string, args ...interface{})
func (c *T) Skipped() bool
测试函数示例

就像细胞是构成我们身体的基本单位,一个软件程序也是由很多单元组件构成的。单元组件可以是函数、结构体、方法和最终用户可能依赖的任意东西。总之我们需要确保这些组件是能够正常运行的。单元测试是一些利用各种方法测试单元组件的程序,它会将结果与预期输出进行比较。

接下来,我们定义一个split的包,包中定义了一个Split函数,具体实现如下:

// split/split.gopackage splitimport "strings"// split package with a single split function.// Split slices s into all substrings separated by sep and
// returns a slice of the substrings between those separators.
func Split(s, sep string) (result []string) {i := strings.Index(s, sep)for i > -1 {result = append(result, s[:i])s = s[i+1:]i = strings.Index(s, sep)}result = append(result, s)return
}

在当前目录下,我们创建一个split_test.go的测试文件,并定义一个测试函数如下:

// split/split_test.gopackage splitimport ("reflect""testing"
)func TestSplit(t *testing.T) { // 测试函数名必须以Test开头,必须接收一个*testing.T类型参数got := Split("a:b:c", ":")         // 程序输出的结果want := []string{"a", "b", "c"}    // 期望的结果if !reflect.DeepEqual(want, got) { // 因为slice不能比较直接,借助反射包中的方法比较t.Errorf("expected:%v, got:%v", want, got) // 测试失败输出错误提示}
}

此时split这个包中的文件如下:

split $ ls -l
total 16
-rw-r--r--  1 liwenzhou  staff  408  4 29 15:50 split.go
-rw-r--r--  1 liwenzhou  staff  466  4 29 16:04 split_test.go

split包路径下,执行go test命令,可以看到输出结果如下:

split $ go test
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       0.005s

一个测试用例有点单薄,我们再编写一个测试使用多个字符切割字符串的例子,在split_test.go中添加如下测试函数:

func TestMoreSplit(t *testing.T) {got := Split("abcd", "bc")want := []string{"a", "d"}if !reflect.DeepEqual(want, got) {t.Errorf("expected:%v, got:%v", want, got)}
}

再次运行go test命令,输出结果如下:

split $ go test
--- FAIL: TestMultiSplit (0.00s)split_test.go:20: expected:[a d], got:[a cd]
FAIL
exit status 1
FAIL    github.com/Q1mi/studygo/code_demo/test_demo/split       0.006s

这一次,我们的测试失败了。我们可以为go test命令添加-v参数,查看测试函数名称和运行时间:

split $ go test -v
=== RUN   TestSplit
--- PASS: TestSplit (0.00s)
=== RUN   TestMoreSplit
--- FAIL: TestMoreSplit (0.00s)split_test.go:21: expected:[a d], got:[a cd]
FAIL
exit status 1
FAIL    github.com/Q1mi/studygo/code_demo/test_demo/split       0.005s

这一次我们能清楚的看到是TestMoreSplit这个测试没有成功。 还可以在go test命令后添加-run参数,它对应一个正则表达式,只有函数名匹配上的测试函数才会被go test命令执行。

split $ go test -v -run="More"
=== RUN   TestMoreSplit
--- FAIL: TestMoreSplit (0.00s)split_test.go:21: expected:[a d], got:[a cd]
FAIL
exit status 1
FAIL    github.com/Q1mi/studygo/code_demo/test_demo/split       0.006s

现在我们回过头来解决我们程序中的问题。很显然我们最初的split函数并没有考虑到sep为多个字符的情况,我们来修复下这个Bug:

package splitimport "strings"// split package with a single split function.// Split slices s into all substrings separated by sep and
// returns a slice of the substrings between those separators.
func Split(s, sep string) (result []string) {i := strings.Index(s, sep)for i > -1 {result = append(result, s[:i])s = s[i+len(sep):] // 这里使用len(sep)获取sep的长度i = strings.Index(s, sep)}result = append(result, s)return
}

这一次我们再来测试一下,我们的程序。注意,当我们修改了我们的代码之后不要仅仅执行那些失败的测试函数,我们应该完整的运行所有的测试,保证不会因为修改代码而引入了新的问题。

split $ go test -v
=== RUN   TestSplit
--- PASS: TestSplit (0.00s)
=== RUN   TestMoreSplit
--- PASS: TestMoreSplit (0.00s)
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       0.006s

这一次我们的测试都通过了。

测试组

我们现在还想要测试一下split函数对中文字符串的支持,这个时候我们可以再编写一个TestChineseSplit测试函数,但是我们也可以使用如下更友好的一种方式来添加更多的测试用例。

func TestSplit(t *testing.T) {// 定义一个测试用例类型type test struct {input stringsep   stringwant  []string}// 定义一个存储测试用例的切片tests := []test{{input: "a:b:c", sep: ":", want: []string{"a", "b", "c"}},{input: "a:b:c", sep: ",", want: []string{"a:b:c"}},{input: "abcd", sep: "bc", want: []string{"a", "d"}},{input: "沙河有沙又有河", sep: "沙", want: []string{"河有", "又有河"}},}// 遍历切片,逐一执行测试用例for _, tc := range tests {got := Split(tc.input, tc.sep)if !reflect.DeepEqual(got, tc.want) {t.Errorf("expected:%v, got:%v", tc.want, got)}}
}

我们通过上面的代码把多个测试用例合到一起,再次执行go test命令。

split $ go test -v
=== RUN   TestSplit
--- FAIL: TestSplit (0.00s)split_test.go:42: expected:[河有 又有河], got:[ 河有 又有河]
FAIL
exit status 1
FAIL    github.com/Q1mi/studygo/code_demo/test_demo/split       0.006s

我们的测试出现了问题,仔细看打印的测试失败提示信息:expected:[河有 又有河], got:[ 河有 又有河],你会发现[ 河有 又有河]中有个不明显的空串,这种情况下十分推荐使用%#v的格式化方式。

我们修改下测试用例的格式化输出错误提示部分:

func TestSplit(t *testing.T) {...for _, tc := range tests {got := Split(tc.input, tc.sep)if !reflect.DeepEqual(got, tc.want) {t.Errorf("expected:%#v, got:%#v", tc.want, got)}}
}

此时运行go test命令后就能看到比较明显的提示信息了:

split $ go test -v
=== RUN   TestSplit
--- FAIL: TestSplit (0.00s)split_test.go:42: expected:[]string{"河有", "又有河"}, got:[]string{"", "河有", "又有河"}
FAIL
exit status 1
FAIL    github.com/Q1mi/studygo/code_demo/test_demo/split       0.006s
子测试

看起来都挺不错的,但是如果测试用例比较多的时候,我们是没办法一眼看出来具体是哪个测试用例失败了。我们可能会想到下面的解决办法:

func TestSplit(t *testing.T) {type test struct { // 定义test结构体input stringsep   stringwant  []string}tests := map[string]test{ // 测试用例使用map存储"simple":      {input: "a:b:c", sep: ":", want: []string{"a", "b", "c"}},"wrong sep":   {input: "a:b:c", sep: ",", want: []string{"a:b:c"}},"more sep":    {input: "abcd", sep: "bc", want: []string{"a", "d"}},"leading sep": {input: "沙河有沙又有河", sep: "沙", want: []string{"河有", "又有河"}},}for name, tc := range tests {got := Split(tc.input, tc.sep)if !reflect.DeepEqual(got, tc.want) {t.Errorf("name:%s expected:%#v, got:%#v", name, tc.want, got) // 将测试用例的name格式化输出}}
}

上面的做法是能够解决问题的。同时Go1.7+中新增了子测试,我们可以按照如下方式使用t.Run执行子测试:

func TestSplit(t *testing.T) {type test struct { // 定义test结构体input stringsep   stringwant  []string}tests := map[string]test{ // 测试用例使用map存储"simple":      {input: "a:b:c", sep: ":", want: []string{"a", "b", "c"}},"wrong sep":   {input: "a:b:c", sep: ",", want: []string{"a:b:c"}},"more sep":    {input: "abcd", sep: "bc", want: []string{"a", "d"}},"leading sep": {input: "沙河有沙又有河", sep: "沙", want: []string{"河有", "又有河"}},}for name, tc := range tests {t.Run(name, func(t *testing.T) { // 使用t.Run()执行子测试got := Split(tc.input, tc.sep)if !reflect.DeepEqual(got, tc.want) {t.Errorf("expected:%#v, got:%#v", tc.want, got)}})}
}

此时我们再执行go test命令就能够看到更清晰的输出内容了:

split $ go test -v
=== RUN   TestSplit
=== RUN   TestSplit/leading_sep
=== RUN   TestSplit/simple
=== RUN   TestSplit/wrong_sep
=== RUN   TestSplit/more_sep
--- FAIL: TestSplit (0.00s)--- FAIL: TestSplit/leading_sep (0.00s)split_test.go:83: expected:[]string{"河有", "又有河"}, got:[]string{"", "河有", "又有河"}--- PASS: TestSplit/simple (0.00s)--- PASS: TestSplit/wrong_sep (0.00s)--- PASS: TestSplit/more_sep (0.00s)
FAIL
exit status 1
FAIL    github.com/Q1mi/studygo/code_demo/test_demo/split       0.006s

这个时候我们要把测试用例中的错误修改回来:

func TestSplit(t *testing.T) {...tests := map[string]test{ // 测试用例使用map存储"simple":      {input: "a:b:c", sep: ":", want: []string{"a", "b", "c"}},"wrong sep":   {input: "a:b:c", sep: ",", want: []string{"a:b:c"}},"more sep":    {input: "abcd", sep: "bc", want: []string{"a", "d"}},"leading sep": {input: "沙河有沙又有河", sep: "沙", want: []string{"", "河有", "又有河"}},}...
}

我们都知道可以通过-run=RegExp来指定运行的测试用例,还可以通过/来指定要运行的子测试用例,例如:go test -v -run=Split/simple只会运行simple对应的子测试用例。

测试覆盖率

测试覆盖率是你的代码被测试套件覆盖的百分比。通常我们使用的都是语句的覆盖率,也就是在测试中至少被运行一次的代码占总代码的比例。

Go提供内置功能来检查你的代码覆盖率。我们可以使用go test -cover来查看测试覆盖率。例如:

split $ go test -cover
PASS
coverage: 100.0% of statements
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       0.005s

从上面的结果可以看到我们的测试用例覆盖了100%的代码。

Go还提供了一个额外的-coverprofile参数,用来将覆盖率相关的记录信息输出到一个文件。例如:

split $ go test -cover -coverprofile=c.out
PASS
coverage: 100.0% of statements
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       0.005s

上面的命令会将覆盖率相关的信息输出到当前文件夹下面的c.out文件中,然后我们执行go tool cover -html=c.out,使用cover工具来处理生成的记录信息,该命令会打开本地的浏览器窗口生成一个HTML报告。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-slN64e8e-1673623886615)(null)]上图中每个用绿色标记的语句块表示被覆盖了,而红色的表示没有被覆盖。

基准测试

基准测试函数格式

基准测试就是在一定的工作负载之下检测程序性能的一种方法。基准测试的基本格式如下:

func BenchmarkName(b *testing.B){// ...
}

基准测试以Benchmark为前缀,需要一个*testing.B类型的参数b,基准测试必须要执行b.N次,这样的测试才有对照性,b.N的值是系统根据实际情况去调整的,从而保证测试的稳定性。 testing.B拥有的方法如下:

func (c *B) Error(args ...interface{})
func (c *B) Errorf(format string, args ...interface{})
func (c *B) Fail()
func (c *B) FailNow()
func (c *B) Failed() bool
func (c *B) Fatal(args ...interface{})
func (c *B) Fatalf(format string, args ...interface{})
func (c *B) Log(args ...interface{})
func (c *B) Logf(format string, args ...interface{})
func (c *B) Name() string
func (b *B) ReportAllocs()
func (b *B) ResetTimer()
func (b *B) Run(name string, f func(b *B)) bool
func (b *B) RunParallel(body func(*PB))
func (b *B) SetBytes(n int64)
func (b *B) SetParallelism(p int)
func (c *B) Skip(args ...interface{})
func (c *B) SkipNow()
func (c *B) Skipf(format string, args ...interface{})
func (c *B) Skipped() bool
func (b *B) StartTimer()
func (b *B) StopTimer()
基准测试示例

我们为split包中的Split函数编写基准测试如下:

func BenchmarkSplit(b *testing.B) {for i := 0; i < b.N; i++ {Split("沙河有沙又有河", "沙")}
}

基准测试并不会默认执行,需要增加-bench参数,所以我们通过执行go test -bench=Split命令执行基准测试,输出结果如下:

split $ go test -bench=Split
goos: darwin
goarch: amd64
pkg: github.com/Q1mi/studygo/code_demo/test_demo/split
BenchmarkSplit-8        10000000               203 ns/op
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       2.255s

其中BenchmarkSplit-8表示对Split函数进行基准测试,数字8表示GOMAXPROCS的值,这个对于并发基准测试很重要。10000000203ns/op表示每次调用Split函数耗时203ns,这个结果是10000000次调用的平均值。

我们还可以为基准测试添加-benchmem参数,来获得内存分配的统计数据。

split $ go test -bench=Split -benchmem
goos: darwin
goarch: amd64
pkg: github.com/Q1mi/studygo/code_demo/test_demo/split
BenchmarkSplit-8        10000000               215 ns/op             112 B/op          3 allocs/op
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       2.394s

其中,112 B/op表示每次操作内存分配了112字节,3 allocs/op则表示每次操作进行了3次内存分配。 我们将我们的Split函数优化如下:

func Split(s, sep string) (result []string) {result = make([]string, 0, strings.Count(s, sep)+1)i := strings.Index(s, sep)for i > -1 {result = append(result, s[:i])s = s[i+len(sep):] // 这里使用len(sep)获取sep的长度i = strings.Index(s, sep)}result = append(result, s)return
}

这一次我们提前使用make函数将result初始化为一个容量足够大的切片,而不再像之前一样通过调用append函数来追加。我们来看一下这个改进会带来多大的性能提升:

split $ go test -bench=Split -benchmem
goos: darwin
goarch: amd64
pkg: github.com/Q1mi/studygo/code_demo/test_demo/split
BenchmarkSplit-8        10000000               127 ns/op              48 B/op          1 allocs/op
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       1.423s

这个使用make函数提前分配内存的改动,减少了2/3的内存分配次数,并且减少了一半的内存分配。

性能比较函数

上面的基准测试只能得到给定操作的绝对耗时,但是在很多性能问题是发生在两个不同操作之间的相对耗时,比如同一个函数处理1000个元素的耗时与处理1万甚至100万个元素的耗时的差别是多少?再或者对于同一个任务究竟使用哪种算法性能最佳?我们通常需要对两个不同算法的实现使用相同的输入来进行基准比较测试。

性能比较函数通常是一个带有参数的函数,被多个不同的Benchmark函数传入不同的值来调用。举个例子如下:

func benchmark(b *testing.B, size int){/* ... */}
func Benchmark10(b *testing.B){ benchmark(b, 10) }
func Benchmark100(b *testing.B){ benchmark(b, 100) }
func Benchmark1000(b *testing.B){ benchmark(b, 1000) }

例如我们编写了一个计算斐波那契数列的函数如下:

// fib.go// Fib 是一个计算第n个斐波那契数的函数
func Fib(n int) int {if n < 2 {return n}return Fib(n-1) + Fib(n-2)
}

我们编写的性能比较函数如下:

// fib_test.gofunc benchmarkFib(b *testing.B, n int) {for i := 0; i < b.N; i++ {Fib(n)}
}func BenchmarkFib1(b *testing.B)  { benchmarkFib(b, 1) }
func BenchmarkFib2(b *testing.B)  { benchmarkFib(b, 2) }
func BenchmarkFib3(b *testing.B)  { benchmarkFib(b, 3) }
func BenchmarkFib10(b *testing.B) { benchmarkFib(b, 10) }
func BenchmarkFib20(b *testing.B) { benchmarkFib(b, 20) }
func BenchmarkFib40(b *testing.B) { benchmarkFib(b, 40) }

运行基准测试:

split $ go test -bench=.
goos: darwin
goarch: amd64
pkg: github.com/Q1mi/studygo/code_demo/test_demo/fib
BenchmarkFib1-8         1000000000               2.03 ns/op
BenchmarkFib2-8         300000000                5.39 ns/op
BenchmarkFib3-8         200000000                9.71 ns/op
BenchmarkFib10-8         5000000               325 ns/op
BenchmarkFib20-8           30000             42460 ns/op
BenchmarkFib40-8               2         638524980 ns/op
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/fib 12.944s

这里需要注意的是,默认情况下,每个基准测试至少运行1秒。如果在Benchmark函数返回时没有到1秒,则b.N的值会按1,2,5,10,20,50,…增加,并且函数再次运行。

最终的BenchmarkFib40只运行了两次,每次运行的平均值只有不到一秒。像这种情况下我们应该可以使用-benchtime标志增加最小基准时间,以产生更准确的结果。例如:

split $ go test -bench=Fib40 -benchtime=20s
goos: darwin
goarch: amd64
pkg: github.com/Q1mi/studygo/code_demo/test_demo/fib
BenchmarkFib40-8              50         663205114 ns/op
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/fib 33.849s

这一次BenchmarkFib40函数运行了50次,结果就会更准确一些了。

使用性能比较函数做测试的时候一个容易犯的错误就是把b.N作为输入的大小,例如以下两个例子都是错误的示范:

// 错误示范1
func BenchmarkFibWrong(b *testing.B) {for n := 0; n < b.N; n++ {Fib(n)}
}// 错误示范2
func BenchmarkFibWrong2(b *testing.B) {Fib(b.N)
}
重置时间

b.ResetTimer之前的处理不会放到执行时间里,也不会输出到报告中,所以可以在之前做一些不计划作为测试报告的操作。例如:

func BenchmarkSplit(b *testing.B) {time.Sleep(5 * time.Second) // 假设需要做一些耗时的无关操作b.ResetTimer()              // 重置计时器for i := 0; i < b.N; i++ {Split("沙河有沙又有河", "沙")}
}
并行测试

func (b *B) RunParallel(body func(*PB))会以并行的方式执行给定的基准测试。

RunParallel会创建出多个goroutine,并将b.N分配给这些goroutine执行, 其中goroutine数量的默认值为GOMAXPROCS。用户如果想要增加非CPU受限(non-CPU-bound)基准测试的并行性, 那么可以在RunParallel之前调用SetParallelismRunParallel通常会与-cpu标志一同使用。

func BenchmarkSplitParallel(b *testing.B) {// b.SetParallelism(1) // 设置使用的CPU数b.RunParallel(func(pb *testing.PB) {for pb.Next() {Split("沙河有沙又有河", "沙")}})
}

执行一下基准测试:

split $ go test -bench=.
goos: darwin
goarch: amd64
pkg: github.com/Q1mi/studygo/code_demo/test_demo/split
BenchmarkSplit-8                10000000               131 ns/op
BenchmarkSplitParallel-8        50000000                36.1 ns/op
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       3.308s

还可以通过在测试命令后添加-cpu参数如go test -bench=. -cpu 1来指定使用的CPU数量。

Setup与TearDown

测试程序有时需要在测试之前进行额外的设置(setup)或在测试之后进行拆卸(teardown)。

TestMain

通过在*_test.go文件中定义TestMain函数来可以在测试之前进行额外的设置(setup)或在测试之后进行拆卸(teardown)操作。

如果测试文件包含函数:func TestMain(m *testing.M)那么生成的测试会先调用 TestMain(m),然后再运行具体测试。TestMain运行在主goroutine中, 可以在调用 m.Run前后做任何设置(setup)和拆卸(teardown)。退出测试的时候应该使用m.Run的返回值作为参数调用os.Exit

一个使用TestMain来设置Setup和TearDown的示例如下:

func TestMain(m *testing.M) {fmt.Println("write setup code here...") // 测试之前的做一些设置// 如果 TestMain 使用了 flags,这里应该加上flag.Parse()retCode := m.Run()                         // 执行测试fmt.Println("write teardown code here...") // 测试之后做一些拆卸工作os.Exit(retCode)                           // 退出测试
}

需要注意的是:在调用TestMain时, flag.Parse并没有被调用。所以如果TestMain 依赖于command-line标志 (包括 testing 包的标记), 则应该显示的调用flag.Parse

子测试的Setup与Teardown

有时候我们可能需要为每个测试集设置Setup与Teardown,也有可能需要为每个子测试设置Setup与Teardown。下面我们定义两个函数工具函数如下:

// 测试集的Setup与Teardown
func setupTestCase(t *testing.T) func(t *testing.T) {t.Log("如有需要在此执行:测试之前的setup")return func(t *testing.T) {t.Log("如有需要在此执行:测试之后的teardown")}
}// 子测试的Setup与Teardown
func setupSubTest(t *testing.T) func(t *testing.T) {t.Log("如有需要在此执行:子测试之前的setup")return func(t *testing.T) {t.Log("如有需要在此执行:子测试之后的teardown")}
}

使用方式如下:

func TestSplit(t *testing.T) {type test struct { // 定义test结构体input stringsep   stringwant  []string}tests := map[string]test{ // 测试用例使用map存储"simple":      {input: "a:b:c", sep: ":", want: []string{"a", "b", "c"}},"wrong sep":   {input: "a:b:c", sep: ",", want: []string{"a:b:c"}},"more sep":    {input: "abcd", sep: "bc", want: []string{"a", "d"}},"leading sep": {input: "沙河有沙又有河", sep: "沙", want: []string{"", "河有", "又有河"}},}teardownTestCase := setupTestCase(t) // 测试之前执行setup操作defer teardownTestCase(t)            // 测试之后执行testdoen操作for name, tc := range tests {t.Run(name, func(t *testing.T) { // 使用t.Run()执行子测试teardownSubTest := setupSubTest(t) // 子测试之前执行setup操作defer teardownSubTest(t)           // 测试之后执行testdoen操作got := Split(tc.input, tc.sep)if !reflect.DeepEqual(got, tc.want) {t.Errorf("expected:%#v, got:%#v", tc.want, got)}})}
}

测试结果如下:

split $ go test -v
=== RUN   TestSplit
=== RUN   TestSplit/simple
=== RUN   TestSplit/wrong_sep
=== RUN   TestSplit/more_sep
=== RUN   TestSplit/leading_sep
--- PASS: TestSplit (0.00s)split_test.go:71: 如有需要在此执行:测试之前的setup--- PASS: TestSplit/simple (0.00s)split_test.go:79: 如有需要在此执行:子测试之前的setupsplit_test.go:81: 如有需要在此执行:子测试之后的teardown--- PASS: TestSplit/wrong_sep (0.00s)split_test.go:79: 如有需要在此执行:子测试之前的setupsplit_test.go:81: 如有需要在此执行:子测试之后的teardown--- PASS: TestSplit/more_sep (0.00s)split_test.go:79: 如有需要在此执行:子测试之前的setupsplit_test.go:81: 如有需要在此执行:子测试之后的teardown--- PASS: TestSplit/leading_sep (0.00s)split_test.go:79: 如有需要在此执行:子测试之前的setupsplit_test.go:81: 如有需要在此执行:子测试之后的teardownsplit_test.go:73: 如有需要在此执行:测试之后的teardown
=== RUN   ExampleSplit
--- PASS: ExampleSplit (0.00s)
PASS
ok      github.com/Q1mi/studygo/code_demo/test_demo/split       0.006s

示例函数

示例函数的格式

go test特殊对待的第三种函数就是示例函数,它们的函数名以Example为前缀。它们既没有参数也没有返回值。标准格式如下:

func ExampleName() {// ...
}
示例函数示例

下面的代码是我们为Split函数编写的一个示例函数:

func ExampleSplit() {fmt.Println(split.Split("a:b:c", ":"))fmt.Println(split.Split("沙河有沙又有河", "沙"))// Output:// [a b c]// [ 河有 又有河]
}

为你的代码编写示例代码有如下三个用处:

  1. 示例函数能够作为文档直接使用,例如基于web的godoc中能把示例函数与对应的函数或包相关联。

  2. 示例函数只要包含了// Output:也是可以通过go test运行的可执行测试。

    split $ go test -run Example
    PASS
    ok      github.com/Q1mi/studygo/code_demo/test_demo/split       0.006s
    
  3. 示例函数提供了可以直接运行的示例代码,可以直接在golang.orggodoc文档服务器上使用Go Playground运行示例代码。下图为strings.ToUpper函数在Playground的示例函数效果。

五、Golang数据库部分

mysql存储引擎:

MyISAM:1查询速度快、2只支持表锁、不支持事务

InnoBD:1整体速度快、2支持表锁和行锁

事务特点:

ACID:

1:原子性:多个操作当做一个整体(例如,转账一个减一个加,只有成功和失败,不存在中间状态)

2:一致性:3:隔离性:4:持久性:(后面有详细介绍哦)

索引:

索引原理:B树和B+树

索引类型、索引命中、分库分表、SQL注入、慢查询优化

MYSQL主从:

MYSQL读写分离

5.1 Go操作MySQL

关系型数据库

连接

Go语言中的**database/sql**包提供了保证SQL或类SQL数据库的泛用接口,并不提供具体的数据库驱动。使用database/sql包时必须注入(至少)一个数据库驱动。

原生支持连接池,并发安全

我们常用的数据库基本上都有完整的第三方实现。例如:MySQL驱动

下载依赖
go get -u github.com/go-sql-driver/mysql
使用MySQL驱动
func Open(driverName, dataSourceName string) (*DB, error)

Open打开一个dirverName指定的数据库,dataSourceName指定数据源,一般至少包括数据库文件名和其它连接必要的信息。

import ("database/sql"_ "github.com/go-sql-driver/mysql"
)func main() {// DSN:Data Source Namedsn := "user:password@tcp(127.0.0.1:3306)/dbname"//dbname是数据库名不是连接名db, err := sql.Open("mysql", dsn)//不会校验用户名和密码是否正确,只校验参数if err != nil {panic(err)}defer db.Close()  // 注意这行代码要写在上面err判断的下面
}

思考题: 为什么上面代码中的defer db.Close()语句不应该写在if err != nil的前面呢?

初始化连接

Open函数可能只是验证其参数格式是否正确,实际上并不创建与数据库的连接。如果要检查数据源的名称是否真实有效,应该调用Ping方法。

返回的DB对象可以安全地被多个goroutine并发使用,并且维护其自己的空闲连接池。因此,Open函数应该仅被调用一次,很少需要关闭这个DB对象。

接下来,我们定义一个全局变量db,用来保存数据库连接对象。将上面的示例代码拆分出一个独立的initDB函数,只需要在程序启动时调用一次该函数完成全局变量db的初始化,其他函数中就可以直接使用全局变量db了。(注意下方的注意

// 定义一个全局对象db
var db *sql.DB// 定义一个初始化数据库的函数
func initDB() (err error) {// DSN:Data Source Namedsn := "user:password@tcp(127.0.0.1:3306)/sql_test?charset=utf8mb4&parseTime=True"// 不会校验账号密码是否正确// 注意!!!这里不要使用:=,我们是给全局变量赋值,然后在main函数中使用全局变量dbdb, err = sql.Open("mysql", dsn)if err != nil {return err}// 尝试与数据库建立连接(校验dsn是否正确)err = db.Ping()if err != nil {return err}return nil
}func main() {err := initDB() // 调用输出化数据库的函数if err != nil {fmt.Printf("init db failed,err:%v\n", err)return}
}

其中sql.DB是表示连接的数据库对象(结构体实例),它保存了连接数据库相关的所有信息。它内部维护着一个具有零到多个底层连接的连接池,它可以安全地被多个goroutine同时使用。

SetMaxOpenConns
func (db *DB) SetMaxOpenConns(n int)

SetMaxOpenConns设置与数据库建立连接的最大数目。 如果n大于0且小于最大闲置连接数,会将最大闲置连接数减小到匹配最大开启连接数的限制。 如果n<=0,不会限制最大开启连接数,默认为0(无限制)。

SetMaxIdleConns
func (db *DB) SetMaxIdleConns(n int)

SetMaxIdleConns设置连接池中的最大闲置连接数。 如果n大于最大开启连接数,则新的最大闲置连接数会减小到匹配最大开启连接数的限制。 如果n<=0,不会保留闲置连接。

CRUD

建库建表

我们先在MySQL中创建一个名为sql_test的数据库

CREATE DATABASE sql_test;

进入该数据库:

use sql_test;

执行以下命令创建一张用于测试的数据表:

CREATE TABLE `user` (`id` BIGINT(20) NOT NULL AUTO_INCREMENT,`name` VARCHAR(20) DEFAULT '',`age` INT(11) DEFAULT '0',PRIMARY KEY(`id`)
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
查询

为了方便查询,我们事先定义好一个结构体来存储user表的数据。

type user struct {id   intage  intname string
}
单行查询

单行查询db.QueryRow()执行一次查询,并期望返回最多一行结果(即Row)。QueryRow总是返回非nil的值,直到返回值的Scan方法被调用时,才会返回被延迟的错误。(如:未找到结果)

func (db *DB) QueryRow(query string, args ...interface{}) *Row

具体示例代码:

// 查询单条数据示例
func queryRowDemo() {sqlStr := "select id, name, age from user where id=?"var u user// 非常重要:确保QueryRow之后调用Scan方法,否则持有的数据库链接不会被释放err := db.QueryRow(sqlStr, 1).Scan(&u.id, &u.name, &u.age)if err != nil {fmt.Printf("scan failed, err:%v\n", err)return}fmt.Printf("id:%d name:%s age:%d\n", u.id, u.name, u.age)
}
多行查询

多行查询db.Query()执行一次查询,返回多行结果(即Rows),一般用于执行select命令。参数args表示query中的占位参数。

func (db *DB) Query(query string, args ...interface{}) (*Rows, error)

具体示例代码:

// 查询多条数据示例
func queryMultiRowDemo() {sqlStr := "select id, name, age from user where id > ?"rows, err := db.Query(sqlStr, 0)if err != nil {fmt.Printf("query failed, err:%v\n", err)return}// 非常重要:关闭rows释放持有的数据库链接defer rows.Close()// 循环读取结果集中的数据//只要是有吓一条就返回truefor rows.Next() {var u usererr := rows.Scan(&u.id, &u.name, &u.age)if err != nil {fmt.Printf("scan failed, err:%v\n", err)return}fmt.Printf("id:%d name:%s age:%d\n", u.id, u.name, u.age)}
}
插入数据

插入、更新和删除操作都使用Exec方法。

func (db *DB) Exec(query string, args ...interface{}) (Result, error)

Exec执行一次命令(包括查询、删除、更新、插入等),返回的Result是对已执行的SQL命令的总结。参数args表示query中的占位参数。

具体插入数据示例代码如下:

// 插入数据
func insertRowDemo() {sqlStr := "insert into user(name, age) values (?,?)"ret, err := db.Exec(sqlStr, "王五", 38)if err != nil {fmt.Printf("insert failed, err:%v\n", err)return}theID, err := ret.LastInsertId() // 新插入数据的idif err != nil {fmt.Printf("get lastinsert ID failed, err:%v\n", err)return}fmt.Printf("insert success, the id is %d.\n", theID)
}
更新数据

具体更新数据示例代码如下:

// 更新数据
func updateRowDemo() {sqlStr := "update user set age=? where id = ?"ret, err := db.Exec(sqlStr, 39, 3)if err != nil {fmt.Printf("update failed, err:%v\n", err)return}n, err := ret.RowsAffected() // 操作影响的行数if err != nil {fmt.Printf("get RowsAffected failed, err:%v\n", err)return}fmt.Printf("update success, affected rows:%d\n", n)
}
删除数据

具体删除数据的示例代码如下:

// 删除数据
func deleteRowDemo() {sqlStr := "delete from user where id = ?"ret, err := db.Exec(sqlStr, 3)if err != nil {fmt.Printf("delete failed, err:%v\n", err)return}n, err := ret.RowsAffected() // 操作影响的行数if err != nil {fmt.Printf("get RowsAffected failed, err:%v\n", err)return}fmt.Printf("delete success, affected rows:%d\n", n)
}

MySQL预处理

什么是预处理?

普通SQL语句执行过程:

  1. 客户端对SQL语句进行占位符替换得到完整的SQL语句。
  2. 客户端发送完整SQL语句到MySQL服务端
  3. MySQL服务端执行完整的SQL语句并将结果返回给客户端。

预处理执行过程:

  1. 把SQL语句分成两部分,命令部分与数据部分。
  2. 先把命令部分发送给MySQL服务端,MySQL服务端进行SQL预处理。
  3. 然后把数据部分发送给MySQL服务端,MySQL服务端对SQL语句进行占位符替换。
  4. MySQL服务端执行完整的SQL语句并将结果返回给客户端。
为什么要预处理?
  1. 优化MySQL服务器重复执行SQL的方法,可以提升服务器性能,提前让服务器编译,一次编译多次执行,节省后续编译的成本。
  2. 避免SQL注入问题。
Go实现MySQL预处理

database/sql中使用下面的Prepare方法来实现预处理操作。

func (db *DB) Prepare(query string) (*Stmt, error)

Prepare方法会先将sql语句发送给MySQL服务端,返回一个准备好的状态用于之后的查询和命令。返回值可以同时执行多个查询和命令。

查询操作的预处理示例代码如下:

// 预处理查询示例
func prepareQueryDemo() {sqlStr := "select id, name, age from user where id > ?"stmt, err := db.Prepare(sqlStr)if err != nil {fmt.Printf("prepare failed, err:%v\n", err)return}defer stmt.Close()rows, err := stmt.Query(0)if err != nil {fmt.Printf("query failed, err:%v\n", err)return}defer rows.Close()// 循环读取结果集中的数据for rows.Next() {var u usererr := rows.Scan(&u.id, &u.name, &u.age)if err != nil {fmt.Printf("scan failed, err:%v\n", err)return}fmt.Printf("id:%d name:%s age:%d\n", u.id, u.name, u.age)}
}

插入、更新和删除操作的预处理十分类似,这里以插入操作的预处理为例:

// 预处理插入示例
func prepareInsertDemo() {sqlStr := "insert into user(name, age) values (?,?)"stmt, err := db.Prepare(sqlStr)if err != nil {fmt.Printf("prepare failed, err:%v\n", err)return}defer stmt.Close()_, err = stmt.Exec("小王子", 18)if err != nil {fmt.Printf("insert failed, err:%v\n", err)return}_, err = stmt.Exec("沙河娜扎", 18)if err != nil {fmt.Printf("insert failed, err:%v\n", err)return}fmt.Println("insert success.")
}
SQL注入问题

我们任何时候都不应该自己拼接SQL语句!

这里我们演示一个自行拼接SQL语句的示例,编写一个根据name字段查询user表的函数如下:

// sql注入示例
func sqlInjectDemo(name string) {sqlStr := fmt.Sprintf("select id, name, age from user where name='%s'", name)fmt.Printf("SQL:%s\n", sqlStr)var u usererr := db.QueryRow(sqlStr).Scan(&u.id, &u.name, &u.age)if err != nil {fmt.Printf("exec failed, err:%v\n", err)return}fmt.Printf("user:%#v\n", u)
}

此时以下输入字符串都可以引发SQL注入问题:

sqlInjectDemo("xxx' or 1=1#")
sqlInjectDemo("xxx' union select * from user #")
sqlInjectDemo("xxx' and (select count(*) from user) <10 #")

**补充:**不同的数据库中,SQL语句使用的占位符语法不尽相同。

数据库 占位符语法
MySQL ?
PostgreSQL $1, $2
SQLite ?$1
Oracle :name

Go实现MySQL事务

什么是事务?

事务:一个最小的不可再分的工作单元;通常一个事务对应一个完整的业务(例如银行账户转账业务,该业务就是一个最小的工作单元),同时这个完整的业务需要执行多次的DML(insert、update、delete)语句共同联合完成。A转账给B,这里面就需要执行两次update操作。

在MySQL中只有使用了Innodb数据库引擎的数据库或表才支持事务。事务处理可以用来维护数据库的完整性,保证成批的SQL语句要么全部执行,要么全部不执行。

事务的ACID

通常事务必须满足4个条件(ACID):原子性(Atomicity,或称不可分割性)、一致性(Consistency)、隔离性(Isolation,又称独立性)、持久性(Durability)。

条件 解释
原子性 一个事务(transaction)中的所有操作,要么全部完成,要么全部不完成,不会结束在中间某个环节。事务在执行过程中发生错误,会被回滚(Rollback)到事务开始前的状态,就像这个事务从来没有执行过一样。
一致性 在事务开始之前和事务结束以后,数据库的完整性没有被破坏。这表示写入的资料必须完全符合所有的预设规则,这包含资料的精确度、串联性以及后续数据库可以自发性地完成预定的工作。
隔离性 数据库允许多个并发事务同时对其数据进行读写和修改的能力,隔离性可以防止多个事务并发执行时由于交叉执行而导致数据的不一致。事务隔离分为不同级别,包括读未提交(Read uncommitted)、读提交(read committed)、可重复读(repeatable read)和串行化(Serializable)。
持久性 事务处理结束后,对数据的修改就是永久的,即便系统故障也不会丢失。
事务相关方法

Go语言中使用以下三个方法实现MySQL中的事务操作。 开始事务

func (db *DB) Begin() (*Tx, error)

提交事务

func (tx *Tx) Commit() error

回滚事务

func (tx *Tx) Rollback() error
事务示例

下面的代码演示了一个简单的事务操作,该事物操作能够确保两次更新操作要么同时成功要么同时失败,不会存在中间状态。

// 事务操作示例
func transactionDemo() {tx, err := db.Begin() // 开启事务if err != nil {if tx != nil {tx.Rollback() // 回滚}fmt.Printf("begin trans failed, err:%v\n", err)return}sqlStr1 := "Update user set age=30 where id=?"ret1, err := tx.Exec(sqlStr1, 2)if err != nil {tx.Rollback() // 回滚fmt.Printf("exec sql1 failed, err:%v\n", err)return}affRow1, err := ret1.RowsAffected()if err != nil {tx.Rollback() // 回滚fmt.Printf("exec ret1.RowsAffected() failed, err:%v\n", err)return}sqlStr2 := "Update user set age=40 where id=?"ret2, err := tx.Exec(sqlStr2, 3)if err != nil {tx.Rollback() // 回滚fmt.Printf("exec sql2 failed, err:%v\n", err)return}affRow2, err := ret2.RowsAffected()if err != nil {tx.Rollback() // 回滚fmt.Printf("exec ret1.RowsAffected() failed, err:%v\n", err)return}fmt.Println(affRow1, affRow2)if affRow1 == 1 && affRow2 == 1 {fmt.Println("事务提交啦...")tx.Commit() // 提交事务} else {tx.Rollback()fmt.Println("事务回滚啦...")}fmt.Println("exec trans success!")
}

更强大、更好用的sqlx库

5.2 sqlx介绍

在项目中我们通常可能会使用database/sql连接MySQL数据库。sqlx可以认为是Go语言内置database/sql的超集,它在优秀的内置database/sql基础上提供了一组扩展。这些扩展中除了大家常用来查询的Get(dest interface{}, ...) errorSelect(dest interface{}, ...) error外还有很多其他强大的功能。

安装sqlx

go get github.com/jmoiron/sqlx

基本使用

连接数据库
var db *sqlx.DBfunc initDB() (err error) {dsn := "user:password@tcp(127.0.0.1:3306)/sql_test?charset=utf8mb4&parseTime=True"// 也可以使用MustConnect连接不成功就panicdb, err = sqlx.Connect("mysql", dsn)if err != nil {fmt.Printf("connect DB failed, err:%v\n", err)return}db.SetMaxOpenConns(20)db.SetMaxIdleConns(10)return
}
查询

查询单行数据示例代码如下:

// 查询单条数据示例
func queryRowDemo() {sqlStr := "select id, name, age from user where id=?"var u usererr := db.Get(&u, sqlStr, 1)if err != nil {fmt.Printf("get failed, err:%v\n", err)return}fmt.Printf("id:%d name:%s age:%d\n", u.ID, u.Name, u.Age)
}

查询多行数据示例代码如下:

// 查询多条数据示例
func queryMultiRowDemo() {sqlStr := "select id, name, age from user where id > ?"var users []usererr := db.Select(&users, sqlStr, 0)if err != nil {fmt.Printf("query failed, err:%v\n", err)return}fmt.Printf("users:%#v\n", users)
}
插入、更新和删除

sqlx中的exec方法与原生sql中的exec使用基本一致:

// 插入数据
func insertRowDemo() {sqlStr := "insert into user(name, age) values (?,?)"ret, err := db.Exec(sqlStr, "沙河小王子", 19)if err != nil {fmt.Printf("insert failed, err:%v\n", err)return}theID, err := ret.LastInsertId() // 新插入数据的idif err != nil {fmt.Printf("get lastinsert ID failed, err:%v\n", err)return}fmt.Printf("insert success, the id is %d.\n", theID)
}// 更新数据
func updateRowDemo() {sqlStr := "update user set age=? where id = ?"ret, err := db.Exec(sqlStr, 39, 6)if err != nil {fmt.Printf("update failed, err:%v\n", err)return}n, err := ret.RowsAffected() // 操作影响的行数if err != nil {fmt.Printf("get RowsAffected failed, err:%v\n", err)return}fmt.Printf("update success, affected rows:%d\n", n)
}// 删除数据
func deleteRowDemo() {sqlStr := "delete from user where id = ?"ret, err := db.Exec(sqlStr, 6)if err != nil {fmt.Printf("delete failed, err:%v\n", err)return}n, err := ret.RowsAffected() // 操作影响的行数if err != nil {fmt.Printf("get RowsAffected failed, err:%v\n", err)return}fmt.Printf("delete success, affected rows:%d\n", n)
}
NamedExec

DB.NamedExec方法用来绑定SQL语句与结构体或map中的同名字段。

func insertUserDemo()(err error){sqlStr := "INSERT INTO user (name,age) VALUES (:name,:age)"_, err = db.NamedExec(sqlStr,map[string]interface{}{"name": "七米","age": 28,})return
}
NamedQuery

DB.NamedExec同理,这里是支持查询。

func namedQuery(){sqlStr := "SELECT * FROM user WHERE name=:name"// 使用map做命名查询rows, err := db.NamedQuery(sqlStr, map[string]interface{}{"name": "七米"})if err != nil {fmt.Printf("db.NamedQuery failed, err:%v\n", err)return}defer rows.Close()for rows.Next(){var u usererr := rows.StructScan(&u)if err != nil {fmt.Printf("scan failed, err:%v\n", err)continue}fmt.Printf("user:%#v\n", u)}u := user{Name: "七米",}// 使用结构体命名查询,根据结构体字段的 db tag进行映射rows, err = db.NamedQuery(sqlStr, u)if err != nil {fmt.Printf("db.NamedQuery failed, err:%v\n", err)return}defer rows.Close()for rows.Next(){var u usererr := rows.StructScan(&u)if err != nil {fmt.Printf("scan failed, err:%v\n", err)continue}fmt.Printf("user:%#v\n", u)}
}
事务操作

对于事务操作,我们可以使用sqlx中提供的db.Beginx()tx.Exec()方法。示例代码如下:

func transactionDemo2()(err error) {tx, err := db.Beginx() // 开启事务if err != nil {fmt.Printf("begin trans failed, err:%v\n", err)return err}defer func() {if p := recover(); p != nil {tx.Rollback()panic(p) // re-throw panic after Rollback} else if err != nil {fmt.Println("rollback")tx.Rollback() // err is non-nil; don't change it} else {err = tx.Commit() // err is nil; if Commit returns error update errfmt.Println("commit")}}()sqlStr1 := "Update user set age=20 where id=?"rs, err := tx.Exec(sqlStr1, 1)if err!= nil{return err}n, err := rs.RowsAffected()if err != nil {return err}if n != 1 {return errors.New("exec sqlStr1 failed")}sqlStr2 := "Update user set age=50 where i=?"rs, err = tx.Exec(sqlStr2, 5)if err!=nil{return err}n, err = rs.RowsAffected()if err != nil {return err}if n != 1 {return errors.New("exec sqlStr1 failed")}return err
}

sqlx.In

sqlx.Insqlx提供的一个非常方便的函数。

sqlx.In的批量插入示例
表结构

为了方便演示插入数据操作,这里创建一个user表,表结构如下:

CREATE TABLE `user` (`id` BIGINT(20) NOT NULL AUTO_INCREMENT,`name` VARCHAR(20) DEFAULT '',`age` INT(11) DEFAULT '0',PRIMARY KEY(`id`)
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
结构体

定义一个user结构体,字段通过tag与数据库中user表的列一致。

type User struct {Name string `db:"name"`Age  int    `db:"age"`
}
bindvars(绑定变量)

查询占位符?在内部称为bindvars(查询占位符),它非常重要。你应该始终使用它们向数据库发送值,因为它们可以防止SQL注入攻击。database/sql不尝试对查询文本进行任何验证;它与编码的参数一起按原样发送到服务器。除非驱动程序实现一个特殊的接口,否则在执行之前,查询是在服务器上准备的。因此bindvars是特定于数据库的:

  • MySQL中使用?
  • PostgreSQL使用枚举的$1$2等bindvar语法
  • SQLite中?$1的语法都支持
  • Oracle中使用:name的语法

bindvars的一个常见误解是,它们用来在sql语句中插入值。它们其实仅用于参数化,不允许更改SQL语句的结构。例如,使用bindvars尝试参数化列或表名将不起作用:

// ?不能用来插入表名(做SQL语句中表名的占位符)
db.Query("SELECT * FROM ?", "mytable")// ?也不能用来插入列名(做SQL语句中列名的占位符)
db.Query("SELECT ?, ? FROM people", "name", "location")
自己拼接语句实现批量插入

比较笨,但是很好理解。就是有多少个User就拼接多少个(?, ?)

// BatchInsertUsers 自行构造批量插入的语句
func BatchInsertUsers(users []*User) error {// 存放 (?, ?) 的slicevalueStrings := make([]string, 0, len(users))// 存放values的slicevalueArgs := make([]interface{}, 0, len(users) * 2)// 遍历users准备相关数据for _, u := range users {// 此处占位符要与插入值的个数对应valueStrings = append(valueStrings, "(?, ?)")valueArgs = append(valueArgs, u.Name)valueArgs = append(valueArgs, u.Age)}// 自行拼接要执行的具体语句stmt := fmt.Sprintf("INSERT INTO user (name, age) VALUES %s",strings.Join(valueStrings, ","))_, err := DB.Exec(stmt, valueArgs...)return err
}
使用sqlx.In实现批量插入

前提是需要我们的结构体实现driver.Valuer接口:

func (u User) Value() (driver.Value, error) {return []interface{}{u.Name, u.Age}, nil
}

使用sqlx.In实现批量插入代码如下:

// BatchInsertUsers2 使用sqlx.In帮我们拼接语句和参数, 注意传入的参数是[]interface{}
func BatchInsertUsers2(users []interface{}) error {query, args, _ := sqlx.In("INSERT INTO user (name, age) VALUES (?), (?), (?)",users..., // 如果arg实现了 driver.Valuer, sqlx.In 会通过调用 Value()来展开它)fmt.Println(query) // 查看生成的querystringfmt.Println(args)  // 查看生成的args_, err := DB.Exec(query, args...)return err
}
使用NamedExec实现批量插入

注意 :该功能需1.3.1版本以上,并且1.3.1版本目前还有点问题,sql语句最后不能有空格和;,详见issues/690。

使用NamedExec实现批量插入的代码如下:

// BatchInsertUsers3 使用NamedExec实现批量插入
func BatchInsertUsers3(users []*User) error {_, err := DB.NamedExec("INSERT INTO user (name, age) VALUES (:name, :age)", users)return err
}

把上面三种方法综合起来试一下:

func main() {err := initDB()if err != nil {panic(err)}defer DB.Close()u1 := User{Name: "七米", Age: 18}u2 := User{Name: "q1mi", Age: 28}u3 := User{Name: "小王子", Age: 38}// 方法1users := []*User{&u1, &u2, &u3}err = BatchInsertUsers(users)if err != nil {fmt.Printf("BatchInsertUsers failed, err:%v\n", err)}// 方法2users2 := []interface{}{u1, u2, u3}err = BatchInsertUsers2(users2)if err != nil {fmt.Printf("BatchInsertUsers2 failed, err:%v\n", err)}// 方法3users3 := []*User{&u1, &u2, &u3}err = BatchInsertUsers3(users3)if err != nil {fmt.Printf("BatchInsertUsers3 failed, err:%v\n", err)}
}
sqlx.In的查询示例

关于sqlx.In这里再补充一个用法,在sqlx查询语句中实现In查询和FIND_IN_SET函数。即实现SELECT * FROM user WHERE id in (3, 2, 1);SELECT * FROM user WHERE id in (3, 2, 1) ORDER BY FIND_IN_SET(id, '3,2,1');

in查询

查询id在给定id集合中的数据。

// QueryByIDs 根据给定ID查询
func QueryByIDs(ids []int)(users []User, err error){// 动态填充idquery, args, err := sqlx.In("SELECT name, age FROM user WHERE id IN (?)", ids)if err != nil {return}// sqlx.In 返回带 `?` bindvar的查询语句, 我们使用Rebind()重新绑定它query = DB.Rebind(query)err = DB.Select(&users, query, args...)return
}
in查询和FIND_IN_SET函数

查询id在给定id集合的数据并维持给定id集合的顺序。

// QueryAndOrderByIDs 按照指定id查询并维护顺序
func QueryAndOrderByIDs(ids []int)(users []User, err error){// 动态填充idstrIDs := make([]string, 0, len(ids))for _, id := range ids {strIDs = append(strIDs, fmt.Sprintf("%d", id))}query, args, err := sqlx.In("SELECT name, age FROM user WHERE id IN (?) ORDER BY FIND_IN_SET(id, ?)", ids, strings.Join(strIDs, ","))if err != nil {return}// sqlx.In 返回带 `?` bindvar的查询语句, 我们使用Rebind()重新绑定它query = DB.Rebind(query)err = DB.Select(&users, query, args...)return
}

当然,在这个例子里面你也可以先使用IN查询,然后通过代码按给定的ids对查询结果进行排序。

5.3 Go操作Redis

Go语言go-redis库的基本使用。

Redis介绍

Redis是一个开源的内存数据库,Redis提供了多种不同类型的数据结构,很多业务场景下的问题都可以很自然地映射到这些数据结构上。除此之外,通过复制、持久化和客户端分片等特性,我们可以很方便地将Redis扩展成一个能够包含数百GB数据、每秒处理上百万次请求的系统。

Redis支持的数据结构

Redis支持诸如字符串(string)、哈希(hashe)、列表(list)、集合(set)、带范围查询的排序集合(sorted set)、bitmap、hyperloglog、带半径查询的地理空间索引(geospatial index)和流(stream)等数据结构。

Redis应用场景
  • 缓存系统,减轻主数据库(MySQL)的压力。
  • 计数场景,比如微博、抖音中的关注数和粉丝数。
  • 热门排行榜,需要排序的场景特别适合使用ZSET。
  • 利用 LIST 可以实现队列的功能。
  • 利用 HyperLogLog 统计UV、PV等数据。
  • 使用 geospatial index 进行地理位置相关查询。

准备Redis环境

读者可以选择在本机安装 redis 或使用云数据库,这里直接使用Docker启动一个 redis 环境,方便学习使用。

使用下面的命令启动一个名为 redis507 的 5.0.7 版本的 redis server环境。

docker run --name redis507 -p 6379:6379 -d redis:5.0.7

**注意:**此处的版本、容器名和端口号可以根据自己需要设置。

启动一个 redis-cli 连接上面的 redis server。

docker run -it --network host --rm redis:5.0.7 redis-cli

go-redis库

安装

Go 社区中目前有很多成熟的 redis client 库,比如[https://github.com/gomodule/redigo 和https://github.com/go-redis/redis,读者可以自行选择适合自己的库。本书使用 go-redis 这个库来操作 Redis 数据库。

使用以下命令下安装 go-redis 库。

go get github.com/go-redis/redis/v8
连接
普通连接模式

go-redis 库中使用 redis.NewClient 函数连接 Redis 服务器。

rdb := redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "", // 密码DB:       0,  // 数据库PoolSize: 20, // 连接池大小
})

除此之外,还可以使用 redis.ParseURL 函数从表示数据源的字符串中解析得到 Redis 服务器的配置信息。

opt, err := redis.ParseURL("redis://<user>:<pass>@localhost:6379/<db>")
if err != nil {panic(err)
}rdb := redis.NewClient(opt)
TLS连接模式

如果使用的是 TLS 连接方式,则需要使用 tls.Config 配置。

rdb := redis.NewClient(&redis.Options{TLSConfig: &tls.Config{MinVersion: tls.VersionTLS12,// Certificates: []tls.Certificate{cert},// ServerName: "your.domain.com",},
})
Redis Sentinel模式

使用下面的命令连接到由 Redis Sentinel 管理的 Redis 服务器。

rdb := redis.NewFailoverClient(&redis.FailoverOptions{MasterName:    "master-name",SentinelAddrs: []string{":9126", ":9127", ":9128"},
})
Redis Cluster模式

使用下面的命令连接到 Redis Cluster,go-redis 支持按延迟或随机路由命令。

rdb := redis.NewClusterClient(&redis.ClusterOptions{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},// 若要根据延迟或随机路由命令,请启用以下命令之一// RouteByLatency: true,// RouteRandomly: true,
})

基本使用

执行命令

下面的示例代码演示了 go-redis 库的基本使用。

// doCommand go-redis基本使用示例
func doCommand() {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()// 执行命令获取结果val, err := rdb.Get(ctx, "key").Result()fmt.Println(val, err)// 先获取到命令对象cmder := rdb.Get(ctx, "key")fmt.Println(cmder.Val()) // 获取值fmt.Println(cmder.Err()) // 获取错误// 直接执行命令获取错误err = rdb.Set(ctx, "key", 10, time.Hour).Err()// 直接执行命令获取值value := rdb.Get(ctx, "key").Val()fmt.Println(value)
}
执行任意命令

go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行。具体使用方法如下。

// doDemo rdb.Do 方法使用示例
func doDemo() {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()// 直接执行命令获取错误err := rdb.Do(ctx, "set", "key", 10, "EX", 3600).Err()fmt.Println(err)// 执行命令获取结果val, err := rdb.Do(ctx, "get", "key").Result()fmt.Println(val, err)
}
redis.Nil

go-redis 库提供了一个 redis.Nil 错误来表示 Key 不存在的错误。因此在使用 go-redis 时需要注意对返回错误的判断。在某些场景下我们应该区别处理 redis.Nil 和其他不为 nil 的错误。

// getValueFromRedis redis.Nil判断
func getValueFromRedis(key, defaultValue string) (string, error) {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()val, err := rdb.Get(ctx, key).Result()if err != nil {// 如果返回的错误是key不存在if errors.Is(err, redis.Nil) {return defaultValue, nil}// 出其他错了return "", err}return val, nil
}

其他示例

zset示例

下面的示例代码演示了如何使用 go-redis 库操作 zset。

// zsetDemo 操作zset示例
func zsetDemo() {// keyzsetKey := "language_rank"// valuelanguages := []*redis.Z{{Score: 90.0, Member: "Golang"},{Score: 98.0, Member: "Java"},{Score: 95.0, Member: "Python"},{Score: 97.0, Member: "JavaScript"},{Score: 99.0, Member: "C/C++"},}ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()// ZADDerr := rdb.ZAdd(ctx, zsetKey, languages...).Err()if err != nil {fmt.Printf("zadd failed, err:%v\n", err)return}fmt.Println("zadd success")// 把Golang的分数加10newScore, err := rdb.ZIncrBy(ctx, zsetKey, 10.0, "Golang").Result()if err != nil {fmt.Printf("zincrby failed, err:%v\n", err)return}fmt.Printf("Golang's score is %f now.\n", newScore)// 取分数最高的3个ret := rdb.ZRevRangeWithScores(ctx, zsetKey, 0, 2).Val()for _, z := range ret {fmt.Println(z.Member, z.Score)}// 取95~100分的op := &redis.ZRangeBy{Min: "95",Max: "100",}ret, err = rdb.ZRangeByScoreWithScores(ctx, zsetKey, op).Result()if err != nil {fmt.Printf("zrangebyscore failed, err:%v\n", err)return}for _, z := range ret {fmt.Println(z.Member, z.Score)}
}

执行上面的函数将得到如下输出结果。

zadd success
Golang's score is 100.000000 now.
Golang 100
C/C++ 99
Java 98
Python 95
JavaScript 97
Java 98
C/C++ 99
Golang 100
扫描或遍历所有key

你可以使用KEYS prefix:* 命令按前缀获取所有 key。

vals, err := rdb.Keys(ctx, "prefix*").Result()

但是如果需要扫描数百万的 key ,那速度就会比较慢。这种场景下你可以使用Scan 命令来遍历所有符合要求的 key。

// scanKeysDemo1 按前缀查找所有key示例
func scanKeysDemo1() {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()var cursor uint64for {var keys []stringvar err error// 按前缀扫描keykeys, cursor, err = rdb.Scan(ctx, cursor, "prefix:*", 0).Result()if err != nil {panic(err)}for _, key := range keys {fmt.Println("key", key)}if cursor == 0 { // no more keysbreak}}
}

Go-redis 允许将上面的代码简化为如下示例。

// scanKeysDemo2 按前缀扫描key示例
func scanKeysDemo2() {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()// 按前缀扫描keyiter := rdb.Scan(ctx, 0, "prefix:*", 0).Iterator()for iter.Next(ctx) {fmt.Println("keys", iter.Val())}if err := iter.Err(); err != nil {panic(err)}
}

例如,我们可以写出一个将所有匹配指定模式的 key 删除的示例。

// delKeysByMatch 按match格式扫描所有key并删除
func delKeysByMatch(match string, timeout time.Duration) {ctx, cancel := context.WithTimeout(context.Background(), timeout)defer cancel()if p := recover(); p != nil {tx.Rollback()panic(p) // re-throw panic after Rollback} else if err != nil {fmt.Println("rollback")tx.Rollback() // err is non-nil; don't change it} else {err = tx.Commit() // err is nil; if Commit returns error update errfmt.Println("commit")}}()sqlStr1 := "Update user set age=20 where id=?"rs, err := tx.Exec(sqlStr1, 1)if err!= nil{return err}n, err := rs.RowsAffected()if err != nil {return err}if n != 1 {return errors.New("exec sqlStr1 failed")}sqlStr2 := "Update user set age=50 where i=?"rs, err = tx.Exec(sqlStr2, 5)if err!=nil{return err}n, err = rs.RowsAffected()if err != nil {return err}if n != 1 {return errors.New("exec sqlStr1 failed")}return err
}

sqlx.In

sqlx.Insqlx提供的一个非常方便的函数。

sqlx.In的批量插入示例
表结构

为了方便演示插入数据操作,这里创建一个user表,表结构如下:

CREATE TABLE `user` (`id` BIGINT(20) NOT NULL AUTO_INCREMENT,`name` VARCHAR(20) DEFAULT '',`age` INT(11) DEFAULT '0',PRIMARY KEY(`id`)
)ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8mb4;
结构体

定义一个user结构体,字段通过tag与数据库中user表的列一致。

type User struct {Name string `db:"name"`Age  int    `db:"age"`
}
bindvars(绑定变量)

查询占位符?在内部称为bindvars(查询占位符),它非常重要。你应该始终使用它们向数据库发送值,因为它们可以防止SQL注入攻击。database/sql不尝试对查询文本进行任何验证;它与编码的参数一起按原样发送到服务器。除非驱动程序实现一个特殊的接口,否则在执行之前,查询是在服务器上准备的。因此bindvars是特定于数据库的:

  • MySQL中使用?
  • PostgreSQL使用枚举的$1$2等bindvar语法
  • SQLite中?$1的语法都支持
  • Oracle中使用:name的语法

bindvars的一个常见误解是,它们用来在sql语句中插入值。它们其实仅用于参数化,不允许更改SQL语句的结构。例如,使用bindvars尝试参数化列或表名将不起作用:

// ?不能用来插入表名(做SQL语句中表名的占位符)
db.Query("SELECT * FROM ?", "mytable")// ?也不能用来插入列名(做SQL语句中列名的占位符)
db.Query("SELECT ?, ? FROM people", "name", "location")
自己拼接语句实现批量插入

比较笨,但是很好理解。就是有多少个User就拼接多少个(?, ?)

// BatchInsertUsers 自行构造批量插入的语句
func BatchInsertUsers(users []*User) error {// 存放 (?, ?) 的slicevalueStrings := make([]string, 0, len(users))// 存放values的slicevalueArgs := make([]interface{}, 0, len(users) * 2)// 遍历users准备相关数据for _, u := range users {// 此处占位符要与插入值的个数对应valueStrings = append(valueStrings, "(?, ?)")valueArgs = append(valueArgs, u.Name)valueArgs = append(valueArgs, u.Age)}// 自行拼接要执行的具体语句stmt := fmt.Sprintf("INSERT INTO user (name, age) VALUES %s",strings.Join(valueStrings, ","))_, err := DB.Exec(stmt, valueArgs...)return err
}
使用sqlx.In实现批量插入

前提是需要我们的结构体实现driver.Valuer接口:

func (u User) Value() (driver.Value, error) {return []interface{}{u.Name, u.Age}, nil
}

使用sqlx.In实现批量插入代码如下:

// BatchInsertUsers2 使用sqlx.In帮我们拼接语句和参数, 注意传入的参数是[]interface{}
func BatchInsertUsers2(users []interface{}) error {query, args, _ := sqlx.In("INSERT INTO user (name, age) VALUES (?), (?), (?)",users..., // 如果arg实现了 driver.Valuer, sqlx.In 会通过调用 Value()来展开它)fmt.Println(query) // 查看生成的querystringfmt.Println(args)  // 查看生成的args_, err := DB.Exec(query, args...)return err
}
使用NamedExec实现批量插入

注意 :该功能需1.3.1版本以上,并且1.3.1版本目前还有点问题,sql语句最后不能有空格和;,详见issues/690。

使用NamedExec实现批量插入的代码如下:

// BatchInsertUsers3 使用NamedExec实现批量插入
func BatchInsertUsers3(users []*User) error {_, err := DB.NamedExec("INSERT INTO user (name, age) VALUES (:name, :age)", users)return err
}

把上面三种方法综合起来试一下:

func main() {err := initDB()if err != nil {panic(err)}defer DB.Close()u1 := User{Name: "七米", Age: 18}u2 := User{Name: "q1mi", Age: 28}u3 := User{Name: "小王子", Age: 38}// 方法1users := []*User{&u1, &u2, &u3}err = BatchInsertUsers(users)if err != nil {fmt.Printf("BatchInsertUsers failed, err:%v\n", err)}// 方法2users2 := []interface{}{u1, u2, u3}err = BatchInsertUsers2(users2)if err != nil {fmt.Printf("BatchInsertUsers2 failed, err:%v\n", err)}// 方法3users3 := []*User{&u1, &u2, &u3}err = BatchInsertUsers3(users3)if err != nil {fmt.Printf("BatchInsertUsers3 failed, err:%v\n", err)}
}
sqlx.In的查询示例

关于sqlx.In这里再补充一个用法,在sqlx查询语句中实现In查询和FIND_IN_SET函数。即实现SELECT * FROM user WHERE id in (3, 2, 1);SELECT * FROM user WHERE id in (3, 2, 1) ORDER BY FIND_IN_SET(id, '3,2,1');

in查询

查询id在给定id集合中的数据。

// QueryByIDs 根据给定ID查询
func QueryByIDs(ids []int)(users []User, err error){// 动态填充idquery, args, err := sqlx.In("SELECT name, age FROM user WHERE id IN (?)", ids)if err != nil {return}// sqlx.In 返回带 `?` bindvar的查询语句, 我们使用Rebind()重新绑定它query = DB.Rebind(query)err = DB.Select(&users, query, args...)return
}
in查询和FIND_IN_SET函数

查询id在给定id集合的数据并维持给定id集合的顺序。

// QueryAndOrderByIDs 按照指定id查询并维护顺序
func QueryAndOrderByIDs(ids []int)(users []User, err error){// 动态填充idstrIDs := make([]string, 0, len(ids))for _, id := range ids {strIDs = append(strIDs, fmt.Sprintf("%d", id))}query, args, err := sqlx.In("SELECT name, age FROM user WHERE id IN (?) ORDER BY FIND_IN_SET(id, ?)", ids, strings.Join(strIDs, ","))if err != nil {return}// sqlx.In 返回带 `?` bindvar的查询语句, 我们使用Rebind()重新绑定它query = DB.Rebind(query)err = DB.Select(&users, query, args...)return
}

当然,在这个例子里面你也可以先使用IN查询,然后通过代码按给定的ids对查询结果进行排序。

5.3 Go操作Redis

Go语言go-redis库的基本使用。

Redis介绍

Redis是一个开源的内存数据库,Redis提供了多种不同类型的数据结构,很多业务场景下的问题都可以很自然地映射到这些数据结构上。除此之外,通过复制、持久化和客户端分片等特性,我们可以很方便地将Redis扩展成一个能够包含数百GB数据、每秒处理上百万次请求的系统。

Redis支持的数据结构

Redis支持诸如字符串(string)、哈希(hashe)、列表(list)、集合(set)、带范围查询的排序集合(sorted set)、bitmap、hyperloglog、带半径查询的地理空间索引(geospatial index)和流(stream)等数据结构。

Redis应用场景
  • 缓存系统,减轻主数据库(MySQL)的压力。
  • 计数场景,比如微博、抖音中的关注数和粉丝数。
  • 热门排行榜,需要排序的场景特别适合使用ZSET。
  • 利用 LIST 可以实现队列的功能。
  • 利用 HyperLogLog 统计UV、PV等数据。
  • 使用 geospatial index 进行地理位置相关查询。

准备Redis环境

读者可以选择在本机安装 redis 或使用云数据库,这里直接使用Docker启动一个 redis 环境,方便学习使用。

使用下面的命令启动一个名为 redis507 的 5.0.7 版本的 redis server环境。

docker run --name redis507 -p 6379:6379 -d redis:5.0.7

**注意:**此处的版本、容器名和端口号可以根据自己需要设置。

启动一个 redis-cli 连接上面的 redis server。

docker run -it --network host --rm redis:5.0.7 redis-cli

go-redis库

安装

Go 社区中目前有很多成熟的 redis client 库,比如[https://github.com/gomodule/redigo 和https://github.com/go-redis/redis,读者可以自行选择适合自己的库。本书使用 go-redis 这个库来操作 Redis 数据库。

使用以下命令下安装 go-redis 库。

go get github.com/go-redis/redis/v8
连接
普通连接模式

go-redis 库中使用 redis.NewClient 函数连接 Redis 服务器。

rdb := redis.NewClient(&redis.Options{Addr:     "localhost:6379",Password: "", // 密码DB:       0,  // 数据库PoolSize: 20, // 连接池大小
})

除此之外,还可以使用 redis.ParseURL 函数从表示数据源的字符串中解析得到 Redis 服务器的配置信息。

opt, err := redis.ParseURL("redis://<user>:<pass>@localhost:6379/<db>")
if err != nil {panic(err)
}rdb := redis.NewClient(opt)
TLS连接模式

如果使用的是 TLS 连接方式,则需要使用 tls.Config 配置。

rdb := redis.NewClient(&redis.Options{TLSConfig: &tls.Config{MinVersion: tls.VersionTLS12,// Certificates: []tls.Certificate{cert},// ServerName: "your.domain.com",},
})
Redis Sentinel模式

使用下面的命令连接到由 Redis Sentinel 管理的 Redis 服务器。

rdb := redis.NewFailoverClient(&redis.FailoverOptions{MasterName:    "master-name",SentinelAddrs: []string{":9126", ":9127", ":9128"},
})
Redis Cluster模式

使用下面的命令连接到 Redis Cluster,go-redis 支持按延迟或随机路由命令。

rdb := redis.NewClusterClient(&redis.ClusterOptions{Addrs: []string{":7000", ":7001", ":7002", ":7003", ":7004", ":7005"},// 若要根据延迟或随机路由命令,请启用以下命令之一// RouteByLatency: true,// RouteRandomly: true,
})

基本使用

执行命令

下面的示例代码演示了 go-redis 库的基本使用。

// doCommand go-redis基本使用示例
func doCommand() {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()// 执行命令获取结果val, err := rdb.Get(ctx, "key").Result()fmt.Println(val, err)// 先获取到命令对象cmder := rdb.Get(ctx, "key")fmt.Println(cmder.Val()) // 获取值fmt.Println(cmder.Err()) // 获取错误// 直接执行命令获取错误err = rdb.Set(ctx, "key", 10, time.Hour).Err()// 直接执行命令获取值value := rdb.Get(ctx, "key").Val()fmt.Println(value)
}
执行任意命令

go-redis 还提供了一个执行任意命令或自定义命令的 Do 方法,特别是一些 go-redis 库暂时不支持的命令都可以使用该方法执行。具体使用方法如下。

// doDemo rdb.Do 方法使用示例
func doDemo() {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()// 直接执行命令获取错误err := rdb.Do(ctx, "set", "key", 10, "EX", 3600).Err()fmt.Println(err)// 执行命令获取结果val, err := rdb.Do(ctx, "get", "key").Result()fmt.Println(val, err)
}
redis.Nil

go-redis 库提供了一个 redis.Nil 错误来表示 Key 不存在的错误。因此在使用 go-redis 时需要注意对返回错误的判断。在某些场景下我们应该区别处理 redis.Nil 和其他不为 nil 的错误。

// getValueFromRedis redis.Nil判断
func getValueFromRedis(key, defaultValue string) (string, error) {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()val, err := rdb.Get(ctx, key).Result()if err != nil {// 如果返回的错误是key不存在if errors.Is(err, redis.Nil) {return defaultValue, nil}// 出其他错了return "", err}return val, nil
}

其他示例

zset示例

下面的示例代码演示了如何使用 go-redis 库操作 zset。

// zsetDemo 操作zset示例
func zsetDemo() {// keyzsetKey := "language_rank"// valuelanguages := []*redis.Z{{Score: 90.0, Member: "Golang"},{Score: 98.0, Member: "Java"},{Score: 95.0, Member: "Python"},{Score: 97.0, Member: "JavaScript"},{Score: 99.0, Member: "C/C++"},}ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()// ZADDerr := rdb.ZAdd(ctx, zsetKey, languages...).Err()if err != nil {fmt.Printf("zadd failed, err:%v\n", err)return}fmt.Println("zadd success")// 把Golang的分数加10newScore, err := rdb.ZIncrBy(ctx, zsetKey, 10.0, "Golang").Result()if err != nil {fmt.Printf("zincrby failed, err:%v\n", err)return}fmt.Printf("Golang's score is %f now.\n", newScore)// 取分数最高的3个ret := rdb.ZRevRangeWithScores(ctx, zsetKey, 0, 2).Val()for _, z := range ret {fmt.Println(z.Member, z.Score)}// 取95~100分的op := &redis.ZRangeBy{Min: "95",Max: "100",}ret, err = rdb.ZRangeByScoreWithScores(ctx, zsetKey, op).Result()if err != nil {fmt.Printf("zrangebyscore failed, err:%v\n", err)return}for _, z := range ret {fmt.Println(z.Member, z.Score)}
}

执行上面的函数将得到如下输出结果。

zadd success
Golang's score is 100.000000 now.
Golang 100
C/C++ 99
Java 98
Python 95
JavaScript 97
Java 98
C/C++ 99
Golang 100
扫描或遍历所有key

你可以使用KEYS prefix:* 命令按前缀获取所有 key。

vals, err := rdb.Keys(ctx, "prefix*").Result()

但是如果需要扫描数百万的 key ,那速度就会比较慢。这种场景下你可以使用Scan 命令来遍历所有符合要求的 key。

// scanKeysDemo1 按前缀查找所有key示例
func scanKeysDemo1() {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()var cursor uint64for {var keys []stringvar err error// 按前缀扫描keykeys, cursor, err = rdb.Scan(ctx, cursor, "prefix:*", 0).Result()if err != nil {panic(err)}for _, key := range keys {fmt.Println("key", key)}if cursor == 0 { // no more keysbreak}}
}

Go-redis 允许将上面的代码简化为如下示例。

// scanKeysDemo2 按前缀扫描key示例
func scanKeysDemo2() {ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)defer cancel()// 按前缀扫描keyiter := rdb.Scan(ctx, 0, "prefix:*", 0).Iterator()for iter.Next(ctx) {fmt.Println("keys", iter.Val())}if err := iter.Err(); err != nil {panic(err)}
}

例如,我们可以写出一个将所有匹配指定模式的 key 删除的示例。

// delKeysByMatch 按match格式扫描所有key并删除
func delKeysByMatch(match string, timeout time.Duration) {ctx, cancel := context.WithTimeout(context.Background(), timeout)defer cancel()iter := rdb.Scan(ctx, 0, matc

golang学习笔记(进阶篇)相关推荐

  1. Vue学习笔记进阶篇——Render函数

    本文为转载,原文:Vue学习笔记进阶篇--Render函数 基础 Vue 推荐在绝大多数情况下使用 template 来创建你的 HTML.然而在一些场景中,你真的需要 JavaScript 的完全编 ...

  2. PHP学习笔记 - 进阶篇(7)

    PHP学习笔记 - 进阶篇(7) 文件操作 读取文件内容 PHP具有丰富的文件操作函数,最简单的读取文件的函数为file_get_contents,可以将整个文件全部读取到一个字符串中. $conte ...

  3. Vue学习笔记进阶篇——多元素及多组件过渡

    本文为转载,原文:Vue学习笔记进阶篇--多元素及多组件过渡 多元素的过渡 对于原生标签可以使用 v-if/v-else.但是有一点需要注意: 当有相同标签名的元素切换时,需要通过 key 特性设置唯 ...

  4. vue-resource post php,Vue学习笔记进阶篇——vue-resource安装及使用

    简介 vue-resource是Vue.js的一款插件,它可以通过XMLHttpRequest或JSONP发起请求并处理响应.也就是说,$.ajax能做的事情,vue-resource插件一样也能做到 ...

  5. DDD学习笔记 - 进阶篇(Ⅱ)

    09 | 中台:数字转型后到底应该共享什么? 课程链接:https://time.geekbang.org/column/article/159580 中台是数字化转型的一个热门话题.继阿里提出中台概 ...

  6. 极客HTTP协议学习笔记破冰篇(1-7)

    极客HTTP协议学习笔记破冰篇(1-7) 前言 各篇章笔记链接 一.学习笔记 1.HTTP的前世今生 2.HTTP是什么 3.与HTTP相关的各种概念(上) 4.与HTTP相关的各种概念(下) 5.常 ...

  7. golang学习笔记(基础篇)

    LCY~~Golang学习笔记 一.Go语言开发环境 ##安装Go开发包以及VsCode Go开发包与vscode配置安装教程网址:https://www.liwenzhou.com/posts/Go ...

  8. C#笔记进阶篇03 抽象函数与抽象类

    C#笔记进阶篇03 抽象函数与抽象类 --本系列是基于人民邮电出版社<C#2008 C#图解教程>.清华大学出版社<C#入门经典(第五版)>两本书的自学C#笔记,如果您发现了本 ...

  9. golang学习笔记-1

    golang学习笔记-1 自学golang时,找到一篇学习资料http://golang.iswbm.com/en/latest/c01/c01_08.html 初看上面两种写法,不明就里,哪里高级了 ...

  10. 118云原生编程语言Golang学习笔记

    Golang学习笔记 文章目录 1.Go简介 1.1 简介 1.2 设计初衷 1.3 Golang vs Java 1.4 应用领域 1.5 用go语言的公司 2.Go下载和安装 2.1 开发工具 2 ...

最新文章

  1. c# 关闭软件 进程 杀死进程
  2. salesforce学习框架图
  3. JZ2440学习总结1
  4. 数码相框项目之显示一张可放大、缩小、拖拽的图片
  5. Go 开发关键技术指南 | 为什么你要选择 GO?(内含超全知识大图)
  6. 历史上今天和成语辞典 进入美国区教育类 what's hot
  7. ios上的pvr与png
  8. Ilist 和list的区别归纳总结
  9. xmp文件格式怎么导入ps?ACR预设安装方法
  10. 苹果笔记本链接刷卡打印机教程
  11. PostgreSQL pg_stats used to estimate top N freps values and explain rows
  12. tolower c语言,tolower ()在c语言中是什么意思
  13. Python用 matplotlib 工具包来绘制世界地图
  14. 通过EXCEL中的FILTERXML函数实现批量翻译
  15. GlobalSign 域名型 SSL 证书
  16. 【BUCTOJ训练: 求和(Python)】
  17. 网站页面SEO优化方案
  18. 攻防世界_江苏工匠杯_MISC_看雪看雪看雪
  19. Tableau(9):计算字段、表计算、自定义表计算
  20. argparse用于解析命令行参数

热门文章

  1. 什么是java线程?java线程模型的组成
  2. 研究生语音识别课程作业记录(一) 非特定人孤立词识别
  3. JS将秒数换算成时分秒 以及转化为年月日 时分秒
  4. html圆形图片切换,jQuery和CSS3炫酷圆形图片切换特效
  5. RBF-UKF径向基神经网络结合无迹卡尔曼滤波估计锂离子电池SOC(附MATLAB代码)
  6. Web自动化测试教程
  7. Weka安装及简单应用
  8. Hadoop笔记-01概述
  9. Github Emoji——Github表情大全
  10. 卫生间智能取纸机选型知识合集