嗨,大家好!我叫 Sergey Kamardin,是 Mail.Ru 的一名开发人员。

这篇文章是关于我们如何用 Go 开发高负载的 WebSocket 服务器。

如果您熟悉 WebSockets,但对 Go 知之甚少,我希望您仍然会发现这篇文章在性能优化的思想和技术方面很有趣。

一、简介

为了定义我们故事的上下文,应该就我们为什么需要这个服务器说几句话。

Mail.Ru 有很多有状态的系统。用户电子邮件存储就是其中之一。有多种方法可以跟踪系统内的状态变化以及系统事件。这主要是通过定期系统轮询或有关其状态更改的系统通知来实现的。

这两种方式各有利弊。但在邮件方面,用户收到新邮件的速度越快越好。

邮件轮询每秒涉及大约 50,000 个 HTTP 查询,其中 60% 返回 304 状态,这意味着邮箱没有变化。

因此,为了减少服务器上的负载并加快向用户发送邮件的速度,决定通过编写发布者-订阅者服务器(也称为总线、消息代理或事件-频道),一方面接收有关状态更改的通知,另一方面接收此类通知的订阅。

之前:

现在:

第一个方案显示了它以前的样子。浏览器定期轮询 API 并询问存储(邮箱服务)更改。

第二个方案描述了新的架构。浏览器与通知 API 建立 WebSocket 连接,通知 API 是总线服务器的客户端。收到新电子邮件后,Storage 会向 Bus (1) 和 Bus 向其订阅者 (2) 发送有关它的通知。API 确定发送接收到的通知的连接,并将其发送到用户的浏览器 (3)。

所以今天我们将讨论 API 或 WebSocket 服务器。展望未来,我会告诉您服务器将有大约 300 万个在线连接。

2.惯用方式

让我们看看如何在没有任何优化的情况下使用普通的 Go 功能来实现我们服务器的某些部分。

在我们继续之前net/http,让我们谈谈我们将如何发送和接收数据。位于WebSocket 协议之上的数据(例如 JSON 对象)在下文中将被称为数据包

让我们开始实现Channel包含通过 WebSocket 连接发送和接收此类数据包的逻辑的结构。

2.1. 通道结构

<span style="color:var(--gray85)"><code class="language-go">// Packet represents application level data.
type Packet struct {...
}// Channel wraps user connection.
type Channel struct {conn net.Conn    // WebSocket connection.send chan Packet // Outgoing packets queue.
}func NewChannel(conn net.Conn) *Channel {c := &Channel{conn: conn,send: make(chan Packet, N),}go c.reader()go c.writer()return c
}</code></span>

WebSocket 通道实现。

我想提请您注意两个读写 goroutines 的发布。每个 goroutine 都需要自己的内存堆栈,根据操作系统和 Go 版本,其初始大小可能为 2 到 8 KB 。

对于上面提到的 300 万个在线连接数,我们将需要24 GB 的内存(堆栈为 4 KB)用于所有连接。这还没有为Channel结构、传出数据包ch.send和其他内部字段分配内存。

2.2. I/O 协程

我们来看看“reader”的实现:

<span style="color:var(--gray85)"><code class="language-go">func (c *Channel) reader() {// We make a buffered read to reduce read syscalls.buf := bufio.NewReader(c.conn)for {pkt, _ := readPacket(buf)c.handle(pkt)}
}</code></span>

频道的阅读 goroutine。

这里我们使用bufio.Reader来减少read()系统调用的数量并读取buf缓冲区大小允许的数量。在无限循环中,我们期待新数据的到来。请记住这句话:期待新数据的到来。我们稍后会回到他们身边。

我们将把传入数据包的解析和处理放在一边,因为这对于我们将要讨论的优化并不重要。但是,buf现在值得我们注意:默认情况下,它是 4 KB,这意味着我们的连接还有12 GB的内存。“作家”也有类似的情况:

<span style="color:var(--gray85)"><code class="language-go">func (c *Channel) writer() {// We make buffered write to reduce write syscalls. buf := bufio.NewWriter(c.conn)for pkt := range c.send {_ := writePacket(buf, pkt)buf.Flush()}
}</code></span>

Channel 的编写 goroutine。

我们遍历传出数据包通道c.send并将它们写入缓冲区。正如我们细心的读者已经猜到的,这是我们 300 万个连接的另外 4 KB 和12 GB内存。

2.3. HTTP

我们已经有了一个简单的Channel实现,现在我们需要获得一个 WebSocket 连接来使用。由于我们还在惯用方式标题下,让我们按照相应的方式进行操作。

注意:如果你不知道 WebSocket 是如何工作的,应该提到客户端通过一种叫做 Upgrade 的特殊 HTTP 机制切换到 WebSocket 协议。升级请求成功处理后,服务器和客户端使用 TCP 连接交换二进制 WebSocket 帧。这里介绍一下连接内部的框架结构。

<span style="color:var(--gray85)"><code class="language-go">import ("net/http""some/websocket"
)http.HandleFunc("/v1/ws", func(w http.ResponseWriter, r *http.Request) {conn, _ := websocket.Upgrade(r, w)ch := NewChannel(conn)//...
})</code></span>

升级到 WebSocket 的惯用方式。

请注意,http.ResponseWriterbufio.Readerbufio.Writer(均具有 4 KB 缓冲区)进行内存分配以进行*http.Request初始化和进一步的响应写入。

无论使用何种WebSocket 库,在成功响应Upgrade 请求后,服务器都会收到I/O 缓冲区以及responseWriter.Hijack()调用后的TCP 连接。

提示:在某些情况下,go:linkname可以通过调用将缓冲区返回到sync.Pool内部。net/httpnet/http.putBufio{Reader,Writer}

因此,我们需要另外24 GB的内存用于 300 万个连接。

因此,总共72 GB的内存用于尚未执行任何操作的应用程序!

3. 优化

让我们回顾一下我们在介绍部分讨论过的内容,并记住用户连接的行为。切换到 WebSocket 后,客户端发送一个包含相关事件的数据包,或者换句话说,订阅事件。然后(不考虑诸如 之类的技术消息ping/pong),客户端可能在整个连接生命周期内不发送任何其他消息。

连接寿命可能会持续几秒到几天。

因此,对于大多数的时间我们Channel.reader()Channel.writer()正在等待数据的处理用于接收或发送。与它们一起等待的是每个 4 KB 的 I/O 缓冲区。

现在很明显,某些事情可以做得更好,不是吗?

3.1. 网络民意调查

你还记得Channel.reader(),落实预期的新的数据来通过被锁在conn.Read()里面的电话bufio.Reader.Read()?如果连接中有数据,Go 运行时“唤醒”我们的 goroutine 并允许它读取下一个数据包。之后,goroutine 在等待新数据时再次被锁定。让我们看看 Go 运行时是如何理解 goroutine 必须被“唤醒”的。

如果我们查看conn.Read() 实现,我们将看到其中的net.netFD.Read() 调用:

<span style="color:var(--gray85)"><code class="language-go">// net/fd_unix.gofunc (fd *netFD) Read(p []byte) (n int, err error) {//...for {n, err = syscall.Read(fd.sysfd, p)if err != nil {n = 0if err == syscall.EAGAIN {if err = fd.pd.waitRead(); err == nil {continue}}}//...break}//...
}</code></span>

Go 内部关于非阻塞读取。

Go 在非阻塞模式下使用套接字。EAGAIN 表示套接字中没有数据并且不会在从空套接字读取时被锁定,操作系统将控制权返回给我们。

我们看到read()来自连接文件描述符的系统调用。如果 read 返回EAGAIN 错误,运行时会进行pollDesc.waitRead() 调用:

<span style="color:var(--gray85)"><code class="language-go">// net/fd_poll_runtime.gofunc (pd *pollDesc) waitRead() error {return pd.wait('r')
}func (pd *pollDesc) wait(mode int) error {res := runtime_pollWait(pd.runtimeCtx, mode)//...
}</code></span>

转到有关 netpoll 使用的内部信息。

如果我们深入挖掘,我们会看到在 Linux 中使用epoll和在 BSD 中使用kqueue实现了 netpoll 。为什么不对我们的连接使用相同的方法?我们可以分配一个读取缓冲区并仅在真正需要时启动读取 goroutine:当套接字中有真正可读的数据时。

在github.com/golang/go上,有导出netpoll函数的问题。

3.2. 摆脱 goroutines

假设我们有Go 的netpoll 实现。现在我们可以避免Channel.reader()使用内部缓冲区启动goroutine,并订阅连接中可读数据的事件:

<span style="color:var(--gray85)"><code class="language-go">ch := NewChannel(conn)// Make conn to be observed by netpoll instance.
poller.Start(conn, netpoll.EventRead, func() {// We spawn goroutine here to prevent poller wait loop// to become locked during receiving packet from ch.go Receive(ch)
})// Receive reads a packet from conn and handles it somehow.
func (ch *Channel) Receive() {buf := bufio.NewReader(ch.conn)pkt := readPacket(buf)c.handle(pkt)
}</code></span>

使用网络轮询。

使用 更容易,Channel.writer()因为我们可以运行 goroutine 并仅在我们要发送数据包时分配缓冲区:

<span style="color:var(--gray85)"><code class="language-go">func (ch *Channel) Send(p Packet) {if c.noWriterYet() {go ch.writer()}ch.send <- p
}</code></span>

仅在需要时启动编写器 goroutine。

请注意,我们不处理操作系统EAGAINwrite()系统调用时返回的情况。我们在这种情况下依赖 Go 运行时,因为这种服务器实际上很少见。尽管如此,如果需要,它可以以相同的方式处理。

ch.send(一个或多个)读取传出数据包后,写入器将完成其操作并释放 goroutine 堆栈和发送缓冲区。

完美的!通过去除两个连续运行的 goroutine 中的堆栈和 I/O 缓冲区,我们节省了48 GB

3.3. 资源控制

大量的连接不仅涉及高内存消耗。在开发服务器时,我们经历了重复的竞争条件和死锁,通常伴随着所谓的自我 DDoS——应用程序客户端猖獗地尝试连接到服务器从而进一步破坏它的情况。

例如,如果由于某种原因我们突然无法处理ping/pong消息,但空闲连接的处理程序继续关闭这些连接(假设连接断开因此没有提供数据),客户端似乎每 N 秒失去一次连接并尝试重新连接而不是等待事件。

如果锁定或过载的服务器刚刚停止接受新连接,并且它之前的平衡器(例如,nginx)将请求传递给下一个服务器实例,那就太好了。

此外,无论服务器负载如何,如果所有客户端突然出于任何原因(可能是由于错误原因)都想向我们发送数据包,则之前保存的48 GB将再次使用,因为我们实际上会回到初始状态每个连接的 goroutine 和缓冲区。

协程池

我们可以使用 goroutine 池限制同时处理的数据包数量。这是这种池的幼稚实现的样子:

<span style="color:var(--gray85)"><code class="language-go">package gopoolfunc New(size int) *Pool {return &Pool{work: make(chan func()),sem:  make(chan struct{}, size),}
}func (p *Pool) Schedule(task func()) error {select {case p.work <- task:case p.sem <- struct{}{}:go p.worker(task)}
}func (p *Pool) worker(task func()) {defer func() { <-p.sem }for {task()task = <-p.work}
}</code></span>

goroutine 池的简单实现。

现在我们的代码netpoll如下:

<span style="color:var(--gray85)"><code class="language-go">pool := gopool.New(128)poller.Start(conn, netpoll.EventRead, func() {// We will block poller wait loop when// all pool workers are busy.pool.Schedule(func() {Receive(ch)})
})</code></span>

处理 goroutine 池中的轮询事件。

所以现在我们不仅在套接字中出现可读数据时读取数据包,而且在第一次有机会占用池中的空闲 goroutine 时读取数据包。

同样,我们将更改Send()

<span style="color:var(--gray85)"><code class="language-go">pool := gopool.New(128)func (ch *Channel) Send(p Packet) {if c.noWriterYet() {pool.Schedule(ch.writer)}ch.send <- p
}</code></span>

重用编写 goroutine。

而不是go ch.writer(),我们想写入重用的 goroutine 之一。因此,对于一个Ngoroutine池,我们可以保证在N同时处理请求和到达时N + 1我们不会分配N + 1缓冲区用于读取。该够程池还允许我们限制Accept()Upgrade()新的连接,以避免与DDoS攻击大多数情况。

3.4. 零拷贝升级

让我们稍微偏离一下 WebSocket 协议。如前所述,客户端使用 HTTP 升级请求切换到 WebSocket 协议。这是它的样子:

<span style="color:var(--gray85)"><code class="language-http">GET /ws HTTP/1.1
Host: mail.ru
Connection: Upgrade
Sec-Websocket-Key: A3xNe7sEB9HixkmBhVrYaA==
Sec-Websocket-Version: 13
Upgrade: websocketHTTP/1.1 101 Switching Protocols
Connection: Upgrade
Sec-Websocket-Accept: ksu0wXWG+YmkVx+KQR2agP0cQn4=
Upgrade: websocket</code></span>

HTTP 升级示例。

也就是说,在我们的例子中,我们只需要 HTTP 请求及其标头来切换到 WebSocket 协议。这些知识和里面存储的内容http.Request表明,为了优化,我们可能会在处理 HTTP 请求时拒绝不必要的分配和复制,并放弃标准net/http服务器。

例如,http.Request包含一个具有同名 Header 类型的字段,通过将数据从连接复制到值字符串,无条件填充所有请求头。想象一下有多少额外的数据可以保存在这个字段中,例如对于一个大尺寸的 Cookie 标头。

但是拿什么作为回报呢?

WebSocket 实现

不幸的是,在我们的服务器优化时存在的所有库都允许我们只对标准net/http服务器进行升级。此外,这(两个)库中的任何一个都无法使用上述所有读写优化。为了使这些优化起作用,我们必须有一个相当低级的 API 来处理 WebSocket。为了重用缓冲区,我们需要 procotol 函数看起来像这样:

<span style="color:var(--gray85)"><code class="language-go">func ReadFrame(io.Reader) (Frame, error)
func WriteFrame(io.Writer, Frame) error</code></span>

如果我们有一个带有这样 API 的库,我们可以从连接中读取数据包,如下所示(数据包写入看起来是一样的):

<span style="color:var(--gray85)"><code class="language-go">// getReadBuf, putReadBuf are intended to
// reuse *bufio.Reader (with sync.Pool for example).
func getReadBuf(io.Reader) *bufio.Reader
func putReadBuf(*bufio.Reader)// readPacket must be called when data could be read from conn.
func readPacket(conn io.Reader) error {buf := getReadBuf()defer putReadBuf(buf)buf.Reset(conn)frame, _ := ReadFrame(buf)parsePacket(frame.Payload)//...
}</code></span>

预期的 WebSocket 实现 API。

简而言之,是时候创建我们自己的库了。

github.com/gobwas/ws

从意识形态上讲,该ws库的编写是为了不将其协议操作逻辑强加给用户。所有读写方法都接受标准io.Readerio.Writer接口,这使得使用或不使用缓冲或任何其他 I/O 包装器成为可能。

除了来自标准的升级请求net/httpws支持零拷贝升级,升级请求的处理和切换到WebSocket,无需内存分配或复制。ws.Upgrade()接受io.ReadWriternet.Conn实现这个接口)。换句话说,我们可以使用标准net.Listen()并将接收到的连接从ln.Accept()立即转移到ws.Upgrade()。该库可以复制任何请求数据以供将来在应用程序中使用(例如,Cookie验证会话)。

下面是升级请求处理的基准:标准net/http服务器与net.Listen()零拷贝升级:

<span style="color:var(--gray85)"><code class="language-http">BenchmarkUpgradeHTTP    5156 ns/op    8576 B/op    9 allocs/op
BenchmarkUpgradeTCP     973 ns/op     0 B/op       0 allocs/op</code></span>

切换到ws零拷贝升级救了我们另一个24 GB -通过在请求处理分配给I / O缓冲的空间net/http处理程序。

3.5. 概括

让我们来构建我告诉过你的优化。

  • 内部带有缓冲区的读取 goroutine 很昂贵。解决方案:netpoll (epoll, kqueue); 重用缓冲区。
  • 内部带有缓冲区的写入 goroutine 很昂贵。解决方案:必要时启动goroutine;重用缓冲区。
  • 随着连接风暴,netpoll 将无法工作。解决方案:重用 goroutines 并限制它们的数量。
  • net/http不是处理升级到 WebSocket 的最快方法。解决方案:在裸 TCP 连接上使用零拷贝升级。

这就是服务器代码的样子:

<span style="color:var(--gray85)"><code class="language-go">import ("net""github.com/gobwas/ws"
)ln, _ := net.Listen("tcp", ":8080")for {// Try to accept incoming connection inside free pool worker.// If there no free workers for 1ms, do not accept anything and try later.// This will help us to prevent many self-ddos or out of resource limit cases.err := pool.ScheduleTimeout(time.Millisecond, func() {conn := ln.Accept()_ = ws.Upgrade(conn)// Wrap WebSocket connection with our Channel struct.// This will help us to handle/send our app's packets.ch := NewChannel(conn)// Wait for incoming bytes from connection.poller.Start(conn, netpoll.EventRead, func() {// Do not cross the resource limits.pool.Schedule(func() {// Read and handle incoming packet(s).ch.Recevie()})})})if err != nil {   time.Sleep(time.Millisecond)}
}</code></span>

带有 netpoll、goroutine 池和零拷贝升级的示例 WebSocket 服务器。

4。结论

过早的优化是编程中所有邪恶(或至少大部分)的根源。唐纳德·克努斯

当然,上述优化是相关的,但并非在所有情况下。比如如果空闲资源(内存、CPU)和在线连接数的比例比较高,那么优化可能就没有意义了。但是,您可以从了解改进的地方和内容中受益匪浅。

感谢您的关注!

5. 参考文献

  • https://github.com/mailru/easygo
  • https://github.com/gobwas/ws
  • https://github.com/gobwas/ws-examples
  • https://github.com/gobwas/httphead
  • 这篇文章的俄文版

一百万个 WebSockets 和 Go相关推荐

  1. Go 语言优秀资源整理,为项目落地加速

    指导原则 简单性 复杂性把可读的程序变得不可读,复杂性终结了很多软件项目. 可读性 代码是给人看的,代码阅读时长远超编写.程序必须可维护,那可读是第一步. 生产率 拥有众多的工具集和基础库,可以很简单 ...

  2. go学习资料以及开源代码

    Go 语言方面的大牛,或者优秀 Go 项目的组织 mattn - 写了数百个 Go 项目,盛产优质项目 Unknwon - gogs/macaron 等项目作者,<The Way to Go&g ...

  3. 使用四种框架分别实现百万websocket常连接的服务器--转

    原文地址:http://colobu.com/2015/05/22/implement-C1000K-servers-by-spray-netty-undertow-and-node-js/#Nett ...

  4. 使用四种框架分别实现百万websocket常连接的服务器

    著名的 C10K 问题提出的时候, 正是 2001 年.这篇文章可以说是高性能服务器开发的一个标志性文档,它讨论的就是单机为1万个连接提供服务这个问题,当时因为硬件和软件的**,单机1万还是 一个非常 ...

  5. python每秒20个请求_使用Python每秒百万个请求

    python每秒20个请求 by Paweł Piotr Przeradowski 通过PawełPiotr Przeradowski 使用Python每秒百万个请求 (A million reque ...

  6. codeblock socket 编译错误_在 Go 中使用 Websockets 和 Socket.IO

    注 - 本教程是使用 Go 1.9 版和 googollee/go-socket.io 编写的 Websockets 我觉得非常有趣,在应用程序之间通信中使用标准 RESTful API 方案之外,它 ...

  7. 百万 Go TCP 连接的思考: epoll方式减少资源占用

    强烈推荐: 鸟窝 https://colobu.com/ 百万 Go TCP 连接的思考: epoll方式减少资源占用 前几天 Eran Yanay 在 Gophercon Israel 分享了一个讲 ...

  8. 行走的Offer收割机,首次公布Java10W字面经,Github访问量破百万

    面试,难还是不难?最终结果好还是不好?取决于面试者的底蕴(气场+技能).心态和认知以及沟通技巧.而一些主流的大型互联网公司面试(阿里巴巴.京东.美团.滴滴)更是需要你在面试时展现出自己的能力,从而获得 ...

  9. 如何构建无服务器 WebSockets 平台

    如何将 WebSockets 集成到您的堆栈中 您可以通过为客户端设置专用的 WebSocket 服务器来连接和接收更新来交付事件驱动的架构.然而,这种架构有几个缺点,包括需要管理和扩展服务器以及从该 ...

最新文章

  1. 【ACM】奇怪的回文数
  2. C#编译器选项(目标平台)
  3. Android中selector的使用
  4. mysql双主数据一致性_mysql双主复制的主备数据一致性知多少
  5. SFB 项目经验-57-Skype for business-录音系统-你拥有吗(模拟线路)
  6. centos 7 网络设置与图像化界面下载
  7. 团队任务2:冲刺前的准备
  8. Redmi K40系列要做旗舰“焊门员”:生死看淡 不服就焊
  9. 【Java】HashMap 和 Hashtable 的 6 个区别
  10. java怎么从数据库中查询_java – 从数据库中检索的实体与查询中的情况相同
  11. 单变量微分、导数与链式法则
  12. 关于HTML条件注释你可能不知道的一些事儿
  13. [IT新应用]无线投影技术
  14. 新建android模拟器无法拨号 真机可以拨号,Android模拟器相关操作设置
  15. 知网HTML阅读是什么,HTML – 屏幕阅读器究竟是什么?我应该如何处理我的网站?...
  16. python-普通pdf的添加水印
  17. PageHelper关闭count语句优化
  18. 源码必须会丨一个bug的解决过程,让你明白阅读源码的重要性!
  19. PMP报考流程以及注意事项
  20. rp3399之mipi接口ov4689摄像头驱动

热门文章

  1. Dockerfile构建mysql镜像,并初始化数据库数据
  2. Lattice CrossLinkNx LIFCL-40应用连载3-使用RISC-V软核
  3. 利用stress-ng压测来理解linux平均负载
  4. vue3 布局样式的原理
  5. unity图片变成马赛克如何取像素并改变颜色_像素和分辨率到底是什么?它们之间的关系是什么?| 手机相机的像素越高越好吗?...
  6. 阿里云安骑士性能特点与使用场景!
  7. FPGA基于VGA显示字符及图片
  8. pytorch - K折交叉验证过程说明及实现
  9. 做网站或者网站改版的费用是多少
  10. 火车头如何把标题加html标签,火车头采集中内容页及标签Xpath可视化提取功能的使用...