分布式 WebSocket 集群解决方案
作者 | weixin_34194702
来源 | blog.csdn.net/weixin_34194702/article/details/88701309
问题起因
最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题。
期间我经过了几天的研究,总结出了几个实现分布式WebSocket集群的办法,从zuul到spring cloud gateway的不同尝试,总结出了这篇文章,希望能帮助到某些人,并且能一起分享这方面的想法与研究。
以下是我的场景描述
资源:4台服务器。其中只有一台服务器具备ssl认证域名,一台redis+mysql服务器,两台应用服务器(集群)
应用发布限制条件:由于场景需要,应用场所需要ssl认证的域名才能发布。因此ssl认证的域名服务器用来当api网关,负责https请求与wss(安全认证的ws)连接。俗称https卸载,用户请求https域名服务器(eg:https://oiscircle.com/xxx),但真实访问到的是http+ip地址的形式。只要网关配置高,能handle多个应用
需求:用户登录应用,需要与服务器建立wss连接,不同角色之间可以单发消息,也可以群发消息
集群中的应用服务类型:每个集群实例都负责http无状态请求服务与ws长连接服务
系统架构图
在我的实现里,每个应用服务器都负责http and ws请求,其实也可以将ws请求建立的聊天模型单独成立为一个模块。从分布式的角度来看,这两种实现类型差不多,但从实现方便性来说,一个应用服务http+ws请求的方式更为方便。下文会有解释
如果您正在学习Spring Boot,推荐一个连载多年还在继续更新的免费教程:http://blog.didispace.com/spring-boot-learning-2x/
本文涉及的技术栈
Eureka 服务发现与注册
Redis Session共享
Redis 消息订阅
Spring Boot
Zuul 网关
Spring Cloud Gateway 网关
Spring WebSocket 处理长连接
Ribbon 负载均衡
Netty 多协议NIO网络通信框架
Consistent Hash 一致性哈希算法
相信能走到这一步的人都了解过我上面列举的技术栈了,如果还没有,可以先去网上找找入门教程了解一下。下面的内容都与上述技术相关,题主默认大家都了解过了...
技术可行性分析
下面我将描述session特性,以及根据这些特性列举出n个解决分布式架构中处理ws请求的集群方案
WebSocketSession与HttpSession
在Spring所集成的WebSocket里面,每个ws连接都有一个对应的session:WebSocketSession,在Spring WebSocket中,我们建立ws连接之后可以通过类似这样的方式进行与客户端的通信:
protected void handleTextMessage(WebSocketSession session, TextMessage message) {System.out.println("服务器接收到的消息: "+ message );//send message to clientsession.sendMessage(new TextMessage("message"));
}
那么问题来了:ws的session无法序列化到redis,因此在集群中,我们无法将所有WebSocketSession都缓存到redis进行session共享。每台服务器都有各自的session。于此相反的是HttpSession,redis可以支持httpsession共享,但是目前没有websocket session共享的方案,因此走redis websocket session共享这条路是行不通的。
有的人可能会想:我可不可以将sessin关键信息缓存到redis,集群中的服务器从redis拿取session关键信息然后重新构建websocket session...我只想说这种方法如果有人能试出来,请告诉我一声...
以上便是websocket session与http session共享的区别,总的来说就是http session共享已经有解决方案了,而且很简单,只要引入相关依赖:spring-session-data-redis
和spring-boot-starter-redis
,大家可以从网上找个demo玩一下就知道怎么做了。而websocket session共享的方案由于websocket底层实现的方式,我们无法做到真正的websocket session共享。
解决方案的演变
Netty与Spring WebSocket
刚开始的时候,我尝试着用netty实现了websocket服务端的搭建。在netty里面,并没有websocket session这样的概念,与其类似的是channel,每一个客户端连接都代表一个channel。前端的ws请求通过netty监听的端口,走websocket协议进行ws握手连接之后,通过一些列的handler(责链模式)进行消息处理。与websocket session类似地,服务端在连接建立后有一个channel,我们可以通过channel进行与客户端的通信
如果您正在学习Spring Boot,推荐一个连载多年还在继续更新的免费教程:http://blog.didispace.com/spring-boot-learning-2x/
/*** TODO 根据服务器传进来的id,分配到不同的group*/private static final ChannelGroup GROUP = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);@Overrideprotected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {//retain增加引用计数,防止接下来的调用引用失效System.out.println("服务器接收到来自 " + ctx.channel().id() + " 的消息: " + msg.text());//将消息发送给group里面的所有channel,也就是发送消息给客户端GROUP.writeAndFlush(msg.retain());}
那么,服务端用netty还是用spring websocket?以下我将从几个方面列举这两种实现方式的优缺点
使用netty实现websocket
玩过netty的人都知道netty是的线程模型是nio模型,并发量非常高,spring5之前的网络线程模型是servlet实现的,而servlet不是nio模型,所以在spring5之后,spring的底层网络实现采用了netty。如果我们单独使用netty来开发websocket服务端,速度快是绝对的,但是可能会遇到下列问题:
与系统的其他应用集成不方便,在rpc调用的时候,无法享受springcloud里feign服务调用的便利性
业务逻辑可能要重复实现
使用netty可能需要重复造轮子
怎么连接上服务注册中心,也是一件麻烦的事情
restful服务与ws服务需要分开实现,如果在netty上实现restful服务,有多麻烦可想而知,用spring一站式restful开发相信很多人都习惯了。
使用spring websocket实现ws服务
spring websocket已经被springboot很好地集成了,所以在springboot上开发ws服务非常方便,做法非常简单
第一步:添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
第二步:添加配置类
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {registry.addHandler(myHandler(), "/").setAllowedOrigins("*");
}@Beanpublic WebSocketHandler myHandler() {return new MessageHandler();}
}
第三步:实现消息监听类
@Component
@SuppressWarnings("unchecked")
public class MessageHandler extends TextWebSocketHandler {private List<WebSocketSession> clients = new ArrayList<>();@Overridepublic void afterConnectionEstablished(WebSocketSession session) {clients.add(session);System.out.println("uri :" + session.getUri());System.out.println("连接建立: " + session.getId());System.out.println("current seesion: " + clients.size());}@Overridepublic void afterConnectionClosed(WebSocketSession session, CloseStatus status) {clients.remove(session);System.out.println("断开连接: " + session.getId());}@Overrideprotected void handleTextMessage(WebSocketSession session, TextMessage message) {String payload = message.getPayload();Map<String, String> map = JSONObject.parseObject(payload, HashMap.class);System.out.println("接受到的数据" + map);clients.forEach(s -> {try {System.out.println("发送消息给: " + session.getId());s.sendMessage(new TextMessage("服务器返回收到的信息," + payload));} catch (Exception e) {e.printStackTrace();}});}
}
从这个demo中,使用spring websocket实现ws服务的便利性大家可想而知了。为了能更好地向spring cloud大家族看齐,我最终采用了spring websocket实现ws服务。
因此我的应用服务架构是这样子的:一个应用既负责restful服务,也负责ws服务。没有将ws服务模块拆分是因为拆分出去要使用feign来进行服务调用。第一本人比较懒惰,第二拆分与不拆分相差在多了一层服务间的io调用,所以就没有这么做了。
如果您正在学习Spring Boot,推荐一个连载多年还在继续更新的免费教程:http://blog.didispace.com/spring-boot-learning-2x/
从zuul技术转型到spring cloud gateway
要实现websocket集群,我们必不可免地得从zuul转型到spring cloud gateway。原因如下:
zuul1.0版本不支持websocket转发,zuul 2.0开始支持websocket,zuul2.0几个月前开源了,但是2.0版本没有被spring boot集成,而且文档不健全。因此转型是必须的,同时转型也很容易实现。
在gateway中,为了实现ssl认证和动态路由负载均衡,yml文件中以下的某些配置是必须的,在这里提前避免大家采坑
server:port: 443ssl:enabled: truekey-store: classpath:xxx.jkskey-store-password: xxxxkey-store-type: JKSkey-alias: alias
spring:application:name: api-gatewaycloud:gateway:httpclient:ssl:handshake-timeout-millis: 10000close-notify-flush-timeout-millis: 3000close-notify-read-timeout-millis: 0useInsecureTrustManager: truediscovery:locator:enabled: truelower-case-service-id: trueroutes:- id: dcuri: lb://dcpredicates:- Path=/dc/**- id: wecheckuri: lb://wecheckpredicates:- Path=/wecheck/**
如果要愉快地玩https卸载,我们还需要配置一个filter,否则请求网关时会出现错误not an SSL/TLS record
@Component
public class HttpsToHttpFilter implements GlobalFilter, Ordered {private static final int HTTPS_TO_HTTP_FILTER_ORDER = 10099;@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI originalUri = exchange.getRequest().getURI();ServerHttpRequest request = exchange.getRequest();ServerHttpRequest.Builder mutate = request.mutate();String forwardedUri = request.getURI().toString();if (forwardedUri != null && forwardedUri.startsWith("https")) {try {URI mutatedUri = new URI("http",originalUri.getUserInfo(),originalUri.getHost(),originalUri.getPort(),originalUri.getPath(),originalUri.getQuery(),originalUri.getFragment());mutate.uri(mutatedUri);} catch (Exception e) {throw new IllegalStateException(e.getMessage(), e);}}ServerHttpRequest build = mutate.build();ServerWebExchange webExchange = exchange.mutate().request(build).build();return chain.filter(webExchange);}@Overridepublic int getOrder() {return HTTPS_TO_HTTP_FILTER_ORDER;}
}
这样子我们就可以使用gateway来卸载https请求了,到目前为止,我们的基本框架已经搭建完毕,网关既可以转发https请求,也可以转发wss请求。接下来就是用户多对多之间session互通的通讯解决方案了。接下来,我将根据方案的优雅性,从最不优雅的方案开始讲起。
session广播
这是最简单的websocket集群通讯解决方案。场景如下:
教师A想要群发消息给他的学生们
教师的消息请求发给网关,内容包含{我是教师A,我想把xxx消息发送我的学生们}
网关接收到消息,获取集群所有ip地址,逐个调用教师的请求
集群中的每台服务器获取请求,根据教师A的信息查找本地有没有与学生关联的session,有则调用sendMessage方法,没有则忽略请求
session广播实现很简单,但是有一个致命缺陷:计算力浪费现象,当服务器没有消息接收者session的时候,相当于浪费了一次循环遍历的计算力,该方案在并发需求不高的情况下可以优先考虑,实现很容易。
另外,如果您正在学习Spring Cloud,推荐一个连载多年还在继续更新的免费教程:https://blog.didispace.com/spring-cloud-learning/
spring cloud中获取服务集群中每台服务器信息的方法如下
@Resource
private EurekaClient eurekaClient;Application app = eurekaClient.getApplication("service-name");
//instanceInfo包括了一台服务器ip,port等消息
InstanceInfo instanceInfo = app.getInstances().get(0);
System.out.println("ip address: " + instanceInfo.getIPAddr());
服务器需要维护关系映射表,将用户的id与session做映射,session建立时在映射表中添加映射关系,session断开后要删除映射表内关联关系
一致性哈希算法实现(本文的要点)
这种方法是本人认为最优雅的实现方案,理解这种方案需要一定的时间,如果你耐心看下去,相信你一定会有所收获。再强调一次,不了解一致性哈希算法的同学请先看这里,现先假设哈希环是顺时针查找的。
首先,想要将一致性哈希算法的思想应用到我们的websocket集群,我们需要解决以下新问题:
集群节点DOWN,会影响到哈希环映射到状态是DOWN的节点。
集群节点UP,会影响到旧key映射不到对应的节点。
哈希环读写共享。
在集群中,总会出现服务UP/DOWN的问题。
针对节点DOWN的问题分析如下:
一个服务器DOWN的时候,其拥有的websocket session会自动关闭连接,并且前端会收到通知。此时会影响到哈希环的映射错误。我们只需要当监听到服务器DOWN的时候,删除哈希环上面对应的实际结点和虚结点,避免让网关转发到状态是DOWN的服务器上。
实现方法:在eureka治理中心监听集群服务DOWN事件,并及时更新哈希环。
另外,如果您正在学习Spring Cloud,推荐一个连载多年还在继续更新的免费教程:https://blog.didispace.com/spring-cloud-learning/
针对节点UP的问题分析如下:
现假设集群中有服务 CacheB上线了,该服务器的ip地址刚好被映射到key1和 cacheA之间。那么key1对应的用户每次要发消息时都跑去 CacheB发送消息,结果明显是发送不了消息,因为 CacheB没有key1对应的session。
此时我们有两种解决方案。
方案A简单,动作大:
eureka监听到节点UP事件之后,根据现有集群信息,更新哈希环。并且断开所有session连接,让客户端重新连接,此时客户端会连接到更新后的哈希环节点,以此避免消息无法送达的情况。
方案B复杂,动作小:
我们先看看没有虚拟节点的情况,假设 CacheC和 CacheA之间上线了服务器 CacheB。所有映射在 CacheC到 CacheB的用户发消息时都会去 CacheB里面找session发消息。也就是说 CacheB一但上线,便会影响到 CacheC到 CacheB之间的用户发送消息。所以我们只需要将 CacheA断开 CacheC到 CacheB的用户所对应的session,让客户端重连。
接下来是有虚拟节点的情况,假设浅色的节点是虚拟节点。我们用长括号来代表某段区域映射的结果属于某个 Cache。首先是C节点未上线的情况。图大家应该都懂吧,所有B的虚拟节点都会指向真实的B节点,所以所有B节点逆时针那一部分都会映射到B(因为我们规定哈希环顺时针查找)。
接下来是C节点上线的情况,可以看到某些区域被C占领了。
由以上情况我们可以知道:节点上线,会有许多对应虚拟节点也同时上线,因此我们需要将多段范围key对应的session断开连接(上图红色的部分)。具体算法有点复杂,实现的方式因人而异,大家可以尝试一下自己实现算法。
哈希环应该放在哪里?
gateway本地创建并维护哈希环。当ws请求进来的时候,本地获取哈希环并获取映射服务器信息,转发ws请求。这种方法看上去不错,但实际上是不太可取的,回想一下上面服务器DOWN的时候只能通过eureka监听,那么eureka监听到DOWN事件之后,需要通过io来通知gateway删除对应节点吗?显然太麻烦了,将eureka的职责分散到gateway,不建议这么做。
eureka创建,并放到redis共享读写。这个方案可行,当eureka监听到服务DOWN的时候,修改哈希环并推送到redis上。为了请求响应时间尽量地短,我们不可以让gateway每次转发ws请求的时候都去redis取一次哈希环。哈希环修改的概率的确很低,gateway只需要应用redis的消息订阅模式,订阅哈希环修改事件便可以解决此问题。
至此我们的spring websocket集群已经搭建的差不多了,最重要的地方还是一致性哈希算法。现在有最后一个技术瓶颈,网关如何根据ws请求转发到指定的集群服务器上?
答案在负载均衡。spring cloud gateway或zuul都默认集成了ribbon作为负载均衡,我们只需要根据建立ws请求时客户端发来的user id,重写ribbon负载均衡算法,根据user id进行hash,并在哈希环上寻找ip,并将ws请求转发到该ip便完事了。流程如下图所示:
接下来用户沟通的时候,只需要根据id进行hash,在哈希环上获取对应ip,便可以知道与该用户建立ws连接时的session存在哪台服务器上了!
另外,如果您正在学习Spring Cloud,推荐一个连载多年还在继续更新的免费教程:https://blog.didispace.com/spring-cloud-learning/
spring cloud Finchley.RELEASE 版本中ribbon未完善的地方
题主在实际操作的时候发现了ribbon两个不完善的地方......
根据网上找的方法,继承AbstractLoadBalancerRule重写负载均衡策略之后,多个不同应用的请求变得混乱。假如eureka上有两个service A和B,重写负载均衡策略之后,请求A或B的服务,最终只会映射到其中一个服务上。非常奇怪!可能spring cloud gateway官网需要给出一个正确的重写负载均衡策略的demo。
一致性哈希算法需要一个key,类似user id,根据key进行hash之后在哈希环上搜索并返回ip。但是ribbon没有完善choose函数的key参数,直接写死了default!
难道这样子我们就没有办法了吗?其实还有一个可行并且暂时可替代的办法!
如下图所示,客户端发送一个普通的http请求(包含id参数)给网关,网关根据id进行hash,在哈希环中寻找ip地址,将ip地址返回给客户端,客户端再根据该ip地址进行ws请求。
由于ribbon未完善key的处理,我们暂时无法在ribbon上实现一致性哈希算法。只能间接地通过客户端发起两次请求(一次http,一次ws)的方式来实现一致性哈希。希望不久之后ribbon能更新这个缺陷!让我们的websocket集群实现得更优雅一点。
后记
以上便是我这几天探索的结果。期间遇到了许多问题,并逐一解决难题,列出两个websocket集群解决方案。第一个是session广播,第二个是一致性哈希。
这两种方案针对不同场景各有优缺点,本文并未用到ActiveMQ,Karfa等消息队列实现消息推送,只是想通过自己的想法,不依靠消息队列来简单地实现多用户之间的长连接通讯。希望能为大家提供一条不同于寻常的思路。
往期推荐
我去!每天都用的这个操作居然算“黑客行为”?
为什么IDEA不推荐你使用@Autowired ?
GitHub高赞,一款足以取代迅雷的开源下载工具
炸裂!跑P站上教微积分,年入170w...
网传字节跳动、腾讯将执行1075和965工作制?加班要审批才行!
技术交流群
最近有很多人问,有没有读者交流群,想知道怎么加入。加入方式很简单,有兴趣的同学,只需要点击下方卡片,回复“加群“,即可免费加入我们的高质量技术交流群!
点击阅读原文,送你免费Spring Boot教程!
分布式 WebSocket 集群解决方案相关推荐
- 深入浅出Websocket(二)分布式Websocket集群
前言 最近在构建两个系统的实时通信部分,总结一下所学. 这是一个系列文章,暂时主要构思四个部分 深入浅出Websocket(一)Websocket协议 深入浅出Websocket(二)分布式Webso ...
- WebSocket 集群解决方案
欢迎关注方志朋的博客,回复"666"获面试宝典 问题起因 最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Sessio ...
- WebSocket集群解决方案
一.问题起因 最近做项目时遇到了需要多用户之间通信的问题,涉及到了WebSocket握手请求,以及集群中WebSocket Session共享的问题. 期间我经过了几天的研究,总结出了几个实现分布式W ...
- tcp out of order解决_分布式集群解决方案 学习笔记
回到目录: OrangeZh:拉勾教育:JAVA高薪训练营 学习技术篇zhuanlan.zhihu.com 介绍 文章内容输出来源:拉勾教育 Java高薪训练营 分布式集群解决方案相关 什么是分布式 ...
- 分布式IM及Netty服务集群解决方案
一.概述 使用netty开发分布式Im,提供分布netty集群解决方案.服务端通过负载均衡策略与服务集群建立连接,消息发送通过服务间集群的通信进行消息转发. 二.集群架构 三.项目地址 https:/ ...
- 分布式ActiveMQ集群--转载
原文地址:http://shensy.iteye.com/blog/1752529 回顾总结前一段时间学习的ActiveMQ分布式集群相关的知识,分享出来希望对看到的人有所帮助. 一.分布式Activ ...
- 保障IDC安全:分布式HIDS集群架构设计
背景 近年来,互联网上安全事件频发,企业信息安全越来越受到重视,而IDC服务器安全又是纵深防御体系中的重要一环.保障IDC安全,常用的是基于主机型入侵检测系统Host-based Intrusion ...
- 分布式精华问答 | 分布式与集群的区别是什么?
什么是分布式计算?所谓分布式计算是一门计算机科学,它研究如何把一个需要非常巨大的计算能力才能解决的问题分成许多小的部分,然后把这些部分分配给许多计算机进行处理,最后把这些计算结果综合起来得到最终的结果 ...
- 私有云办公平台大规模集群/企业级集群/小型工作室集群解决方案:NextCloud集群部署方案--NextCloud集群架构设计
原作者:NextCloud文档库 转载来源:https://docs.nextcloud.com/server/11/admin_manual/installation/deployment_reco ...
最新文章
- C#调用C++dll
- 雅客EXCEL (3)-合并取消单元格、平均值、添加序号
- 【B站视频教程笔记】基于VSCode和CMake实现C/C++开发 | Linux篇(gcc/g++)(安装、配置、使用详细教程)(VSCode教程)(CMake教程)(精!)
- windows中 修改某种文件图标 的方法 (备忘)
- NOIP模拟测试「简单的区间·简单的玄学·简单的填数·简单的序列」
- 找第一个只出现一次的字符_leetcode哈希表之第一个只出现一次的字符
- java实现增量更新_Android 增量更新的完整实现步骤
- Java千百问_05面向对象(013)_泛型如何使用
- 【项目总结】基于STM32的物流搬运小车
- idea创建svn分支
- 【读书笔记】《有效需求分析》
- 医院排队系统排队叫号系统
- 如何做一个简单的学生喜欢家长买单的scratch小游戏
- 【K8S】Submariner实现跨集群通信
- Thinkphp6 baiy/think-async redis 异步代码执行/异步延迟执行/异步事件订阅
- CV学习笔记【1】:transforms
- 亿级工具类APP头条数据聚合优化实践
- android卡片风格,[Android] Android 卡片式控件CardView的优雅使用
- win10桌面计算机在哪里打开,Win10计算器在哪里?三种可以打开Win10计算器的方法图文介绍...
- 在线文库源码php,在线文库网站 文档分享平台网站 在线文档 在线预览网站 源码...
热门文章
- Cracking the coding interview--Q1.7
- vector机器人 WHAT DO I USE THE VECTOR APP FOR? 我使用 VECTOR 应用程序做什么?
- linux 处理 BOM头 ^M 方法
- java -cp 和 java -jar 的区别
- python3 调用字符串对应的函数
- /usr/bin/ld: cannot find -lc错误原因及解决方法
- python3 运算符
- python3 selenium webdriver 启动三大浏览器Firefox,Chrome,IE
- openvas进程间通讯api与报文交互
- Android 中的 Service 全面总结