基于netty+websocket实现门户游客实时统计功能

基本需求

商城门户页面需要实时展示游客访问的数量,商城后台页面需要实时游客访问量、登录用户数量,以及下订单用户数量。

技术选型

1.首先实时推送信息到前端,我们第一反应就是使用webscoket。那么什么是websocket呢?

WebScoket简述

WebSocket是一种在单个TCP连接上进行全双工通信的协议。

WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

websocket协议本身是构建在http协议之上的升级协议,客户端首先向服务器端去建立连接,这个连接本身就是http协议只是在头信息中包含了一些websocket协议的相关信息,一旦http连接建立之后,服务器端读到这些websocket协议的相关信息就将此协议升级成websocket协议。websocket协议也可以应用在非浏览器应用,只需要引入相关的websocket库就可以了.

Websocket使用ws或wss的统一资源标志符,类似于HTTPS,其中wss表示在TLS之上的Websocket.

2.springboot支持原生的websocket开发,关于是否使用原生websocket进行开发,需要根据自身的需求进行合理选择,这里我选择使用了netty作为websocket容器进行集成开发。主要原因是:netty相对于springboot默认配置的tomcat并发高、传输快、封装好。当然这里并不是说netty一定是优于tomcat的,笔者这里是为了避免提高并发等而修改tomcat的配置,而选择了开箱即用的netty框架。关于netty和tomcat的比较,网上的的说法很多,总结如下:

Netty和Tomcat的区别

Netty和Tomcat最大的区别就在于通信协议,Tomcat是基于Http协议的,他的实质是一个基于http协议的web容器,但是Netty不一样,他能通过编程自定义各种协议,因为netty能够通过codec自己来编码/解码字节流,完成类似redis访问的功能,这就是netty和tomcat最大的不同。

3.在选择了netty+websocket的技术栈后,我们来看看如何实现。目前主流的实现方式有2种:

  • 1 SpringBoot2+Netty+WebSocket 自主控制实现NettyServer来进行webscoket信息传输,这种模式比较适合特殊的协议及传递参数的场景,可以定制化传输模式。这里比较推荐一篇博客: https://blog.csdn.net/moshowgame/article/details/91552993

  • 2 使用开源框架netty-websocket-spring-boot-starter,这种模式适合比较常规的传输形式,使用配置及标签的形式快速的搭建后台往前端传输信息的环境。笔者这次的需求比较简单,故选取这种方式进行开发。框架的官方文档:https://github.com/YeautyYE/netty-websocket-spring-boot-starter/blob/master/README.md

快速开始

1.引入maven依赖

    <dependency><groupId>org.yeauty</groupId><artifactId>netty-websocket-spring-boot-starter</artifactId><version>0.9.5</version></dependency>

2.新建一个门户项目的websocket

在开启ws链接时,记录同一IP的链接为一次访问次数,并记录在缓存中(正式环境中可存入Redis缓存,后作为访问日志持久化到数据库)。链接断开时,将缓存中的Session记录移出。

@Slf4j
@ServerEndpoint(path = "/portalWs",port = "8189")
public class PortalWebSocketServer {//缓存游客的map,正式环境建议替换为Redispublic static ConcurrentHashMap<String, String> touristsMap = new ConcurrentHashMap<>();public static ConcurrentHashMap<String, Session> clientToChannelMap = new ConcurrentHashMap<>();@Autowiredprivate DataHandler dataHandler;/*** 连接建立时触发*/@OnOpenpublic void onOpen(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap){SocketAddress socketAddress = session.remoteAddress();String clientIP = socketAddress != null ? socketAddress.toString().replace("/","").split(":")[0] : "";//记录游客访问记录touristsMap.put(clientIP, clientIP);//记录游客ws的session记录clientToChannelMap.put(clientIP,session);//同时推送门户和后台的消息sendMessage();log.info("Tourists join, the tourists IP is: " + clientIP);}/*** 连接关闭时触发*/@OnClosepublic void onClose(Session session) throws IOException {SocketAddress socketAddress = session.remoteAddress();String clientIP = socketAddress != null ? socketAddress.toString().replace("/","").split(":")[0] : "";if (StringUtils.isNoneBlank(clientIP) && touristsMap.containsKey(clientIP)){//移除需要推送到门户的游客ws连接touristsMap.remove(clientIP);//更新并推送门户和后台的消息sendMessage();}log.info("one connection closed");}/*** 前端发送信息的回复*/@OnMessagepublic void onMessage(String message) {if (!CollectionUtils.isEmpty(clientToChannelMap)){Iterator<Map.Entry<String, Session>> entries = clientToChannelMap.entrySet().iterator();while (entries.hasNext()) {Map.Entry<String, Session> entry = entries.next();try {sendMessage(entry.getValue(), message);} catch(Exception e){log.error("send message failed! Exception: {}", e.getMessage());}}}}@OnErrorpublic void onError(Session session, Throwable throwable) {log.error("connection err: {}", throwable.getMessage());}/*** 发送消息方法* @param session 客户端与socket建立的会话* @param message 消息* @throws IOException*/public void sendMessage(Session session, String message) throws IOException{if(session != null){session.sendText(message);}}private void sendMessage(){//通知游客数量减少if (!CollectionUtils.isEmpty(clientToChannelMap)){Iterator<Map.Entry<String, Session>> entries = clientToChannelMap.entrySet().iterator();while (entries.hasNext()) {Map.Entry<String, Session> entry = entries.next();try {sendMessage(entry.getValue(), JSON.toJSONString(dataHandler.handlerTouristMessage()));} catch(Exception e){log.error("send tourist message failed! Exception: {}", e.getMessage());}}}//通知后台管理人员数量减少if (!CollectionUtils.isEmpty(WebSocketServer.userToChannelMap)){Iterator<Map.Entry<String, Session>> entries = WebSocketServer.userToChannelMap.entrySet().iterator();while (entries.hasNext()) {Map.Entry<String, Session> entry = entries.next();try {sendMessage(entry.getValue(), JSON.toJSONString(dataHandler.handlerUserMessage()));} catch(Exception e){log.error("send user message failed! Exception: {}", e.getMessage());}}}}}

2.新建一个后台项目的websocket

类比门户websocket服务端,建立后台websocket的服务端,记录和推送后台展示的信息。

@Slf4j
@ServerEndpoint(path = "/ws",port = "8188")
public class WebSocketServer {//根据IP或者用户名缓存ws连接public static ConcurrentHashMap<String, Session> userToChannelMap = new ConcurrentHashMap<>();@Autowiredprivate DataHandler dataHandler;@OnOpenpublic void onOpen(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap){SocketAddress socketAddress = session.remoteAddress();String clientIP = socketAddress != null ? socketAddress.toString().replace("/","").split(":")[0] : "";userToChannelMap.put(clientIP, session);session.sendText(JSON.toJSONString(dataHandler.handlerUserMessage()));}@OnMessagepublic void onMessage(String message) {if (!CollectionUtils.isEmpty(userToChannelMap)){Iterator<Map.Entry<String, Session>> entries = userToChannelMap.entrySet().iterator();while (entries.hasNext()) {Map.Entry<String, Session> entry = entries.next();try {sendMessage(entry.getValue(), message);} catch(Exception e){log.error("send message failed! Exception: {}", e.getMessage());}}}}@OnClosepublic void onClose(Session session) throws IOException {SocketAddress socketAddress = session.remoteAddress();String clientIP = socketAddress != null ? socketAddress.toString().replace("/","").split(":")[0] : "";if (StringUtils.isNoneBlank(clientIP)){userToChannelMap.remove(clientIP);}log.info("one connection closed");}@OnErrorpublic void onError(Session session, Throwable throwable) {log.error("connection err: {}", throwable.getMessage());}/*** 发送消息方法* @param session 客户端与socket建立的会话* @param message 消息* @throws IOException*/public void sendMessage(Session session, String message) throws IOException{if(session != null){session.sendText(message);}}public void sendInfo(String userName, String message) {if(userToChannelMap.get(userName) != null){userToChannelMap.get(userName).sendText(message);}}}

3.建立一个定时任务实时推送

除开连接的时刻,还需要将实时信息同步推送到前端页面展示,这里使用了定时任务框架Quartz,后续根据需求可以更换为分布式调度系统如:xxl-job等。

@Configuration
@EnableScheduling
public class WebSocketTask {@Resourceprivate WebSocketServer webSocketServer;@Autowiredprivate DataHandler dataHandler;@Scheduled(cron = "0/1 * * * * ?")private void configureTasks() {webSocketServer.onMessage(JSON.toJSONString(dataHandler.handlerUserMessage()));}
}

4.自定义一个推送标签

如果使用定时任务进行信息推送,在没有游客访问或用户登录时,会推送大量无用信息,造成不必要的网络开销,故现在修改为用户登录或用户下订单时织入一个AOP,推送实时信息到前端展示。

首先自定义一个标签:

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.TYPE})
public @interface SendWebSocket {String value() default "SendWebSocket";
}

在标签中处理发送信息的业务:

/*** 基于需要触发websocket自动推送的AOP标签,可用于登录或者订单业务时统计数量*/@Component
@Aspect
public class SendWebSocketAspect {@Resourceprivate WebSocketServer webSocketServer;@Autowiredprivate DataHandler dataHandler;@Pointcut("@annotation(com.zhangyang.websocket.aop.SendWebSocket)")public void cut() { }@AfterReturning(value = "cut()", returning="returnValue")public void record(JoinPoint joinPoint, Object returnValue) {webSocketServer.onMessage(JSON.toJSONString(dataHandler.handlerUserMessage()));}
}

在需要触发消息推送的业务代码上打上标签即可。

5.新建一个websocke的客户端用于测试

<!DOCTYPE html>
<html>
<head><title>My WebSocket</title>
</head><body>
Welcome<br/>
<input id="text" type="text" /><button onclick="send()">Send</button>    <button onclick="closeWebSocket()">Close</button>
<div id="message">
</div>
</body><script type="text/javascript">var websocket = null;//判断当前浏览器是否支持WebSocketif('WebSocket' in window){//门户页面和后台页面只需要修改ws连接的信息onlinenum = new WebSocket("ws://localhost:8188/ws");}else{alert('Not support websocket')}//连接发生错误的回调方法onlinenum.onerror = function(){setMessageInnerHTML("error");};//连接成功建立的回调方法onlinenum.onopen = function(event){setMessageInnerHTML("open");}//接收到消息的回调方法onlinenum.onmessage = function(event){setMessageInnerHTML(event.data);}//连接关闭的回调方法onlinenum.onclose = function(){setMessageInnerHTML("close");}//监听窗口关闭事件,当窗口关闭时,主动去关闭websocket连接,防止连接还没断开就关闭窗口,server端会抛异常。window.onbeforeunload = function(){onlinenum.close();}//将消息显示在网页上function setMessageInnerHTML(innerHTML){document.getElementById('message').innerHTML += innerHTML + '<br/>';}//关闭连接function closeWebSocket(){onlinenum.close();}//发送消息function send(){var message = document.getElementById('text').value;onlinenum.send(message);}
</script>
</html>

6.消息体及Controller触发

主题代码如上所示,如需对消息体进行处理,以及Controller触发触发,可以参考Demo代码,连接:
https://download.csdn.net/download/alanzy123/13092362。因为公司检测,代码后续会上传到github上。

效果展示

1.首先打开后台展示页面:

2.再打开门户展示页面模仿游客登录:

3.切换到后台展示页面,验证是否刷新信息

可以看到后端自动刷新了门户的游客访问记录,至此,我们的需求全部实现。

后记

针对于其他的单端点demo,多端点服务需要注意:

  • 在快速启动的基础上,在多个需要成为端点的类上使用@ServerEndpoint、@Component注解即可
  • 可通过ServerEndpointExporter.getInetSocketAddressSet()获取所有端点的地址
  • 当地址不同时(即host不同或port不同),使用不同的ServerBootstrap实例
  • 当地址相同,路径(path)不同时,使用同一个ServerBootstrap实例
  • 当多个端点服务的port为0时,将使用同一个随机的端口号
  • 当多个端点的port和path相同时,host不能设为"0.0.0.0",因为"0.0.0.0"意味着绑定所有的host

增加Nginx对ws进行反向代理的配置

1.增加nginx配置

#http模块下加入
map $http_upgrade $connection_upgrade {default upgrade;''   close;}
=================================================================server {listen 8081;location /websocket {proxy_pass http://xxx.xxx.xxx.xxx:8188/ws;proxy_http_version 1.1;#该指令设置与upstream server的连接超时时间,有必要记住,这个超时不能超过75秒proxy_connect_timeout 60s;#该指令设置与代理服务器的读超时时间。它决定了nginx会等待多长时间来获得请求的响应。这个时间不是获得整个response的时间,而是两次reading操作的时间proxy_read_timeout 3600s;#这个指定设置了发送请求给upstream服务器的超时时间。超时设置不是为了整个发送期间,而是在两次write操作期间。如果超时后,upstream没有收到新的数据,nginx会关闭连接proxy_send_timeout 60s;proxy_set_header  X-Real-IP  $remote_addr;add_header Access-Control-Allow-Origin*;proxy_set_header Upgrade websocket;proxy_set_header Connection Upgrade;proxy_set_header Host $host;proxy_set_header X-Real-IP $remote_addr;#通过nginx转发后游客IP会被统一代理,故需要在header中加入客户真实的IP来统计proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;}}

2.修改根据游客IP来统计游客数的统计代码


private static final String NGINX_URL = "X-Forwarded-For";@OnOpenpublic void onOpen(Session session, HttpHeaders headers, @RequestParam String req, @RequestParam MultiValueMap reqMap, @PathVariable String arg, @PathVariable Map pathMap){SocketAddress socketAddress = session.remoteAddress();String clientIP = socketAddress != null ? socketAddress.toString().replace("/","").split(":")[0] : "";//增加nginx反向代理后获取客户端真实IP的逻辑List<Map.Entry<String, String>> entries = headers.entries();for (Map.Entry<String, String> entry : entries) {if(entry.getKey().equals(NGINX_URL)){clientIP = StringUtils.isNotBlank(entry.getValue()) ? entry.getValue().split(":")[0] : entry.getValue() ;}}//记录游客访问记录touristsMap.put(clientIP, clientIP);//记录游客ws的session记录clientToChannelMap.put(clientIP,session);//同时推送门户和后台的消息sendMessage();log.info("Tourists join, the tourists IP is: " + clientIP);}

3.踩坑记录

  • 对于要经过nginx代理的ws,注意防火墙通过端口放行
  • 针对于不同的网络环境,X-Forwarded-For获得的IP可能带端口号,注意截取
  • 以上是通过超时时间来延长ws连接的超时时间,建议使用前端心跳包的形式来维持连接

基于netty+websocket实现门户游客实时统计功能相关推荐

  1. SuperDog——一个基于netty的web服务器开发项目

      项目GitHub地址:https://github.com/HelloWorld-Ian/SuperDog   这是我在实习期间开发的一个项目demo,简单来说是一个基于netty框架的web服务 ...

  2. 京东到家基于netty与websocket的实践

    作者:李天翼,软件开发工程师,任职于达达京东到家后台研发团队,负责订单流程的开发工作. 背景 在京东到家商家中心系统中,商家提出在 Web 端实现自动打印的需求,不需要人工盯守点击打印,直接打印小票, ...

  3. 基于netty的websocket协议实现

    基于netty的websocket协议实现 背景 1.启动服务端 2.测试服务端和客户端效果 背景 项目中使用到了websocket,所以查阅相关资料,完成了一个基于netty的websocket的实 ...

  4. 简易 IM 双向通信电脑端 GUI 应用——基于 Netty、WebSocket、JavaFX 、多线程技术等

    简易 IM 双向通信电脑端 GUI 应用--基于 Netty.WebSocket.JavaFX .多线程技术等 说明 运行效果 核心代码 完整代码 参考知识 说明   这是一款使用 Netty 来实现 ...

  5. 指标统计:基于流计算 Oceanus(Flink) 实现实时 UVPV 统计

    作者:吴云涛,腾讯 CSIG 高级工程师 导语 | 最近梳理了一下如何用 Flink 来实现实时的 UV.PV 指标的统计,并和公司内微视部门的同事交流.然后针对该场景做了简化,并发现使用 Flink ...

  6. netty 游戏服务器框图_基于Netty和WebSocket协议实现Web端自动打印订单服务方法与流程...

    本发明涉及电子商务技术领域,尤其涉及一种基于netty和websocket协议实现web端自动打印订单服务方法. 背景技术: 电子商务是以信息网络技术为手段,以商品交换为中心的商务活动:也可理解为在互 ...

  7. 基于Websocket草案10协议的升级及基于Netty的握手实现

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! 最近发现 ...

  8. 一款基于Netty开发的WebSocket服务器

    代码地址如下: http://www.demodashi.com/demo/13577.html 一款基于Netty开发的WebSocket服务器 这是一款基于Netty框架开发的服务端,通信协议为W ...

  9. 基于netty搭建websocket,实现消息的主动推送

    基于netty搭建websocket,实现消息的主动推送 rpf_siwash https://www.jianshu.com/p/56216d1052d7 netty是由jboss提供的一款开源框架 ...

最新文章

  1. 一篇文章搞定大规模容器平台生产落地十大实践
  2. [Gamma阶段]第十次Scrum Meeting
  3. 前端规范之媒体文件规范
  4. hdu 5230(整数划分,dp)
  5. 使用Subversion版本标识符
  6. 几种字符串加密解密的方法
  7. zoj 1078 palindrom numbers
  8. 一步一步教你Pycharm的配置Python环境
  9. 微信小应用资源汇总整理
  10. 实用防火与防爆技术培训—总目录
  11. pip 安装指定版本的库
  12. android蓝牙 助手源码,蓝牙串口助手(Android Studio源码)
  13. Axure统计图表设计
  14. matlab解高阶非齐次方程并作图,2x2齐次线性方程组作图
  15. java protected用法_深入理解Java的protected修饰符
  16. StackExchange.Redis Timeout performing 超时问题
  17. 应广单片机PFS123按键中断控制数码管显示例程
  18. 设计组合中的10个严重错误可能会导致您丧命
  19. 数据中心交钥匙项目部署仍面临挑战
  20. 程序员必备免费电子书下载网站

热门文章

  1. 小程序源码:喝酒娱乐小游戏助力神器-多玩法安装简单
  2. linux中的lsof命令简介
  3. VLOOKUP函数常用套路大全
  4. 融合多头注意力机制的网络恶意流量检测
  5. linux系统下questasim 10.7安装教程
  6. 小程序正则验证 身份证号、统一社会信用代码
  7. Isaac Sim 使用指南(一)
  8. 非监督分类ecognition_资管新规学习03资管产品的范围和分类
  9. 计算机考证一般多少钱
  10. 看steam教育之风带来创新与变革