目录

环境准备及说明

MessagePack 编解码器

MessagePack 编码器

MessagePack 解码器

POJO  User

Netty 网络通信

服务端

客户端

运行测试


环境准备及说明

如果是导入二进制开发包,则如下所示:

需要开发包的可以参考《 MessagePack 开发入门详解》。

如果是 Maven 项目,则添加如下依赖:

     <!-- https://mvnrepository.com/artifact/io.netty/netty-all --><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.30.Final</version></dependency><!-- https://mvnrepository.com/artifact/org.msgpack/msgpack --><dependency><groupId>org.msgpack</groupId><artifactId>msgpack</artifactId><version>0.6.12</version></dependency><!-- https://mvnrepository.com/artifact/org.javassist/javassist --><dependency><groupId>org.javassist</groupId><artifactId>javassist</artifactId><version>3.24.0-GA</version></dependency>

1)netty-all:是 Netty 开发包

2)msgpack:是 Messagepck 序列化开发包

3)javassist:是 msgpack 自己的依赖包

本文示例项目结构如下:

1)User:网络传输的 POJO 对象,注意:序列化 POJO 必须加 org.msgpack.annotation.Message 注解:@Message

2)echo:包中为 netty 通信的客户端与服务端

3)messagepack:包中为 MessagePack 编解码器

特别提醒:

1)虽然 MessagePack 用于序列化对象,但是普通 String、Integer 等等同样也是对象,所以照样可以传输普通的字符串等消息
2)需要序列化的 POJO 对象上必须加上 org.msgpack.annotation.Message 注解:@Message,否则传输会失败,而且也不报错,很难排查
3)MessagePack 序列化对象后的消息,经过发送后,接收端 channelRead(ChannelHandlerContext ctx, Object msg)
    3.1)即使发送的是 User 对象,接收端的 msg 也不能进行 User user = (User)msg 强转,否则客户端会被强制断开连接
    3.2)如果发送的是 User 对象,接收端可以转为 List<Object> objects = (List<Object>) msg,list 中的元素对应 User 的属性值
    3.3)如果发送的不是 POJO 对象,而是简单的 String 对象,则不能转为 List<Object>,否则客户端也会被强制断开

MessagePack 编解码器

利用 Netty 的编解码框架可以非常方便的集成第三方序列化框架,Netty 预集成了几种常用的编解码框架,用户也可以根据自己项目的实际情况集成其它编解码框架,或者进行自定义。

MessagePack 编码器

package com.example.messagepack;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;/*** Created by Administrator on 2018/11/25 0025.* MessagePack 编码器 —— 继承 Netty 的 MessageToByteEncoder,比重写方法*/
public class MsgpackEncoder extends MessageToByteEncoder<Object> {/*** 重写方法,负责将 Object 类型的 POJO 对象编码为 byte 数组,然后写入 ByteBuf 中** @param channelHandlerContext* @param o* @param byteBuf* @throws Exception*/@Overrideprotected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) throws Exception {MessagePack messagePack = new MessagePack();/** 序列化对象*/byte[] raw = messagePack.write(o);byteBuf.writeBytes(raw);}
}

MessagePack 解码器

package com.example.messagepack;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;import java.util.List;/*** Created by Administrator on 2018/11/25 0025.* MessagePack 解码器 - 继承 Netty 的 MessageToMessageDecoder,并重写方法*/
public class MsgpackDecoder extends MessageToMessageDecoder<ByteBuf> {/*** 重写方法,首先从数据报 byteBuf 中获取需要解码的 byte 数组,* 然后调用 MessagePack 的 read 方法将其反序列化为 Object 对象,将解码后的对象加入到解码列表 list 中,* 这样就完成了 MessagePack 的解码操作** @param channelHandlerContext* @param byteBuf* @param list* @throws Exception*/@Overrideprotected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {int length = byteBuf.readableBytes();byte[] array = new byte[length];byteBuf.getBytes(byteBuf.readerIndex(), array, 0, length);MessagePack messagePack = new MessagePack();list.add(messagePack.read(array));}
}

POJO  User

package com.example.domain;import org.msgpack.annotation.Message;import java.util.Date;/*** Created by Administrator on 2018/11/25 0025.* 用户 实体* 需要序列化的 POJO 对象上必须加上 org.msgpack.annotation.Message 注解:@Message*/
@Message
public class User {private Integer pId;private String pName;private Date birthday;private Boolean isMarry;public Date getBirthday() {return birthday;}public void setBirthday(Date birthday) {this.birthday = birthday;}public Integer getpId() {return pId;}public void setpId(Integer pId) {this.pId = pId;}public String getpName() {return pName;}public void setpName(String pName) {this.pName = pName;}public Boolean getIsMarry() {return isMarry;}public void setIsMarry(Boolean isMarry) {this.isMarry = isMarry;}@Overridepublic String toString() {return "User{" +"birthday=" + birthday +", pId=" + pId +", pName='" + pName + '\'' +", isMarry=" + isMarry +'}';}
}

Netty 网络通信

首先模拟的情况是:客户端连接上服务器后,给服务器连发消息,服务器接收后会将原信息进回复,同时会解决 TCP 粘包与拆包。会使用 Netty 的 LengthFieldPrepender、LengthFieldBasedFrameDecoder 编解码器处理半包消息,不会出现 TCP 粘包/拆包。

服务端

EchoServer 内容如下:

package com.example.echo;import com.example.messagepack.MsgpackDecoder;
import com.example.messagepack.MsgpackEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;/*** Created by Administrator on 2018/11/11 0011.* Echo 服务器*/
public class EchoServer {public static void main(String[] args) {int port = 9898;new EchoServer().bind(port);}public void bind(int port) {/*** interface EventLoopGroup extends EventExecutorGroup extends ScheduledExecutorService extends ExecutorService* 配置服务端的 NIO 线程池,用于网络事件处理,实质上他们就是 Reactor 线程组* bossGroup 用于服务端接受客户端连接,workerGroup 用于进行 SocketChannel 网络读写*/EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {/** ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类,用于降低开发难度* */final ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)// 设置TCP连接超时时间.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {System.out.println(Thread.currentThread().getName() + ",服务器初始化通道...");/*** 为了处理半包消息,添加如下两个 Netty 内置的编解码器* LengthFieldPrepender:前置长度域编码器——放在MsgpackEncoder编码器前面* LengthFieldBasedFrameDecoder:长度域解码器——放在MsgpackDecoder解码器前面* 关于 长度域编解码器处理半包消息,本文不做详细讲解,会有专门篇章进行说明*/ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));ch.pipeline().addLast("MessagePack encoder", new MsgpackEncoder());ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));ch.pipeline().addLast("MessagePack Decoder", new MsgpackDecoder());ch.pipeline().addLast(new EchoServerHandler());}});/**服务器启动辅助类配置完成后,调用 bind 方法绑定监听端口,调用 sync 方法同步等待绑定操作完成*/ChannelFuture f = b.bind(port).sync();System.out.println(Thread.currentThread().getName() + ",服务器开始监听端口,等待客户端连接.........");/**下面会进行阻塞,等待服务器连接关闭之后 main 方法退出,程序结束* */f.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {/**优雅退出,释放线程池资源*/bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

EchoServerHandler 内容如下:

package com.example.echo;import com.example.domain.User;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;/*** Created by Administrator on 2017/5/16.* ChannelInboundHandlerAdapter extends ChannelHandlerAdapter 用于对网络事件进行读写操作*/
public class EchoServerHandler extends ChannelInboundHandlerAdapter {/*** 因为多线程,所以使用原子操作类来进行计数*/private static AtomicInteger atomicInteger = new AtomicInteger();/*** 收到客户端消息,自动触发** @param ctx* @param msg* @throws Exception*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println((atomicInteger.addAndGet(1)) + "--->" + Thread.currentThread().getName() + ",The server receive  order : " + msg);/*** 如果传输的是 POJO 对象,则可以转成 List<Object>* list 中的每一个元素都是发送来的 POJO 对象的属性值* 注意:如果对方传输只是简单的 String 对象,则不能强转为 List<Object>*//* List<Object> objects = (List<Object>) msg;for (Object obj : objects) {System.out.println("属性:" + obj);}*//*** 服务端接收到客户端发送来的数据后,再回发给客户端*/ctx.writeAndFlush(msg);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("-----客户端关闭:" + ctx.channel().remoteAddress());/**当发生异常时,关闭 ChannelHandlerContext,释放和它相关联的句柄等资源 */ctx.close();}
}

客户端

EchoClient 内容如下:

package com.example.echo;import com.example.messagepack.MsgpackDecoder;
import com.example.messagepack.MsgpackEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.string.StringDecoder;/*** Created by Administrator on 2017/5/16.* Echo 客户端*/
public class EchoClient {/*** 使用 2 个线程模拟 2 个客户端** @param args*/public static void main(String[] args) {for (int i = 0; i < 2; i++) {new Thread(new MyThread()).start();}}static class MyThread implements Runnable {@Overridepublic void run() {connect("192.168.1.20", 9898);}public void connect(String host, int port) {/**配置客户端 NIO 线程组/池*/EventLoopGroup group = new NioEventLoopGroup();try {/**Bootstrap 与 ServerBootstrap 都继承(extends)于 AbstractBootstrap* 创建客户端辅助启动类,并对其配置,与服务器稍微不同,这里的 Channel 设置为 NioSocketChannel* 然后为其添加 Handler,这里直接使用匿名内部类,实现 initChannel 方法* 作用是当创建 NioSocketChannel 成功后,在进行初始化时,将它的ChannelHandler设置到ChannelPipeline中,用于处理网络I/O事件*/Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)// 设置TCP连接超时时间.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {System.out.println(Thread.currentThread().getName() + ",客户端初始化管道...");/*** 为了处理半包消息,添加如下两个 Netty 内置的编解码器* LengthFieldPrepender:前置长度域编码器——放在MsgpackEncoder编码器前面* LengthFieldBasedFrameDecoder:长度域解码器——放在MsgpackDecoder解码器前面* 关于 长度域编解码器处理半包消息,本文不做详细讲解,会有专门篇章进行说明*/ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));ch.pipeline().addLast("MessagePack encoder", new MsgpackEncoder());ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65535, 0, 2, 0, 2));ch.pipeline().addLast("MessagePack Decoder", new MsgpackDecoder());ch.pipeline().addLast(new EchoClientHandler());}});/**connect:发起异步连接操作,调用同步方法 sync 等待连接成功*/ChannelFuture channelFuture = b.connect(host, port).sync();System.out.println(Thread.currentThread().getName() + ",客户端发起异步连接..........");/**等待客户端链路关闭*/channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();} finally {/**优雅退出,释放NIO线程组*/group.shutdownGracefully();}}}
}

EchoClientHandler 内容如下:

package com.example.echo;import com.example.domain.User;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;/*** Created by Administrator on 2017/5/17.* 用于对网络事件进行读写操作*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {/*** 因为 Netty 采用线程池,所以这里使用原子操作类来进行计数*/private static AtomicInteger atomicInteger = new AtomicInteger();/*** 当客户端和服务端 TCP 链路建立成功之后,Netty 的 NIO 线程会调用 channelActive 方法*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {/*** 多余 数组、List、Set、Map 等,对立面的元素逐个进行发送,则对方也是逐个接收* 否则如果直接发送 数组、List、Set、Map 等,则对方会统一接收* 注意:因为使用LengthFieldPrepender、LengthFieldBasedFrameDecoder编解码器处理半包消息* 所以这里连续发送也不会出现 TCP 粘包/拆包*/List<User> users = getUserArrayData();for (User user : users) {ctx.writeAndFlush(user);}ctx.writeAndFlush("我是普通的字符串消息" + Thread.currentThread().getName());}/*** 当服务端返回应答消息时,channelRead 方法被调用,从 Netty 的 ByteBuf 中读取并打印应答消息*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {System.out.println((atomicInteger.addAndGet(1)) + "---" + Thread.currentThread().getName() + ",Server return Message:" + msg);}/*** 当发生异常时,打印异常 日志,释放客户端资源*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {/**释放资源*/ctx.close();}/*** 设置网络传输的 POJO 对象数组/列表** @return*/public List<User> getUserArrayData() {User[] users = new User[5];User loopUser = null;for (int i = 0; i < 5; i++) {loopUser = new User();loopUser.setpId(i + 1);loopUser.setpName("华安" + Thread.currentThread().getName());loopUser.setIsMarry(true);loopUser.setBirthday(new Date());users[i] = loopUser;}return Arrays.asList(users);}
}

运行测试

先运行服务器,再运行客户端。

Netty 整合 MessagePack 序列化框架 + LengthFieldBasedFrameDecoder 自定义解码器相关推荐

  1. netty编解码器与序列化框架分析

    netty编解码器分析 编码(Encode)也称为序列化(serialization),它将对象序列化为字节数组,用于网络传输.数据持久化或者其它用途. 反之,解码(Decode)也称为反序列化(de ...

  2. netty实战-自定义解码器处理半包消息

    概述 在李林锋的Netty系列之Netty编解码框架分析中介绍了各种解码器,也推荐组合 LengthFieldBasedFrameDecoder ByteToMessageDecoder 这两个解码器 ...

  3. Netty源码分析系列之常用解码器(下)——LengthFieldBasedFrameDecoder

    扫描下方二维码或者微信搜索公众号菜鸟飞呀飞,即可关注微信公众号,Spring源码分析和Java并发编程文章. 前言 在上一篇文章中分析了三个比较简单的解码器,今天接着分析最后一个常用的解码器:Leng ...

  4. spring整合各种RPC框架(netty、dubbo、dubbox、gRPC、Motan)-续(Motan)

    继上文 :spring整合各种RPC框架(netty.dubbo.dubbox.gRPC.Motan)-续(gRPC) Motan相关介绍? Motan是新浪微博开源的一套基于java开发的RPC框架 ...

  5. java基础巩固-宇宙第一AiYWM:为了维持生计,手写RPC~Version07(RPC原理、序列化框架们、网络协议框架们 、RPC 能帮助我们做什么呢、RPC异常排查:ctrl+F搜超时)整起

    上次Version06说到了咱们手写迷你版RPC的大体流程, 对咱们的迷你版RPC的大体流程再做几点补充: 为什么要封装网络协议,别人说封装好咱们就要封装?Java有这个特性那咱就要用?好像是这样.看 ...

  6. Netty使用kryo序列化传输对象

    Netty使用kryo序列化传输对象 横渡 Netty使用kryo序列化传输对象 - 简书参考文章:https://blog.csdn.net/eguid_1/article/details/7931 ...

  7. [强烈推荐] 新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析

    新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析 1.引言 Netty 是一个广受欢迎的异步事件驱动的Java开源网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端. 本文基 ...

  8. Spring笔记 整合SSM[Struts2框架] 万神小栈

    一 spring概述 1.1 web项目开发中的耦合度问题 微信小程序搜索 万神小栈 更多资源等你发现! 如果文章对你有帮助别忘了点赞加关注喔~ 在servlet中需要调用service中的方法,则需 ...

  9. websocket性能低?教你使用netty整合websocket(二)——实现点对点聊天(客户端与客户端通信)

    前提 了解如何实现客户端和服务端通讯 上一篇博客--SpringBoot+Netty整合websocket(一)--客户端和服务端通讯 实现点对点聊天 后端 1.建立服务端WebSocketNetty ...

  10. Netty整合Disruptor实战

    1.Netty实现服务端与客户端数据传输 1).依赖 <dependency><groupId>io.netty</groupId><artifactId&g ...

最新文章

  1. [Git/Github] ubuntu 14.0 下github 配置
  2. WINRAR 命令行语法
  3. Pthread:POSIX Threads Programming
  4. 基于dsp_builder的算法在FPGA上的实现(转自https://www.cnblogs.com/sunev/archive/2012/11/17/2774836.html)...
  5. centos7 tomcat_centos7中利用logrotate工具切割日志,以tomcat日志为例
  6. 文献记录(part55)--基于分布式非负矩阵分解的大规模主题社区挖掘
  7. java中使用es的dsl_基于DSL的基于图论的Java程序中输入图数据的方法
  8. 第十期:快来了解这五种热门的开发技能
  9. python定义栈_Python栈实现
  10. python2.7个3.7之间传输文件_关于将python2.7转换为python3.7的说明,python27,python37,散记...
  11. java 父类_java 调用父类的父类
  12. 怎么做c语言的子程序,哪位师傅知道51单片机怎样编写子程序?C语言的。在主程序里调...
  13. 手机鸿蒙系统体验,首位!魅族Lipro智能家居宣布接入鸿蒙系统,魅族手机还会远吗?...
  14. 旋转区域_高空旋转雾化机雾桩应用场所、高压喷雾立杆式降尘设备,高压微雾除尘系统原理以及优势...
  15. 免费讲座:数据库工程实施中的性能保证
  16. 计算机单机游戏c0005错误,堡垒之夜Epic Games Launcher错误怎么办错误解决方法介绍...
  17. 明解C语言(入门篇)第十章
  18. 机器学习算法初识—二分k均值算法
  19. C语言-编写函数isprime(int a),用来判断自变量a是否为素数。若是素数,函数返回整数1,否则返回0。
  20. seowhy论坛 seo技术 seo搜索引擎排名 seo交流学习 网站快速收录操作方法

热门文章

  1. 孙鑫VC学习笔记:第十一讲 (六) 图形重绘方法二 利用元文件
  2. 【大数据部落】r语言使用rjags R2jags建立贝叶斯模型
  3. mysql职业要求_运维职业要求
  4. 多标签图像分类任务的评价方法——mAP
  5. python的sorted用法
  6. SQLServer数据库写操作报错String or binary data would be truncated问题解决
  7. 20170910算法工程师在线笔试之求第n个丑数
  8. linux基础(三)——yum的使用
  9. VMware12安装虚拟机教程、Ubuntu16.04安装教程
  10. 2021-08-13servlet 原理及注意事项