##

Netty实战 IM即时通讯系统(九)实现客户端登录

零、 目录

  1. IM系统简介
  • Netty 简介
  • Netty 环境配置
  • 服务端启动流程
  • 客户端启动流程
  • 实战: 客户端和服务端双向通信
  • 数据传输载体ByteBuf介绍
  • 客户端与服务端通信协议编解码
  • 实现客户端登录
  • 实现客户端与服务端收发消息
  • pipeline与channelHandler
  • 构建客户端与服务端pipeline
  • 拆包粘包理论与解决方案
  • channelHandler的生命周期
  • 使用channelHandler的热插拔实现客户端身份校验
  • 客户端互聊原理与实现
  • 群聊的发起与通知
  • 群聊的成员管理(加入与退出,获取成员列表)
  • 群聊消息的收发及Netty性能优化
  • 心跳与空闲检测
  • 总结
  • 扩展

一、 登录流程

  1. 从上图中我们可以看到 , 客户端连接上服务端之后

    1. 客户端会构建一个登录请求对象 , 然后通过编码把请求对象编码为ByteBuf , 写到服务端
    2. 服务端接收到ByteBuf之后 , 首先通过解码把ByteBuf 解码为登录请求响应 , 然后进行校验
    3. 服务端校验通过之后 , 构造一个登录响应对象 , 依然经过编码 , 然后回写到客户端
    4. 客户端收到服务端响应之后解码ByteBuf , 能拿到登录响应之后 , 判断是否登录成功

二、 代码框架

 /*** 实现客户端登录* * @author outman*/public class Test_10_实现客户端登录 {public static void main(String[] args) {// 启动服务端Test_10_server.start(8000);// 启动客户端Test_10_client.start("127.0.0.1", 8000, 5);}}/*** 客户端* * @author outman*/class Test_10_client {/*** 客户端启动* * @param ip*            连接ip* @param port*            服务端端口* @param maxRetry*            最大重试次数*/public static void start(String ip, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客户端处理逻辑}});// 连接服务端connect(bootstrap, ip, port, maxRetry);}/*** @desc 连接服务端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex*            重试计数*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重连计数if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判断连接状态if (future.isSuccess()) {System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】成功");} else if (maxRetry <= 0) {System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败,达到重连最大次数放弃重连");} else {// 重连使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败," + delay + "秒后执行重试");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}/*** 服务端* * @author outman*/class Test_10_server {/*** @desc 服务端启动* @param port*/public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服务端处理逻辑}});// 绑定端口bind(serverBootstrap, port);}/*** @desc 自动绑定递增并启动服务端* @param serverBootstrap* @param port*/private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("服务端:" + new Date() + "绑定端口【" + port + "】成功");} else {System.out.println("服务端:" + new Date() + "绑定端口【" + port + "】失败,执行递增绑定");bind(serverBootstrap, port + 1);}});}}/*** 客户端处理逻辑* * @author outman*/class Test_10_clientHandler extends ChannelInboundHandlerAdapter {/*** 连接成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有数据可读时触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {}}/*** 服务端处理逻辑* * @author outman*/class Test_10_serverHandler extends ChannelInboundHandlerAdapter {/*** 连接成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有数据可读时触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {}}/*** 数据包抽象类* * @author outman*/@Dataabstract class Test_10_Packet {// 协议版本号private byte version = 1;// 获取指定标识public abstract byte getCommand();// 指令集合public interface Command {// 登录指令public static final byte LOGIN_REQUEST = 1;// 登陆响应指令public static final byte LOGIN_RESPONSE = 2;}}/*** 序列化抽象接口* * @author outman*/interface Test_10_Serializer {// 获取序列化算法标识byte getSerializerAlgorithm();// 序列化算法标识集合interface SerializerAlgorithm {// JSON 序列化算法标识public static final byte JSONSerializerAlgrothm = 1;}// 默认的序列化算法public Test_10_Serializer DEFAULT = new Test_10_JSONSerializer();// 序列化byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet);// 反序列化<T>T deSerialize(byte[] bs, Class<T> clazz);}/*** 数据包编解码类* * @author outman*/class Test_10_PacketCodec {// 魔数private static final int MAGIC_NUMBER = 0x12345678;// 单例public static Test_10_PacketCodec INSTANCE = new Test_10_PacketCodec();// 注册 序列化类private Class[] serializerArray = new Class[] { Test_10_JSONSerializer.class };// 注册抽象数据包类private Class[] packetArray = new Class[] { Test_10_LoginRequestPacket.class, Test_10_LoginResponsePacket.class };// 序列化算法标识 和对应的序列化类映射private static Map<Byte, Class<? super Test_10_Serializer>> serializerMap;// 指令标识和对应的数据包抽象类映射private static Map<Byte, Class<? super Test_10_Packet>> packetMap;// 初始化 两个映射private Test_10_PacketCodec() {serializerMap = new HashMap<>();Arrays.asList(serializerArray).forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");byte serializerAlgorthm = (byte) method.invoke((Test_10_Serializer)clazz.newInstance());serializerMap.put(serializerAlgorthm, clazz);} catch (Exception e) {e.printStackTrace();}});packetMap = new HashMap<>();Arrays.asList(packetArray).forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");method.setAccessible(true);byte command = (byte) method.invoke((Test_10_Packet)clazz.newInstance());packetMap.put(command, clazz);} catch (Exception e) {e.printStackTrace();}});}// 编码public ByteBuf enCode(ByteBuf byteBuf, Test_10_Packet packet) {// 序列化数据包byte[] bs = Test_10_Serializer.DEFAULT.enSerialize(byteBuf, packet);// 写入魔数byteBuf.writeInt(MAGIC_NUMBER);// 写入协议版本号byteBuf.writeByte(packet.getVersion());// 写入指令标识byteBuf.writeByte(packet.getCommand());// 写入序列化算法标识byteBuf.writeByte(Test_10_Serializer.DEFAULT.getSerializerAlgorithm());// 写入数据长度byteBuf.writeInt(bs.length);// 写入数据byteBuf.writeBytes(bs);return byteBuf;}// 解码public Test_10_Packet deCode(ByteBuf byteBuf) throws Exception {// 跳过魔数校验byteBuf.skipBytes(4);// 跳过版本号校验byteBuf.skipBytes(1);// 获取指令标识byte command = byteBuf.readByte();// 获取序列化算法标识byte serializerAlgorthm = byteBuf.readByte();// 获取数据长度int len = byteBuf.readInt();// 获取数据byte[] bs = new byte[len];byteBuf.readBytes(bs);// 获取对应的序列化算法类Test_10_Serializer serializer = getSerializer(serializerAlgorthm);// 获取对应的数据包类Test_10_Packet packet = getPacket(command);if(serializer != null && packet != null) {//反序列化数据包return serializer.deSerialize(bs, packet.getClass());}else {throw new RuntimeException("没有找到对应的序列化实现或数据包实现");}}private static Test_10_Packet getPacket(byte command) throws Exception {return (Test_10_Packet) packetMap.get(command).newInstance();}private static Test_10_Serializer getSerializer(byte serializerAlgorthm) throws Exception {return (Test_10_Serializer) serializerMap.get(serializerAlgorthm).newInstance();}}/*** 登录请求数据包实体类* * @author outman*/@Dataclass Test_10_LoginRequestPacket extends Test_10_Packet {private int userId ;private String userName;private String password;@Overridepublic byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 登录响应数据包实体类* * @author outman*/@Dataclass Test_10_LoginResponsePacket extends Test_10_Packet {private int code;private String msg;@Overridepublic byte getCommand() {return Command.LOGIN_RESPONSE;}/*** 响应码集合* */interface Code{// 成功的响应码public static final int SUCCESS= 10000;// 失败的响应码public static final int FAIL = 10001;}}/*** Json序列化实现类* * @author outman*/class Test_10_JSONSerializer implements Test_10_Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSONSerializerAlgrothm;}@Overridepublic byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet) {return JSONObject.toJSONBytes(packet);}@Overridepublic <T>T deSerialize(byte[] bs, Class<T> clazz) {return JSONObject.parseObject(bs, clazz);}}
  1. 这个代码框架中 已经写好了 服务端、 客户端启动连接 。 通信协议 、 数据包编解码的逻辑 , 剩下的客户端、服务端业务处理逻辑 , 我们边学边写, 现在你可以把代码框架粘贴到你的编辑器中

二、 逻辑处理器

  1. 接下来我们分别实现一下上述四个过程 , 开始之前 , 我们回顾一下客户端与服务端的启动流程 , 客户端启动的时候 , 我们会在引导类BootStrap里配置客户端处理逻辑 , 本小节中我们的客户端业务处理逻辑叫做Test_10_clientHandler

     /*** 客户端处理逻辑* * @author outman*/class Test_10_clientHandler extends ChannelInboundHandlerAdapter {/*** 连接成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有数据可读时触发*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {}}
    
  2. 我们在客户端启动的时候 , 给客户端引导类配置这个逻辑处理器 , 这样Netty中事件相关的回调就会回调我们的Test_10_clientHandler

     bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客户端处理逻辑ch.pipeline().addLast(new Test_10_clientHandler());}});
    
  3. 同样 我们给服务端引导类页配置一个逻辑处理器

     serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服务端处理逻辑ch.pipeline().addLast(new Test_10_serverHandler());}});
    
  4. 接下来我们主要围绕这两个Handler来编写客户端登录相关的处理逻辑

三、 客户端发送登录请求

  1. 客户端处理登录请求

    1. 我们实现在客户端连接上服务端之后 , 立即登录。 在客户端和服务端连接成功时 , Netty 会回调Test_10_clientHandler 的channelActive(ChannelHandlerContext ctx) 方法 , 我们在这里写 请求登录的逻辑(我们事先在 Test_10_LoginRequestPacket 中添加了三个属性 , Test_10_LoginRequestPacket 类上的@Data 注解是lombok 提供的 ,让我们不用写setter/getter)

        /*** 连接成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端:"+new Date()+"开始登陆");// 创建登陆对象Test_10_LoginRequestPacket loginRequestPacket = new Test_10_LoginRequestPacket();// 随机取ID  1~999 loginRequestPacket.setUserId((int)(Math.random()*1000)+1);loginRequestPacket.setUserName("outman");loginRequestPacket.setPassword("123456");// 编码ByteBuf byteBuf = Test_10_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket);// 写出数据ctx.channel().writeAndFlush(byteBuf);}
      
    2. 写数据的时候 , 我们通过ctx.channel() 获取到当前连接(Netty对连接 的抽象为channel , 后面小节会分析) , 然后调用了writeAndFlush() 方法 就能把二进制数据写到服务端

  2. 服务端处理登录请求

     /*** 服务端处理逻辑* * @author outman*/class Test_10_serverHandler extends ChannelInboundHandlerAdapter {/*** 连接成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有数据可读时触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解码Test_10_Packet packet = Test_10_PacketCodec.INSTANCE.deCode(byteBuf);// 根据指令执行对应的处理逻辑switch (packet.getCommand() ) {case Test_10_Packet.Command.LOGIN_REQUEST:Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;// 校验成功System.out.println("服务端:"+new Date()+"【"+loginRequestPacket.getUserName()+"】 登陆成功");break;default:System.out.println("服务端:"+new Date()+"收到未知的指令【"+packet.getCommand()+"】");break;}}}
    
    1. 我们在服务端引导类 ServerBootstrap 添加了逻辑处理器Test_10_serverHandler 之后 , Netty 在收到数据之后会回调channelRead() , 这里第二个参数msg , 在我们这个场景中 , 可以直接强转为ByteBuf , 为什么Netty不直接把这个参数类型定义为ByteBuf? , 我们在后面的小节会分析到
    2. 拿到ByteBuf 之后 , 首先要做的事情就是解码 , 解码出的java数据包对象 , 然后判断如果是登陆请求数据包, 就进行登录逻辑的处理这里我们假设所有的登录请求都是成功的 , 接下来, 我们来告诉客户端他登陆成功的好消息。

四、 服务端发送登录响应

  1. 服务端发送登录响应

     /*** 服务端处理逻辑* * @author outman*/class Test_10_serverHandler extends ChannelInboundHandlerAdapter {/*** 连接成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有数据可读时触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解码Test_10_Packet packet = Test_10_PacketCodec.INSTANCE.deCode(byteBuf);// 根据指令执行对应的处理逻辑switch (packet.getCommand() ) {case Test_10_Packet.Command.LOGIN_REQUEST:Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;// 模拟校验成功System.out.println("服务端:"+new Date()+"【"+loginRequestPacket.getUserName()+"】 登陆成功");// 给服务端响应Test_10_LoginResponsePacket loginResponsePacket = new Test_10_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陆成功!");// 编码byteBuf = Test_10_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);//写出数据ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服务端:"+new Date()+"收到未知的指令【"+packet.getCommand()+"】");break;}}}
    
    1. 这段逻辑仍然时候服务端逻辑处理器Test_10_serverHandler的channelRead 方法中 , 我们构造一个登录响应包Test_10_LoginResponsePacket , 然后在校验成功和失败时分别设置标志位 , 接下来调用编码器把java对象编码成ByteBuf , 然后调用writeAndFlush 把数据包写给客户端
  2. 客户端处理登录响应

     /*** 有数据可读时触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 数据包解码Test_10_Packet packet= Test_10_PacketCodec.INSTANCE.deCode(byteBuf);//根据不同的指令选择对应的处理逻辑switch (packet.getCommand()) {case Test_10_Packet.Command.LOGIN_RESPONSE:Test_10_LoginResponsePacket loginResponsePacket = (Test_10_LoginResponsePacket) packet;System.out.println("客户端:"+new Date() +"收到服务端响应【"+loginResponsePacket.getMsg()+"】");break;default:break;}}
    
    1. 客户端拿到数据之后 , 调用Test_10_PacketCodec 进行解码操作 , 然后我们打印出服务端的响应内容
  3. 执行结果

  4. 完整代码

     /*** 实现客户端登录* * @author outman*/public class Test_10_实现客户端登录 {public static void main(String[] args) {// 启动服务端Test_10_server.start(8000);// 启动客户端Test_10_client.start("127.0.0.1", 8000, 5);}}/*** 客户端* * @author outman*/class Test_10_client {/*** 客户端启动* * @param ip*            连接ip* @param port*            服务端端口* @param maxRetry*            最大重试次数*/public static void start(String ip, int port, int maxRetry) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加 客户端处理逻辑ch.pipeline().addLast(new Test_10_clientHandler());}});// 连接服务端connect(bootstrap, ip, port, maxRetry);}/*** @desc 连接服务端* @param bootstrap* @param ip* @param port* @param maxRetry* @param retryIndex*            重试计数*/private static void connect(Bootstrap bootstrap, String ip, int port, int maxRetry, int... retryIndex) {bootstrap.connect(ip, port).addListener(future -> {int[] finalRetryIndex;// 初始化 重连计数if (retryIndex.length == 0) {finalRetryIndex = new int[] { 0 };} else {finalRetryIndex = retryIndex;}// 判断连接状态if (future.isSuccess()) {System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】成功");} else if (maxRetry <= 0) {System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败,达到重连最大次数放弃重连");} else {// 重连使用退避算法int delay = 1 << finalRetryIndex[0];System.out.println("客户端:" + new Date() + "连接【" + ip + ":" + port + "】失败," + delay + "秒后执行重试");bootstrap.config().group().schedule(() -> {connect(bootstrap, ip, port, maxRetry - 1, finalRetryIndex[0] + 1);}, delay, TimeUnit.SECONDS);}});}}/*** 服务端* * @author outman*/class Test_10_server {/*** @desc 服务端启动* @param port*/public static void start(int port) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {// 添加服务端处理逻辑ch.pipeline().addLast(new Test_10_serverHandler());}});// 绑定端口bind(serverBootstrap, port);}/*** @desc 自动绑定递增并启动服务端* @param serverBootstrap* @param port*/private static void bind(ServerBootstrap serverBootstrap, int port) {serverBootstrap.bind(port).addListener(future -> {if (future.isSuccess()) {System.out.println("服务端:" + new Date() + "绑定端口【" + port + "】成功");} else {System.out.println("服务端:" + new Date() + "绑定端口【" + port + "】失败,执行递增绑定");bind(serverBootstrap, port + 1);}});}}/*** 客户端处理逻辑* * @author outman*/class Test_10_clientHandler extends ChannelInboundHandlerAdapter {/*** 连接成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端:"+new Date()+"开始登陆");// 创建登陆对象Test_10_LoginRequestPacket loginRequestPacket = new Test_10_LoginRequestPacket();// 随机取ID  1~999 loginRequestPacket.setUserId((int)(Math.random()*1000)+1);loginRequestPacket.setUserName("outman");loginRequestPacket.setPassword("123456");// 编码ByteBuf byteBuf = Test_10_PacketCodec.INSTANCE.enCode(ctx.alloc().buffer(), loginRequestPacket);// 写出数据ctx.channel().writeAndFlush(byteBuf);}/*** 有数据可读时触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;// 数据包解码Test_10_Packet packet= Test_10_PacketCodec.INSTANCE.deCode(byteBuf);//根据不同的指令选择对应的处理逻辑switch (packet.getCommand()) {case Test_10_Packet.Command.LOGIN_RESPONSE:Test_10_LoginResponsePacket loginResponsePacket = (Test_10_LoginResponsePacket) packet;System.out.println("客户端:"+new Date() +"收到服务端响应【"+loginResponsePacket.getMsg()+"】");break;default:break;}}}/*** 服务端处理逻辑* * @author outman*/class Test_10_serverHandler extends ChannelInboundHandlerAdapter {/*** 连接成功时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {}/*** 有数据可读时触发*/@Overridepublic void channelRead(ChannelHandlerContext ctx , Object obj) throws Exception {ByteBuf byteBuf = (ByteBuf) obj;// 解码Test_10_Packet packet = Test_10_PacketCodec.INSTANCE.deCode(byteBuf);// 根据指令执行对应的处理逻辑switch (packet.getCommand() ) {case Test_10_Packet.Command.LOGIN_REQUEST:Test_10_LoginRequestPacket loginRequestPacket = (Test_10_LoginRequestPacket) packet;// 模拟校验成功System.out.println("服务端:"+new Date()+"【"+loginRequestPacket.getUserName()+"】 登陆成功");// 给服务端响应Test_10_LoginResponsePacket loginResponsePacket = new Test_10_LoginResponsePacket();loginResponsePacket.setCode(Code.SUCCESS);loginResponsePacket.setMsg("登陆成功!");// 编码byteBuf = Test_10_PacketCodec.INSTANCE.enCode(byteBuf, loginResponsePacket);//写出数据ctx.channel().writeAndFlush(byteBuf);break;default:System.out.println("服务端:"+new Date()+"收到未知的指令【"+packet.getCommand()+"】");break;}}}/*** 数据包抽象类* * @author outman*/@Dataabstract class Test_10_Packet {// 协议版本号private byte version = 1;// 获取指定标识public abstract byte getCommand();// 指令集合public interface Command {// 登录指令public static final byte LOGIN_REQUEST = 1;// 登陆响应指令public static final byte LOGIN_RESPONSE = 2;}}/*** 序列化抽象接口* * @author outman*/interface Test_10_Serializer {// 获取序列化算法标识byte getSerializerAlgorithm();// 序列化算法标识集合interface SerializerAlgorithm {// JSON 序列化算法标识public static final byte JSONSerializerAlgrothm = 1;}// 默认的序列化算法public Test_10_Serializer DEFAULT = new Test_10_JSONSerializer();// 序列化byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet);// 反序列化<T>T deSerialize(byte[] bs, Class<T> clazz);}/*** 数据包编解码类* * @author outman*/class Test_10_PacketCodec {// 魔数private static final int MAGIC_NUMBER = 0x12345678;// 单例public static Test_10_PacketCodec INSTANCE = new Test_10_PacketCodec();// 注册 序列化类private Class[] serializerArray = new Class[] { Test_10_JSONSerializer.class };// 注册抽象数据包类private Class[] packetArray = new Class[] { Test_10_LoginRequestPacket.class, Test_10_LoginResponsePacket.class };// 序列化算法标识 和对应的序列化类映射private static Map<Byte, Class<? super Test_10_Serializer>> serializerMap;// 指令标识和对应的数据包抽象类映射private static Map<Byte, Class<? super Test_10_Packet>> packetMap;// 初始化 两个映射private Test_10_PacketCodec() {serializerMap = new HashMap<>();Arrays.asList(serializerArray).forEach(clazz -> {try {Method method = clazz.getMethod("getSerializerAlgorithm");byte serializerAlgorthm = (byte) method.invoke((Test_10_Serializer)clazz.newInstance());serializerMap.put(serializerAlgorthm, clazz);} catch (Exception e) {e.printStackTrace();}});packetMap = new HashMap<>();Arrays.asList(packetArray).forEach(clazz -> {try {Method method = clazz.getMethod("getCommand");method.setAccessible(true);byte command = (byte) method.invoke((Test_10_Packet)clazz.newInstance());packetMap.put(command, clazz);} catch (Exception e) {e.printStackTrace();}});}// 编码public ByteBuf enCode(ByteBuf byteBuf, Test_10_Packet packet) {// 序列化数据包byte[] bs = Test_10_Serializer.DEFAULT.enSerialize(byteBuf, packet);// 写入魔数byteBuf.writeInt(MAGIC_NUMBER);// 写入协议版本号byteBuf.writeByte(packet.getVersion());// 写入指令标识byteBuf.writeByte(packet.getCommand());// 写入序列化算法标识byteBuf.writeByte(Test_10_Serializer.DEFAULT.getSerializerAlgorithm());// 写入数据长度byteBuf.writeInt(bs.length);// 写入数据byteBuf.writeBytes(bs);return byteBuf;}// 解码public Test_10_Packet deCode(ByteBuf byteBuf) throws Exception {// 跳过魔数校验byteBuf.skipBytes(4);// 跳过版本号校验byteBuf.skipBytes(1);// 获取指令标识byte command = byteBuf.readByte();// 获取序列化算法标识byte serializerAlgorthm = byteBuf.readByte();// 获取数据长度int len = byteBuf.readInt();// 获取数据byte[] bs = new byte[len];byteBuf.readBytes(bs);// 获取对应的序列化算法类Test_10_Serializer serializer = getSerializer(serializerAlgorthm);// 获取对应的数据包类Test_10_Packet packet = getPacket(command);if(serializer != null && packet != null) {//反序列化数据包return serializer.deSerialize(bs, packet.getClass());}else {throw new RuntimeException("没有找到对应的序列化实现或数据包实现");}}private static Test_10_Packet getPacket(byte command) throws Exception {return (Test_10_Packet) packetMap.get(command).newInstance();}private static Test_10_Serializer getSerializer(byte serializerAlgorthm) throws Exception {return (Test_10_Serializer) serializerMap.get(serializerAlgorthm).newInstance();}}/*** 登录请求数据包实体类* * @author outman*/@Dataclass Test_10_LoginRequestPacket extends Test_10_Packet {private int userId ;private String userName;private String password;@Overridepublic byte getCommand() {return Command.LOGIN_REQUEST;}}/*** 登录响应数据包实体类* * @author outman*/@Dataclass Test_10_LoginResponsePacket extends Test_10_Packet {private int code;private String msg;@Overridepublic byte getCommand() {return Command.LOGIN_RESPONSE;}/*** 响应码集合* */interface Code{// 成功的响应码public static final int SUCCESS= 10000;// 失败的响应码public static final int FAIL = 10001;}}/*** Json序列化实现类* * @author outman*/class Test_10_JSONSerializer implements Test_10_Serializer {@Overridepublic byte getSerializerAlgorithm() {return SerializerAlgorithm.JSONSerializerAlgrothm;}@Overridepublic byte[] enSerialize(ByteBuf byteBuf, Test_10_Packet packet) {return JSONObject.toJSONBytes(packet);}@Overridepublic <T>T deSerialize(byte[] bs, Class<T> clazz) {return JSONObject.parseObject(bs, clazz);}}
    

五、 总结

  1. 本小节我们梳理了客户端登录的基本流程 , 然后结合上一小节的编解码逻辑 , 我们使用Netty 完成了完整的客户端登录流程。

六、 思考

  1. 客户端登录成功或失败之后 , 如何把成功或者失败的标识绑定在客户端的连接上 ? 服务端又是怎样有效的避免客户端重新登录的?

    1. 答: 给channel设置attr自定义属性 , 可以把登录标识绑定在连接上
  2. 客户端NioEventLoopGroup不用释放吗?
    1. 答: 不用 , 程序关闭之后 , 所有的线程都自动关闭了

Netty实战 IM即时通讯系统(九)实现客户端登录相关推荐

  1. Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息

    Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数据 ...

  2. Netty实战 IM即时通讯系统(十二)构建客户端与服务端pipeline

    Netty实战 IM即时通讯系统(十二)构建客户端与服务端pipeline 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向 ...

  3. Netty实战 IM即时通讯系统(十一)pipeline与channelHandler

    Netty实战 IM即时通讯系统(十一)pipeline与channelHandler 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端 ...

  4. Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码

    Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数 ...

  5. Netty实战 IM即时通讯系统(七)数据传输载体ByteBuf介绍

    ## Netty实战 IM即时通讯系统(七)数据传输载体ByteBuf介绍 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向 ...

  6. Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信

    ## Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载 ...

  7. Netty实战 IM即时通讯系统(五)客户端启动流程

    ## Netty实战 IM即时通讯系统(五)客户端启动流程 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf ...

  8. Netty实战 IM即时通讯系统(四)服务端启动流程

    ## Netty实战 IM即时通讯系统(四)服务端启动流程 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf ...

  9. Netty实战 IM即时通讯系统(三)Netty环境配置

    ## Netty实战 IM即时通讯系统(三)Netty环境配置 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteB ...

最新文章

  1. kvm虚拟服务器备份,OpenStack KVM虚拟机实例磁盘的备份脚本
  2. PCL点云库学习笔记 点云的欧式聚类
  3. 危害网站关键词优化的因素如何避免?
  4. Spring 框架 详解 (四)------IOC装配Bean(注解方式)
  5. python类定义中、对象字符串的特殊方法是_python中自定义类对象json字符串化的方法_python json转字符串、...
  6. 10分钟读懂人工智能、机器学习到底有什么关系
  7. python from import什么意思_Python 引用From import介绍
  8. 今天会议的召开,和你有关系吗?
  9. 中国医学不能走西方道路
  10. opencv之绘制带箭头的线段---arrowedLine
  11. Ubuntu 12.04下NFS安装配置
  12. 理光打印机服务器响应错误,理光网络打印机服务器设置
  13. XPS Viewer 无法设置权限账户 - 无法激活此计算机上的任何权限管理账户
  14. php 有道翻译api,php有道翻译api调用方法实例
  15. 利用python对股票商誉进行排名分析,防止踩雷
  16. SAP采购定价过程-条件技术介绍
  17. Illegal base64 character 20
  18. Win10任务栏假死问题解决方案
  19. word中图片不显示怎么办
  20. java javascript数组_浅谈javascript和java中的数组

热门文章

  1. mybatis 一对一与一对多collection和association的使用
  2. JSON表单提交(ajax异步刷新)
  3. Xcode 升级后,cocoaPod 问题
  4. stm8s103k3 周期 捕获_STM8S103K3 - 主流基本型系列8位MCU,具有8 KB Flash、16 MHz CPU和集成EEPROM - STMicroelectronics...
  5. python日志_python日志处理
  6. Java黑皮书课后题第10章:**10.25(新的字符串split方法)String类中的split方法会返回一个字符串数组,该数组是由分隔符分隔开的子串构成的
  7. C语言学习之企业发放的奖金根据利润提成。利润I低于或等于100000元的,奖金可提成10%;
  8. 9.找出1000以内的完数,所谓完数是指该数的各因子之和等于该数,如:6 = 1+2+3。
  9. 【bzoj4264】小C找朋友
  10. 以太坊solidity编程常见错误(不定期更新)