有你的日子你就是一切,没你的日子一切都是你

又是一天新的开始,让我来带领你打开任督二脉。。。

前言:

在前面有了对NIO、BIO知识的学习,以及对netty结构组的基本了解,接下来将学习一下如何使用netty去实现一个群聊功能,读者可自行去对比基于NIO、BIO、Netty实现群聊功能的不同方式,以更深刻的理解IO网络编程。

学习内容:

整体思路
1) 编写一个Netty群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞)
2) 实现多人聊天
3) 服务器端:可以监测用户上线,离线,并实现消息转发
4) 客户端: 通过channel可以无阻塞发送消息给其他所有用户,同时可接收到其他用户发送的消息)
5)目的:进一步理解Netty非阻塞网络编程机制


具体实现

服务端代码

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;
import java.util.Scanner;/**
* @PackageName: com.netty.demo.chart
* @author: youjp
* @create: 2021-01-22 10:05
* @description: 群聊服务端
* @Version: 1.0
*/
public class NettyChartServer {/*** 服务端口号*/private int port;public NettyChartServer(int port) {this.port = port;}/*** 服务执行:用于处理客户端请求*/public void run() {//服务线程组EventLoopGroup bossgroup = new NioEventLoopGroup(1);//工作组线程池,默认为CPU核数*2EventLoopGroup workgroup = new NioEventLoopGroup();ServerBootstrap bootstrap = new ServerBootstrap();try {bootstrap.group(bossgroup, workgroup).channel(NioServerSocketChannel.class) //可以监听新进来的TCP连接的通道.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态.handler(new LoggingHandler(LogLevel.INFO))//心跳检测日志.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//获取到piepline管道ChannelPipeline pipeline = socketChannel.pipeline();//向pipeline加入解码器pipeline.addLast("decoder", new StringDecoder());//向pipeline加入编码器pipeline.addLast("encoder", new StringEncoder());//心跳检测handlepipeline.addLast("idcheck",new IdleStateHandler(3,5,8, TimeUnit.SECONDS));//加入自己的业务处理handlerpipeline.addLast("handle",new MyNettyServerHandler());}});ChannelFuture ch = bootstrap.bind(port).sync();System.out.println("服务器 is ready-------");//异步监听端口ch.addListeners(e->{if(e.isSuccess()){System.out.println("监听端口 "+port+" 成功");}else {System.out.println("监听端口 "+port+" 失败");}});//异步关闭通道ch.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {//优雅退出线程池bossgroup.shutdownGracefully();workgroup.shutdownGracefully();}}public static void main(String[] args) {NettyChartServer server=new NettyChartServer(8888);server.run();}
}

服务端主要作用是对8888端口进行监听,等待客户端的请求。使用了主从Reactor模式,去实现接收处理多个客户端请求。利用自定义业务处理handler 完成对信息的获取,以及客户端直接信息转发。

服务端业务处理器

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.text.SimpleDateFormat;
import java.util.HashMap;/**
* @PackageName: com.netty.demo.chart
* @author: youjp
* @create: 2021-01-22 10:43
* @description: 业务自定义的逻辑处理类:它是入站 ChannelInboundHandler 类型的处理器,负责接收解码后的 HTTP 请求数据,并将请求处理结果写回客户端。
* @Version: 1.0
*/
public class MyNettyServerHandler  extends SimpleChannelInboundHandler<String> {//定义一个channerl集合,类似于List<Channel> ,用于存储不同的channel通道转发信息private static ChannelGroup channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);//設計一個集合可存儲通道通道对应的用户,未开发改功能private static HashMap<String,Channel> channelHashMap=new HashMap<>();private SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");/*** 该channel建立连接后: 生命周期 handlerAdded》 channelRegistered-》channelActive,* 表示连接建立,一旦连接,第一个执行* @param ctx* @throws Exception*/@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {System.out.println("------------handlerAdded------");Channel channel = ctx.channel();channelGroup.add(channel);channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"加入聊天");}/*** 连接中断* @param ctx* @throws Exception*/@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {System.out.println("------------handlerRemoved------");Channel channel=ctx.channel();//服务下线,移除该channelchannelGroup.remove(channel);//并通知,已离线channelGroup.writeAndFlush("【客戶端】"+channel.remoteAddress()+"离开了~");}/*** 表示该channel处于活动状态* @param ctx* @throws Exception*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelActive------");System.out.println(ctx.channel().remoteAddress() + " 上线了~");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelInactive------");System.out.println(ctx.channel().remoteAddress() + " 下线了~");}/*** 读取消息* @param channelHandlerContext* @param s* @throws Exception*/@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println("------------channelRead0------");Channel channel = channelHandlerContext.channel();//将消息转发给其他客户端,并且排除自己channelGroup.forEach(e->{if (channel!=e){//e.writeAndFlush("【客戶端】"+channel.remoteAddress()+"说:"+msg);}else {e.writeAndFlush("【自己】"+"说:"+msg);}});}/*** 消息读取后* @param ctx* @throws Exception*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelReadComplete------");}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelRegistered------");}/*** 离线生命:* @param ctx exceptionCaught-》channelInactive》channelUnregistered》handlerRemoved* @throws Exception*/@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {System.out.println("------------channelUnregistered------");}/*** 处理心跳检测* @param ctx* @param evt* @throws Exception*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {//    System.out.println("------------userEventTriggered------");if (evt instanceof IdleStateEvent){//向下转型IdleStateEvent event=(IdleStateEvent)evt;String eventType=null;switch (event.state()){case  READER_IDLE:eventType="读空闲";break;case WRITER_IDLE:eventType="写空闲";break;case  ALL_IDLE:eventType="读写空闲";break;default:break;}System.out.println(ctx.channel().remoteAddress()+"---超时时间---"+eventType);System.out.println("服务器做相应操作");}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("------------exceptionCaught------");}
}

业务自定义的逻辑处理类:它是入站 ChannelInboundHandler 类型的处理器,负责接收解码后的 HTTP 请求数据,并将请求处理结果写回客户端。当channle通道建立以后,便开启了生命周期。其中 channelRegistered 是用于channnel通道注册,channelActive用于查看通道是否活跃 、channelRead0用于读取客户端信息、exceptionCaught用于异常处理

客户端代码

客户端启动类

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.CharsetUtil;import java.util.Scanner;/**
* @PackageName: com.netty.demo.chart
* @author: youjp
* @create: 2021-01-22 10:06
* @description: 客户端
* @Version: 1.0
*/
public class NettyChartClient {private String ip;private int port;public NettyChartClient(String ip, int port) {this.ip = ip;this.port = port;}/*** 客户端启动*/public void run() {//创建工作线程池 CPU核数*2EventLoopGroup group = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();try {bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE,true).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {//得到PipelineChannelPipeline pipeline = socketChannel.pipeline();//加入相关handlerpipeline.addLast("decoder", new StringDecoder());pipeline.addLast("encoder", new StringEncoder());pipeline.addLast("handler", (ChannelHandler) new MyNettyClientHandler());}});System.out.println("---客户端启动了----");//连接服务端ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();Scanner can=new Scanner(System.in);while (can.hasNext()){String str=can.nextLine();channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer(""+str, CharsetUtil.UTF_8));}//异步关闭通道channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}}public static void main(String[] args) {NettyChartClient chartClient=new NettyChartClient("127.0.0.1",8888);chartClient.run();}}

客户端处理器

import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;/**
* @PackageName: com.netty.demo.chart
* @author: youjp
* @create: 2021-01-22 11:16
* @description: 客户端处理器
* @Version: 1.0
*/
public class MyNettyClientHandler  extends SimpleChannelInboundHandler<String> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception {System.out.println(""+msg);}
}

客户端只需要获取服务端传来的消息,并且可以自己编写消息发送即可。

运行测试

接下来,先将服务端启动类启动,然后再运行多个客户端。查看控制台,并发送消息,查看channel的生命周期

分别在各自的客户端控制台输入信息,测试。基于Netty实现的群聊功能就这样实现了

有兴趣的老爷,可以关注我的公众号【一起收破烂】,回复【006】获取2021最新java面试资料以及简历模型120套哦~

基于Netty实现群聊功能相关推荐

  1. Netty聊天系统(4)群聊功能实现

    7 群聊功能的实现 7.1 群聊的创建 创建一个创建群聊请求的实体类,依然是继承Packet,因为要通过我们的协议进行编解码 服务器端创建一个处理创建群聊请求的Handler,并实现其中的逻辑 创建一 ...

  2. 基于WebSocket实现一个简易的群聊功能

    本文主要来讲解如何使用WebSocket来实现一个简易的群聊功能 引入maven依赖 <dependency><groupId>org.springframework.boot ...

  3. 基于Vue+springboot+websocket实现的简短仿微信web聊天室(私聊和群聊功能)(可在线预览)

    写目录 一.界面展示 二.介绍 一.界面展示 之前闲着有空就给自己的个人博客搭了一些附加功能,聊天室也是其中之一,简单的实现了私聊.群聊功能,可以发送emoji表情和图片等,项目已经部署在www.tc ...

  4. UDP实现简单的群聊功能代码示例

    以下是UDP实现群聊功能代码示例 在java中,通过两个特定类来实现UDP协议顶层数据报,分别是DatagramPacket和DatagramSocket,其中类DatagramPacket是一个数据 ...

  5. 用C++/MFC实现P2P和群聊功能的聊天小软件

    final edit 2015-01-03 · 实现平台: Window 8.1,Visual Studio 2013 Window 7, Visual Studio 2010 · 所用框架: 是基于 ...

  6. Netty教程06:netty实现群聊私聊

    netty实现群聊,点击查看 需求:在群聊基础上,增加私聊功能 Server package com.lian.groupprivatechat;import io.netty.bootstrap.S ...

  7. 互联网早报 | 3月16日 星期二 | 微信AI直播助理开放内测;汽车之家港交所挂牌上市;美团App内测“群聊”功能...

    今日看点 ✦ 汽车之家港交所成功挂牌,成年内首家回港二次上市中概股 ✦ 恒大汽车与腾讯旗下梧桐车联成立合资公司 ,共同开发车载智能操作系统 ✦ 微信AI直播助理开放内测,助力电商带货 ✦ 美团App内 ...

  8. 360搜索、UC浏览器等被3·15点名应用已下架;马斯克宣布通过NFT卖歌;美团App再发力社交,内测 “群聊”功能 |极客头条...

    「极客头条」-- 技术人员的新闻圈! CSDN 的读者朋友们早上好哇,「极客头条」来啦,快来看今天都有哪些值得我们技术人关注的重要新闻吧. 整理 | 丁恩华 出品 | CSDN(ID:CSDNnews ...

  9. C++搭建集群聊天室(十四):群聊功能

    文章目录 群聊功能思路 放码过来 groupuser.hpp group.hpp groupmodel.hpp groupmodel.cpp 群聊功能思路 1.创建群聊,提交群信息,返回群号 2.拉取 ...

  10. Line推出新语音群聊功能 最多支持200人

    3月11日,据科技博客VentureBeat报道,日本移动通讯巨头Line公司近日宣布,为支持企业电话会议,该公司将在最新版Line应用中增加语音群聊功能,最多支持200人同时群聊. Line的这项新 ...

最新文章

  1. php directoryiterator,PHP DirectoryIterator getBasename()用法及代码示例
  2. 当人工智能掌管城市,会带来怎样的巨变?
  3. 【转】C++中的SFINAE
  4. python日期函数_python 时间相关函数
  5. php编程 第一节,PHP第一节php简介_PHP
  6. 浅谈“三层结构”原理与用意(转帖)
  7. pakeage php国内镜像,Packagist/Composer中国全量镜像 | 严佳冬
  8. JavaWeb_EL表达式存储数据及获得项目路径
  9. rocketmq安装教程以及遇到的坑排查
  10. Java非对称加密开发(三)-代码及说明
  11. 用ajax请求豆瓣api,结合豆瓣Api v2.0实现Jsonp跨域
  12. arm64-v8a、armeabi-v7a、armeabi、x86 abiFilters 详解
  13. C++ 小游戏程序 (共七款)
  14. H3C华三路由器nat避免生成null 0路由并解决nat需求
  15. linux 是什么?
  16. 树莓派系列二:openCV之头像添加国旗
  17. 第三阶段应用层——1.11 数码相册—setting_page设置页面的显存管理、页面规划、输入控制
  18. Firebug 网络监视器使用教程英文
  19. 快速入门——深度学习理论解析与实战应用
  20. 关于域名备案后的注意事项,血淋淋的教训

热门文章

  1. 电子报纸的分析即制作
  2. 云存储云计算选择开源还是商业版
  3. jmeter面试题及答案(jmeter接口自动化测试面试题)
  4. winhex 19.8 注册码生成工具(keygen)
  5. USGS下载遥感影像——以Landsat影像下载为例
  6. 计算机操作系统|汤小丹|第四版|习题答案(一)
  7. matlab得到小波参数,matlab小波分析去噪详解
  8. Ubuntu安装网易云音乐
  9. 我的世界高亮显示服务器,waila(我的世界高亮显示没了)
  10. 南京工程学院《DSP技术及应用》期末试卷