前言

通过之前的学习,我们知道使用netty在客户端和服务端之间很容易发送一个文本类的消息,但在实际应用中这肯定是不够用的,像java中使用最多的对象这种形式的数据,在客户端和服务端通信的时候也必然会涉及,那么netty作为通信的框架,也会有相应的支持

其实在所有的通信过程中,不管是普通的文本消息还是对象消息甚至是其他格式的数据,在传输过程中,对方解析的时候就会涉及到序列化和反序列化的问题,同样,使用netty在进行客户端与服务端消息传输时,如果发送对象类的消息,就需要借助序列化和反序列化对应的组件,下面介绍两种常用的方式,使用protostuff和marshalling分别发送对象消息

方式一,使用protostuff

序列化方式有很多中,JDK自带的方式,JSON方式,google的Protobuf,google原生的protobuffer使用起来相当麻烦,首先要写.proto文件,然后编译.proto文件,生成对应的.java文件,试了一次,发现真的很麻烦;总结来说,Protobuf是一种序列化工具,相对于其他的序列化组件,Protobuf功能更加丰富和强大,下面开始整合过程,

1、导入pom依赖

     <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>5.0.0.Alpha2</version><!-- <version>4.1.24.Final</version> --></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.1.7</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.1.7</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-access</artifactId><version>1.1.7</version></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-api</artifactId></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-core</artifactId></dependency><dependency><groupId>io.protostuff</groupId><artifactId>protostuff-runtime</artifactId></dependency>

2、构造一个简单的对象
由于是模拟,这里只弄了两个简单的属性,使用了toString方法,方便后面看效果

public class Person implements Serializable {private static final long  SerialVersionUID = 1L;private int id;private String name;public int getId() {return id;}public String getName() {return name;}public void setId(int id) {this.id = id;}public void setName(String name) {this.name = name;}public Person(int id, String name) {this.id = id;this.name = name;}public Person() {}@Overridepublic String toString() {return "Person{" +"id=" + id +", name='" + name + '\'' +'}';}
}

3、netty服务端server
通过之前的篇章的学习,想必大家都了解了netty编程的风格和特点了吧,这里就直接上代码了

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public class NettyServer {public static void main(String[] args) throws Exception{EventLoopGroup boss = new NioEventLoopGroup(1);EventLoopGroup worker = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 128).handler(new LoggingHandler(LogLevel.TRACE))//自定义ServerInitialize,方便后续代码维护.childHandler(new ServerInitialize());  ChannelFuture future = bootstrap.bind(8899).sync();System.out.println("server start in port:8899");future.channel().closeFuture().sync();boss.shutdownGracefully();worker.shutdownGracefully();}}

4、server端自定义ServerInitialize
网上有很多资料喜欢把ServerInitialize这个类直接写到server类中,虽然效果是一样的,但从代码的优雅性和可维护角度看并不是很好,而且官方也不建议这么做,最好做成一个类去继承ChannelInitializer,在这个类里面添加各种自定义处理逻辑,复合netty的编程特点

public class ServerInitialize extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new LengthFieldBasedFrameDecoder(10240, 0, 4, 0, 4));pipeline.addLast(new ProtostuffDecoder());pipeline.addLast(new ProtoStuffServerHandler());}}

这个LengthFieldBasedFrameDecoder中的相关参数,可以根据实际传输对象的大小进行自定义设置,ProtostuffDecoder为服务端对象编码的工具类,代码如下,

//用将ByteBuf中的数据转换成Person对象
public class ProtostuffDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {Schema<Person> schema = RuntimeSchema.getSchema(Person.class);Person person = schema.newMessage();byte[] array = new byte[in.readableBytes()];in.readBytes(array);ProtobufIOUtil.mergeFrom(array, person, schema);out.add(person);}}

简答解释一下这个逻辑,其实这里做对象的解析是由两步构成的,如下圈起来的部分:
1、服务器端先用LengthFieldBasedFrameDecoder进行解码,获取到一个完整的ByteBuf消息;
2、然后ProtostuffDecoder解码器将上一步解码出来的消息,转换成一个Person对象,也就是ProtostuffDecoder 工具类要做的事情

由于netty的handler的处理逻辑是链式的,在真正实现自己的handler逻辑之前,前两步已经把对象进行反序列化了,那么在自己的handler逻辑中就可以直接使用了,这也是为什么将自定义的handler放在最后的原因

4、server端自定义handler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class ProtoStuffServerHandler extends SimpleChannelInboundHandler {private static Logger logger = LoggerFactory.getLogger(ProtoStuffServerHandler.class);private int counter = 0;@Overrideprotected void messageReceived(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {Person person = (Person) msg;logger.info("当前是第[{}]次获取到客户端发送过来的person对象[{}].", ++counter, person.toString());}/** 当发生了异常时,次方法调用 */@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.error("error:", cause);ctx.close();}}

这里面的逻辑比较简单,只是单纯的接收数据,打印数据,在真实业务中,接收到数据后,可以进行其他的业务逻辑处理,这里不继续延伸了,以上就是服务端的全部代码,下面看客户端代码部分

5、客户端server

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) throws Exception {EventLoopGroup group = new NioEventLoopGroup(1);Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ClientInitialize());ChannelFuture future = bootstrap.connect("localhost", 8899).sync();System.out.println("client connect server.");future.channel().closeFuture().sync();group.shutdownGracefully();}}

5、客户端自定义ClientInitialize

public class ClientInitialize extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new LengthFieldPrepender(4));pipeline.addLast(new ProtostuffEncoder());pipeline.addLast(new ProtostuffClientHandler());}}

在这个类里面,客户端是发送对象数据的一方,因此需要一个方法,将对象进行编码,即ProtostuffEncoder,

//用于将Person对象编码成字节数组
public class ProtostuffEncoder extends MessageToByteEncoder<Person> {@Overrideprotected void encode(ChannelHandlerContext ctx, Person msg, ByteBuf out) throws Exception {LinkedBuffer buffer = LinkedBuffer.allocate(1024);Schema<Person> schema = RuntimeSchema.getSchema(Person.class);byte[] array = ProtobufIOUtil.toByteArray(msg, schema, buffer);out.writeBytes(array);}}

6、客户端自定义handler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ProtostuffClientHandler extends SimpleChannelInboundHandler {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Person person;for (int i = 0; i < 10; i++) {person = new Person();person.setId(i);person.setName("张三丰" + i);ctx.writeAndFlush(person);}}@Overrideprotected void messageReceived(ChannelHandlerContext channelHandlerContext, Object o) {}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("客户端发生异常:" + cause);ctx.close();}
}

为了演示效果,在channelActive这个回调方法中,即客户端连接服务端建立通信的channel的时候就直接发送10条数据,当然也可以在messageReceived这个里面编写,以上是客户端的所有代码,下面我们启动程序看一下效果,

启动服务端,

启动客户端,

启动完毕客户端,再看服务端的控制台,可以看到,服务端成功收到了客户端发送过来的对象消息

方式二,使用Marshalling发送对象消息

过程和上传差不多,
1、导入依赖

 <dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling-serial</artifactId><version>2.0.0.Beta2</version></dependency>

2、Marshalling编解码工具类

import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;
import io.netty.handler.codec.marshalling.*;
import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;/*** 对象编解码工具类*/
public class MarshallingCodeCFactory {/*** 创建Jboss Marshalling解码器MarshallingDecoder* @return MarshallingDecoder*/public static MarshallingDecoder buildMarshallingDecoder() {//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");//创建了MarshallingConfiguration对象,配置了版本号为5final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);//根据marshallerFactory和configuration创建providerUnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);return decoder;}/*** 创建Jboss Marshalling编码器MarshallingEncoder* @return MarshallingEncoder*/public static MarshallingEncoder buildMarshallingEncoder() {final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");final MarshallingConfiguration configuration = new MarshallingConfiguration();configuration.setVersion(5);MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组MarshallingEncoder encoder = new MarshallingEncoder(provider);return encoder;}}

这个工具类将会被客户端或服务端在任何地方使用,即在传输对象的方法里均可以使用

3、服务端server

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer {public static void main(String[] args) throws Exception{EventLoopGroup boss = new NioEventLoopGroup();EventLoopGroup worker = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(boss, worker).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024).childHandler(new ServerInitialize());ChannelFuture future = bootstrap.bind(8888).sync();System.out.println("server start in port:8888");future.channel().closeFuture().sync();boss.shutdownGracefully();worker.shutdownGracefully();}}

4、服务端自定义ServerInitialize

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;public class ServerInitialize extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(MarshallingCodeCFactory.buildMarshallingDecoder());pipeline.addLast(MarshallingCodeCFactory.buildMarshallingEncoder());pipeline.addLast(new ServerHandler());}}

这里面可能有些小伙伴不太理解,服务端为什么要把编解码的两个方法都要加上了,因为在实际使用中,服务端也有可能要发对象消息给客户端使用

5、服务端自定义handler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;public class ServerHandler extends SimpleChannelInboundHandler {/*** 接收到消息的时候触发* @param channelHandlerContext* @param msg*/@Overrideprotected void messageReceived(ChannelHandlerContext channelHandlerContext, Object msg) {Person person = (Person) msg;System.out.println("收到了客户端发过来的对象消息:" + person.toString());channelHandlerContext.channel().writeAndFlush("已经确认收到消息了");}/*** 抛出异常的时候触发* @param ctx* @param cause* @throws Exception*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);ctx.close();}
}

6、客户端server

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient {public static void main(String[] args) throws Exception {EventLoopGroup workerGroup = new NioEventLoopGroup();//负责处理任务Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) {socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());//添加处理器socketChannel.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());//添加处理器socketChannel.pipeline().addLast(new ClientHandler());}}).option(ChannelOption.SO_BACKLOG, 1024).option(ChannelOption.SO_KEEPALIVE, true);Person request = new Person();request.setName("xxxx");request.setId(20);ChannelFuture f = bootstrap.connect("127.0.0.1",8888).sync();f.channel().writeAndFlush(request);f.channel().closeFuture().sync();workerGroup.shutdownGracefully();}}

7、客户端handler

public class ClientHandler extends SimpleChannelInboundHandler {@Overrideprotected void messageReceived(ChannelHandlerContext channelHandlerContext, Object msg) {String message = (String)msg;System.out.println("来自服务端的消息:" + message);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {super.exceptionCaught(ctx, cause);ctx.close();}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Person person = new Person();person.setId(111);person.setName("张三丰");ctx.writeAndFlush(person);}
}

到这里,客户端和服务端的所有代码就全部编写完了,下面运行一下程序

启动服务端,

启动客户端,这个里面,由于服务端先启动的,客户端只要一启动就立即发送了消息,

再看服务端的控制台,也成功收到了两个对象的消息,同时回应给客户端的确认消息也在客户端的控制台上打印了出来

综上,就是本篇关于使用netty发送对象消息的两种方式的全部内容,希望对看到的小伙伴们有用,最后,感谢观看!

netty发送对象消息相关推荐

  1. 006_Topic消息模式发送对象消息

    1. 新建一个名为JMSActiveMQObjectMessage的Java项目, 同时拷入相关jar包 2. 编辑User.java package com.jms.activemq.tom;imp ...

  2. 使用Java实现发送微信消息(附源码)_此程序在手再也不怕对象跟你闹了

    使用Java实现发送微信消息(附源码)_此程序在手再也不怕对象跟你闹了 此程序在手再也不怕女朋友跟你闹了!!!!自从有了女朋友比如:早安.晚安之类的问候语可不能断,但是也难免有时候会忘记那么该咋么办呢 ...

  3. gprs发送信号对方如何接收_和接收缓冲区比较:Netty发送缓冲区是如何设计的,why?...

    点击上方蓝字关注我吧! 本篇文章大概3300字,阅读时间大约10分钟 前面文章,透彻分析了Netty的接收缓冲区优化的套路和实现细节,以及写数据和刷新数据的宏观流程和细节: 从源码出发:在宏观上把握N ...

  4. 微信小程序发送模板消息,php发送模板消息

    微信小程序开发交流qq群   173683895    承接微信小程序开发.扫码加微信. formId 在安卓系统是纯数字,在IOS系统是一串加密字符,如图: 发送模板消息(服务通知)效果图: 前端 ...

  5. Android发送短消息程序的总结

    1.1      准备的基础知识 1.1.1          PendingIntent类 pendingIntent字面意义:等待的,未决定的Intent. 要得到一个pendingIntent对 ...

  6. 微信公众账号开发-发送模板消息

    内容概要 本篇文章主要叙述如何在微信公众帐号上实现"发送模板消息开发"功能.包含json的封装与解析. 何谓模板消息 为了保证用户不受到骚扰,在开发者出现需要主动提醒.通知用户时, ...

  7. Runtime-消息发送和消息转发

    消息发送 消息发送举例:下面这个OC代码 [person read:book]; 会被编译成: objc_msgSend(person, @selector(read:), book); objc_m ...

  8. 004_Queue消息模式发送文本消息

    1. 新建一个名为JMSActiveMQTextMessage的Java项目, 同时拷入相关jar包 2. 相关jar包可以在apache-activemq-5.16.1\lib目录下找到 3. 编辑 ...

  9. ActiveMQ 部署及发送接收消息

    ActiveMQ 部署及发送接收消息 一.           下载 下载地址:http://activemq.apache.org/ 我这里使用的版本为当前最新5.8.0. 下载版本有Windows ...

最新文章

  1. window远程桌面连接centos7
  2. servlet.xml 出现 Referenced file contains errors(http://.......)
  3. Eclipse无法找到Java EE Tools选项问题解决方案
  4. FPGA大数据之我认为的明天
  5. 判断字符串NSString是否是整形或者浮点型
  6. python array 使用创建10万浮点数
  7. 软件配置管理(四)代码味道与重构
  8. linux 从光盘安装数据,LINUX访问光盘数据
  9. vscode 结束_21 个VSCode 快捷键,让代码更快,更有趣
  10. Java—这把线程池ThreadPoolExecutor操作,你学会了吗?
  11. WinForm+ADO.net应用(二)+ 例子源码
  12. VMware Workstation 16.2 Pro for Windows SLIC 2.6 Unlocker
  13. android屏幕大小字体大小,Android字体大小自适应不同分辨率的解决办法
  14. android 腾讯x5浏览器,【Android Web】腾讯X5浏览器的集成与常见问题
  15. 【计算机网络】Socket编程
  16. jquery-weui扩展功能Picker实现营业时间选择
  17. 闭环控制和PID在闭环控制中的作用以及程序编写
  18. RGB、YUY2、YUYV、YVYU、UYVY、AYUV格式详解
  19. java网店系统_关于java网店系统的404页面 你知道多少
  20. python字符串知识点_python字符串的相关知识点

热门文章

  1. sysctl修改内核参数
  2. POJ 2431 Expedition 优先队列
  3. 博客真的停止维护了,找我请@我。
  4. MCSE第六课-DHCP
  5. 前端基础7:a标签常用方法和元素居中方式,响应式@media
  6. Jarvis Oj Pwn 学习笔记-level3
  7. JQ js选择节点操作
  8. 手机端页面自适应解决方案-rem布局
  9. Python将迁移到GitHub
  10. Android之Fragment