netty server端使用自定义解码器,通过存储client连接实现主动推送消息,并发送自定义心跳包

Server端

依赖
     <dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.19.Final</version></dependency>
创建服务端
@Component
public class NettyServer {@Autowiredprivate ServiceConfiguration serviceConfiguration;private EventLoopGroup bossGroup = new NioEventLoopGroup();private EventLoopGroup workGroup = new NioEventLoopGroup();/*** 启动netty服务** @throws InterruptedException*/@PostConstructpublic void start() throws Exception {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)//SO_BACKLOG只能控制最大并发量,无法限制最大连接数.option(ChannelOption.SO_BACKLOG, 128)//端口复用.option(ChannelOption.SO_REUSEADDR, true).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//执行顺序为注册顺序的逆序//IdleStateHandler()中的参数为多长时间未读,未写,(未读且未写)触发事件socketChannel.pipeline().addLast(new IdleStateHandler(0, 0, 5, TimeUnit.SECONDS));socketChannel.pipeline().addLast("messageHandler", new MessageHandler());socketChannel.pipeline().addLast("serverHandler", new ServerHandler());}});//开启需要监听 的端口ChannelFuture future = b.bind(serviceConfiguration.getAddress(), serviceConfiguration.getClientPort()).sync();if (future.isSuccess()) {//serviceConfiguration类通过@value引入可配置的IP和端口System.out.println("IP:" + serviceConfiguration.getAddress() + "端口" + serviceConfiguration.getClientPort() + "启动监听成功");}}/*** 销毁*/@PreDestroypublic void destroy() {//syncUninterruptibly()主线程同步等待子线程结果,子线程释放后关闭bossGroup.shutdownGracefully().syncUninterruptibly();workGroup.shutdownGracefully().syncUninterruptibly();System.out.println("关闭 Netty 成功");}
ServerHandler
@Component
public class ServerHandler extends ChannelInboundHandlerAdapter {@Autowiredprivate ServiceConfiguration serviceConfiguration;/*** 获取真实连接*/public static Map<String, ChannelHandlerContext> realMap = new ConcurrentHashMap<>();/*** 获取到连接时触发*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {//监测到连接时先发送一个自定义心跳信息//通过发送自定义心跳的方法做身份验证,保存真实连接sendHeart(ctx);}/*** 断开连接触发*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("Disconnected client." + ctx.channel().remoteAddress().toString());//检测到连接关闭时移除存储的key信息String ipAddress = getContext(ctx);if (realMap.containsKey(ipAddress)) {realMap.remove(ipAddress);}ctx.fireChannelInactive();}/*** 获取数据** @param ctx 上下文* @param msg 获取的数据* @throws UnsupportedEncodingException*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {//接收到解码器返回的数据后先进行判断//无特殊需求可直接通过ctx来处理数据if (realMap.size() > 1) {//除非被恶意攻击,理论上此处不会发生。。。realMap.clear();//发送身份验证包sendHeart(ctx);} else {//解析数据byte[] bytes = (byte[]) msg;String s = getMessage(bytes);if (s.contains("heartbeats")) {//处理心跳数据System.out.println(s);HeartBeat heartBeat = JsonUtil.json2Object(s, HeartBeat.class);System.out.println(heartBeat);System.out.println("接收到的数据" + ByteCopyUtil.toHexString(bytes));if(realMap.size() != 1){//存储连接realMap.put(getContext(ctx), ctx);}} else {//处理其他格式包的业务逻辑System.out.println("接收到的数据" + ByteCopyUtil.toHexString(bytes));}}}/**异常关闭连接*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}/** 通过IdleStateHandler()定时发送自定义心跳*/@Overridepublic void userEventTriggered(ChannelHandlerContext context, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleState state = ((IdleStateEvent) evt).state();//IdleState.XX 处理事件if (state == IdleState.ALL_IDLE) {sendHeart(context);}}}/*** 获取客户端IP地址*/public String getContext(ChannelHandlerContext context) {InetSocketAddress ipSocket = (InetSocketAddress) context.channel().remoteAddress();String clientIp = ipSocket.getAddress().getHostAddress();System.out.println("客户端ip地址" + clientIp);return clientIp;}/*** 自定义发送消息方法*/public int sendMessage(String jsonObj) {//根据连接数量返回异常状态值if (realMap.size() == 1) {realMap.get(serviceConfiguration.getAddress()).writeAndFlush(SendJsonObj.getBuf(jsonObj));return 1;} else if (realMap.size() > 1) {//连接数量大于1,拒绝发送数据return -1;} else {//未获取到真实连接return 0;}}/**通用发送方法*/public void sendMessageData(String ipAddress,String jsonObj) {//通过ip地址指定要发送客户端realMap.get(ipAddress).writeAndFlush(SendJsonObj.getBuf(jsonObj));   }public void sendHeart(ChannelHandlerContext context) {//HeartBeat 自定义心跳类HeartBeat heartBeat = new HeartBeat("heartbeats");//以json的形式发送String s = JsonUtil.object2Json(heartBeat);//获取连接主动发送给对端身份验证信息(SendJsonObj类中自定义了一个身份校验包(ByteBuf))context.writeAndFlush(SendJsonObj.getBuf(s));}/**解析获取自定义包中Json字符串长度*/public int getMessageLen(byte[] bytes) {if (bytes.length > 8) {byte[] len = new byte[4];len[0] = bytes[4];len[1] = bytes[5];len[2] = bytes[6];len[3] = bytes[7];return BitConverter.toInt(len) - 17;} else {return -1;}}/**解析自定义包中的Json字符串*/public String getMessage(byte[] bytes) {int len = getMessageLen(bytes);byte[] message = new byte[len];try {for (int i = 0; i < len; i++) {message[i] = bytes[i + 16];}String s = JsonUtil.object2Json(new String(message));return s;} catch (Exception e) {e.printStackTrace();return "getMessageFail";}}
MessageHandler(自定义解码器)
@Component
public class MessageHandler extends ByteToMessageDecoder {private ByteBuf tempMessage = Unpooled.buffer();public MessageHandler() {//自定义包头this.tempMessage.writeInt(0x00000000);}/**找到并返回一个完整包*/@Overrideprotected void decode(ChannelHandlerContext context, ByteBuf input, List<Object> output) throws Exception {while (true) {int index = IndexOf(input, tempMessage);if (index == -1) {//没找到return;} else if (index != 0) {//找到了input.skipBytes(index);}if (input.readableBytes() < 8) {return;}input.markReaderIndex();int length = 0;ByteBuf b = input.skipBytes(4).readBytes(4);byte[] bs = new byte[b.readableBytes()];b.readBytes(bs);b.release();length = BitConverter.ToInt32(bs, 0);input.resetReaderIndex();//异常处理if (length < 0) {input.skipBytes(4);return;}//length长度为协议长度,不包括包头或校验位等位置if (input.readableBytes() >= (length)) {byte[] bb = new byte[length];input.readBytes(bb);output.add(bb);} else {return;}}}/**查找自定义包头*/static int IndexOf(ByteBuf haystack, ByteBuf needle) {for (int i = haystack.readerIndex(); i < needle.writerIndex(); i++) {int haystackIndex = i;int needleIndex;for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) {if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) {break;} else {haystackIndex++;if (haystackIndex == haystack.writerIndex() && needleIndex != 3) {return -1;}}}if (needleIndex == 4) {return i - haystack.readerIndex();}}return -1;}

Netty-Server-Hander自定义解码器-服务端主动推送相关推荐

  1. flux服务器推消息,服务端主动推送数据,除了 WebSocket 你还能想到啥?

    原标题:服务端主动推送数据,除了 WebSocket 你还能想到啥? 来自公众号: 江南一点雨 在 上篇文章 中,松哥和大家分享了 WebFlux 的基本用法,小伙伴们已经了解到使用 WebFlux ...

  2. 使用SignalR从服务端主动推送警报日志到各种终端(桌面、移动、网页)

    工作上有个业务,.Net Core WebAPI作为服务端,需要将运行过程中产生的日志分类,并实时推送到各种终端进行报警,终端有桌面(WPF).移动(Xamarin.Forms).网站(Angular ...

  3. spring集成mina,包含心跳检测,实现服务端主动推送

    服务端 1.常规的spring工程集成mina时,pom.xml中需要加入如下配置: <dependency><groupId>org.slf4j</groupId> ...

  4. 服务端主动推送数据,除了 WebSocket 你还能想到啥?

    在上篇文章中,松哥和大家分享了 WebFlux 的基本用法,小伙伴们已经了解到使用 WebFlux 我们的返回值可以是 Mono 也可以是 Flux,如果是 Flux,由于 Flux 中包含多个元素, ...

  5. 利用mochiweb让服务端主动推送数据至前端页面

    对于智能化监控软件,从wincc等国外的有相当积累的系统,以及国内一些小型的智能化集成软件,通常其监控数据通过前端绑定控件的方式,做到了实时的通讯,通过控件直接和后端服务交互.这种方式可以灵活的组态, ...

  6. 服务端如何推送消息给客户端?

    大家好,我是前端西瓜哥,今天带大家了解一下服务端如何推送消息给客户端. 有时候,我们希望服务端能够主动推送一些信息给客户端.但 HTTP 协议只能让客户端发起请求然后服务端响应,而无法让服务端主动去发 ...

  7. 基于netty搭建websocket,实现消息的主动推送

    基于netty搭建websocket,实现消息的主动推送 rpf_siwash https://www.jianshu.com/p/56216d1052d7 netty是由jboss提供的一款开源框架 ...

  8. Asp.net SignalR 实现服务端消息推送到Web端

    参考博客https://www.cnblogs.com/wintersun/p/4148223.html ASP .NET SignalR是一个ASP .NET 下的类库,可以在ASP .NET 的W ...

  9. pushlet实现单机-集群服务端消息推送

    一.什么是pushlet? 1.pushlet推送是一种将java后台数据推送到web页面的框架技术,实现了comet. 2.comet是一个用于描述客户端和服务器之间交互的术语,即使用长期保持的ht ...

最新文章

  1. 配置scp在Linux或Unix之间传输文件无需密码
  2. C#windows向窗体传递泛型类
  3. Maven编译jar出现:无法确定 T 的类型参数的异常的原因和处理方案
  4. MySql修改最大连接数的两种方式
  5. 推荐系统之业务架构总览
  6. php 调取子栏目,Dedecms 如何调取某个栏目所在的顶级栏目及顶级下的子栏目
  7. java.util.concurrent.BlockingQueue指南
  8. 中断linux命令快捷键_实用!快速操作Linux终端命令行的快捷键
  9. Mac IDEA解决Maven项目命令行报错:command not found: mvn
  10. 如何破解加密的PDF文件
  11. 计算机中计算平均数的函数是什么,Excel里怎么求平均数的?函数是什么?!excle2010怎么求平均数...
  12. 数据分析——Python内容学习【1】
  13. Windows AD域下批量分发安装软件
  14. 湛蓝.Net软件国际化工具V1.0.0.发布了,欢迎大家使用
  15. ILSpy中baml转化为xaml的改进(四)
  16. OSChina 周二乱弹 —— 怎么制作妹子面
  17. ccf-csp 2015春季真题题解
  18. 剖析云计算技术及架构(2 云存储)
  19. mysql gunzip 远程,Java 操作mysql 导入|导出 gzip|gunzip 工具类
  20. 东芝Toshiba e-STUDIO810 一体机驱动

热门文章

  1. 拼搏了3个月,工资翻了3倍多后,我哭了......
  2. 怎么打印飞书的审批单?
  3. 深入理解Java虚拟机二(类加载器和类的加载过程)
  4. 29、MAix Bit K210开发板进行目标检测
  5. opencv学习:二维浮点数离散傅里叶变换及其扩展边界优化
  6. 大工21秋《房地产开发与经营课程设计》离线作业
  7. CocosCreator之做个基于物理引擎的绳子关节
  8. 权威报告:2030年企业将全面上云,隐私计算可保障数据上云安全可信
  9. 终于拿到了阿里技术专家分享的552页大型网站架构实战
  10. Aws云服务EMR使用