一.Netty序列化介绍

序列化管理操作:Java原生实现(性能比较差)、JSON(Restful)、MessagePack、Marshalling、AVRO、....

netty本身直接支持有原生的 Java序列化操作,直接配置已有的程序类即可

MessagePack:类似于JSON,但是要比JSON传输的更加小巧同时速度也快。它定义一个自己的压缩算法,例如:boolean只有true和false,但是有了MP就可以通过0和1描述了;

Marshalling:使用JBoss实现的序列化处理操作,是基于传统的Java序列化形式的一种升级。

JSON:是一种标准做法,但是JSON需要清楚的问题是传输体积要大,但是传输的信息明确,本次使用FastJSON操作。

二.实例

1.公共netty类

Member.java

packagecom.bijian.vo;

importjava.io.Serializable;

public class Member implementsSerializable {

privateString mid ;

privateString name ;

privateInteger age ;

privateDouble salary ;

publicMember() {}

publicMember(String mid,String name,Integer age,Double salary) {

this.mid =mid ;

this.name =name ;

this.age =age ;

this.salary =salary ;

}

publicString getMid() {

returnmid;

}

public voidsetMid(String mid) {

this.mid =mid;

}

publicString getName() {

returnname;

}

public voidsetName(String name) {

this.name =name;

}

publicInteger getAge() {

returnage;

}

public voidsetAge(Integer age) {

this.age =age;

}

publicDouble getSalary() {

returnsalary;

}

public voidsetSalary(Double salary) {

this.salary =salary;

}

@Override

publicString toString() {

return "Member{" +

"mid='" + mid + '\'' +

", name='" + name + '\'' +

", age=" + age +

", salary=" + salary +

'}';

}

}

EchoServerHandler.java

packagecom.bijian.http.server.server.handler;

importcom.bijian.vo.Member;

importio.netty.channel.ChannelHandlerContext;

importio.netty.channel.ChannelInboundHandlerAdapter;

importio.netty.util.ReferenceCountUtil;

/**

* 处理Echo的操作方式,其中ChannelInboundHandlerAdapter是针对于数据输入的处理

* Netty是基于NIO的一种开发框架的封装,这里面和AIO是没有任何关系的。

*/

public class EchoServerHandler extendsChannelInboundHandlerAdapter {

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {

try{

System.out.println(msg.getClass() + " **************");

Member member =(Member) msg ;

System.err.println("{服务器}" +member);

member.setName("【ECHO】" +member.getName());

ctx.writeAndFlush(member); // 回应的输出操作

} finally{

ReferenceCountUtil.release(msg) ; // 释放缓存

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {

cause.printStackTrace();

ctx.close() ;

}

}

EchoClientHandler.java

packagecom.bijian.http.server.client.handler;

importcom.bijian.vo.Member;

importio.netty.channel.ChannelHandlerContext;

importio.netty.channel.ChannelInboundHandlerAdapter;

importio.netty.util.ReferenceCountUtil;

/**

* 需要进行数据的读取操作,服务器端处理完成的数据信息会进行读取

*/

public class EchoClientHandler extendsChannelInboundHandlerAdapter {

private static final int REPEAT = 500;// 消息重复发送次数

@Override

public void channelActive(ChannelHandlerContext ctx) throwsException {

Member member = new Member(); // 现在直接进行对象的发送

member.setMid("xiaoli");

member.setName("小李老师");

member.setAge(16);

member.setSalary(1.1);

ctx.writeAndFlush(member) ; // 直接进行对象实例发送

}

@Override

public void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {

// 只要服务器端发送完成信息之后,都会执行此方法进行内容的输出操作

try{

Member member =(Member) msg ;

System.out.println(member); // 输出服务器端的响应内容

} finally{

ReferenceCountUtil.release(msg); // 释放缓存

}

}

@Override

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {

cause.printStackTrace();

ctx.close();

}

}

EchoServerMain.java

packagecom.bijian.http.server.server.main;

importcom.bijian.http.server.server.EchoServer;

public classEchoServerMain {

public static void main(String[] args) throwsException {

newEchoServer().run();

}

}

EchoClientMain.java

packagecom.bijian.http.server.client.main;

importcom.bijian.http.server.client.EchoClient;

public classEchoClientMain {

public static void main(String[] args) throwsException {

newEchoClient().run();

}

}

2.使用Java原生的序列化程序管理实现对象网络传输

项目结构如下:

EchoServer.java

packagecom.bijian.http.server.server;

importcom.bijian.http.server.server.handler.EchoServerHandler;

importcom.bijian.info.HostInfo;

importio.netty.bootstrap.ServerBootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.SocketChannel;

importio.netty.channel.socket.nio.NioServerSocketChannel;

importio.netty.handler.codec.LengthFieldBasedFrameDecoder;

importio.netty.handler.codec.LengthFieldPrepender;

importio.netty.handler.codec.serialization.ClassResolvers;

importio.netty.handler.codec.serialization.ObjectDecoder;

importio.netty.handler.codec.serialization.ObjectEncoder;

/**

* 实现了基础的线程池与网络连接的配置项

*/

public classEchoServer {

public void run() throws Exception { // 进行服务器端的启动处理

// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量

// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)

EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池

EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池

System.out.println("服务器启动成功,监听端口为:" +HostInfo.PORT);

try{

// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel

ServerBootstrap serverBootstrap = new ServerBootstrap(); // 服务器端

// 设置要使用的线程池以及当前的Channel类型

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);

// 接收到信息之后需要进行处理,于是定义子处理器

serverBootstrap.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throwsException {

socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled((this.getClass().getClassLoader())))) ;

socketChannel.pipeline().addLast(newObjectEncoder()) ;

socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器

}

});

// 可以直接利用常亮进行TCP协议的相关配置

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);

serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

// ChannelFuture描述的时异步回调的处理操作

ChannelFuture future =serverBootstrap.bind(HostInfo.PORT).sync();

future.channel().closeFuture().sync();// 等待Socket被关闭

} finally{

workerGroup.shutdownGracefully() ;

bossGroup.shutdownGracefully() ;

}

}

}

EchoClient.java

packagecom.bijian.http.server.client;

importcom.bijian.http.server.client.handler.EchoClientHandler;

importcom.bijian.info.HostInfo;

importio.netty.bootstrap.Bootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.SocketChannel;

importio.netty.channel.socket.nio.NioSocketChannel;

importio.netty.handler.codec.LengthFieldBasedFrameDecoder;

importio.netty.handler.codec.LengthFieldPrepender;

importio.netty.handler.codec.serialization.ClassResolvers;

importio.netty.handler.codec.serialization.ObjectDecoder;

importio.netty.handler.codec.serialization.ObjectEncoder;

public classEchoClient {

public void run() throwsException {

// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;

// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池

EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池

try{

Bootstrap client = new Bootstrap(); // 创建客户端处理程序

client.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throwsException {

socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled((this.getClass().getClassLoader())))) ;

socketChannel.pipeline().addLast(newObjectEncoder()) ;

socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器

}

});

ChannelFuture channelFuture =client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();

channelFuture.channel().closeFuture().sync(); // 关闭连接

} finally{

group.shutdownGracefully();

}

}

}

先启动EchoServerMain.java,再启动EchoClientMain.java,运行效果如下所示:

3.MessagePack实现传输序列化

项目结构如下所示:

pom.xml中引入msgpack

org.msgpack

msgpack

Member.java增加@Message注解

package com.bijian.vo;

import org.msgpack.annotation.Message;

import java.io.Serializable;

@Message

public class Member implements Serializable {

private String mid ;

private String name ;

private Integer age ;

private Double salary ;

public Member() {}

public Member(String mid,String name,Integer age,Double salary) {

this.mid = mid ;

this.name = name ;

this.age = age ;

this.salary = salary ;

}

public String getMid() {

return mid;

}

public void setMid(String mid) {

this.mid = mid;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public Integer getAge() {

return age;

}

public void setAge(Integer age) {

this.age = age;

}

public Double getSalary() {

return salary;

}

public void setSalary(Double salary) {

this.salary = salary;

}

@Override

public String toString() {

return "Member{" +

"mid='" + mid + '\'' +

", name='" + name + '\'' +

", age=" + age +

", salary=" + salary +

'}';

}

}

测试类MessagePackDemoA.java

package com.bijian.msgpack;

import com.bijian.vo.Member;

import org.msgpack.MessagePack;

import org.msgpack.template.Templates;

import java.util.ArrayList;

import java.util.List;

public class MessagePackDemoA {

public static void main(String[] args) throws Exception {

List allMembers = new ArrayList() ;

for(int x = 0 ; x < 10 ; x ++) {

Member member = new Member() ;

member.setMid("MLDN - " + x);

member.setName("Hello - " + x);

member.setAge(10);

member.setSalary(1.1);

allMembers.add(member) ;

}

MessagePack msgPack = new MessagePack() ;

byte data [] = msgPack.write(allMembers) ;

System.out.println(data.length);

{ // 将数据解析回来

List all = msgPack.read(data, Templates.tList(msgPack.lookup(Member.class))) ;

System.out.println(all);

}

}

}

运行测试类结果如下:

MessagePackDecoder.java

package com.bijian.netty.serious;

import com.bijian.vo.Member;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToMessageDecoder;

import org.msgpack.MessagePack;

import java.util.List;

public class MessagePackDecoder extends MessageToMessageDecoder {

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List list) throws Exception {

int len = msg.readableBytes(); // 获取读取的数据长度

byte[] data = new byte[len]; // 准备读取数据的空间

msg.getBytes(msg.readerIndex(), data, 0, len); // 读取数据

MessagePack msgPack = new MessagePack() ;

System.out.println(msgPack.read(data));

// list.add(msgPack.read(data)) ;

list.add(msgPack.read(data,msgPack.lookup(Member.class))) ;

}

}

MessagePackEncoder.java

package com.bijian.netty.serious;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

import org.msgpack.MessagePack;

public class MessagePackEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf byteBuf) throws Exception {

MessagePack msgPack = new MessagePack() ;

byte [] raw = msgPack.write(msg) ; // 进行对象的编码操作

byteBuf.writeBytes(raw) ;

}

}

EchoServerHandler.java

packagecom.bijian.netty.server.handlerimportcom.bijian.vo.Member;importio.netty.channel.ChannelHandlerContext;importio.netty.channel.ChannelInboundHandlerAdapter;importio.netty.util.ReferenceCountUtil;/*** 处理Echo的操作方式,其中ChannelInboundHandlerAdapter是针对于数据输入的处理

* Netty是基于NIO的一种开发框架的封装,这里面和AIO是没有任何关系的。*/

public class EchoServerHandler extendsChannelInboundHandlerAdapter {

@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throwsException {try{

System.out.println(msg.getClass()+ " **************");

Member member=(Member) msg ;

System.err.println("{服务器}" +member);

member.setName("【ECHO】" +member.getName());

ctx.writeAndFlush(member);//回应的输出操作

} finally{

ReferenceCountUtil.release(msg) ;//释放缓存

}

}

@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throwsException {

cause.printStackTrace();

ctx.close() ;

}

}

EchoServer.java

package com.bijian.http.server.server;

import com.bijian.http.server.server.handler.EchoServerHandler;

import com.bijian.info.HostInfo;

import com.bijian.netty.serious.MessagePackDecoder;

import com.bijian.netty.serious.MessagePackEncoder;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

import io.netty.handler.codec.serialization.ClassResolvers;

import io.netty.handler.codec.serialization.ObjectDecoder;

import io.netty.handler.codec.serialization.ObjectEncoder;

/**

* 实现了基础的线程池与网络连接的配置项

*/

public class EchoServer {

public void run() throws Exception { // 进行服务器端的启动处理

// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量

// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)

EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池

EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池

System.out.println("服务器启动成功,监听端口为:" + HostInfo.PORT);

try {

// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel

ServerBootstrap serverBootstrap = new ServerBootstrap(); // 服务器端

// 设置要使用的线程池以及当前的Channel类型

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);

// 接收到信息之后需要进行处理,于是定义子处理器

serverBootstrap.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;

socketChannel.pipeline().addLast(new MessagePackDecoder()) ;

socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ; // 与属性个数保持一致

socketChannel.pipeline().addLast(new MessagePackEncoder()) ;

socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器

}

});

// 可以直接利用常亮进行TCP协议的相关配置

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);

serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

// ChannelFuture描述的时异步回调的处理操作

ChannelFuture future = serverBootstrap.bind(HostInfo.PORT).sync();

future.channel().closeFuture().sync();// 等待Socket被关闭

} finally {

workerGroup.shutdownGracefully() ;

bossGroup.shutdownGracefully() ;

}

}

}

EchoClient.java

package com.bijian.http.server.client;

import com.bijian.http.server.client.handler.EchoClientHandler;

import com.bijian.info.HostInfo;

import com.bijian.netty.serious.MessagePackDecoder;

import com.bijian.netty.serious.MessagePackEncoder;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

import io.netty.handler.codec.serialization.ClassResolvers;

import io.netty.handler.codec.serialization.ObjectDecoder;

import io.netty.handler.codec.serialization.ObjectEncoder;

public class EchoClient {

public void run() throws Exception {

// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;

// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池

EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池

try {

Bootstrap client = new Bootstrap(); // 创建客户端处理程序

client.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;

socketChannel.pipeline().addLast(new MessagePackDecoder()) ;

socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ; // 与属性个数保持一致

socketChannel.pipeline().addLast(new MessagePackEncoder()) ;

socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器

}

});

ChannelFuture channelFuture = client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();

channelFuture.channel().closeFuture().sync(); // 关闭连接

} finally {

group.shutdownGracefully();

}

}

}

MessagePack实现传输序列化与使用Java原生的序列化程序管理实现对象网络传输项目的变化如下:

先启动EchoServerMain.java,再启动EchoClientMain.java,运行效果如下所示:

4.Marshalling序列化实例

项目结构如下所示:

Member.java中的@Message注解去掉

MarshallingCodeFactory.java

packagecom.bijian.netty.serious;

import io.netty.handler.codec.marshalling.*;

importorg.jboss.marshalling.MarshallerFactory;

importorg.jboss.marshalling.Marshalling;

importorg.jboss.marshalling.MarshallingConfiguration;

public classMarshallingCodeFactory {

public staticMarshallingDecoder buildMarshallingDecoder() {

MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial") ; // 获取原始的JDK序列化

MarshallingConfiguration configuration = newMarshallingConfiguration() ;

UnmarshallerProvider provider = newDefaultUnmarshallerProvider(marshallerFactory,configuration) ;

int maxSize = 1024 << 2 ; // 设置单个对象的最大长度

MarshallingDecoder decoder = newMarshallingDecoder(provider,maxSize) ;

returndecoder ;

}

public staticMarshallingEncoder buildMarshallingEncoder() {

MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial") ; // 获取原始的JDK序列化

MarshallingConfiguration configuration = newMarshallingConfiguration() ;

MarshallerProvider provider = newDefaultMarshallerProvider(marshallerFactory,configuration) ;

MarshallingEncoder encoder = newMarshallingEncoder(provider) ;

returnencoder ;

}

}

EchoServer.java

packagecom.bijian.http.server.server;

importcom.bijian.http.server.server.handler.EchoServerHandler;

importcom.bijian.info.HostInfo;

importcom.bijian.netty.serious.MarshallingCodeFactory;

importio.netty.bootstrap.ServerBootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.SocketChannel;

importio.netty.channel.socket.nio.NioServerSocketChannel;

importio.netty.handler.codec.LengthFieldBasedFrameDecoder;

importio.netty.handler.codec.LengthFieldPrepender;

/**

* 实现了基础的线程池与网络连接的配置项

*/

public classEchoServer {

public void run() throws Exception { // 进行服务器端的启动处理

// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量

// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)

EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池

EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池

System.out.println("服务器启动成功,监听端口为:" +HostInfo.PORT);

try{

// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel

ServerBootstrap serverBootstrap = new ServerBootstrap(); // 服务器端

// 设置要使用的线程池以及当前的Channel类型

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);

// 接收到信息之后需要进行处理,于是定义子处理器

serverBootstrap.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throwsException {

socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()) ;

socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder()) ;

socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器

}

});

// 可以直接利用常亮进行TCP协议的相关配置

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);

serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

// ChannelFuture描述的时异步回调的处理操作

ChannelFuture future =serverBootstrap.bind(HostInfo.PORT).sync();

future.channel().closeFuture().sync();// 等待Socket被关闭

} finally{

workerGroup.shutdownGracefully() ;

bossGroup.shutdownGracefully() ;

}

}

}

EchoClient.java

packagecom.bijian.http.server.client;

importcom.bijian.http.server.client.handler.EchoClientHandler;

importcom.bijian.info.HostInfo;

importcom.bijian.netty.serious.MarshallingCodeFactory;

importio.netty.bootstrap.Bootstrap;

importio.netty.channel.ChannelFuture;

importio.netty.channel.ChannelInitializer;

importio.netty.channel.ChannelOption;

importio.netty.channel.EventLoopGroup;

importio.netty.channel.nio.NioEventLoopGroup;

importio.netty.channel.socket.SocketChannel;

importio.netty.channel.socket.nio.NioSocketChannel;

importio.netty.handler.codec.LengthFieldBasedFrameDecoder;

importio.netty.handler.codec.LengthFieldPrepender;

public classEchoClient {

public void run() throwsException {

// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;

// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池

EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池

try{

Bootstrap client = new Bootstrap(); // 创建客户端处理程序

client.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throwsException {

socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingEncoder()) ;

socketChannel.pipeline().addLast(MarshallingCodeFactory.buildMarshallingDecoder()) ;

socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器

}

});

ChannelFuture channelFuture =client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();

channelFuture.channel().closeFuture().sync(); // 关闭连接

} finally{

group.shutdownGracefully();

}

}

}

Marshalling序列化与MessagePack实现传输序列化的变化如下:

echo-netty下的pom.xml

echo-netty下的pom.xml完整内容如下:

echo

cn.mldn

1.0

4.0.0

echo-netty

jar

io.netty

netty-all

cn.mldn

echo-util

junit

junit

test

org.jboss.marshalling

jboss-marshalling

org.jboss.marshalling

jboss-marshalling-serial

工程下的pom.xml

pom.xml完整内容如下:

4.0.0

cn.mldn

echo

1.0

echo-base

echo-util

echo-netty

netty

http://www.example.com

pom

UTF-8

1.8

3.7.0

4.12

4.1.31.Final

1.0

0.6.12

2.0.6.Final

cn.mldn

echo-util

${echo.version}

io.netty

netty-all

${netty.version}

org.jboss.marshalling

jboss-marshalling

${jboss-marshalling.version}

org.jboss.marshalling

jboss-marshalling-serial

${jboss-marshalling.version}

org.msgpack

msgpack

${msgpack.version}

junit

junit

${junit.version}

test

org.apache.maven.plugins

maven-compiler-plugin

${maven-compiler-plugin.version}

${jdk.version}

${jdk.version}

先启动EchoServerMain.java,再启动EchoClientMain.java,运行效果如下所示:

5.JSON序列化实例

项目结构如下:

JSONDecoder.java

package com.bijian.http.server.serious;

import com.bijian.vo.Member;

import com.alibaba.fastjson.JSON;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToMessageDecoder;

import java.util.List;

public class JSONDecoder extends MessageToMessageDecoder {

@Override

protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf msg, List list) throws Exception {

int len = msg.readableBytes(); // 可以用的数据长度

byte data[] = new byte[len];

msg.getBytes(msg.readerIndex(), data, 0, len);

list.add(JSON.parseObject(new String(data)).toJavaObject(Member.class)) ;

}

}

JSONEncoder.java

package com.bijian.http.server.serious;

import com.alibaba.fastjson.JSONObject;

import io.netty.buffer.ByteBuf;

import io.netty.channel.ChannelHandlerContext;

import io.netty.handler.codec.MessageToByteEncoder;

public class JSONEncoder extends MessageToByteEncoder {

@Override

protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, ByteBuf out) throws Exception {

byte data [] = JSONObject.toJSONString(msg).getBytes() ;

out.writeBytes(data) ;

}

}

EchoServer.java

package com.bijian.http.server.server;

import com.bijian.http.server.server.handler.EchoServerHandler;

import com.bijian.info.HostInfo;

import com.bijian.http.server.serious.JSONDecoder;

import com.bijian.http.server.serious.JSONEncoder;

import io.netty.bootstrap.ServerBootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioServerSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

/**

* 实现了基础的线程池与网络连接的配置项

*/

public class EchoServer {

public void run() throws Exception { // 进行服务器端的启动处理

// 线程池是提升服务器性能的重要技术手段,利用定长的线程池可以保证核心线程的有效数量

// 在Netty之中线程池的实现分为两类:主线程池(接收客户端连接)、工作线程池(处理客户端连接)

EventLoopGroup bossGroup = new NioEventLoopGroup(10); // 创建接收线程池

EventLoopGroup workerGroup = new NioEventLoopGroup(20); // 创建工作线程池

System.out.println("服务器启动成功,监听端口为:" + HostInfo.PORT);

try {

// 创建一个服务器端的程序类进行NIO启动,同时可以设置Channel

ServerBootstrap serverBootstrap = new ServerBootstrap(); // 服务器端

// 设置要使用的线程池以及当前的Channel类型

serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);

// 接收到信息之后需要进行处理,于是定义子处理器

serverBootstrap.childHandler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,0,4,0,4)) ;

socketChannel.pipeline().addLast(new JSONDecoder()) ;

socketChannel.pipeline().addLast(new LengthFieldPrepender(4)) ;

socketChannel.pipeline().addLast(new JSONEncoder()) ;

socketChannel.pipeline().addLast(new EchoServerHandler()); // 追加了处理器

}

});

// 可以直接利用常亮进行TCP协议的相关配置

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);

serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

// ChannelFuture描述的时异步回调的处理操作

ChannelFuture future = serverBootstrap.bind(HostInfo.PORT).sync();

future.channel().closeFuture().sync();// 等待Socket被关闭

} finally {

workerGroup.shutdownGracefully() ;

bossGroup.shutdownGracefully() ;

}

}

}

EchoClient.java

package com.bijian.http.server.client;

import com.bijian.http.server.client.handler.EchoClientHandler;

import com.bijian.info.HostInfo;

import com.bijian.http.server.serious.JSONDecoder;

import com.bijian.http.server.serious.JSONEncoder;

import io.netty.bootstrap.Bootstrap;

import io.netty.channel.ChannelFuture;

import io.netty.channel.ChannelInitializer;

import io.netty.channel.ChannelOption;

import io.netty.channel.EventLoopGroup;

import io.netty.channel.nio.NioEventLoopGroup;

import io.netty.channel.socket.SocketChannel;

import io.netty.channel.socket.nio.NioSocketChannel;

import io.netty.handler.codec.LengthFieldBasedFrameDecoder;

import io.netty.handler.codec.LengthFieldPrepender;

public class EchoClient {

public void run() throws Exception {

// 1、如果现在客户端不同,那么也可以不使用多线程模式来处理;

// 在Netty中考虑到代码的统一性,也允许你在客户端设置线程池

EventLoopGroup group = new NioEventLoopGroup(); // 创建一个线程池

try {

Bootstrap client = new Bootstrap(); // 创建客户端处理程序

client.group(group).channel(NioSocketChannel.class)

.option(ChannelOption.TCP_NODELAY, true) // 允许接收大块的返回数据

.handler(new ChannelInitializer() {

@Override

protected void initChannel(SocketChannel socketChannel) throws Exception {

socketChannel.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 0, 4, 0, 4));

socketChannel.pipeline().addLast(new JSONDecoder());

socketChannel.pipeline().addLast(new LengthFieldPrepender(4));

socketChannel.pipeline().addLast(new JSONEncoder());

socketChannel.pipeline().addLast(new EchoClientHandler()); // 追加了处理器

}

});

ChannelFuture channelFuture = client.connect(HostInfo.HOST_NAME, HostInfo.PORT).sync();

channelFuture.channel().closeFuture().sync(); // 关闭连接

} finally {

group.shutdownGracefully();

}

}

}

echo-netty下的pom.xml

工程下pom.xml内容修改如下:

先启动EchoServerMain.java,再启动EchoClientMain.java,效果如下:

java 网络 序列化_Java网络通信基础系列-Netty序列化相关推荐

  1. java nio 客户端_Java网络编程:Netty框架学习(二)---Java NIO,实现简单的服务端客户端消息传输...

    概述 上篇中已经讲到Java中的NIO类库,Java中也称New IO,类库的目标就是要让Java支持非阻塞IO,基于这个原因,更多的人喜欢称Java NIO为非阻塞IO(Non-Block IO), ...

  2. java 编程原理_Java网络编程 -- 网络编程基础原理

    Hello,今天记录下 Java网络编程 --> 网络编程基础原理. 一起学习,一起进步.继续沉淀,慢慢强大.希望这文章对您有帮助.若有写的不好的地方,欢迎评论给建议哈! 初写博客不久,我是杨展 ...

  3. java 网络实验_java网络聊天室实验

    使用java写的一个网络聊天小程序,采用UDP协议.将左下角的ip改为目标地址即可发送信息.作为java网络编程入门的一个参考.最后一幅图为程序中所使用图片,来自仙剑奇侠传五的壁纸. 1.[文件] C ...

  4. java 复制对象_Java程序员必备:序列化全方位解析

    前言 相信大家日常开发中,经常看到Java对象"implements Serializable".那么,它到底有什么用呢?本文从以下几个角度来解析序列这一块知识点~ 什么是Java ...

  5. java bean 序列化_JAVA bean为何要实现序列化

    简而言之:序列化,就是为了在不同时间或不同平台的JVM之间共享实例对象.即序列化出于两个原因:①.用于持久化到磁盘上:②.用于作为数据流在网络上传输. 所谓的Serializable,就是java提供 ...

  6. Java基础通信_Java网络通信基础编程(必看篇)

    方式一:同步阻塞方式(BIO): 服务器端(Server): package com.ietree.basicskill.socket.mode1; import java.io.IOExceptio ...

  7. java https 网络爬虫_Java 网络爬虫,就是这么的简单

    这是 Java 网络爬虫系列文章的第一篇,如果你还不知道 Java 网络爬虫系列文章,请参看 学 Java 网络爬虫,需要哪些基础知识.第一篇是关于 Java 网络爬虫入门内容,在该篇中我们以采集虎扑 ...

  8. java爬虫入门_Java 网络爬虫新手入门详解

    这是 Java 网络爬虫系列文章的第一篇,如果你还不知道 Java 网络爬虫系列文章,请参看Java 网络爬虫基础知识入门解析.第一篇是关于 Java 网络爬虫入门内容,在该篇中我们以采集虎扑列表新闻 ...

  9. java实现套接字网络编程_Java网络编程(一)Socket套接字

    一.基础知识 1.TCP:传输控制协议. 2.UDP:用户数据报协议. 二.IP地址封装 1.InetAddress类的常用方法 getLocalHost() 返回本地主机的InetAddress对象 ...

  10. java网络编程_Java网络编程进阶:通过JSSE创建安全的数据通信

    小编说:本文作者孙卫琴,知名IT作家和Java专家.本文将通过一个范例向大家介绍JSSE是如何实现安全的网络通信的. 在网络上,信息在由源主机到目标主机的传输过程中会经过其他计算机.一般情况下,中间的 ...

最新文章

  1. 百度Apollo 3.5是如何设计Cyber RT计算框架的?
  2. 简书非官方大数据(一)
  3. servlet乱码问题
  4. Softmax和softmax loss的理解
  5. 微信公众平台消息接口开发(6)电话号码链接与网址链接
  6. Vertica数据库系列:这几天踩过时间函数的坑
  7. 【PyTorch】Trick集锦
  8. 《机械工程测试技术基础》教学大纲
  9. 四分之一波长传输线应用举例
  10. JAVA爬虫天眼查、启信宝...企业信息查询网站
  11. 代码坏味道与重构之数据泥团和基本类型偏执
  12. 软件测试岗位具体是做什么的?
  13. 同一台虚拟服务器多个域名,一台服务器上可以配置多个域名
  14. 今天看到一篇文章,收藏了很多大牛的博客
  15. 基于SpringBoot+MyBatis的餐饮点餐系统
  16. ChatGPT聊天app(基于autojs)
  17. 怎样去掉“交互式服务对话框检测”提示对话框
  18. Sqlserver 中临时表和全局临时表
  19. 未转变者DLC皮肤在服务器,未转变者 steam上 最新的版本 肿么联机?肿么创建房间...
  20. iloc和loc的区别

热门文章

  1. 轻触开关式三功能手电筒3-5W驱动芯片AH3301
  2. Linux 修改 host
  3. Chrome浏览器取证分析
  4. 牛!各大公司【薪资待遇】一览,我心动了
  5. JAVA钓鱼游戏_java如何实现纸牌游戏之小猫钓鱼算法
  6. python- 小猫钓鱼纸牌游戏
  7. 开机自检(POST)
  8. 计算机文件丢失系统无法启动,windows7文件丢失无法启动怎么修复_win7系统显示文件丢失无法启动修复方法-win7之家...
  9. web文件管理系统_实用开源项目,基于Web的文件管理系统——DocSys
  10. 2021-03-16PyCharm3.0默认快捷键(翻译的)PyCharm Default Keymap