文章目录

  • 07 接头暗语:如何利用 Netty 实现自定义协议通信?
    • 通信协议设计
      • 1. 魔数
      • 2. 协议版本号
      • 3. 序列化算法
      • 4. 报文类型
      • 5. 长度域字段
      • 6. 请求数据
      • 7. 状态
      • 8. 保留字段
    • Netty 如何实现自定义通信协议
      • 抽象编码类
      • 抽象解码类
    • 通信协议实战

07 接头暗语:如何利用 Netty 实现自定义协议通信?

通信协议设计

所谓协议,就是通信双方事先商量好的接口暗语,在 TCP 网络编程中,发送方和接收方的数据包格式都是二进制,发送方将对象转化成二进制流发送给接收方,接收方获得二进制数据后需要知道如何解析成对象,所以协议是双方能够正常通信的基础

目前市面上已经有不少通用的协议,例如 HTTP、HTTPS、JSON-RPC、FTP、IMAP、Protobuf 等。通用协议兼容性好,易于维护,各种异构系统之间可以实现无缝对接。如果在满足业务场景以及性能需求的前提下,推荐采用通用协议的方案。相比通用协议,自定义协议主要有以下优点。

  • 极致性能:通用的通信协议考虑了很多兼容性的因素,必然在性能方面有所损失。
  • 扩展性:自定义的协议相比通用协议更好扩展,可以更好地满足自己的业务需求。
  • 安全性:通用协议是公开的,很多漏洞已经很多被黑客攻破。自定义协议更加安全,因为黑客需要先破解自定义协议的内容。

一个完备的网络协议需要具备的基本要素:

1. 魔数

魔数是通信双方协商的一个暗号,通常采用固定的几个字节表示。魔数的作用是防止任何人随便向服务器的端口上发送数据服务端在接收到数据时会解析出前几个固定字节的魔数,然后做正确性比对。如果和约定的魔数不匹配,则认为是非法数据,可以直接关闭连接或者采取其他措施以增强系统的安全防护。魔数的思想在压缩算法、Java Class 文件等场景中都有所体现,例如 Class 文件开头就存储了魔数 0xCAFEBABE,在加载 Class 文件时首先会验证魔数的正确性。

2. 协议版本号

随着业务需求的变化,协议可能需要对结构或字段进行改动,不同版本的协议对应的解析方法也是不同的。所以在生产级项目中强烈建议预留协议版本号这个字段。

3. 序列化算法

序列化算法字段表示数据发送方应该采用何种方法将请求的对象转化为二进制,以及如何再将二进制转化为对象,如 JSON、Hessian、Java 自带序列化等。

4. 报文类型

在不同的业务场景中,报文可能存在不同的类型。例如在 RPC 框架中有请求、响应、心跳等类型的报文,在 IM 即时通信的场景中有登陆、创建群聊、发送消息、接收消息、退出群聊等类型的报文。

5. 长度域字段

长度域字段代表请求数据的长度,接收方根据长度域字段获取一个完整的报文。

6. 请求数据

请求数据通常为序列化之后得到的二进制流,每种请求数据的内容是不一样的。

7. 状态

状态字段用于标识请求是否正常。一般由被调用方设置。例如一次 RPC 调用失败,状态字段可被服务提供方设置为异常状态。

8. 保留字段

保留字段是可选项,为了应对协议升级的可能性,可以预留若干字节的保留字段,以备不时之需。

通用的协议示例:

+---------------------------------------------------------------+| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |+---------------------------------------------------------------+| 状态 1byte |        保留字段 4byte     |      数据长度 4byte     | +---------------------------------------------------------------+|                   数据内容 (长度不定)                          |+---------------------------------------------------------------+

Netty 如何实现自定义通信协议

Netty 常用编码器类型:

  • MessageToByteEncoder 对象编码成字节流;
  • MessageToMessageEncoder 一种消息类型编码成另外一种消息类型。

Netty 常用解码器类型:

  • ByteToMessageDecoder/ReplayingDecoder 将字节流解码为消息对象;
  • MessageToMessageDecoder 将一种消息类型解码为另外一种消息类型。

编解码器可以分为一次编解码器二次编解码器一次解码器用于解决 TCP 拆包/粘包问题,按协议解析后得到的字节数据。而二次解码器,转化为所需的对象。编码器的过程与解码器的相反。

  • 一次编解码器:MessageToByteEncoder/ByteToMessageDecoder。
  • 二次编解码器:MessageToMessageEncoder/MessageToMessageDecoder。

抽象编码类


通过抽象编码类的继承图可以看出,编码类是 ChannelOutboundHandler 的抽象类实现,具体操作的是 Outbound 出站数据。

  • MessageToByteEncoder

MessageToByteEncoder 用于将对象编码成字节流,MessageToByteEncoder 提供了唯一的 encode 抽象方法,只需要实现 encode 方法即可完成自定义编码。

encode() 方法是在什么时候被调用的呢?查看 MessageToByteEncoder 的核心源码片段,如下所示:

write 方法:

    @Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {ByteBuf buf = null;try {if (acceptOutboundMessage(msg)) { // 1. 消息类型是否匹配@SuppressWarnings("unchecked")I cast = (I) msg;buf = allocateBuffer(ctx, cast, preferDirect); // 2. 分配 ByteBuf 资源try {encode(ctx, cast, buf); // 3. 执行 encode 方法完成数据编码} finally {ReferenceCountUtil.release(cast);}if (buf.isReadable()) {ctx.write(buf, promise); // 4. 向后传递写事件} else {buf.release();ctx.write(Unpooled.EMPTY_BUFFER, promise);}buf = null;} else {ctx.write(msg, promise); }} catch (EncoderException e) {throw e;} catch (Throwable e) {throw new EncoderException(e);} finally {if (buf != null) {buf.release();}}}

acceptOutboundMessage 方法(判断消息类型是否匹配):

/*** Returns {@code true} if the given message should be handled. If {@code false} it will be passed to the next* {@link ChannelOutboundHandler} in the {@link ChannelPipeline}.*/public boolean acceptOutboundMessage(Object msg) throws Exception {return matcher.match(msg);}

allocateBuffer 方法(分配 ByteBuf 资源):

/*** Allocate a {@link ByteBuf} which will be used as argument of {@link #encode(ChannelHandlerContext, I, ByteBuf)}.* Sub-classes may override this method to return {@link ByteBuf} with a perfect matching {@code initialCapacity}.*/protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,boolean preferDirect) throws Exception {if (preferDirect) {return ctx.alloc().ioBuffer();} else {return ctx.alloc().heapBuffer();}}

encode 方法(完成数据编码):

/*** Encode a message into a {@link ByteBuf}. This method will be called for each written message that can be handled* by this encoder.** @param ctx           the {@link ChannelHandlerContext} which this {@link MessageToByteEncoder} belongs to* @param msg           the message to encode* @param out           the {@link ByteBuf} into which the encoded message will be written* @throws Exception    is thrown if an error occurs*/protected abstract void encode(ChannelHandlerContext ctx, I msg, ByteBuf out) throws Exception;

MessageToByteEncoder 重写了 ChanneOutboundHandler 的 write() 方法,其主要逻辑分为以下几个步骤:

  1. acceptOutboundMessage 判断是否有匹配的消息类型,如果匹配需要执行编码流程,如果不匹配直接继续传递给下一个 ChannelOutboundHandler;
  2. 分配 ByteBuf 资源,默认使用堆外内存;
  3. 调用子类实现的 encode 方法完成数据编码,一旦消息被成功编码,会通过调用 ReferenceCountUtil.release(cast) 自动释放;
  4. 如果 ByteBuf 可读,说明已经成功编码得到数据,然后写入 ChannelHandlerContext 交到下一个节点;如果 ByteBuf 不可读,则释放 ByteBuf 资源,向下传递空的 ByteBuf 对象。

编码器实现不需要关注拆包/粘包问题。如下代码,展示了如何将字符串类型的数据写入到 ByteBuf 实例,ByteBuf 实例将传递给 ChannelPipeline 链表中的下一个 ChannelOutboundHandler。

public class StringToByteEncoder extends MessageToByteEncoder<String> {@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, String data, ByteBuf byteBuf) throws Exception {byteBuf.writeBytes(data.getBytes());}
}
  • MessageToMessageEncoder

MessageToMessageEncoder 与 MessageToByteEncoder 类似,同样只需要实现 encode 方法与 MessageToByteEncoder 不同的是,MessageToMessageEncoder 是将一种格式的消息转换为另外一种格式的消息。其中第二个 Message 所指的可以是任意一个对象,如果该对象是 ByteBuf 类型,那么基本上和 MessageToByteEncoder 的实现原理是一致的。此外 MessageToByteEncoder 的输出结果是对象列表,编码后的结果属于中间对象最终仍然会转化成 ByteBuf 进行传输

MessageToMessageEncoder 常用的实现子类有 StringEncoder、LineEncoder、Base64Encoder 等。

以 StringEncoder 为例,源码示例如下:将 CharSequence 类型(String、StringBuilder、StringBuffer 等)转换成 ByteBuf 类型,结合 StringDecoder 可以直接实现 String 类型数据的编解码。

/** Copyright 2012 The Netty Project** The Netty Project licenses this file to you under the Apache License,* version 2.0 (the "License"); you may not use this file except in compliance* with the License. You may obtain a copy of the License at:**   http://www.apache.org/licenses/LICENSE-2.0** Unless required by applicable law or agreed to in writing, software* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the* License for the specific language governing permissions and limitations* under the License.*/
package io.netty.handler.codec.string;import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.MessageToMessageEncoder;import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.List;/*** Encodes the requested {@link String} into a {@link ByteBuf}.* A typical setup for a text-based line protocol in a TCP/IP socket would be:* <pre>* {@link ChannelPipeline} pipeline = ...;** // Decoders* pipeline.addLast("frameDecoder", new {@link LineBasedFrameDecoder}(80));* pipeline.addLast("stringDecoder", new {@link StringDecoder}(CharsetUtil.UTF_8));** // Encoder* pipeline.addLast("stringEncoder", new {@link StringEncoder}(CharsetUtil.UTF_8));* </pre>* and then you can use a {@link String} instead of a {@link ByteBuf}* as a message:* <pre>* void channelRead({@link ChannelHandlerContext} ctx, {@link String} msg) {*     ch.write("Did you say '" + msg + "'?\n");* }* </pre>*/
@Sharable
public class StringEncoder extends MessageToMessageEncoder<CharSequence> {// TODO Use CharsetEncoder instead.private final Charset charset;/*** Creates a new instance with the current system character set.*/public StringEncoder() {this(Charset.defaultCharset());}/*** Creates a new instance with the specified character set.*/public StringEncoder(Charset charset) {if (charset == null) {throw new NullPointerException("charset");}this.charset = charset;}@Overrideprotected void encode(ChannelHandlerContext ctx, CharSequence msg, List<Object> out) throws Exception {if (msg.length() == 0) {return;}out.add(ByteBufUtil.encodeString(ctx.alloc(), CharBuffer.wrap(msg), charset));}
}

抽象解码类

解码类是 ChanneInboundHandler 的抽象类实现,操作的是 Inbound 入站数据。解码器实现的难度要远大于编码器,因为解码器需要考虑拆包/粘包问题。由于接收方有可能没有接收到完整的消息,所以解码框架需要对入站的数据做缓冲操作,直至获取到完整的消息。

  • 抽象解码类 ByteToMessageDecoder。

查看 ByteToMessageDecoder 定义的抽象方法:

decodeLast 方法:

/*** Is called one last time when the {@link ChannelHandlerContext} goes in-active. Which means the* {@link #channelInactive(ChannelHandlerContext)} was triggered.** By default this will just call {@link #decode(ChannelHandlerContext, ByteBuf, List)} but sub-classes may* override this for some special cleanup operation.*/protected void decodeLast(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.isReadable()) {// Only call decode() if there is something left in the buffer to decode.// See https://github.com/netty/netty/issues/4386decodeRemovalReentryProtection(ctx, in, out);}}

decode 方法:

/*** Decode the from one {@link ByteBuf} to an other. This method will be called till either the input* {@link ByteBuf} has nothing to read when return from this method or till nothing was read from the input* {@link ByteBuf}.** @param ctx           the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to* @param in            the {@link ByteBuf} from which to read data* @param out           the {@link List} to which decoded messages should be added* @throws Exception    is thrown if an error occurs*/protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

decode() 是用户必须实现的抽象方法,在该方法在调用时需要传入接收的数据 ByteBuf,及用来添加编码后消息的 List。由于 TCP 粘包问题,ByteBuf 中可能包含多个有效的报文,或者不够一个完整的报文。Netty 会重复回调 decode() 方法,直到没有解码出新的完整报文可以添加到 List 当中,或者 ByteBuf 没有更多可读取的数据为止。 如果此时 List 的内容不为空,那么会传递给 ChannelPipeline 中的下一个ChannelInboundHandler。

为什么抽象解码器要比编码器多一个 decodeLast() 方法呢?因为 decodeLast 在 Channel 关闭后会被调用一次,主要用于处理 ByteBuf 最后剩余的字节数据。 Netty 中 decodeLast 的默认实现只是简单调用了 decode() 方法。如果有特殊的业务需求,则可以通过重写 decodeLast() 方法扩展自定义逻辑。

ByteToMessageDecoder 还有一个抽象子类是 ReplayingDecoder。它封装了缓冲区的管理,在读取缓冲区数据时,无须再对字节长度进行检查。因为如果没有足够长度的字节数据,ReplayingDecoder 将终止解码操作。ReplayingDecoder 的性能相比直接使用 ByteToMessageDecoder 要慢,大部分情况下并不推荐使用 ReplayingDecoder。

  • 抽象解码类 MessageToMessageDecoder。

MessageToMessageDecoder 与 ByteToMessageDecoder 作用类似,都是将一种消息类型的编码成另外一种消息类型。不同的是 MessageToMessageDecoder 并不会对数据报文进行缓存,它主要用作转换消息模型。比较推荐的做法是使用 ByteToMessageDecoder 解析 TCP 协议,解决拆包/粘包问题。解析得到有效的 ByteBuf 数据,然后传递给后续的 MessageToMessageDecoder 做数据对象的转换,具体流程如下图所示:

通信协议实战

在实现协议解码器之前,首先需要清楚一个问题:如何判断 ByteBuf 是否存在完整的报文?最常用的做法就是通过读取消息长度 dataLength 进行判断。如果 ByteBuf 的可读数据长度小于 dataLength,说明 ByteBuf 还不够获取一个完整的报文。在该协议前面的消息头部分包含了魔数、协议版本号、数据长度等固定字段,共 14 个字节。固定字段长度和数据长度可以作为判断消息完整性的依据,具体解码器实现逻辑示例如下:

/*+---------------------------------------------------------------+| 魔数 2byte | 协议版本号 1byte | 序列化算法 1byte | 报文类型 1byte  |+---------------------------------------------------------------+| 状态 1byte |        保留字段 4byte     |      数据长度 4byte     | +---------------------------------------------------------------+|                   数据内容 (长度不定)                          |+---------------------------------------------------------------+*/@Override
public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {// 判断 ByteBuf 可读取字节if (in.readableBytes() < 14) { return;}in.markReaderIndex(); // 标记 ByteBuf 读指针位置in.skipBytes(2); // 跳过魔数in.skipBytes(1); // 跳过协议版本号byte serializeType = in.readByte();in.skipBytes(1); // 跳过报文类型in.skipBytes(1); // 跳过状态字段in.skipBytes(4); // 跳过保留字段int dataLength = in.readInt();if (in.readableBytes() < dataLength) {in.resetReaderIndex(); // 重置 ByteBuf 读指针位置return;}byte[] data = new byte[dataLength];in.readBytes(data);SerializeService serializeService = getSerializeServiceByType(serializeType);Object obj = serializeService.deserialize(data);if (obj != null) {out.add(obj);}
}

07 接头暗语:如何利用 Netty 实现自定义协议通信?相关推荐

  1. 07 | 接头暗语:如何利用 Netty 实现自定义协议通信?

    既然是网络编程,自然离不开通信协议,应用层之间通信需要实现各种各样的网络协议.在项目开发的过程中,我们就需要去构建满足自己业务场景的应用层协议.在上节课中我们介绍了如何使用网络协议解决 TCP 拆包/ ...

  2. Netty实现自定义协议

    关于协议,使用最为广泛的是HTTP协议,但是在一些服务交互领域,其使用则相对较少,主要原因有三方面: HTTP协议会携带诸如header和cookie等信息,其本身对字节的利用率也较低,这使得HTTP ...

  3. C++实现的Socket接口实现自定义协议通信

    资源下载地址:https://download.csdn.net/download/sheziqiong/85836435 资源下载地址:https://download.csdn.net/downl ...

  4. 读取串口数据_自定义串口通信的相关问题整理

    串口通信是常见的通信方式,串口接口是大部分工控器件标配的通信接口.在项目开发的过程中,也经常遇到进行串口通信的处理.这里就串口通信的部分问题分享给大家. 1.TTL.RS232.RS422.RS458 ...

  5. 物联网架构成长之路(35)-利用Netty解析物联网自定义协议

    一.前言 前面博客大部分介绍了基于EMQ中间件,通信协议使用的是MQTT,而传输的数据为纯文本数据,采用JSON格式.这种方式,大部分一看就知道是熟悉Web开发.软件开发的人喜欢用的方式.由于我也是做 ...

  6. android爬楼梯动画,[转载]利用PPT的自定义动画功能制作爬楼梯的动态效果

    利用PPT的自定义动画功能制作爬楼梯的动态效果 职场中,PPT报告总是少不了的,如何能让PPT与众不同.华丽而不失分寸呢?PPT中的自定义动画是一个不错的选择.我很喜欢逛一些PPT,有时看见不错的动画 ...

  7. 【Netty】利用Netty实现心跳检测和重连机制

    一.前言 心跳机制是定时发送一个自定义的结构体(心跳包),让对方知道自己还活着,以确保连接的有效性的机制.   我们用到的很多框架都用到了心跳检测,比如服务注册到 Eureka Server 之后会维 ...

  8. android之利用surfaceView实现自定义水印相机

    android之利用surfaceView实现自定义水印相机 知识点 1.自定义相机+预览相机 2.截屏拍照加水印 3.关于不使用intent来传输图片 4.关于大家说要demo的,因为这里是项目里头 ...

  9. 物联网嵌入式 校园噪声监测系统 ESP8266 STM32 LM386声音传感器 NETTY自定义协议

    目录 一.想法及需求 1.1最初设想 1.2需求分析 1.3方案设计 二.硬件 2.1器材选型 2.2原理图解释 2.3PCB绘制 2.4焊接及成品 三.软件 3.1NETTY自定义协议的TCP服务器 ...

最新文章

  1. 面试高频题:Hash一致性算法是如何解决数据倾斜问题的?
  2. 使用批处理复制并以时间规则重命名文件
  3. list转map stream_advancedday10可变参数不可变集合及Stream流
  4. Lunar New Year and Cross Counting
  5. MySQL笔记——JDBC入门
  6. spring mvc 工作流程
  7. spring-boot(2)--环境搭建
  8. [Python] 维度交换函数:transpose(m,n,r)和permute(m,n,r)
  9. [算法]海量数据问题之一
  10. nodejs如何实现ajax,nodejs + express怎么实现Ajax方式及其简单功能
  11. 计算机科学导论大一论文,《计算机科学导论论文.doc
  12. XP侧边栏(XP桌面秀)
  13. 完美国际单机版 服务器修改,绝心完美136开服教程EL修改
  14. IMX6Q的DDR3初始化配置
  15. python除以10取整_python中整数除以整数的结果是取整数
  16. LogicFlow 边的绘制与交互
  17. 高一c语言期末试题,江苏省海安高中2020-2021学年高一上学期期中考试信息技术试题 Word版含答案...
  18. 什么是SSR服务端渲染
  19. 智能个性化推荐系统设计
  20. 《MVC》——ViewData、ViewBag、TempData、model

热门文章

  1. 一级域名怎么申请二级域名
  2. oracle中integer最大值,integer表示的最大整数
  3. 计量大学计算机学院,计算机科学与技术
  4. 武汉理工大学新思维研究生英语课文翻译和课后习题答案(1~12单元)
  5. 计算机表格单元格合并,excel表格数据拆分和合并单元格-excel中如何将已经合并的单元格拆分,并将该单元格......
  6. Swift5.1 语言参考(三) 类型
  7. 模块学习4:(1)通过MQTT协议和电信云平台的通信(内附MQTT协议V3.1.1的原版和中文参考资料)
  8. 基础零信任服务相关软件的安装和调试
  9. vijos 、洛谷 —— 珠心算测验(java实现)
  10. SI 539 网站开发(二):week6