提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

文章目录

  • 前言
  • 一、配置
  • 二、Netty服务
    • 1. WsServer构建
    • 2. ChannelInitializer构建
    • 3. 自定义消息处理器
    • 4. 全局用户Session管理
    • 5. 客户端与服务端消息实体
    • 6. 消息构建工具类
    • 7. 鉴权处理器
  • 三, 整合spring

前言

技术栈: SpringBoot+netty+websocket+mybatis-plus


一、配置

Maven

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.6.5</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.22</version></dependency><dependency><groupId>com.sun.mail</groupId><artifactId>javax.mail</artifactId><version>1.6.2</version></dependency><dependency><groupId>com.graphql-java-kickstart</groupId><artifactId>graphql-spring-boot-starter</artifactId><version>12.0.0</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-elasticsearch</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.75.Final</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.80</version></dependency><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.7.22</version></dependency>

spring配置

server:port: 10086
netty:host: 127.0.0.1port: 10010
spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://127.0.0.1:3306/test_one?useUnicode=true&useSSL=falseusername: xzqpassword: root
mybatis-plus:mapper-locations: classpath*:/mapper/**/*.xmltype-aliases-package: com.xzq.netty.websocket.entityconfiguration:log-impl: org.apache.ibatis.logging.stdout.StdOutImpl

二、Netty服务

1. WsServer构建

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;@Component
@Slf4j
public class WsServer {@Value("${netty.host}")public String host;@Value("${netty.port}")public Integer port;@Autowiredprivate WsChannelInitializer channelInitializer;private NioEventLoopGroup bossGroup = new NioEventLoopGroup();private NioEventLoopGroup workerGroup = new NioEventLoopGroup();public void listener() {try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)//指定服务端连接队列长度,也就是服务端处理线程全部繁忙,并且队列长度已达到1024个,后续请求将会拒绝.option(ChannelOption.SO_BACKLOG, 1024).childHandler(channelInitializer);log.info("Netty start successful " + host + ":" + port);ChannelFuture f = serverBootstrap.bind(host, port).sync();f.channel().closeFuture().sync();} catch (Exception e) {log.info("Ws服务启动失败:" + e);} finally {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}public void destroy() {log.info("JVM close,WsServer close");workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();}}

2. ChannelInitializer构建

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class WsChannelInitializer extends ChannelInitializer<SocketChannel> {@Autowiredprivate AuthHandler authHandler;@Overrideprotected void initChannel(SocketChannel ch) throws Exception {//http 协议编解码器ch.pipeline().addLast("http-codec", new HttpServerCodec());//http请求聚合处理,多个HTTP请求或响应聚合为一个FullHtppRequest或FullHttpResponsech.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));//鉴权处理器ch.pipeline().addLast("auth", authHandler);//大数据的分区传输ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//websocket协议处理器ch.pipeline().addLast("websocket", new WebSocketServerProtocolHandler("/im"));//自定义消息处理器ch.pipeline().addLast("my-handler", new WsChannelHandler());}
}

3. 自定义消息处理器

import com.alibaba.fastjson.JSONException;
import com.alibaba.fastjson.JSONObject;
import com.xzq.netty.websocket.message.ClientMessage;
import com.xzq.netty.websocket.util.MsgUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class WsChannelHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();log.info("有客户端建立连接");log.info("客户端address: " + channel.remoteAddress().toString());log.info("客户端channel Id:" + channel.id().toString());}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {String body = msg.text();ClientMessage clientMessage = JSONObject.parseObject(body, ClientMessage.class);if (clientMessage.getType() == 1) {WsChannelGroup.userChannelGroup.get(clientMessage.getTo()).writeAndFlush(MsgUtil.buildSingleMsg(ctx.channel().id().toString(), clientMessage.getMsgInfo()));return;}WsChannelGroup.channelGroup.writeAndFlush(MsgUtil.buildGroupMsg(ctx.channel().id().toString(),clientMessage.getMsgInfo()));}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {Channel channel = ctx.channel();log.info("客户端断开连接... 客户端 address: " + channel.remoteAddress());WsChannelGroup.channelGroup.remove(channel);WsChannelGroup.userChannelGroup.remove(channel.id().toString(), channel);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {if (cause instanceof JSONException) {ctx.channel().writeAndFlush(new TextWebSocketFrame("服务端仅支持JSON格式体消息"));return;}Channel channel = ctx.channel();log.info(channel.remoteAddress()+" 连接异常,断开连接...");cause.printStackTrace();ctx.channel().writeAndFlush(new TextWebSocketFrame("服务端500 关闭连接"));ctx.channel().closeFuture();WsChannelGroup.channelGroup.remove(channel);WsChannelGroup.userChannelGroup.remove(channel.id().toString(), channel);}
}

4. 全局用户Session管理

import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;public class WsChannelGroup  {public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);public static Map<String, Channel> userChannelGroup = new ConcurrentHashMap<>();
}

5. 客户端与服务端消息实体

@Data
public class ClientMessage implements Serializable {/***  消息类型:  1 单聊  2,群聊*/private int type;/*** 消息内容*/private String msgInfo;/***  消息发送方 (目前只在单聊中体现,【群聊暂时没有分组处理】)*/private String to;}@Data
public class ServerMessage implements Serializable {/*** 消息发送方*/private String from;/*** 消息内容*/private String msgInfo;/*** 时间*/private String date;public ServerMessage(String from, String msgInfo) {this.from = from;this.msgInfo = msgInfo;this.date = LocalDateTime.now().toString();}
}

6. 消息构建工具类

import com.alibaba.fastjson.JSONObject;
import com.xzq.netty.websocket.message.ServerMessage;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;public class MsgUtil {public static TextWebSocketFrame buildSingleMsg(String from, String msgInfo) {return new TextWebSocketFrame(JSONObject.toJSONString(new ServerMessage(from, msgInfo)));}public static TextWebSocketFrame buildGroupMsg(String from,String msgInfo) {return new TextWebSocketFrame(JSONObject.toJSONString(new ServerMessage(from, msgInfo)));}
}

7. 鉴权处理器

import cn.hutool.core.util.StrUtil;
import com.xzq.netty.websocket.entity.TestUser;
import com.xzq.netty.websocket.mapper.TestUserMapper;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
@ChannelHandler.Sharable
public class AuthHandler extends ChannelInboundHandlerAdapter {@Autowiredprivate TestUserMapper userMapper;@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {if (msg instanceof FullHttpRequest) {FullHttpRequest msg1 = (FullHttpRequest) msg;//根据请求头的 auth-token 进行鉴权操作String authToken = msg1.headers().get("auth-token");System.out.println(">>>>>>>>>>>>鉴权操作");if (StrUtil.isEmpty(authToken)) {refuseChannel(ctx);return;}//查询数据库是否存在该用户TestUser testUser = userMapper.selectById(authToken);if (testUser == null) {refuseChannel(ctx);}//鉴权成功,添加channel用户组WsChannelGroup.channelGroup.add(ctx.channel());WsChannelGroup.userChannelGroup.put(testUser.getName(), ctx.channel());}ctx.fireChannelRead(msg);}private void refuseChannel(ChannelHandlerContext ctx) {ctx.channel().writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.UNAUTHORIZED));ctx.channel().close();}
}

三, 整合spring

mport com.xzq.netty.websocket.server.WsServer;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
@MapperScan("com.xzq.netty.websocket.mapper")
public class WsApplication implements CommandLineRunner {@Autowiredprivate WsServer wsServer;public static void main(String[] args) {SpringApplication.run(WsApplication.class, args);}@Overridepublic void run(String... args) throws Exception {wsServer.listener();//钩子函数,虚拟机正常关闭调用Runtime.getRuntime().addShutdownHook(new Thread(()->{wsServer.destroy();}));}
}
```# 四,测试
开启三个websocket客户端并都建立连接
![在这里插入图片描述](https://img-blog.csdnimg.cn/045f579ec75b4cae9b358eb073cef88d.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQ29jb3h6cTAwMA==,size_20,color_FFFFFF,t_70,g_se,x_16)
发送消息
客户端2收到消息
![在这里插入图片描述](https://img-blog.csdnimg.cn/ebc7637bc17c4f54aae9d142b2e76f5f.png?x-oss-process=image/watermark,type_d3F5LXplbmhlaQ,shadow_50,text_Q1NETiBAQ29jb3h6cTAwMA==,size_20,color_FFFFFF,t_70,g_se,x_16)

Springboot+netty+websocket 实现单聊群聊及用户鉴权相关推荐

  1. labview的用户身份认证系统设计_elasticsearch 集群身份认证与用户鉴权

    elasticsearch在默认安装后,没有提供任何安全保护. 在elasticsearch.yml配置了server.host=0.0.0.0导致公网可以访问es集群. 数据安全的基本需求: 1.身 ...

  2. Springboot系列之Shiro、JWT、Redis 进行认证鉴权

    Springboot系列之Shiro.JWT.Redis 进行认证鉴权 Shiro架构 Apache Shiro是一个轻量级的安全框架 Shiro可以非常容易的开发出足够好的应用,其不仅可以用在Jav ...

  3. SpringBoot项目的用户鉴权分析

    一.鉴权类:UserAuthFilter springBoot被称为开箱即用,是因为许多代码都帮开发者实现了,就连用户鉴权部分都帮我们写好了,如需自定义过滤器,则继承UserAuthFilter类即可 ...

  4. netty自定义handler分别支持群聊和单聊

    消息实体类 public class WsMessage {/** 消息id */private String id;/** 消息发送类型 */private Integer code;/** 发送人 ...

  5. Netty聊天系统(4)群聊功能实现

    7 群聊功能的实现 7.1 群聊的创建 创建一个创建群聊请求的实体类,依然是继承Packet,因为要通过我们的协议进行编解码 服务器端创建一个处理创建群聊请求的Handler,并实现其中的逻辑 创建一 ...

  6. (六)Netty网络编程应用实例-群聊系统

    实例要求: 编写一个NIO群聊系统,实现服务器端和客户端之间的数据简单通讯(非阻塞) 实现多人群聊 服务器端:可以监测用户上线,离线,并实现消息转发功能 客户端:通过channel可以无阻塞发送消息给 ...

  7. django+vue3实现websocket聊天室(群聊)

    1.如何实现聊天功能 2.什么是websocket 2.1解释什么叫全双工,半双工,单工 3.websocker的概念 4.websocket的优点 5.django ,vue如何实现websocke ...

  8. 钉钉单聊/群聊机器人实现思路

    钉钉官网文档:https://open.dingtalk.com/document/group/robot-overview 一.钉钉开发平台 1.申请机器人 应用开发-企业内部开发-基础信息-机器人 ...

  9. WebSocket刨根问底(三)之群聊

    前两篇文章[WebSocket刨根问底(一) ][WebSocket刨根问底(二) ]我们介绍了WebSocket的一些基本理论,以及一个简单的案例,那么今天继续,我们来看一个简单的群聊的案例,来进一 ...

最新文章

  1. 互联网人的求生战役!分享身边的 5 个故事
  2. 刷算法的时候有没有必要自写测试用例?
  3. python split()
  4. SpringCloud与dubbo的区别
  5. OpenSilver: 通过WebAssembly 复活Silverlight
  6. 部署php项目到linux
  7. springCloud五大组件--Eureka
  8. C# SuperSocket服务端入门(一)
  9. WordCount——MapReduce 实例入门
  10. 《破茧成蝶——用户体验设计师的成长之路》一1.2 邂逅用户体验设计
  11. html级联选择器,HTML5 学习--级联样式与CSS选择器
  12. 第一二三章 PMP第六版读书笔记
  13. 中小企业网络推广方案
  14. 条码软件如何修改条码标签的字体格式
  15. 计算机图形表示的原理
  16. JNCIE考试准备指南(ITAA 2014版)
  17. Linux学习笔记之MySql的安装(CentOS)
  18. 今日头条面试经验分享
  19. STM32 四轴无人机设计——遥控器PPM信号
  20. 【自然语言处理(NLP)】基于ERNIE语言模型的文本语义匹配

热门文章

  1. 电子商务让我的公司站稳脚跟
  2. python-循环递归斐波那契数列
  3. Android_异步加载1
  4. T2噬菌体MVC案例教程
  5. 如果在驾驶证丢失到补办期间
  6. 怎样制作泡泡html页面,html5+css3气泡组件的实现
  7. Shell 一键启动脚本
  8. Centos7桥接设置网络并使用xrdp+tigervnc实现桌面远程访问
  9. Jupyter Notebook切换python运行环境
  10. linux查看磁盘对应槽位,一种linux系统硬盘槽位识别方法与流程