6行代码解决golang TCP粘包

转自:https://studygolang.com/articles/12483

什么是TCP粘包问题以及为什么会产生TCP粘包,本文不加讨论。本文使用golang的bufio.Scanner来实现自定义协议解包。

协议数据包定义

本文模拟一个日志服务器,该服务器接收客户端传到的数据包并显示出来

type Package struct {Version        [2]byte // 协议版本,暂定V1Length         int16   // 数据部分长度Timestamp      int64   // 时间戳HostnameLength int16   // 主机名长度Hostname       []byte  // 主机名TagLength      int16   // 标签长度Tag            []byte  // 标签Msg            []byte  // 日志数据
}

协议定义部分没有什么好讲的,根据具体的业务逻辑定义即可。

数据打包

由于TCP协议是语言无关的协议,所以直接把协议数据包结构体发送到TCP连接中也是不可能的,只能发送字节流数据,所以需要自己实现数据编码。所幸golang提供了binary来帮助我们实现网络字节编码。

func (p *Package) Pack(writer io.Writer) error {var err errorerr = binary.Write(writer, binary.BigEndian, &p.Version)err = binary.Write(writer, binary.BigEndian, &p.Length)err = binary.Write(writer, binary.BigEndian, &p.Timestamp)err = binary.Write(writer, binary.BigEndian, &p.HostnameLength)err = binary.Write(writer, binary.BigEndian, &p.Hostname)err = binary.Write(writer, binary.BigEndian, &p.TagLength)err = binary.Write(writer, binary.BigEndian, &p.Tag)err = binary.Write(writer, binary.BigEndian, &p.Msg)return err
}

Pack方法的输出目标为io.Writer,有利于接口扩展,只要实现了该接口即可编码数据写入。binary.BigEndian是字节序,本文暂时不讨论,有需要的读者可以自行查找资料研究。

数据解包

解包需要将TCP数据包解析到结构体中,接下来会讲为什么需要添加几个数据无关的长度字段。

func (p *Package) Unpack(reader io.Reader) error {var err errorerr = binary.Read(reader, binary.BigEndian, &p.Version)err = binary.Read(reader, binary.BigEndian, &p.Length)err = binary.Read(reader, binary.BigEndian, &p.Timestamp)err = binary.Read(reader, binary.BigEndian, &p.HostnameLength)p.Hostname = make([]byte, p.HostnameLength)err = binary.Read(reader, binary.BigEndian, &p.Hostname)err = binary.Read(reader, binary.BigEndian, &p.TagLength)p.Tag = make([]byte, p.TagLength)err = binary.Read(reader, binary.BigEndian, &p.Tag)p.Msg = make([]byte, p.Length-8-2-p.HostnameLength-2-p.TagLength)err = binary.Read(reader, binary.BigEndian, &p.Msg)return err
}

由于主机名、标签这种数据是不固定长度的,所以需要两个字节来标识数据长度,否则读取的时候只知道一个总的数据长度是无法区分主机名、标签名、日志数据的。

数据包的粘包问题解决

上文只是解决了编码/解码问题,前提是收到的数据包没有产生粘包问题,解决粘包就是要正确分割字节流中的数据。一般有以下做法:

  1. 定长分隔(每个数据包最大为该长度) 缺点是数据不足时会浪费传输资源
  2. 特定字符分隔(如rn) 缺点是如果正文中有rn就会导致问题
  3. 在数据包中添加长度字段(本文采用的)

golang提供了bufio.Scanner来解决粘包问题。

scanner := bufio.NewScanner(reader) // reader为实现了io.Reader接口的对象,如net.Conn
scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {if !atEOF && data[0] == 'V' { // 由于我们定义的数据包头最开始为两个字节的版本号,所以只有以V开头的数据包才处理if len(data) > 4 { // 如果收到的数据>4个字节(2字节版本号+2字节数据包长度)length := int16(0)binary.Read(bytes.NewReader(data[2:4]), binary.BigEndian, &length) // 读取数据包第3-4字节(int16)=>数据部分长度if int(length)+4 <= len(data) { // 如果读取到的数据正文长度+2字节版本号+2字节数据长度不超过读到的数据(实际上就是成功完整的解析出了一个包)return int(length) + 4, data[:int(length)+4], nil}}}return
})
// 打印接收到的数据包
for scanner.Scan() {scannedPack := new(Package)scannedPack.Unpack(bytes.NewReader(scanner.Bytes()))log.Println(scannedPack)
}

本文的核心就在于scanner.Split方法,该方法用来解析TCP数据包

完整源码

package mainimport ("bufio""bytes""encoding/binary""fmt""io""log""os""time"
)type Package struct {Version        [2]byte // 协议版本Length         int16   // 数据部分长度Timestamp      int64   // 时间戳HostnameLength int16   // 主机名长度Hostname       []byte  // 主机名TagLength      int16   // Tag长度Tag            []byte  // TagMsg            []byte  // 数据部分长度
}func (p *Package) Pack(writer io.Writer) error {var err errorerr = binary.Write(writer, binary.BigEndian, &p.Version)err = binary.Write(writer, binary.BigEndian, &p.Length)err = binary.Write(writer, binary.BigEndian, &p.Timestamp)err = binary.Write(writer, binary.BigEndian, &p.HostnameLength)err = binary.Write(writer, binary.BigEndian, &p.Hostname)err = binary.Write(writer, binary.BigEndian, &p.TagLength)err = binary.Write(writer, binary.BigEndian, &p.Tag)err = binary.Write(writer, binary.BigEndian, &p.Msg)return err
}
func (p *Package) Unpack(reader io.Reader) error {var err errorerr = binary.Read(reader, binary.BigEndian, &p.Version)err = binary.Read(reader, binary.BigEndian, &p.Length)err = binary.Read(reader, binary.BigEndian, &p.Timestamp)err = binary.Read(reader, binary.BigEndian, &p.HostnameLength)p.Hostname = make([]byte, p.HostnameLength)err = binary.Read(reader, binary.BigEndian, &p.Hostname)err = binary.Read(reader, binary.BigEndian, &p.TagLength)p.Tag = make([]byte, p.TagLength)err = binary.Read(reader, binary.BigEndian, &p.Tag)p.Msg = make([]byte, p.Length-8-2-p.HostnameLength-2-p.TagLength)err = binary.Read(reader, binary.BigEndian, &p.Msg)return err
}func (p *Package) String() string {return fmt.Sprintf("version:%s length:%d timestamp:%d hostname:%s tag:%s msg:%s",p.Version,p.Length,p.Timestamp,p.Hostname,p.Tag,p.Msg,)
}func main() {hostname, err := os.Hostname()if err != nil {log.Fatal(err)}pack := &Package{Version:        [2]byte{'V', '1'},Timestamp:      time.Now().Unix(),HostnameLength: int16(len(hostname)),Hostname:       []byte(hostname),TagLength:      4,Tag:            []byte("demo"),Msg:            []byte(("现在时间是:" + time.Now().Format("2006-01-02 15:04:05"))),}pack.Length = 8 + 2 + pack.HostnameLength + 2 + pack.TagLength + int16(len(pack.Msg))buf := new(bytes.Buffer)// 写入四次,模拟TCP粘包效果pack.Pack(buf)pack.Pack(buf)pack.Pack(buf)pack.Pack(buf)// scannerscanner := bufio.NewScanner(buf)scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) {if !atEOF && data[0] == 'V' {if len(data) > 4 {length := int16(0)binary.Read(bytes.NewReader(data[2:4]), binary.BigEndian, &length)if int(length)+4 <= len(data) {return int(length) + 4, data[:int(length)+4], nil}}}return})for scanner.Scan() {scannedPack := new(Package)scannedPack.Unpack(bytes.NewReader(scanner.Bytes()))log.Println(scannedPack)}if err := scanner.Err(); err != nil {log.Fatal("无效数据包")}
}

写在最后

golang作为一门强大的网络编程语言,实现自定义协议是非常重要的,实际上实现自定义协议也不是很难,以下几个步骤:

  1. 数据包编码
  2. 数据包解码
  3. 处理TCP粘包问题
  4. 断线重连(可以使用心跳实现)(非必须)

本文引用自我自己的博客golang解决TCP粘包问题

golang中tcp socket粘包问题和处理

http://www.01happy.com/golang-tcp-socket-adhere/

深入理解 Go 标准库之 bufio.Scanner

yujiahaol68 · 2017-12-10 02:49:26 · 4966 次点击 · 预计阅读时间 11 分钟 · 大约5小时之前 开始浏览

这是一个创建于 2017-12-10 02:49:26 的文章,其中的信息可能已经有所发展或是发生改变。

众所周知,带缓冲的 IO 标准库 一直是 Go 中优化读写操作的利器。对于写操作来说,在被发送到 socket 或硬盘之前,IO 缓冲区 提供了一个临时存储区来存放数据,缓冲区存储的数据达到一定容量后才会被"释放"出来进行下一步存储,这种方式大大减少了写操作或是最终的系统调用被触发的次数,这无疑会在频繁使用系统资源的时候节省下巨大的系统开销。而对于读操作来说,缓冲 IO 意味着每次操作能够读取更多的数据,既减少了系统调用的次数,又通过以块为单位读取硬盘数据来更高效地使用底层硬件。本文会更加侧重于讲解 bufio 包中的 Scanner 扫描器模块,它的主要作用是把数据流分割成一个个标记并除去它们之间的空格。

"foo  bar   baz"

如果我们只想得到上面字符串中的单词,那么扫描器能帮我们按顺序检索出 "foo","bar" 和 "baz" 这三个单词( 查看源码 )

package mainimport ("bufio""fmt""strings"
)func main() {input := "foo  bar   baz"scanner := bufio.NewScanner(strings.NewReader(input))scanner.Split(bufio.ScanWords)for scanner.Scan() {fmt.Println(scanner.Text())}
}

输出结果:

foo
bar
baz

Scanner 扫描器读取数据流的时候会使用带缓冲区的 IO,并接受 io.Reader 作为参数。

如果你需要在内存中处理字符串或者是 bytes 切片,可以首先考虑使用 bytes.Split 或是 strings.Split 这样的工具集,当处理这些流数据时,bytes 或是 strings 标准库中的方法可能是最简单可靠的。

在底层,扫描器使用缓冲不断存储数据,当缓冲区非空或者是读到文件的末尾时 (EOF) split 函数会被调用,目前我们介绍了一个预定义好的 split 函数,但根据下面的函数签名来看,它的用途可能更加广泛。

func(data []byte, atEOF bool) (advance int, token []byte, err error)

目前为止,我们知道 Split 函数会在读数据的时候被调用,从返回值来看,它的执行应该有 3 种不同情况。

1. 需要补充更多的数据

这表示传入的数据还不足以生成一个字符流的标记,当返回的值分别是 0, nil, nil 的时候,扫描器会尝试读取更多的数据,如果缓冲区已满,那么缓冲区会在任何读取操作前自动扩容为原来的两倍,让我们来仔细看一下这个过程 查看源码

package mainimport ("bufio""fmt""strings"
)func main() {input := "abcdefghijkl"scanner := bufio.NewScanner(strings.NewReader(input))split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {fmt.Printf("%t\t%d\t%s\n", atEOF, len(data), data)return 0, nil, nil}scanner.Split(split)buf := make([]byte, 2)scanner.Buffer(buf, bufio.MaxScanTokenSize)for scanner.Scan() {fmt.Printf("%s\n", scanner.Text())}
}

输出结果:

false    2    ab
false    4    abcd
false    8    abcdefgh
false    12    abcdefghijkl
true    12    abcdefghijkl

上例中的 split 函数可以说是简单且极其贪婪的 -- 总是请求更多的数据, Scanner 尝试读取更多的数据的同时会保证缓冲区拥有足够的空间来存放这些数据。在上面的例子中,我们将缓冲区的大小设置为 2。

buf := make([]byte, 2)
scanner.Buffer(buf, bufio.MaxScanTokenSize)

在 split 函数第一次被调用后,scanner 会倍增缓冲区的容量,读取更多的数据,然后再次调用 split 函数。在第二次调用之后增长倍数仍然保持不变,通过观察输出结果可以发现第一次调用 split 得到大小为 2 的切片,然后是 4、8,最后到 12,因为没有更多的数据了。

缓冲区的默认大小是 4096 个字节。

在这值得我们来讨论一下 atEOF 这个参数,通过这个参数我们能够在 split 函数中判断是否还有数据可供使用,它能够在达到数据末尾 (EOF) 或者是读取出错的时候触发为真,一旦任何上述情况发生, scanner 将拒绝读取任何东西,像这样的 flag 标志可被用来抛出异常(因其不完整的字符标记),最终会导致 scanner.Split() 在调用的时候返回 false 并终止整个进程。异常可以通过 Err方法来取得。

package mainimport ("bufio""errors""fmt""strings"
)func main() {input := "abcdefghijkl"scanner := bufio.NewScanner(strings.NewReader(input))split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {fmt.Printf("%t\t%d\t%s\n", atEOF, len(data), data)if atEOF {return 0, nil, errors.New("bad luck")}return 0, nil, nil}scanner.Split(split)buf := make([]byte, 12)scanner.Buffer(buf, bufio.MaxScanTokenSize)for scanner.Scan() {fmt.Printf("%s\n", scanner.Text())}if scanner.Err() != nil {fmt.Printf("error: %s\n", scanner.Err())}
}

输出结果:

false    12    abcdefghijkl
true    12    abcdefghijkl
error: bad luck

atEOF 参数同时也能够用于处理那些遗留在缓冲区中的数据,其中一个预定义的 split 函数逐行扫描输入反映了 这种行为 ,例如我们这样输入下面这些单词时

foo
bar
baz

因为在行末并没有 \n 字符,因此当 ScanLines 无法找到新一行的字符时,它就会返回剩余的字符来作为最后的字符标记 (查看源码)

package mainimport ("bufio""fmt""strings"
)func main() {input := "foo\nbar\nbaz"scanner := bufio.NewScanner(strings.NewReader(input))// 事实上这里并不需要传入 ScanLines 因为这原本就是标准库默认的 split 函数scanner.Split(bufio.ScanLines)for scanner.Scan() {fmt.Println(scanner.Text())}
}

输出结果:

foo
bar
baz

2. 已找到字符标记(token)

当 split 函数能够检测到 标记 时,就会发生这种情况。它返回在缓冲区中向前移动的字符数和 标记 本身。返回两个值的原因在于 标记 向前移动的距离不总是等于字节个数。假设输入为 "foo foo foo" ,当我们的目标只是找到其中的单词 ( 扫描单词 ) 时,split 函数会跳过它们之间的空格。

(4, "foo")
(4, "foo")
(3, "foo")

让我们通过一个具体的例子看一下,下面的这个函数将只会寻找连续的 foo 串, 查看源码

package mainimport ("bufio""bytes""fmt""io""strings"
)func main() {input := "foofoofoo"scanner := bufio.NewScanner(strings.NewReader(input))split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {if bytes.Equal(data[:3], []byte{'f', 'o', 'o'}) {return 3, []byte{'F'}, nil}if atEOF {return 0, nil, io.EOF}return 0, nil, nil}scanner.Split(split)for scanner.Scan() {fmt.Printf("%s\n", scanner.Text())}
}

输出结果:

F
F
F

3. 报错

如果 split 函数返回了错误那么扫描器就会停止工作,查看源码

package mainimport ("bufio""errors""fmt""strings"
)func main() {input := "abcdefghijkl"scanner := bufio.NewScanner(strings.NewReader(input))split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {return 0, nil, errors.New("bad luck")}scanner.Split(split)for scanner.Scan() {fmt.Printf("%s\n", scanner.Text())}if scanner.Err() != nil {fmt.Printf("error: %s\n", scanner.Err())}
}

输出结果:

error: bad luck

然而,其中有一种特殊的错误并不会使扫描器立即停止工作。

ErrFinalToken

扫描器给信号(signal) 提供了一个叫做 最终标记 的选项,这是一个不会打破循环(扫描过程依然返回真)的特殊标记,但随后的一系列调用会使扫描动作立刻终止。

func (s *Scanner) Scan() bool {if s.done {return false}...

在 Go 语言官方 issue #11836 中提供了一种方法使得当发现特殊标记时也能够立即停止扫描。查看源码

package mainimport ("bufio""bytes""fmt""strings"
)func split(data []byte, atEOF bool) (advance int, token []byte, err error) {advance, token, err = bufio.ScanWords(data, atEOF)if err == nil && token != nil && bytes.Equal(token, []byte{'e', 'n', 'd'}) {return 0, []byte{'E', 'N', 'D'}, bufio.ErrFinalToken}return
}func main() {input := "foo end bar"scanner := bufio.NewScanner(strings.NewReader(input))scanner.Split(split)for scanner.Scan() {fmt.Println(scanner.Text())}if scanner.Err() != nil {fmt.Printf("Error: %s\n", scanner.Err())}
}

输出结果:

foo
END

io.EOF 和 ErrFinalToken 类型的错误都不被认为是真的起作用的错误 -- Err 方法会在任何这两个错误出现并停止扫描器时仍然返回 nil

最大标记大小 / ErrTooLong

默认情况下,缓冲区的最大长度应该小于 64 * 1024 个字节,这意味着找到的标记不能大于这个限制。

package mainimport ("bufio""fmt""strings"
)func main() {input := strings.Repeat("x", bufio.MaxScanTokenSize)scanner := bufio.NewScanner(strings.NewReader(input))for scanner.Scan() {fmt.Println(scanner.Text())}if scanner.Err() != nil {fmt.Println(scanner.Err())}
}

上面的程序会打印出 bufio.Scanner: token too long ,我们可以通过 Buffer 方法来自定义缓冲区的长度,在上文第一小节中这个方法有出现过,但我们这次会举一个更切题的例子,查看源码

buf := make([]byte, 10)
input := strings.Repeat("x", 20)
scanner := bufio.NewScanner(strings.NewReader(input))
scanner.Buffer(buf, 20)for scanner.Scan() {fmt.Println(scanner.Text())
}if scanner.Err() != nil {fmt.Println(scanner.Err())
}

输出结果:

bufio.Scanner: token too long

防止死循环

几年前 issue #8672 被提出,解决方案是加多一段代码,通过判断 atEOF 为真且缓冲区为空来确定 split 函数可以被调用,而现有的代码可能会进入死循环。

package mainimport ("bufio""bytes""fmt""strings"
)func main() {input := "foo|bar"scanner := bufio.NewScanner(strings.NewReader(input))split := func(data []byte, atEOF bool) (advance int, token []byte, err error) {if i := bytes.IndexByte(data, '|'); i >= 0 {return i + 1, data[0:i], nil}if atEOF {return len(data), data[:len(data)], nil}return 0, nil, nil}scanner.Split(split)for scanner.Scan() {if scanner.Text() != "" {fmt.Println(scanner.Text())}}
}

split 函数假设当 atEOF 为真就能够安全地使用剩余的缓冲作为标记,这引发了 issue #8672 被修复之后的另一个问题: 因为缓冲区可以为空,所以当返回 (0, [], nil) 时 split 函数并不能增加缓冲区的大小, issue #9020 发现了此种情况下的 panic ,查看源码

foo
bar
panic: bufio.Scan: 100 empty tokens without progressing

当我第一次阅读有关 Scanner 或是 SplitFunc 的文档时我并没能弄明白在所有情况下它们是如何工作的,即便是阅读源代码也帮助甚微,因为 Scan 看上去真的很复杂,希望这篇文章能够帮助其他人更好地理清这块的细节。


via: https://medium.com/golangspec/in-depth-introduction-to-bufio-scanner-in-golang-55483bb689b4

作者:Michał Łowicki  译者:yujiahaol68  校对:rxcai polaris1119

本文由 GCTT 原创编译,Go语言中文网 荣誉推出

golang解决TCP粘包问题相关推荐

  1. golang 解决 TCP 粘包问题

    什么是 TCP 粘包问题以及为什么会产生 TCP 粘包,本文不加讨论.本文使用 golang 的 bufio.Scanner 来实现自定义协议解包. 协议数据包定义 本文模拟一个日志服务器,该服务器接 ...

  2. Golang解决TCP粘包拆包问题

    协议定义 报文长度(4字节) 报文内容[]byte 服务端代码 package mainimport ("encoding/binary""fmt"" ...

  3. c#解决TCP“粘包”问题

    c#解决TCP"粘包"问题 参考文章: (1)c#解决TCP"粘包"问题 (2)https://www.cnblogs.com/wangjun8868/p/71 ...

  4. Linux socket编程(一):客户端服务端通信、解决TCP粘包

    一.服务端程序 服务端程序工作流程: 创建socket →\rightarrow→ 绑定监听的IP地址和端口 →\rightarrow→ 监听客户端连接 →\rightarrow→ 接受/发送数据.对 ...

  5. Netty解决TCP粘包/拆包导致的半包读写问题

    一.TCP粘包/拆包问题说明 TCP是个"流"协议,就是没有界限的一串数据.TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包拆分,所以在业务上认为,一 ...

  6. netty解决TCP粘包/拆包导致的半包读写问题的三种方案

    解决方案一:LineBasedFrameDecoder+StringDecoder来解决TCP的粘包/拆包问题 只需要在客户端和服务端加上45.46两行代码并且在发送消息的时候加上换行符即可解决TCP ...

  7. 《精通并发与Netty》学习笔记(13 - 解决TCP粘包拆包(一)概念及实例演示)

    一.粘包/拆包概念 TCP是一个"流"协议,所谓流,就是没有界限的一长串二进制数据.TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的 ...

  8. swoole 解决tcp粘包问题

    Tcp粘包问题 tcp在发送数据的时候因为存在数据缓存的关系,对于数据在发送的时候在 短时间内 如果连续发送很多小的数据的时候就会有可能一次性一起发送,还有就是对于大的数据就会分开连续发送多次 Tcp ...

  9. 6行代码解决golang TCP粘包

    什么是TCP粘包问题以及为什么会产生TCP粘包,本文不加讨论.本文使用golang的bufio.Scanner来实现自定义协议解包. 协议数据包定义 本文模拟一个日志服务器,该服务器接收客户端传到的数 ...

最新文章

  1. Qt——模态、非模态
  2. mysql5.6更改datadir数据存储目录
  3. js 点击最后一个 和倒数第二个_期货及期权品种的最后交易日,您都了解吗?...
  4. 将本地SHH文件导入SourceTree配置
  5. Windows系统下,Android Studio的安装
  6. 计算两个数的乘积java编写_Java模拟计算机的整数乘积计算功能示例
  7. ubuntu for win10 里运行apache+php
  8. python3 写入excel_Python3 读、写Excel文件的操作方法
  9. 计算机不支持此接口,Windows10提示不支持此接口的解决方法
  10. 各银行支付/各种支付平台/php对接支付接口心得/php h5支付接口对接
  11. numpy的stack大白话解释
  12. Python3.7出现RuntimeError: generator raised StopIteration异常
  13. 一款不错的手机端视频剪辑软件
  14. [附源码]Python计算机毕业设计超市商品管理系统
  15. 组装办公室用计算机,(需要组装一批电脑,用于办公。多运用于普通办公软件,WORD、EXCEL、PPT、PS等。 要求实惠,可用集成显卡。)组装电脑excle模板...
  16. win10删除文件提示源文件路经太长无法删除解决办法
  17. HDU_1709 The Balence (生成函数)
  18. 【看表情包学Linux】Redirect 重定向 | 时间相关指令 | 文件查找 | 打包与压缩
  19. Android按钮扩大动效,Android按钮动效UI设计教程
  20. 布局数据存储,中国电子云意在何为?

热门文章

  1. python图像对比_python+PIL实现图片对比(一)
  2. 测试类什么时候初始化
  3. visionpro加载toolblock 和保存toolblock
  4. eigen之eigen中文文档
  5. opencv之对图像中的点做几何变换
  6. 计算机html二级难度,计算机二级考试越来越难的实锤!真实数据告诉你到底难在哪里?...
  7. 201703-1-分蛋糕
  8. Linux命令解释之setfacl,getfacl,chattr
  9. 将本地 jar 安装到本地 maven 仓库
  10. caas k8s主控节点如何查询_k8s中部署prometheus监控告警系统prometheus系列文章第一篇...