Netty下的消息群发
本文使用netty4.1.16 JDK 1.8 实现简单的群发功能
代码来源于GitHub上的项目,本着学习态度对该代码进行了仔细学习。
客户端代码
package simplechat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import java.io.BufferedReader; import java.io.InputStreamReader; /** * 简单聊天服务器-客户端 * * @author waylau.com * @date 2015-2-26 */ public class SimpleChatClient {public static void main(String[] args) throws Exception{new SimpleChatClient("localhost", 2333).run(); }private final String host; private final int port; public SimpleChatClient(String host, int port){this.host = host; this.port = port; }public void run() throws Exception{EventLoopGroup group = new NioEventLoopGroup(); try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new SimpleChatClientInitializer()); Channel channel = bootstrap.connect(host, port).sync().channel(); //录入信息 BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); while(true){//将录入的信息添加一个尾缀,用与分包和粘包的判断 channel.writeAndFlush(in.readLine() + "\r\n"); }} catch (Exception e) {e.printStackTrace(); } finally {group.shutdownGracefully(); }}}
客户端处理类
package simplechat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 客户端 channel * * @author waylau.com * @date 2015-2-26 */ public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {@Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {//客户端读取服务端发送回来的消息,只是进行显示 System.out.println(s); } }
package simplechat; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 客户端 ChannelInitializer * * @author waylau.com * @date 2015-2-26 */ public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {@Override public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline(); //DelimiterBasedFrameDecoder:以分隔符作为码流结束标识 pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatClientHandler()); } }
服务端
package simplechat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 简单聊天服务器-服务端 * * @author waylau.com * @date 2015-2-16 */ public class SimpleChatServer {private int port; public SimpleChatServer(int port) {this.port = port; }public void run() throws Exception {EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) EventLoopGroup workerGroup = new NioEventLoopGroup(); try {ServerBootstrap b = new ServerBootstrap(); // (2) b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // (3) .childHandler(new SimpleChatServerInitializer()) //(4) .option(ChannelOption.SO_BACKLOG, 128) // (5) .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) System.out.println("SimpleChatServer 启动了"); // 绑定端口,开始接收进来的连接 ChannelFuture f = b.bind(port).sync(); // (7) // 等待服务器 socket 关闭 。 // 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。 f.channel().closeFuture().sync(); } finally {workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); System.out.println("SimpleChatServer 关闭了"); }}public static void main(String[] args) throws Exception {int port; if (args.length > 0) {port = Integer.parseInt(args[0]); } else {port = 2333; }new SimpleChatServer(port).run(); } }
服务端处理类
package simplechat; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; /** * 服务端 channel * * @author waylau.com * @date 2015-2-16 */ public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> { // (1) /** * A thread-safe Set Using ChannelGroup, you can categorize Channels into a meaningful group. * A closed Channel is automatically removed from the collection, */ public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); //2.覆盖了 handlerAdded() 事件处理方法。每当从服务端收到新的客户端连接时,客户端的 Channel 存入 ChannelGroup列表中,并通知列表中的其他客户端 Channel @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // (2) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " channel_id :" + incoming.id() + " 加入\n"); //添加到channelGroup 通道组 channels.add(ctx.channel()); }//3.覆盖了 handlerRemoved() 事件处理方法。每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { // (3) Channel incoming = ctx.channel(); // Broadcast a message to multiple Channels channels.writeAndFlush("[SERVER] - " + incoming.remoteAddress() + " 离开\n"); // A closed Channel is automatically removed from ChannelGroup, // so there is no need to do "channels.remove(ctx.channel());" }//4.覆盖了 channelRead0() 事件处理方法。每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived() @Override protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4) Channel incoming = ctx.channel(); for (Channel channel : channels) {//遍历ChannelGroup中的channel if (channel != incoming){//找到加入到ChannelGroup中的channel后,将录入的信息回写给除去发送信息的客户端 channel.writeAndFlush("[" + incoming.remoteAddress() + "]" + s + "\n"); }else {channel.writeAndFlush("[you]" + s + "\n"); }}}//5.覆盖了 channelActive() 事件处理方法。服务端监听到客户端活动 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "在线"); }//6.覆盖了 channelInactive() 事件处理方法。服务端监听到客户端不活动 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6) Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "掉线"); }@Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {Channel incoming = ctx.channel(); System.out.println("SimpleChatClient:" + incoming.remoteAddress() + "异常"); // 当出现异常就关闭连接 //cause.printStackTrace(); ctx.close(); } }
package simplechat; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * 服务端 ChannelInitializer * * @author waylau.com * @date 2015-2-26 */ public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {@Override public void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline(); //以"\n"或者"\r\n"作为分隔符 pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter())); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast("handler", new SimpleChatServerHandler()); //当客户端连接事件出现,输出客户端的远程地址 System.out.println("SimpleChatClient:" + ch.remoteAddress() + " channel_id :" + ch.id() + " 连接上"); } }
Netty下的消息群发相关推荐
- netty实现消息群发
netty是什么 我所理解的netty,是一个比较底层的网络编程的框架,它和tomcat的区别是什么呢?就是tomcat是一个已经封装好的容器,你可以直接使用,而netty是可以写出像tomcat这种 ...
- 微信公众号开发 [03] 结合UEditor实现图文消息群发功能
0.写在前面的话 如何实现微信平台后台管理中的,图文消息发送功能? 大概的过程如下: 通过类似表单的形式,将文章各部分内容提交到后台,封装成一个实体类,并持久化到数据库中 需要推送的时候,将不同的文章 ...
- SpringBoot 集成 WebSocket 实现消息群发推送
一. 什么是 WebSocket WebSocket 是一种全新的协议.它将 TCP 的 Socket(套接字)应用在了web page上,从而使通信双方建立起一个保持在活动状态的连接通道,并且属于全 ...
- 公众号客服消息超过48小时_免费模板消息群发的方法,在这里!
不知道大家会不会因为服务号4次推送机会用完,又遇到老板施压,让发布重要消息,而苦恼? 经过我苦心搜索,找到了一个解决方法!原理就是,利用公众号模板消息,给粉丝推送. 作为运营同学,大多是不会技术的,所 ...
- java BIO tcp服务端向客户端消息群发代码教程实战
前言 项目需要和第三方厂商的服务需要用TCP协议通讯,考虑到彼此双方可能都会有断网重连.宕机重启的情况,需要保证 发生上述情况后,服务之间能够自动实现重新通信.研究测试之后整理如下代码实现.因为发现客 ...
- netty系列之:netty对http2消息的封装
文章目录 简介 http2消息的结构 netty对http2的封装 Http2Stream Http2Frame 总结 简介 无论是什么协议,如果要真正被使用的话,需要将该协议转换成为对应的语言才好真 ...
- O2OA V4 Build 11.13 发布! 支持语音办公、微信钉钉消息群发
官方网站 : http://www.o2server.io 软件主页 : https://www.oschina.net/p/o2oa v4 build 11.13更新内容: 新增功能: 1 ...
- python自动回复微信群的消息_程序员用python实现微信消息群发和微信自动回复
程序员用python实现微信消息群发和微信自动回复 每当逢年过节的时候, 你是否会遇到要给亲朋好友发祝福而不得不重复复制.改名.发送的麻烦, 还有收到许多好友祝福又来不及回复的情况.如果有,这篇文章正 ...
- 利用Visual C++ 实现QQ消息群发
一.引言 QQ一直是国内最令人瞩目的及时通讯软件,近6年的经营使其在IM市场有了"世界第一"名号.在国内市场,QQ的市场占有率已经超过了70%,用户群庞大,本人就是腾讯QQ的一个用 ...
最新文章
- Python多进程 AttributeError: Can't get attribute 'worker' on module '__main__' from
- Cell子刊:源自微生物群的醋酸盐能够在健康和疾病期间促进大脑先天免疫系统的代谢适应性...
- 1123: 零起点学算法30——参加程序设计竞赛
- libuv 高性能事件驱动库 简介
- python保存至对应目录_python如何实现复制目录到指定目录
- 《构建之法》8、9、10章
- boost::geometry::strategy::distance用法的测试程序
- 逃离裁员:程序员在云时代的生存之道
- day 45 SQLAlchemy,和增删查改
- Oracle、 Mysql 、 SQLserver 分页查询
- mysq;多表查询 总结
- 微电子专业深度盘点:哪所大学芯片最强?强在哪?(第3弹)
- win10小课堂:必须掌握的十个电脑使用技巧
- Python 绘制属于你的世界地图
- cad怎么卸载干净_安装CAD时提示已安装了怎么办?收藏了这个方法,节省你半天时间!...
- pulsar BookieException$InvalidCookieException 异常
- 中粮、益海品牌集中度提高,中小米企机会在高端细分市场
- video网页能播放.mp4视频,微信不能播放的问题
- Linux 块设备之bio结构体
- Oracle那些年那些事儿