java 网络 序列化_Java网络通信基础系列-Netty序列化
一.Netty序列化介绍
序列化管理操作:Java原生实现(性能比较差)、JSON(Restful)、MessagePack、Marshalling、AVRO、....
netty本身直接支持有原生的 Java序列化操作,直接配置已有的程序类即可
MessagePack:类似于JSON,但是要比JSON传输的更加小巧同时速度也快。它定义一个自己的压缩算法,例如:boolean只有true和false,但是有了MP就可以通过0和1描述了;
Marshalling:使用JBoss实现的序列化处理操作,是基于传统的Java序列化形式的一种升级。
JSON:是一种标准做法,但是JSON需要清楚的问题是传输体积要大,但是传输的信息明确,本次使用FastJSON操作。
二.实例
1.公共netty类
Member.java
packagecom.bijian.vo;
importjava.io.Serializable;
public class Member implementsSerializable {
privateString mid ;
privateString name ;
privateInteger age ;
privateDouble salary ;
publicMember() {}
publicMember(String mid,String name,Integer age,Double salary) {
this.mid =mid ;
this.name =name ;
this.age =age ;
this.salary =salary ;
}
publicString getMid() {
returnmid;
}
public voidsetMid(String mid) {
this.mid =mid;
}
publicString getName() {
returnname;
}
public voidsetName(String name) {
this.name =name;
}
publicInteger getAge() {
returnage;
}
public voidsetAge(Integer age) {
this.age =age;
}
publicDouble getSalary() {
returnsalary;
}
public voidsetSalary(Double salary) {
this.salary =salary;
}
@Override
publicString toString() {
return "Member{" +
"mid='" + mid + '\'' +
", name='" + name + '\'' +
", age=" + age +
", salary=" + salary +
'}';
}
}
EchoServerHandler.java
packagecom.bijian.http.server.server.handler;
importcom.bijian.vo.Member;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelInboundHandlerAdapter;
importio.netty.util.ReferenceCountUtil;
/**
* 处理Echo的操作方式,其中ChannelInboundHandlerAdapter是针对于数据输入的处理
* Netty是基于NIO的一种开发框架的封装,这里面和AIO是没有任何关系的。
*/
public class EchoServerHandler extendsChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
try{
System.out.println(msg.getClass() + " **************");
Member member =(Member) msg ;
System.err.println("{服务器}" +member);
member.setName("【ECHO】" +member.getName());
ctx.writeAndFlush(member); // 回应的输出操作
} finally{
ReferenceCountUtil.release(msg) ; // 释放缓存
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
cause.printStackTrace();
ctx.close() ;
}
}
EchoClientHandler.java
packagecom.bijian.http.server.client.handler;
importcom.bijian.vo.Member;
importio.netty.channel.ChannelHandlerContext;
importio.netty.channel.ChannelInboundHandlerAdapter;
importio.netty.util.ReferenceCountUtil;
/**
* 需要进行数据的读取操作,服务器端处理完成的数据信息会进行读取
*/
public class EchoClientHandler extendsChannelInboundHandlerAdapter {
private static final int REPEAT = 500;// 消息重复发送次数
@Override
public void channelActive(ChannelHandlerContext ctx) throwsException {
Member member = new Member(); // 现在直接进行对象的发送
member.setMid("xiaoli");
member.setName("小李老师");
member.setAge(16);
member.setSalary(1.1);
ctx.writeAndFlush(member) ; // 直接进行对象实例发送
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {
// 只要服务器端发送完成信息之后,都会执行此方法进行内容的输出操作
try{
Member member =(Member) msg ;
System.out.println(member); // 输出服务器端的响应内容
} finally{
ReferenceCountUtil.release(msg); // 释放缓存
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
cause.printStackTrace();
ctx.close();
}
}
EchoServerMain.java
packagecom.bijian.http.server.server.main;
importcom.bijian.http.server.server.EchoServer;
public classEchoServerMain {
public static void main(String[] args) throwsException {
newEchoServer().run();
}
}
EchoClientMain.java
packagecom.bijian.http.server.client.main;
importcom.bijian.http.server.client.EchoClient;
public classEchoClientMain {
public static void main(String[] args) throwsException {
newEchoClient().run();
}
}
2.使用Java原生的序列化程序管理实现对象网络传输
项目结构如下:
EchoServer.java
packagecom.bijian.http.server.server;
importcom.bijian.http.server.server.handler.EchoServerHandler;
importcom.bijian.info.HostInfo;
importio.netty.bootstrap.ServerBootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.SocketChannel;
importio.netty.channel.socket.nio.NioServerSocketChannel;
importio.netty.handler.codec.LengthFieldBasedFrameDecoder;
importio.netty.handler.codec.LengthFieldPrepender;
importio.netty.handler.codec.serialization.ClassResolvers;
importio.netty.handler.codec.serialization.ObjectDecoder;
importio.netty.handler.codec.serialization.ObjectEncoder;
/**
* 实现了基础的线程池与网络连接的配置项
*/
public classEchoServer {
public void run() throws Exception { // 进行服务器端的启动处理
// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量
// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)
EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池
System.out.println("服务器启动成功,监听端口为:" +HostInfo.PORT);
try{
// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 服务器端
// 设置要使用的线程池以及当前的Channel类型
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
// 接收到信息之后需要进行处理,于是定义子处理器
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throwsException {
socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled((this.getClass().getClassLoader())))) ;
socketChannel.pipeline().addLast(newObjectEncoder()) ;
socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器
}
});
// 可以直接利用常亮进行TCP协议的相关配置
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// ChannelFuture描述的时异步回调的处理操作
ChannelFuture future =serverBootstrap.bind(HostInfo.PORT).sync();
future.channel().closeFuture().sync();// 等待Socket被关闭
} finally{
workerGroup.shutdownGracefully() ;
bossGroup.shutdownGracefully() ;
}
}
}
EchoClient.java
packagecom.bijian.http.server.client;
importcom.bijian.http.server.client.handler.EchoClientHandler;
importcom.bijian.info.HostInfo;
importio.netty.bootstrap.Bootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.SocketChannel;
importio.netty.channel.socket.nio.NioSocketChannel;
importio.netty.handler.codec.LengthFieldBasedFrameDecoder;
importio.netty.handler.codec.LengthFieldPrepender;
importio.netty.handler.codec.serialization.ClassResolvers;
importio.netty.handler.codec.serialization.ObjectDecoder;
importio.netty.handler.codec.serialization.ObjectEncoder;
public classEchoClient {
public void run() throwsException {
// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;
// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池
EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池
try{
Bootstrap client = new Bootstrap(); // 创建客户端处理程序
client.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throwsException {
socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled((this.getClass().getClassLoader())))) ;
socketChannel.pipeline().addLast(newObjectEncoder()) ;
socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器
}
});
ChannelFuture channelFuture =client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();
channelFuture.channel().closeFuture().sync(); // 关闭连接
} finally{
group.shutdownGracefully();
}
}
}
先启动EchoServerMain.java,再启动EchoClientMain.java,运行效果如下所示:
3.MessagePack实现传输序列化
项目结构如下所示:
pom.xml中引入msgpack
org.msgpack
msgpack
Member.java增加@Message注解
package com.bijian.vo;
import org.msgpack.annotation.Message;
import java.io.Serializable;
@Message
public class Member implements Serializable {
private String mid ;
private String name ;
private Integer age ;
private Double salary ;
public Member() {}
public Member(String mid,String name,Integer age,Double salary) {
this.mid = mid ;
this.name = name ;
this.age = age ;
this.salary = salary ;
}
public String getMid() {
return mid;
}
public void setMid(String mid) {
this.mid = mid;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public Double getSalary() {
return salary;
}
public void setSalary(Double salary) {
this.salary = salary;
}
@Override
public String toString() {
return "Member{" +
"mid='" + mid + '\'' +
", name='" + name + '\'' +
", age=" + age +
", salary=" + salary +
'}';
}
}
测试类MessagePackDemoA.java
package com.bijian.msgpack;
import com.bijian.vo.Member;
import org.msgpack.MessagePack;
import org.msgpack.template.Templates;
import java.util.ArrayList;
import java.util.List;
public class MessagePackDemoA {
public static void main(String[] args) throws Exception {
List allMembers = new ArrayList() ;
for(int x = 0 ; x < 10 ; x ++) {
Member member = new Member() ;
member.setMid("MLDN - " + x);
member.setName("Hello - " + x);
member.setAge(10);
member.setSalary(1.1);
allMembers.add(member) ;
}
MessagePack msgPack = new MessagePack() ;
byte data [] = msgPack.write(allMembers) ;
System.out.println(data.length);
{ // 将数据解析回来
List all = msgPack.read(data, Templates.tList(msgPack.lookup(Member.class))) ;
System.out.println(all);
}
}
}
运行测试类结果如下:
MessagePackDecoder.java
package com.bijian.netty.serious;
import com.bijian.vo.Member;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import org.msgpack.MessagePack;
import java.util.List;
public class MessagePackDecoder extends MessageToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List list) throws Exception {
int len = msg.readableBytes(); // 获取读取的数据长度
byte[] data = new byte[len]; // 准备读取数据的空间
msg.getBytes(msg.readerIndex(), data, 0, len); // 读取数据
MessagePack msgPack = new MessagePack() ;
System.out.println(msgPack.read(data));
// list.add(msgPack.read(data)) ;
list.add(msgPack.read(data,msgPack.lookup(Member.class))) ;
}
}
MessagePackEncoder.java
package com.bijian.netty.serious;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import org.msgpack.MessagePack;
public class MessagePackEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {
MessagePack msgPack = new MessagePack() ;
byte [] raw = msgPack.write(msg) ; // 进行对象的编码操作
byteBuf.writeBytes(raw) ;
}
}
EchoServerHandler.java
packagecom.bijian.netty.server.handlerimportcom.bijian.vo.Member;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importio.netty.util.ReferenceCountUtil;/*** 处理Echo的操作方式,其中ChannelInboundHandlerAdapter是针对于数据输入的处理
* Netty是基于NIO的一种开发框架的封装,这里面和AIO是没有任何关系的。*/
public class EchoServerHandler extendsChannelInboundHandlerAdapter {
@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {try{
System.out.println(msg.getClass()+ " **************");
Member member=(Member) msg ;
System.err.println("{服务器}" +member);
member.setName("【ECHO】" +member.getName());
ctx.writeAndFlush(member);//回应的输出操作
} finally{
ReferenceCountUtil.release(msg) ;//释放缓存
}
}
@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {
cause.printStackTrace();
ctx.close() ;
}
}
EchoServer.java
package com.bijian.http.server.server;
import com.bijian.http.server.server.handler.EchoServerHandler;
import com.bijian.info.HostInfo;
import com.bijian.netty.serious.MessagePackDecoder;
import com.bijian.netty.serious.MessagePackEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
/**
* 实现了基础的线程池与网络连接的配置项
*/
public class EchoServer {
public void run() throws Exception { // 进行服务器端的启动处理
// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量
// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)
EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池
System.out.println("服务器启动成功,监听端口为:" + HostInfo.PORT);
try {
// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 服务器端
// 设置要使用的线程池以及当前的Channel类型
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
// 接收到信息之后需要进行处理,于是定义子处理器
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;
socketChannel.pipeline().addLast(new MessagePackDecoder()) ;
socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ; // 与属性个数保持一致
socketChannel.pipeline().addLast(new MessagePackEncoder()) ;
socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器
}
});
// 可以直接利用常亮进行TCP协议的相关配置
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// ChannelFuture描述的时异步回调的处理操作
ChannelFuture future = serverBootstrap.bind(HostInfo.PORT).sync();
future.channel().closeFuture().sync();// 等待Socket被关闭
} finally {
workerGroup.shutdownGracefully() ;
bossGroup.shutdownGracefully() ;
}
}
}
EchoClient.java
package com.bijian.http.server.client;
import com.bijian.http.server.client.handler.EchoClientHandler;
import com.bijian.info.HostInfo;
import com.bijian.netty.serious.MessagePackDecoder;
import com.bijian.netty.serious.MessagePackEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.serialization.ClassResolvers;
import io.netty.handler.codec.serialization.ObjectDecoder;
import io.netty.handler.codec.serialization.ObjectEncoder;
public class EchoClient {
public void run() throws Exception {
// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;
// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池
EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池
try {
Bootstrap client = new Bootstrap(); // 创建客户端处理程序
client.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;
socketChannel.pipeline().addLast(new MessagePackDecoder()) ;
socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ; // 与属性个数保持一致
socketChannel.pipeline().addLast(new MessagePackEncoder()) ;
socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器
}
});
ChannelFuture channelFuture = client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();
channelFuture.channel().closeFuture().sync(); // 关闭连接
} finally {
group.shutdownGracefully();
}
}
}
MessagePack实现传输序列化与使用Java原生的序列化程序管理实现对象网络传输项目的变化如下:
先启动EchoServerMain.java,再启动EchoClientMain.java,运行效果如下所示:
4.Marshalling序列化实例
项目结构如下所示:
Member.java中的@Message注解去掉
MarshallingCodeFactory.java
packagecom.bijian.netty.serious;
import io.netty.handler.codec.marshalling.*;
importorg.jboss.marshalling.MarshallerFactory;
importorg.jboss.marshalling.Marshalling;
importorg.jboss.marshalling.MarshallingConfiguration;
public classMarshallingCodeFactory {
public staticMarshallingDecoder buildMarshallingDecoder() {
MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial") ; // 获取原始的JDK序列化
MarshallingConfiguration configuration = newMarshallingConfiguration() ;
UnmarshallerProvider provider = newDefaultUnmarshallerProvider(marshallerFactory,configuration) ;
int maxSize = 1024 << 2 ; // 设置单个对象的最大长度
MarshallingDecoder decoder = newMarshallingDecoder(provider,maxSize) ;
returndecoder ;
}
public staticMarshallingEncoder buildMarshallingEncoder() {
MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial") ; // 获取原始的JDK序列化
MarshallingConfiguration configuration = newMarshallingConfiguration() ;
MarshallerProvider provider = newDefaultMarshallerProvider(marshallerFactory,configuration) ;
MarshallingEncoder encoder = newMarshallingEncoder(provider) ;
returnencoder ;
}
}
EchoServer.java
packagecom.bijian.http.server.server;
importcom.bijian.http.server.server.handler.EchoServerHandler;
importcom.bijian.info.HostInfo;
importcom.bijian.netty.serious.MarshallingCodeFactory;
importio.netty.bootstrap.ServerBootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.SocketChannel;
importio.netty.channel.socket.nio.NioServerSocketChannel;
importio.netty.handler.codec.LengthFieldBasedFrameDecoder;
importio.netty.handler.codec.LengthFieldPrepender;
/**
* 实现了基础的线程池与网络连接的配置项
*/
public classEchoServer {
public void run() throws Exception { // 进行服务器端的启动处理
// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量
// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)
EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池
System.out.println("服务器启动成功,监听端口为:" +HostInfo.PORT);
try{
// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 服务器端
// 设置要使用的线程池以及当前的Channel类型
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
// 接收到信息之后需要进行处理,于是定义子处理器
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throwsException {
socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()) ;
socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder()) ;
socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器
}
});
// 可以直接利用常亮进行TCP协议的相关配置
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// ChannelFuture描述的时异步回调的处理操作
ChannelFuture future =serverBootstrap.bind(HostInfo.PORT).sync();
future.channel().closeFuture().sync();// 等待Socket被关闭
} finally{
workerGroup.shutdownGracefully() ;
bossGroup.shutdownGracefully() ;
}
}
}
EchoClient.java
packagecom.bijian.http.server.client;
importcom.bijian.http.server.client.handler.EchoClientHandler;
importcom.bijian.info.HostInfo;
importcom.bijian.netty.serious.MarshallingCodeFactory;
importio.netty.bootstrap.Bootstrap;
importio.netty.channel.ChannelFuture;
importio.netty.channel.ChannelInitializer;
importio.netty.channel.ChannelOption;
importio.netty.channel.EventLoopGroup;
importio.netty.channel.nio.NioEventLoopGroup;
importio.netty.channel.socket.SocketChannel;
importio.netty.channel.socket.nio.NioSocketChannel;
importio.netty.handler.codec.LengthFieldBasedFrameDecoder;
importio.netty.handler.codec.LengthFieldPrepender;
public classEchoClient {
public void run() throwsException {
// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;
// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池
EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池
try{
Bootstrap client = new Bootstrap(); // 创建客户端处理程序
client.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throwsException {
socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()) ;
socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder()) ;
socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器
}
});
ChannelFuture channelFuture =client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();
channelFuture.channel().closeFuture().sync(); // 关闭连接
} finally{
group.shutdownGracefully();
}
}
}
Marshalling序列化与MessagePack实现传输序列化的变化如下:
echo-netty下的pom.xml
echo-netty下的pom.xml完整内容如下:
echo
cn.mldn
1.0
4.0.0
echo-netty
jar
io.netty
netty-all
cn.mldn
echo-util
junit
junit
test
org.jboss.marshalling
jboss-marshalling
org.jboss.marshalling
jboss-marshalling-serial
工程下的pom.xml
pom.xml完整内容如下:
4.0.0
cn.mldn
echo
1.0
echo-base
echo-util
echo-netty
netty
http://www.example.com
pom
UTF-8
1.8
3.7.0
4.12
4.1.31.Final
1.0
0.6.12
2.0.6.Final
cn.mldn
echo-util
${echo.version}
io.netty
netty-all
${netty.version}
org.jboss.marshalling
jboss-marshalling
${jboss-marshalling.version}
org.jboss.marshalling
jboss-marshalling-serial
${jboss-marshalling.version}
org.msgpack
msgpack
${msgpack.version}
junit
junit
${junit.version}
test
org.apache.maven.plugins
maven-compiler-plugin
${maven-compiler-plugin.version}
${jdk.version}
${jdk.version}
先启动EchoServerMain.java,再启动EchoClientMain.java,运行效果如下所示:
5.JSON序列化实例
项目结构如下:
JSONDecoder.java
package com.bijian.http.server.serious;
import com.bijian.vo.Member;
import com.alibaba.fastjson.JSON;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
public class JSONDecoder extends MessageToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List list) throws Exception {
int len = msg.readableBytes(); // 可以用的数据长度
byte data[] = new byte[len];
msg.getBytes(msg.readerIndex(), data, 0, len);
list.add(JSON.parseObject(new String(data)).toJavaObject(Member.class)) ;
}
}
JSONEncoder.java
package com.bijian.http.server.serious;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
public class JSONEncoder extends MessageToByteEncoder {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf out) throws Exception {
byte data [] = JSONObject.toJSONString(msg).getBytes() ;
out.writeBytes(data) ;
}
}
EchoServer.java
package com.bijian.http.server.server;
import com.bijian.http.server.server.handler.EchoServerHandler;
import com.bijian.info.HostInfo;
import com.bijian.http.server.serious.JSONDecoder;
import com.bijian.http.server.serious.JSONEncoder;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
/**
* 实现了基础的线程池与网络连接的配置项
*/
public class EchoServer {
public void run() throws Exception { // 进行服务器端的启动处理
// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量
// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)
EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池
EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池
System.out.println("服务器启动成功,监听端口为:" + HostInfo.PORT);
try {
// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel
ServerBootstrap serverBootstrap = new ServerBootstrap(); // 服务器端
// 设置要使用的线程池以及当前的Channel类型
serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);
// 接收到信息之后需要进行处理,于是定义子处理器
serverBootstrap.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;
socketChannel.pipeline().addLast(new JSONDecoder()) ;
socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ;
socketChannel.pipeline().addLast(new JSONEncoder()) ;
socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器
}
});
// 可以直接利用常亮进行TCP协议的相关配置
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
// ChannelFuture描述的时异步回调的处理操作
ChannelFuture future = serverBootstrap.bind(HostInfo.PORT).sync();
future.channel().closeFuture().sync();// 等待Socket被关闭
} finally {
workerGroup.shutdownGracefully() ;
bossGroup.shutdownGracefully() ;
}
}
}
EchoClient.java
package com.bijian.http.server.client;
import com.bijian.http.server.client.handler.EchoClientHandler;
import com.bijian.info.HostInfo;
import com.bijian.http.server.serious.JSONDecoder;
import com.bijian.http.server.serious.JSONEncoder;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
public class EchoClient {
public void run() throws Exception {
// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;
// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池
EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池
try {
Bootstrap client = new Bootstrap(); // 创建客户端处理程序
client.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据
.handler(new ChannelInitializer() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));
socketChannel.pipeline().addLast(new JSONDecoder());
socketChannel.pipeline().addLast(new LengthFieldPrepender(4));
socketChannel.pipeline().addLast(new JSONEncoder());
socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器
}
});
ChannelFuture channelFuture = client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();
channelFuture.channel().closeFuture().sync(); // 关闭连接
} finally {
group.shutdownGracefully();
}
}
}
echo-netty下的pom.xml
工程下pom.xml内容修改如下:
先启动EchoServerMain.java,再启动EchoClientMain.java,效果如下:
java 网络 序列化_Java网络通信基础系列-Netty序列化相关推荐
- java nio 客户端_Java网络编程:Netty框架学习(二)---Java NIO,实现简单的服务端客户端消息传输...
概述 上篇中已经讲到Java中的NIO类库,Java中也称New IO,类库的目标就是要让Java支持非阻塞IO,基于这个原因,更多的人喜欢称Java NIO为非阻塞IO(Non-Block IO), ...
- java 编程原理_Java网络编程 -- 网络编程基础原理
Hello,今天记录下 Java网络编程 --> 网络编程基础原理. 一起学习,一起进步.继续沉淀,慢慢强大.希望这文章对您有帮助.若有写的不好的地方,欢迎评论给建议哈! 初写博客不久,我是杨展 ...
- java 网络实验_java网络聊天室实验
使用java写的一个网络聊天小程序,采用UDP协议.将左下角的ip改为目标地址即可发送信息.作为java网络编程入门的一个参考.最后一幅图为程序中所使用图片,来自仙剑奇侠传五的壁纸. 1.[文件] C ...
- java 复制对象_Java程序员必备:序列化全方位解析
前言 相信大家日常开发中,经常看到Java对象"implements Serializable".那么,它到底有什么用呢?本文从以下几个角度来解析序列这一块知识点~ 什么是Java ...
- java bean 序列化_JAVA bean为何要实现序列化
简而言之:序列化,就是为了在不同时间或不同平台的JVM之间共享实例对象.即序列化出于两个原因:①.用于持久化到磁盘上:②.用于作为数据流在网络上传输. 所谓的Serializable,就是java提供 ...
- Java基础通信_Java网络通信基础编程(必看篇)
方式一:同步阻塞方式(BIO): 服务器端(Server): package com.ietree.basicskill.socket.mode1; import java.io.IOExceptio ...
- java https 网络爬虫_Java 网络爬虫,就是这么的简单
这是 Java 网络爬虫系列文章的第一篇,如果你还不知道 Java 网络爬虫系列文章,请参看 学 Java 网络爬虫,需要哪些基础知识.第一篇是关于 Java 网络爬虫入门内容,在该篇中我们以采集虎扑 ...
- java爬虫入门_Java 网络爬虫新手入门详解
这是 Java 网络爬虫系列文章的第一篇,如果你还不知道 Java 网络爬虫系列文章,请参看Java 网络爬虫基础知识入门解析.第一篇是关于 Java 网络爬虫入门内容,在该篇中我们以采集虎扑列表新闻 ...
- java实现套接字网络编程_Java网络编程(一)Socket套接字
一.基础知识 1.TCP:传输控制协议. 2.UDP:用户数据报协议. 二.IP地址封装 1.InetAddress类的常用方法 getLocalHost() 返回本地主机的InetAddress对象 ...
- java网络编程_Java网络编程进阶:通过JSSE创建安全的数据通信
小编说:本文作者孙卫琴,知名IT作家和Java专家.本文将通过一个范例向大家介绍JSSE是如何实现安全的网络通信的. 在网络上,信息在由源主机到目标主机的传输过程中会经过其他计算机.一般情况下,中间的 ...
最新文章
- 百度Apollo 3.5是如何设计Cyber RT计算框架的?
- 简书非官方大数据(一)
- servlet乱码问题
- Softmax和softmax loss的理解
- 微信公众平台消息接口开发(6)电话号码链接与网址链接
- Vertica数据库系列:这几天踩过时间函数的坑
- 【PyTorch】Trick集锦
- 《机械工程测试技术基础》教学大纲
- 四分之一波长传输线应用举例
- JAVA爬虫天眼查、启信宝...企业信息查询网站
- 代码坏味道与重构之数据泥团和基本类型偏执
- 软件测试岗位具体是做什么的?
- 同一台虚拟服务器多个域名,一台服务器上可以配置多个域名
- 今天看到一篇文章,收藏了很多大牛的博客
- 基于SpringBoot+MyBatis的餐饮点餐系统
- ChatGPT聊天app(基于autojs)
- 怎样去掉“交互式服务对话框检测”提示对话框
- Sqlserver 中临时表和全局临时表
- 未转变者DLC皮肤在服务器,未转变者 steam上 最新的版本 肿么联机?肿么创建房间...
- iloc和loc的区别
热门文章
- 轻触开关式三功能手电筒3-5W驱动芯片AH3301
- Linux 修改 host
- Chrome浏览器取证分析
- 牛!各大公司【薪资待遇】一览,我心动了
- JAVA钓鱼游戏_java如何实现纸牌游戏之小猫钓鱼算法
- python- 小猫钓鱼纸牌游戏
- 开机自检(POST)
- 计算机文件丢失系统无法启动,windows7文件丢失无法启动怎么修复_win7系统显示文件丢失无法启动修复方法-win7之家...
- web文件管理系统_实用开源项目,基于Web的文件管理系统——DocSys
- 2021-03-16PyCharm3.0默认快捷键(翻译的)PyCharm Default Keymap