开发者(KaiFaX)

面向全栈工程师的开发者
专注于前端、Java/Python/Go/PHP的技术社区

来源 | blog.csdn.net/weixin_44912855

学过 Netty 的都知道,Netty 对 NIO 进行了很好的封装,简单的 API,庞大的开源社区。深受广大程序员喜爱。基于此本文分享一下基础的 netty 使用。实战制作一个 Netty + websocket 的消息推送小栗子。

netty服务器

@Component
public class NettyServer {static final Logger log = LoggerFactory.getLogger(NettyServer.class);/*** 端口号*/@Value("${webSocket.netty.port:8888}")int port;EventLoopGroup bossGroup;EventLoopGroup workGroup;@AutowiredProjectInitializer nettyInitializer;@PostConstructpublic void start() throws InterruptedException {new Thread(() -> {bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();// bossGroup辅助客户端的tcp连接请求, workGroup负责与客户端之前的读写操作bootstrap.group(bossGroup, workGroup);// 设置NIO类型的channelbootstrap.channel(NioServerSocketChannel.class);// 设置监听端口bootstrap.localAddress(new InetSocketAddress(port));// 设置管道bootstrap.childHandler(nettyInitializer);// 配置完成,开始绑定server,通过调用sync同步方法阻塞直到绑定成功ChannelFuture channelFuture = null;try {channelFuture = bootstrap.bind().sync();log.info("Server started and listen on:{}", channelFuture.channel().localAddress());// 对关闭通道进行监听channelFuture.channel().closeFuture().sync();} catch (InterruptedException e) {e.printStackTrace();}}).start();}/*** 释放资源*/@PreDestroypublic void destroy() throws InterruptedException {if (bossGroup != null) {bossGroup.shutdownGracefully().sync();}if (workGroup != null) {workGroup.shutdownGracefully().sync();}}
}

Netty配置

管理全局Channel以及用户对应的channel(推送消息)

public class NettyConfig {/*** 定义全局单利channel组 管理所有channel*/private static volatile ChannelGroup channelGroup = null;/*** 存放请求ID与channel的对应关系*/private static volatile ConcurrentHashMap<String, Channel> channelMap = null;/*** 定义两把锁*/private static final Object lock1 = new Object();private static final Object lock2 = new Object();public static ChannelGroup getChannelGroup() {if (null == channelGroup) {synchronized (lock1) {if (null == channelGroup) {channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);}}}return channelGroup;}public static ConcurrentHashMap<String, Channel> getChannelMap() {if (null == channelMap) {synchronized (lock2) {if (null == channelMap) {channelMap = new ConcurrentHashMap<>();}}}return channelMap;}public static Channel getChannel(String userId) {if (null == channelMap) {return getChannelMap().get(userId);}return channelMap.get(userId);}
}

管道配置

@Component
public class ProjectInitializer extends ChannelInitializer<SocketChannel> {/*** webSocket协议名*/static final String WEBSOCKET_PROTOCOL = "WebSocket";/*** webSocket路径*/@Value("${webSocket.netty.path:/webSocket}")String webSocketPath;@AutowiredWebSocketHandler webSocketHandler;@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {// 设置管道ChannelPipeline pipeline = socketChannel.pipeline();// 流水线管理通道中的处理程序(Handler),用来处理业务// webSocket协议本身是基于http协议的,所以这边也要使用http编解码器pipeline.addLast(new HttpServerCodec());pipeline.addLast(new ObjectEncoder());// 以块的方式来写的处理器pipeline.addLast(new ChunkedWriteHandler());pipeline.addLast(new HttpObjectAggregator(8192));pipeline.addLast(new WebSocketServerProtocolHandler(webSocketPath, WEBSOCKET_PROTOCOL, true, 65536 * 10));// 自定义的handler,处理业务逻辑pipeline.addLast(webSocketHandler);}
}

自定义handler

@Component
@ChannelHandler.Sharable
public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {private static final Logger log = LoggerFactory.getLogger(NettyServer.class);/*** 一旦连接,第一个被执行*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("有新的客户端链接:[{}]", ctx.channel().id().asLongText());// 添加到channelGroup 通道组NettyConfig.getChannelGroup().add(ctx.channel());}/*** 读取数据*/@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {log.info("服务器收到消息:{}", msg.text());// 获取用户ID,关联channelJSONObject jsonObject = JSONUtil.parseObj(msg.text());String uid = jsonObject.getStr("uid");NettyConfig.getChannelMap().put(uid, ctx.channel());// 将用户ID作为自定义属性加入到channel中,方便随时channel中获取用户IDAttributeKey<String> key = AttributeKey.valueOf("userId");ctx.channel().attr(key).setIfAbsent(uid);// 回复消息ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器收到消息啦"));}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("用户下线了:{}", ctx.channel().id().asLongText());// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常:{}", cause.getMessage());// 删除通道NettyConfig.getChannelGroup().remove(ctx.channel());removeUserId(ctx);ctx.close();}/*** 删除用户与channel的对应关系*/private void removeUserId(ChannelHandlerContext ctx) {AttributeKey<String> key = AttributeKey.valueOf("userId");String userId = ctx.channel().attr(key).get();NettyConfig.getChannelMap().remove(userId);}
}

推送消息接口及实现类

public interface PushMsgService {/*** 推送给指定用户*/void pushMsgToOne(String userId, String msg);/*** 推送给所有用户*/void pushMsgToAll(String msg);}
@Service
public class PushMsgServiceImpl implements PushMsgService {@Overridepublic void pushMsgToOne(String userId, String msg) {Channel channel = NettyConfig.getChannel(userId);if (Objects.isNull(channel)) {throw new RuntimeException("未连接socket服务器");}channel.writeAndFlush(new TextWebSocketFrame(msg));}@Overridepublic void pushMsgToAll(String msg) {NettyConfig.getChannelGroup().writeAndFlush(new TextWebSocketFrame(msg));}
}

测试

链接服务器

发送消息

调用接口,往前端推送消息!

OK!

一个简单的 netty 小栗子就完成了。

源码地址:https://gitee.com/dugt/springboot-netty-demo


1. 回复“m”可以查看历史记录;

2. 回复“h”或者“帮助”,查看帮助;

开发者已开通多个技术群交流学习,请加若飞微信:1321113940  (暗号k)进开发群学习交流

说明:我们都是开发者。视频或文章来源于网络,如涉及版权或有误,请您与若飞(1321113940)联系,将在第一时间删除或者修改,谢谢!

开发者:KaiFaX

面向全栈工程师的开发者
专注于前端、Java/Python/Go/PHP的技术社区

Netty实战,Springboot + netty +websocket 实现推送消息相关推荐

  1. 实现微信支付(Native支付),使用WebSocket进行推送 ——4.配置SpringBoot支持WebSocket,推送结果

    实现微信支付(Native支付),使用WebSocket进行推送 --4.配置SpringBoot支持WebSocket,推送结果 依赖 <dependency><groupId&g ...

  2. springboot 实现服务端推送消息

    文章目录 前言 一.关于SSE 1. 概念介绍 2. 特点分析 3. 应用场景 二.SpringBoot实现 三.前端vue调用 四.一些问题 前言 服务端推送消息我们采用SSE方式进行推送. 一.关 ...

  3. HTTP Websocket 服务器推送消息

    文章目录 HTTP HTTP请求过程 1. 无状态 2. 基于TCP协议 心跳包 3. 长.短连接 4. 单向请求 传统服务器推送技术 短轮询 polling 同源限制 跨域资源共享 长轮询 long ...

  4. 实现微信支付(Native支付),使用WebSocket进行推送——3.创建支付订单,接收付款结果

    实现微信支付(Native支付),使用WebSocket进行推送--3.创建支付订单,接收付款结果 注:本实验使用springboot框架 一.创建订单 1.流程 2.创建支付订单所需参数 2. AP ...

  5. 深入解析消息推送平台的设计原理,百万门店同时推送消息是如何实现的?

    简介 现有项目中存在需要针对百万门店同时推送消息的需求,需要设计一个消息推送中心的系统进行专门的消息推送 需求 对百万门店进行消息推送 支持坐席侧websocket实时推送消息通知客服需要注意的事项 ...

  6. Netty游戏服务器开发实战(14):游戏推送的设计

    导读- 本篇主要介绍如何实现游戏服务器推送消息到客户端或者服务器和服务器之间进行消息推送,结合Netty组件,设计一个具有推送功能的高性能游戏服务器框架. 什么是推送?为何需要推送? 首先,我们要明白 ...

  7. Springboot整合Websocket(推送消息通知)

    在手机上相信都有来自服务器的推送消息,比如一些及时的新闻信息,这篇文章主要就是实现这个功能,只演示一个基本的案例.使用的是websocket技术. 一.什么是websocket WebSocket协议 ...

  8. SpringBoot 集成 webSocket,实现后台向客户端推送消息

    图文等内容参考链接 SpringBoot2.0集成WebSocket,实现后台向前端推送信息_Moshow郑锴的博客-CSDN博客_springboot websocket WebSocket 简介 ...

  9. SpringBoot使用Socket向前端推送消息

    个人资源与分享网站:http://xiaocaoshare.com/ 1.对webSocket理解 WebSocket协议是基于TCP的一种新的网络协议.它实现了浏览器与服务器全双工(full-dup ...

最新文章

  1. 黑客帝国真的可以!这100万个「活体人脑细胞」5分钟学会打游戏
  2. android gridview 加载图片大小,Gridview有两列和自动调整大小的图像
  3. 【C++ STL】vector库使用方法
  4. CSS控制鼠标的箭头
  5. python实际应用方面的材料_python应用于哪些方面
  6. FastDFS分布式文件系统
  7. 9_less中的层级结构
  8. sql字符处理函数concat()、concat_ws()
  9. 基于Spring的包含特定注解bean的package扫描工具
  10. linux 跑java程序_Linux下独立执行Java程序
  11. html制作dnf,dnf怎么制作img文件 时装拼合教程
  12. GIS数据处理-OSGB转换3dTiles
  13. python可以做什么灰产-广州市标书资料销毁详细流程
  14. 冥想第二百五十六天。
  15. 静态IP、动态IP、ADSL拨号和DNS这几者你分得清吗?
  16. Java 使用 iText5 API 根据需求导出 PDF
  17. uniapp消息推送(个推-PHP服务端推送)
  18. oracle 11.2 RAC 安装新主机 识别老存储
  19. 阿里云python自测答案_阿里云技能测试python初级中级高级
  20. java面试(二十五)--(1)redis为什么读写速率快性能好(2)说说web.xml文件中可以配置哪些内容(3)和的区别(4)扑克牌顺子

热门文章

  1. java中判断字符串是否为数字(正整数)
  2. video-editing
  3. 什么是微服务?看这里
  4. 【学习笔记】mysql数据库优化小手段/原则
  5. c语言中线程的调度,线程、进程及其调度简介
  6. 什么是焊锡机器人?如何使用?
  7. golang 云效私有模块依赖拉取配置
  8. *.3ds文件格式与*.max文件格式的区别
  9. ETF场内基金:AI量化投资最佳切入点(数据篇)
  10. Centos 6.3 安装 yozo office (永中office)