关于协议,使用最为广泛的是HTTP协议,但是在一些服务交互领域,其使用则相对较少,主要原因有三方面:

  • HTTP协议会携带诸如header和cookie等信息,其本身对字节的利用率也较低,这使得HTTP协议比较臃肿,在承载相同信息的情况下,HTTP协议将需要发送更多的数据包;

  • HTTP协议是基于TCP的短连接,其在每次请求和响应的时候都需要进行三次握手和四次挥手,由于服务的交互设计一般都要求能够承载高并发的请求,因而HTTP协议这种频繁的握手和挥手动作会极大的影响服务之间交互的效率;

  • 服务之间往往有一些根据其自身业务特性所独有的需求,而HTTP协议无法很好的服务于这些业务需求。

基于上面的原因,一般的服务之间进行交互时都会使用自定义协议,常见的框架,诸如dubbo,kafka,zookeeper都实现了符合其自身业务需求的协议,本文主要讲解如何使用Netty实现一款自定义的协议。

1. 协议规定

所谓协议,其本质其实就是定义了一个将数据转换为字节,或者将字节转换为数据的一个规范。一款自定义协议,其一般包含两个部分:消息头和消息体。消息头的长度一般是固定的,或者说是可确定的,其定义了此次消息的一些公有信息,比如当前服务的版本,消息的sessionId,消息的类型等等;消息体则主要是此次消息所需要发送的内容,一般在消息头的最后一定的字节中保存了当前消息的消息体的长度。下面是我们为当前自定义协议所做的一些规定:上述协议定义中,我们除了定义常用的请求和响应消息类型以外,还定义了Ping和Pong消息。Ping和Pong消息的作用一般是,在服务处于闲置状态达到一定时长,比如2s时,客户端服务会向服务端发送一个Ping消息,则会返回一个Pong消息,这样才表示客户端与服务端的连接是完好的。如果服务端没有返回相应的消息,客户端就会关闭与服务端的连接或者是重新建立与服务端的连接。这样的优点在于可以防止突然会产生的客户端与服务端的大量交互。

2. 协议实现

通过上面的定义其实我们可以发现,所谓协议,就是定义了一个规范,基于这个规范,我们可以将消息转换为相应的字节流,然后经由TCP传输到目标服务,目标服务则也基于该规范将字节流转换为相应的消息,这样就达到了相互交流的目的。这里面最重要的主要是如何基于该规范将消息转换为字节流或者将字节流转换为消息。这一方面,Netty为我们提供了ByteToMessageDecoder和MessageToByteEncoder用于进行消息和字节流的相互转换。首先我们定义了如下消息实体:

public class Message {  private int magicNumber;  private byte mainVersion;  private byte subVersion;  private byte modifyVersion;  private String sessionId;  private MessageTypeEnum messageType;  private Map attachments = new HashMap<>();  private String body;  public MapgetAttachments() {    return Collections.unmodifiableMap(attachments);  }  public void setAttachments(Map attachments) {    this.attachments.clear();    if (null != attachments) {      this.attachments.putAll(attachments);    }  }  public void addAttachment(String key, String value) {    attachments.put(key, value);  }  // getter and setter...}

上述消息中,我们将协议中所规定的各个字段都进行了定义,并且定义了一个标志消息类型的枚举MessageTypeEnum,如下是该枚举的源码:

public enum MessageTypeEnum {  REQUEST((byte)1), RESPONSE((byte)2), PING((byte)3), PONG((byte)4), EMPTY((byte)5);  private byte type;  MessageTypeEnum(byte type) {    this.type = type;  }  public int getType() {    return type;  }  public static MessageTypeEnum get(byte type) {    for (MessageTypeEnum value : values()) {      if (value.type == type) {        return value;      }    }    throw new RuntimeException("unsupported type: " + type);  }}

上述主要是定义了描述自定义协议相关的实体属性,对于消息的编码,本质就是依据上述协议方式将消息实体转换为字节流,如下是转换字节流的代码:

public class MessageEncoder extends MessageToByteEncoder<Message> {  @Override  protected void encode(ChannelHandlerContext ctx, Message message, ByteBuf out) {    // 这里会判断消息类型是不是EMPTY类型,如果是EMPTY类型,则表示当前消息不需要写入到管道中    if (message.getMessageType() != MessageTypeEnum.EMPTY) {      out.writeInt(Constants.MAGIC_NUMBER); // 写入当前的魔数      out.writeByte(Constants.MAIN_VERSION); // 写入当前的主版本号      out.writeByte(Constants.SUB_VERSION); // 写入当前的次版本号      out.writeByte(Constants.MODIFY_VERSION); // 写入当前的修订版本号      if (!StringUtils.hasText(message.getSessionId())) {        // 生成一个sessionId,并将其写入到字节序列中        String sessionId = SessionIdGenerator.generate();        message.setSessionId(sessionId);        out.writeCharSequence(sessionId, Charset.defaultCharset());      }      out.writeByte(message.getMessageType().getType()); // 写入当前消息的类型      out.writeShort(message.getAttachments().size()); // 写入当前消息的附加参数数量      message.getAttachments().forEach((key, value) -> {        Charset charset = Charset.defaultCharset();        out.writeInt(key.length()); // 写入键的长度        out.writeCharSequence(key, charset); // 写入键数据        out.writeInt(value.length()); // 希尔值的长度        out.writeCharSequence(value, charset); // 写入值数据      });      if (null == message.getBody()) {        out.writeInt(0); // 如果消息体为空,则写入0,表示消息体长度为0      } else {        out.writeInt(message.getBody().length());        out.writeCharSequence(message.getBody(), Charset.defaultCharset());      }    }  }}

对于消息的解码,其过程与上面的消息编码方式基本一致,主要是基于协议所规定的将字节流数据转换为消息实体数据。如下是其转换过程:

public class MessageDecoder extends ByteToMessageDecoder {  @Override  protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List out) throws Exception {    Message message = new Message();    message.setMagicNumber(byteBuf.readInt());  // 读取魔数    message.setMainVersion(byteBuf.readByte()); // 读取主版本号    message.setSubVersion(byteBuf.readByte()); // 读取次版本号    message.setModifyVersion(byteBuf.readByte()); // 读取修订版本号    CharSequence sessionId = byteBuf.readCharSequence(        Constants.SESSION_ID_LENGTH, Charset.defaultCharset()); // 读取sessionId    message.setSessionId((String)sessionId);    message.setMessageType(MessageTypeEnum.get(byteBuf.readByte())); // 读取当前的消息类型    short attachmentSize = byteBuf.readShort(); // 读取附件长度    for (short i = 0; i < attachmentSize; i++) {      int keyLength = byteBuf.readInt(); // 读取键长度和数据      CharSequence key = byteBuf.readCharSequence(keyLength, Charset.defaultCharset());      int valueLength = byteBuf.readInt(); // 读取值长度和数据      CharSequence value = byteBuf.readCharSequence(valueLength, Charset.defaultCharset());      message.addAttachment(key.toString(), value.toString());    }    int bodyLength = byteBuf.readInt(); // 读取消息体长度和数据    CharSequence body = byteBuf.readCharSequence(bodyLength, Charset.defaultCharset());    message.setBody(body.toString());    out.add(message);  }}
如此,我们自定义消息与字节流的相互转换工作已经完成。对于消息的处理,
主要是要根据消息的不同类型,对消息进行相应的处理,比如对于request类型消息
,要写入响应数据,对于ping消息,要写入pong消息作为回应。下面我们通过定义
Netty handler的方式实现对消息的处理:
// 服务端消息处理器public class ServerMessageHandler extends SimpleChannelInboundHandler<Message> {  // 获取一个消息处理器工厂类实例  private MessageResolverFactory resolverFactory = MessageResolverFactory.getInstance();  @Override  protected void channelRead0(ChannelHandlerContext ctx, Message message) throws Exception {    Resolver resolver = resolverFactory.getMessageResolver(message); // 获取消息处理器    Message result = resolver.resolve(message); // 对消息进行处理并获取响应数据    ctx.writeAndFlush(result); // 将响应数据写入到处理器中  }  @Override  public void channelRegistered(ChannelHandlerContext ctx) throws Exception {    resolverFactory.registerResolver(new RequestMessageResolver()); // 注册request消息处理器    resolverFactory.registerResolver(new ResponseMessageResolver());// 注册response消息处理器    resolverFactory.registerResolver(new PingMessageResolver()); // 注册ping消息处理器    resolverFactory.registerResolver(new PongMessageResolver()); // 注册pong消息处理器  }}
// 客户端消息处理器public class ClientMessageHandler extends ServerMessageHandler {  // 创建一个线程,模拟用户发送消息  private ExecutorService executor = Executors.newSingleThreadExecutor();  @Override  public void channelActive(ChannelHandlerContext ctx) throws Exception {    // 对于客户端,在建立连接之后,在一个独立线程中模拟用户发送数据给服务端    executor.execute(new MessageSender(ctx));  }  /**   * 这里userEventTriggered()主要是在一些用户事件触发时被调用,这里我们定义的事件是进行心跳检测的   * ping和pong消息,当前触发器会在指定的触发器指定的时间返回内如果客户端没有被读取消息或者没有写入   * 消息到管道,则会触发当前方法   */  @Override  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {    if (evt instanceof IdleStateEvent) {      IdleStateEvent event = (IdleStateEvent) evt;      if (event.state() == IdleState.READER_IDLE) {        // 一定时间内,当前服务没有发生读取事件,也即没有消息发送到当前服务来时,        // 其会发送一个Ping消息到服务器,以等待其响应Pong消息        Message message = new Message();        message.setMessageType(MessageTypeEnum.PING);        ctx.writeAndFlush(message);      } else if (event.state() == IdleState.WRITER_IDLE) {        // 如果当前服务在指定时间内没有写入消息到管道,则关闭当前管道        ctx.close();      }    }  }
 private static final class MessageSender implements Runnable {    private static final AtomicLong counter = new AtomicLong(1);    private volatile ChannelHandlerContext ctx;    public MessageSender(ChannelHandlerContext ctx) {      this.ctx = ctx;    }    @Override    public void run() {      try {        while (true) {          // 模拟随机发送消息的过程          TimeUnit.SECONDS.sleep(new Random().nextInt(3));          Message message = new Message();          message.setMessageType(MessageTypeEnum.REQUEST);          message.setBody("this is my " + counter.getAndIncrement() + " message.");          message.addAttachment("name", "xufeng");          ctx.writeAndFlush(message);        }      } catch (InterruptedException e) {        e.printStackTrace();      }    }  }}
上述代码中,由于客户端和服务端需要处理的消息类型是完全一样的,因而客户端
处理类继承了服务端处理类。但是对于客户端而言,其还需要定时向服务端发送心
跳消息,用于检测客户端与服务器的连接是否健在,因而客户端还会实现
userEventTriggered()方法,在该方法中定时向服务器发送心跳消息。
userEventTriggered()方法主要是在客户端被闲置一定时间后,其会根据其读取或者
写入消息的限制时长来选择性的触发读取或写入事件。

上述实现中,我们看到,对于具体类型消息的处理,我们是通过一个工厂类来获取对应的消息处理器,然后处理相应的消息,下面我们该工厂类的代码:

public final class MessageResolverFactory {  // 创建一个工厂类实例  private static final MessageResolverFactory resolverFactory = new MessageResolverFactory();  private static final List resolvers = new CopyOnWriteArrayList<>();  private MessageResolverFactory() {}  // 使用单例模式实例化当前工厂类实例  public static MessageResolverFactory getInstance() {    return resolverFactory;  }  public void registerResolver(Resolver resolver) {    resolvers.add(resolver);  }  // 根据解码后的消息,在工厂类处理器中查找可以处理当前消息的处理器  public Resolver getMessageResolver(Message message) {    for (Resolver resolver : resolvers) {      if (resolver.support(message)) {        return resolver;      }    }    throw new RuntimeException("cannot find resolver, message type: " + message.getMessageType());  }}

上述工厂类比较简单,主要就是通过单例模式获取一个工厂类实例,然后提供一个根据具体消息来查找其对应的处理器的方法。下面我们来看看各个消息处理器的代码:

// request类型的消息public class RequestMessageResolver implements Resolver {  private static final AtomicInteger counter = new AtomicInteger(1);  @Override  public boolean support(Message message) {    return message.getMessageType() == MessageTypeEnum.REQUEST;  }  @Override  public Message resolve(Message message) {    // 接收到request消息之后,对消息进行处理,这里主要是将其打印出来    int index = counter.getAndIncrement();    System.out.println("[trx: " + message.getSessionId() + "]"        + index + ". receive request: " + message.getBody());    System.out.println("[trx: " + message.getSessionId() + "]"        + index + ". attachments: " + message.getAttachments());    // 处理完成后,生成一个响应消息返回    Message response = new Message();    response.setMessageType(MessageTypeEnum.RESPONSE);    response.setBody("nice to meet you too!");    response.addAttachment("name", "xufeng");    response.addAttachment("hometown", "wuhan");    return response;  }}
// 响应消息处理器public class ResponseMessageResolver implements Resolver {  private static final AtomicInteger counter = new AtomicInteger(1);  @Override  public boolean support(Message message) {    return message.getMessageType() == MessageTypeEnum.RESPONSE;  }  @Override  public Message resolve(Message message) {    // 接收到对方服务的响应消息之后,对响应消息进行处理,这里主要是将其打印出来    int index = counter.getAndIncrement();    System.out.println("[trx: " + message.getSessionId() + "]"        + index + ". receive response: " + message.getBody());    System.out.println("[trx: " + message.getSessionId() + "]"        + index + ". attachments: " + message.getAttachments());    // 响应消息不需要向对方服务再发送响应,因而这里写入一个空消息    Message empty = new Message();    empty.setMessageType(MessageTypeEnum.EMPTY);    return empty;  }}
// ping消息处理器public class PingMessageResolver implements Resolver {  @Override  public boolean support(Message message) {    return message.getMessageType() == MessageTypeEnum.PING;  }  @Override  public Message resolve(Message message) {    // 接收到ping消息后,返回一个pong消息返回    System.out.println("receive ping message: " + System.currentTimeMillis());    Message pong = new Message();    pong.setMessageType(MessageTypeEnum.PONG);    return pong;  }}
// pong消息处理器public class PongMessageResolver implements Resolver {  @Override  public boolean support(Message message) {    return message.getMessageType() == MessageTypeEnum.PONG;  }  @Override  public Message resolve(Message message) {    // 接收到pong消息后,不需要进行处理,直接返回一个空的message    System.out.println("receive pong message: " + System.currentTimeMillis());    Message empty = new Message();    empty.setMessageType(MessageTypeEnum.EMPTY);    return empty;  }}

如此,对于自定义协议的消息处理过程已经完成,下面则是使用用Netty实现的客户端与服务端代码:

// 服务端public class Server {  public static void main(String[] args) {    EventLoopGroup bossGroup = new NioEventLoopGroup();    EventLoopGroup workerGroup = new NioEventLoopGroup();    try {      ServerBootstrap bootstrap = new ServerBootstrap();      bootstrap.group(bossGroup, workerGroup)          .channel(NioServerSocketChannel.class)          .option(ChannelOption.SO_BACKLOG, 1024)          .handler(new LoggingHandler(LogLevel.INFO))          .childHandler(new ChannelInitializer() {            @Override            protected void initChannel(SocketChannel ch) throws Exception {              ChannelPipeline pipeline = ch.pipeline();               // 添加用于处理粘包和拆包问题的处理器              pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));              pipeline.addLast(new LengthFieldPrepender(4));              // 添加自定义协议消息的编码和解码处理器              pipeline.addLast(new MessageEncoder());              pipeline.addLast(new MessageDecoder());              // 添加具体的消息处理器              pipeline.addLast(new ServerMessageHandler());            }          });      ChannelFuture future = bootstrap.bind(8585).sync();      future.channel().closeFuture().sync();    } catch (InterruptedException e) {      e.printStackTrace();    } finally {      bossGroup.shutdownGracefully();      workerGroup.shutdownGracefully();    }  }}
public class Client {  public static void main(String[] args) {    NioEventLoopGroup group = new NioEventLoopGroup();    Bootstrap bootstrap = new Bootstrap();    try {      bootstrap.group(group)          .channel(NioSocketChannel.class)          .option(ChannelOption.TCP_NODELAY, Boolean.TRUE)          .handler(new ChannelInitializer() {            @Override            protected void initChannel(SocketChannel ch) throws Exception {              ChannelPipeline pipeline = ch.pipeline();              // 添加用于解决粘包和拆包问题的处理器              pipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));              pipeline.addLast(new LengthFieldPrepender(4));              // 添加用于进行心跳检测的处理器              pipeline.addLast(new IdleStateHandler(1, 2, 0));              // 添加用于根据自定义协议将消息与字节流进行相互转换的处理器              pipeline.addLast(new MessageEncoder());              pipeline.addLast(new MessageDecoder());              // 添加客户端消息处理器              pipeline.addLast(new ClientMessageHandler());            }          });      ChannelFuture future = bootstrap.connect("127.0.0.1", 8585).sync();      future.channel().closeFuture().sync();    } catch (InterruptedException e) {      e.printStackTrace();    } finally {      group.shutdownGracefully();    }  }}

运行上述代码之后,我们可以看到客户端和服务器分别打印了如下数据:

// 客户端receive pong message: 1555123429356[trx: d05024d2]1. receive response: nice to meet you too![trx: d05024d2]1. attachments: {hometown=wuhan, name=xufeng}[trx: 66ee1438]2. receive response: nice to meet you too!// 服务器receive ping message: 1555123432279[trx: f582444f]4. receive request: this is my 4 message.[trx: f582444f]4. attachments: {name=xufeng}

3. 小结

本文首先将自定义协议与HTTP协议进行了对比,阐述了自定义协议的一些优点。然后定义了一份自定义协议,并且讲解了协议中各个字节的含义。最后通过Netty对自定义协议进行了实现,并且实现了基于自定义协议的心跳功能。

charming丶

my.oschina.net/zhangxufeng/blog/3043768

  1. 收藏了近800G古籍资料,发送【古籍】获取下载地址。

  2. 更多资料请戳【搜索】菜单,里面可自行搜索下载。

自定义协议_面试官:Netty如何实现自定义协议?尽量详细点相关推荐

  1. 引用计数器法 可达性分析算法_面试官:你说你熟悉jvm?那你讲一下并发的可达性分析...

    持续输出原创文章,点击蓝字关注我吧 上面这张图是我还是北漂的时候,在鼓楼附近的胡同里面拍的. 那天刚刚下完雨,路过这个地方的时候,一瞬间就被这五颜六色的门板和自行车给吸引了,于是拍下了这张图片.看到这 ...

  2. qps是什么意思_面试官:说说你之前负责的系统,QPS 能达到多少?

    被面试官经常问到之前开发的系统接口 QPS 能达到多少,经常给不出一个数值,支支吾吾,导致整体面试效果降低? 原因基本是一些公司中,做完功能测试就完了,压根不会有性能测试这一步,或者说并发量较少,没有 ...

  3. .jar中没有主清单属性_面试官问:为什么SpringBoot的 jar 可以直接运行?

    点击上方蓝色字体,选择"设为星标" 优质文章,及时送达 来源 | https://urlify.cn/uQvIna SpringBoot提供了一个插件spring-boot-mav ...

  4. 307 跳转会携带请求方法吗_面试官:GET和POST两种基本请求方法有什么区别

    点击上方蓝色"后端面试那些事儿",选择"设为星标" 学最好的别人,做最好的我们 来源:r6d.cn/j26B GET和POST是HTTP请求的两种基本方法,要说 ...

  5. jar打包 剔除第三方依赖以及它的依赖_面试官:为什么Spring Boot的jar可以直接运行?...

    来源:Gormat's Notes fangjian0423.github.io/2017/05/31/springboot-executable-jar/ Spring Boot Loader抽象的 ...

  6. java执行sql文件_面试官问你MyBatis SQL是如何执行的?把这篇文章甩给他

    初识 MyBatis MyBatis 是第一个支持自定义 SQL.存储过程和高级映射的类持久框架.MyBatis 消除了大部分 JDBC 的样板代码.手动设置参数以及检索结果.MyBatis 能够支持 ...

  7. redis为什么是单线程_面试官:Redis单线程为什么执行效率这么高?

    点击上方☝Java编程技术乐园,轻松关注!及时获取有趣有料的技术文章 做一个积极的人 编码.改bug.提升自己 我有一个乐园,面向编程,春暖花开! 上一篇回顾: 面试官:Redis为什么设计成单线程的 ...

  8. mysql怎么用_面试官都是这样发问的,连环冲锋炮,看你怎么抵挡(上)

    本内容来源于和尚 16 年毕业的学长,先在 58,后阿里,如今准备跳槽了,以下内容为他的最近面试经历 我最近从大厂离职之后在合肥呆了个把月,之前已经准备了半个多月,从7月底开始投简历面试,目前是jav ...

  9. firefox 接受post 不完整_面试官想听到的GET和POST两种基本请求方法的区别

    前言 HTTP定义了与服务器交互的不同方法,最常用的方法有四种Put,Delete.post,get,即增删改查. 1.Get,它用于获取信息,它只是获取.查询数据,也就是说它不会修改服务器上的数据, ...

  10. 如何把class里的vector结构体memcpy出来_面试官:请说出线程安全的 ArrayList 有哪些,除了Vector...

    以下环境是 JDK 1.8 ArrayList 的初始容量 面试官:你看过 ArrayList 的源码? Python 小星:看过 面试官:那你说下ArrayList 的初始容量是多少? Python ...

最新文章

  1. 原创 人物志|山东省临沭县 - 一位身残志坚的奋斗青年 - 吴忠军
  2. Android自定义组合控件
  3. java解惑你知道多少_Java解惑
  4. git submodule 删除及更新URL
  5. 无法从“std::_Binder std::_Unforced,SOCKET ,LPSOCKADDR,unsigned int ”转换为“int”
  6. 廉洁修身论文2000字_自主招生论文发表要求【期刊论文】自主招生论文发表要求...
  7. 问懵逼:请站在 JVM 角度谈谈 Java 的锁?
  8. php文件上传前端页面样式,HTML实现美化上传文件样式
  9. 类路径是什么意思_甲状腺结节4a类严重吗,是什么意思?怎么治疗需要手术吗?一文解答...
  10. Java 算法 寂寞的数
  11. 15种基础的可以直接使用的CSS3样式
  12. 在C语言中,SetConsoleTextAttribute(参数1,参数2)是设置控制台窗口字体颜色和背景颜色的函数。GetStdHandle(参数)函数用于获得句柄
  13. 基础知识及命令(1)
  14. 网络工程师必备工具之超级终端
  15. angular 单击和双击事件分开
  16. 麻辣江湖服务器正在维护,7月18日例行维护更新公告
  17. 在matlab中开根号,请问,在matlab里面如果输入开方号(根号)?如9的开方怎么写?...
  18. L2-029 特立独行的幸福 (25 分)
  19. PyTorch搭建LSTM实现多变量输入多变量输出时间序列预测(多任务学习)
  20. 焊接技巧 -- 拖焊

热门文章

  1. OpenSource.com 评出 2014 年十佳开源软件
  2. 实现简单的python爬虫功能
  3. springboot 指定 logback_Spring Boot日志框架实战解析
  4. 利用CURL修改页面内容
  5. dx绘制2d图像_【3D建模】聊聊2D动画软件
  6. Layer:layui.util.timeAgo 使用
  7. YAF 接口 2016-10-27
  8. javaweb中真分页案例
  9. XCode应该是从11.4开始支持Sandbox
  10. Fffmpeg:从AVFrame中由YUV获取RGB