项目背景

和各位读者大致介绍下具体场景,线上的小程序中开放一些语音麦克风的房间,让用户进入房间之后可以互相通过语音聊天的方式进行互动。

这里分享一下相关的技术设计方案。这款系统的核心点设计在于如何能让一个用户发出的语音通知到其他用户上边。语音数据在客户端同事的处理下最终变成了io数据流请求到了后端,后端只需要将这些数据流传达给各个不同的终端即可达到广播通知的效果。

单机版架构

最初期上线的时候,为了赶速度,快速试错,所以简单地采用了单机版架构去设计。结合技术栈为 SpringBoot,WebSocket,MySQL技术。

线上一间语音房间的同时在线人数并不会特别多,大概在15-50人的区间段内,系统核心代码是通过SpringBoot内部的WebSocket技术去进行数据的主动推送。

设计思路

整体的设计图比较简单,基本就是一台服务器存储WebSocket连接,如下图所示:

用户进行WebSocket初始化连接的时候需要一个连接分配和存储的过程:早期的存储是存放在了服务器本地的一个Map集合中。当WebSocket进行连接的时候就会往内存中写入一条数据信息,当链接断开的时候,就将内存中的数据移除。然后进行语音广播的时候需要结合WebSocket内部的广播发送功能进行通知看似设计比较简单,但是在后期业务变得庞大的时候出现了瓶颈。因为随着参加语音活动用户的增加,越来越多的WebSocketSession对象需要被存储到内存当中,这种有状态性的存储对于单机扩容不灵活。

设计缺陷

1.假设原先的服务器扩容到了A,B两台机器,A用户在A机器上边建立了WebSocketSession,B用户在B机器上边建立的WebSocketSession连接。此时如果A想要和B进行对话发送,需要先查找到具体WebSocketSession存放在哪台机器上边。

2.当用户出现了网络异常,临时断开连接进行重连的时候,也可能会出现1所说的问题。

集群架构

设计思路

一旦出现需要发送语音通知的时候,发送一条广播的mq消息,每个机器都接收到消息之后,触发自己的广播操作即可。

RocketMq的接入系统设计里面mq采用的是广播模式,这和我们通常使用的集群模式有一定的区别。

消息队列RocketMQ版是基于发布或订阅模型的消息系统。消费者,即消息的订阅方订阅关注的Topic,以获取并消费消息。由于消费者应用一般是分布式系统,以集群方式部署,因此消息队列RocketMQ版约定以下概念:

  • 集群:使用相同Group ID的消费者属于同一个集群。同一个集群下的消费者消费逻辑必须完全一致(包括Tag的使用)。

  • 集群消费:当使用集群消费模式时,消息队列RocketMQ版认为任意一条消息只需要被集群内的任意一个消费者处理即可。

  • 广播消费:当使用广播消费模式时,消息队列RocketMQ版会将每条消息推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。

集群消费模式适用场景 适用于消费端集群化部署,每条消息只需要被处理一次的场景。此外,由于消费进度在服务端维护,可靠性更高。具体消费示例如下图所示。

注意事项

  • 集群消费模式下,每一条消息都只会被分发到一台机器上处理。如果需要被集群下的每一台机器都处理,请使用广播模式。

  • 集群消费模式下,不保证每一次失败重投的消息路由到同一台机器上。

广播消费模式适用场景 适用于消费端集群化部署,每条消息需要被集群下的每个消费者处理的场景。具体消费示例如下图所示。注意事项

  • 广播消费模式下不支持顺序消息。

  • 广播消费模式下不支持重置消费位点。

  • 每条消息都需要被相同订阅逻辑的多台机器处理。

  • 消费进度在客户端维护,出现重复消费的概率稍大于集群模式。

  • 广播模式下,消息队列RocketMQ版保证每条消息至少被每台客户端消费一次,但是并不会重投消费失败的消息,因此业务方需要关注消费失败的情况。

  • 广播模式下,客户端每一次重启都会从最新消息消费。客户端在被停止期间发送至服务端的消息将会被自动跳过,请谨慎选择。

  • 广播模式下,每条消息都会被大量的客户端重复处理,因此推荐尽可能使用集群模式。

  • 广播模式下服务端不维护消费进度,所以消息队列RocketMQ版控制台不支持消息堆积查询、消息堆积报警和订阅关系查询功能。

这里面的应用场景需要对集群内部对每个消费者都对服务器内存中的socket连接进行session是否存在对判断,因此需要采用mq的广播模式。

关于mq部分的接入代码

Consumer模块的配置:

package org.idea.web.socket.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
/*** @Author linhao* @Date created in 10:30 上午 2021/5/10*/
@ConfigurationProperties(prefix = "rocketmq.consumer")
public class MqConsumerConfig {private boolean isOn;private String groupName;private String nameSrvAddr;private String topics;private Integer consumeThreadMin;private Integer consumeThreadMax;private Integer consumeMessageBatchMaxSize;/**getter 和 setter部分省略**/
}

Producer模块的配置展示:


package org.idea.web.socket.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
/*** @Author linhao* @Date created in 10:26 上午 2021/5/10*/
@ConfigurationProperties(prefix = "rocketmq.producer")
public class MqProducerConfig {private boolean isOn;private String groupName;private String nameSrvAddr;private Integer maxMessageSize;private Integer sendMsgTimeout;private Integer retryTimesWhenSendFailed;/**getter 和 setter部分省略**/
}

RocketMq内部的消费端Bean配置


package org.idea.web.socket.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.idea.web.socket.config.MqConsumerConfig;
import org.idea.web.socket.config.MqProducerConfig;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/*** @Author linhao* @Date created in 10:34 上午 2021/5/10*/
@Configuration
@Slf4j
@EnableConfigurationProperties({MqConsumerConfig.class})
public class MqConsumerAutoConfig {@Resourceprivate MqConsumerConfig mqConsumerConfig;@Resource//这个接口需要手动实现顺序消费的逻辑 每次获取到消息队列的第一条数据private MessageListenerHandler messageListenerConcurrently;@Bean@ConditionalOnMissingBeanpublic DefaultMQPushConsumer defaultMQPushConsumer() {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr());consumer.setConsumerGroup(mqConsumerConfig.getGroupName());consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin());consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());consumer.registerMessageListener(messageListenerConcurrently);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//消费模型是什么?consumer.setMessageModel(MessageModel.BROADCASTING);//默认一次拉取一条消费consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());//*表示订阅所有的tagtry {consumer.subscribe(mqConsumerConfig.getTopics(), "*");consumer.start();log.info("【 MqConsumerAutoConfig 】mq consumer is started!");} catch (Exception e) {log.error("mq start fail,e is ", e);}return consumer;}
}

RocketMq的服务生产者Bean配置

package org.idea.web.socket.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.idea.web.socket.config.MqProducerConfig;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.Resource;
/*** @Author linhao* @Date created in 11:05 上午 2021/5/10*/
@Configuration
@Slf4j
@EnableConfigurationProperties({MqProducerConfig.class})
public class MqProducerAutoConfig {@Resourceprivate MqProducerConfig mqProducerConfig;@Bean@ConditionalOnMissingBean//意味着DefaultMQProducer的配置可以被覆盖public DefaultMQProducer defaultMQProducer() {DefaultMQProducer producer = new DefaultMQProducer(mqProducerConfig.getGroupName());producer.setNamesrvAddr(mqProducerConfig.getNameSrvAddr());//没有则自动创建topic的key
//        producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");producer.setMaxMessageSize(mqProducerConfig.getMaxMessageSize());producer.setSendMsgTimeout(mqProducerConfig.getSendMsgTimeout());producer.setRetryTimesWhenSendFailed(mqProducerConfig.getRetryTimesWhenSendFailed());try {producer.start();log.info("【 MqProducerAutoConfig 】mq producer is started!");} catch (Exception e) {log.error("[MqProducerAutoConfig] start fail, e is ", e);}return producer;}
}

然后是对RocketMq内部发送消息事件的一层函数封装

package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.idea.web.socket.config.MqProducerConfig;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/*** 消息广播发送端** @Author linhao* @Date created in 10:43 下午 2021/5/9*/
@Component
@Slf4j
public class BroadcastMqProducer {@Resourceprivate DefaultMQProducer defaultMQProducer;@Resourceprivate MqProducerConfig mqProducerConfig;private static String TOPIC = "ws-topic";private static String TAGS = "ws-tag";public static Integer ALL_USER_RECEIVE_TYPE = 1;public static Integer ONE_USER_RECEIVE_TYPE = 2;/*** 点对点之间的消息发送** @param destSessionKey* @param msg* @return*/public SendResult sendWebSocketToUser(String destSessionKey,String msg) {if (StringUtils.isEmpty(msg)) {log.error("[sendWebSocketToUser] msg can not be null!");return null;}Message message = null;SendResult sendResult = null;try {BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();broadcastMqDTO.setEventType(ONE_USER_RECEIVE_TYPE);broadcastMqDTO.setMessage(msg);broadcastMqDTO.setSessionKey(destSessionKey);message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));sendResult = defaultMQProducer.send(message);} catch (Exception e) {log.error("[sendWebSocketBroadcastMsg] e is ", e);}return sendResult;}/*** 广播消息发送** @param msg* @return*/public SendResult sendWebSocketBroadcastMsg(String msg) {if (StringUtils.isEmpty(msg)) {log.error("[sendWebSocketBroadcastMsg] msg can not be null!");return null;}Message message = null;SendResult sendResult = null;try {BroadcastMqDTO broadcastMqDTO = new BroadcastMqDTO();broadcastMqDTO.setEventType(ALL_USER_RECEIVE_TYPE);broadcastMqDTO.setMessage(msg);message = new Message(TOPIC, TAGS, (JSON.toJSONString(broadcastMqDTO)).getBytes(RemotingHelper.DEFAULT_CHARSET));sendResult = defaultMQProducer.send(message);} catch (Exception e) {log.error("[sendWebSocketBroadcastMsg] e is ", e);}return sendResult;}
}

对消息的订阅模块实现代码如下:

package org.idea.web.socket.mq;
import com.alibaba.fastjson.JSON;
import com.oracle.tools.packager.Log;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.idea.web.socket.dto.BroadcastMqDTO;
import org.idea.web.socket.manager.SocketManager;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.web.socket.WebSocketSession;
import javax.annotation.Resource;
import java.util.List;
import static org.idea.web.socket.mq.BroadcastMqProducer.ALL_USER_RECEIVE_TYPE;
import static org.idea.web.socket.mq.BroadcastMqProducer.ONE_USER_RECEIVE_TYPE;
/*** @Author linhao* @Date created in 10:59 上午 2021/5/10*/
@Component
@Slf4j
public class MessageListenerHandler implements MessageListenerConcurrently {@Resourceprivate SocketManager socketManager;@Resourceprivate SimpMessagingTemplate template;@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {if (CollectionUtils.isEmpty(list)) {Log.info("receive empty msg");return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}MessageExt messageExt = list.get(0);byte[] bytes = messageExt.getBody();String json = new String(bytes);BroadcastMqDTO broadcastMqDTO = JSON.parseObject(json, BroadcastMqDTO.class);log.info("[MessageListenerHandler] broadcastMqDTO is " + broadcastMqDTO);if (ALL_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {log.info("[consumeMessage] 广播发送消息:触发----》消息内容为:" + broadcastMqDTO);template.convertAndSend("/topic/sendTopic", broadcastMqDTO);} else if (ONE_USER_RECEIVE_TYPE.equals(broadcastMqDTO.getEventType())) {String sessionKey = broadcastMqDTO.getSessionKey();WebSocketSession webSocketSession = socketManager.get(sessionKey);if (webSocketSession != null) {template.convertAndSendToUser(sessionKey, "/queue/sendUser", broadcastMqDTO.getMessage());log.info("[consumeMessage] 点对点发送消息;触发----》消息内容为:" + broadcastMqDTO);}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
}

整体设计结构如下图:

于是按照这个结构进行了一版本的紧急开发迭代,原先的单台服务器扩展为了服务集群。

业务拓展后续产品经理提出一个需求,要求支持在同一间房内的两个用户之间发送悄悄话功能。这就需要我们进行一个点对点之间传输通讯的功能了。因此需要在mq通知到每台机器的时候加一个本地Session遍历的逻辑,如果当前机器存有用户token对应的session变量,那么就单独针对那个Session进行WebSocket的发送通知。设计弊端一旦某台机器出现了异常崩溃,那么就意味着这台机器上的所有语音连接可能会出现中断情况。目前这一块的问题也在考虑解决,计划是将WebSocketSession存入到分布式缓存的redis中保证数据可靠存储,但是在后续尝试的时候发现WebSocketSession对象没有实现序列化接口,在存储到Redis的时候会出现异常。目前这个问题还在寻找解决思路中,不知道各位读者朋友们有什么好的思路。

遇到的问题点用户请求直接访问到了我们的内部服务器,如果在请求的中间加入一台nginx做负载均衡则需要在nginx中配置一些额外信息。

项目的源代码比较多,这里我把核心部分的代码整理了一份,感兴趣的朋友可以到我的gitee上边去下载:

https://gitee.com/IdeaHome_admin/socket-framework

推荐好文

>>【练手项目】基于SpringBoot的ERP系统,自带进销存+财务+生产功能>>分享一套基于SpringBoot和Vue的企业级中后台开源项目,代码很规范!
>>能挣钱的,开源 SpringBoot 商城系统,功能超全,超漂亮!

教你用纯Java实现一个即时通讯系统(附源码)相关推荐

  1. 用Java实现一个区块链系统 附源码!

    微信公众号:Java随笔录 关注可了解更多Java相关的技术分享.问题或建议,欢迎公众号留言! 如果你觉得JiangNanMax对你有帮助,欢迎赞赏! 文章目录 前言 1. 介绍 2. 实现细节 2. ...

  2. vue项目中集成腾讯TIM即时通讯(附源码)

    上图 前言 项目需要做个客服功能,用户端小程序,客服人员web端,于是用到了腾讯的tim 准备工作 在腾讯云官网上创建应用,获取到相应的SDKAppID和相应的秘钥信息 安装SDK (1) web项目 ...

  3. 手动搭建一个车牌识别系统 | 附源码

    点击上方"小白学视觉",选择加"星标"或"置顶" 重磅干货,第一时间送达 车牌识别是一种图像处理技术,用于识别不同车辆.这项技术被广泛用于各 ...

  4. JAVA毕业设计分时共享办公系统计算机源码+lw文档+系统+调试部署+数据库

    JAVA毕业设计分时共享办公系统计算机源码+lw文档+系统+调试部署+数据库 JAVA毕业设计分时共享办公系统计算机源码+lw文档+系统+调试部署+数据库 本源码技术栈: 项目架构:B/S架构 开发语 ...

  5. java毕业生设计仓储ERP系统计算机源码+系统+mysql+调试部署+lw

    java毕业生设计仓储ERP系统计算机源码+系统+mysql+调试部署+lw java毕业生设计仓储ERP系统计算机源码+系统+mysql+调试部署+lw 本源码技术栈: 项目架构:B/S架构 开发语 ...

  6. JAVA毕业设计-智慧农业水果销售系统计算机源码+lw文档+系统+调试部署+数据库

    JAVA毕业设计-智慧农业水果销售系统计算机源码+lw文档+系统+调试部署+数据库 JAVA毕业设计-智慧农业水果销售系统计算机源码+lw文档+系统+调试部署+数据库 本源码技术栈: 项目架构:B/S ...

  7. iOS即时通讯之CocoaAsyncSocket源码解析一

    申明:本文内容属于转载整理,原文连接 前言: CocoaAsyncSocket是谷歌的开发者,基于BSD-Socket写的一个IM框架,它给Mac和iOS提供了易于使用的.强大的异步套接字库,向上封装 ...

  8. JAVA毕业设计口腔医院患者服务系统计算机源码+lw文档+系统+调试部署+数据库

    JAVA毕业设计口腔医院患者服务系统计算机源码+lw文档+系统+调试部署+数据库 JAVA毕业设计口腔医院患者服务系统计算机源码+lw文档+系统+调试部署+数据库 本源码技术栈: 项目架构:B/S架构 ...

  9. java毕业设计校园内推系统mybatis+源码+调试部署+系统+数据库+lw

    java毕业设计校园内推系统mybatis+源码+调试部署+系统+数据库+lw java毕业设计校园内推系统mybatis+源码+调试部署+系统+数据库+lw 本源码技术栈: 项目架构:B/S架构 开 ...

最新文章

  1. 汇总同一时间段的数据_数据集干货:一文读懂Mapsidejoin
  2. 链式比较、奇怪的字母、有趣的import...Python冷知识(六)
  3. 【正一专栏】巴萨艰难收获一场平局
  4. hdu 3966( 树链剖分+点权更新)
  5. 字节增强java_提高byte的效率
  6. 计算机网络(二十)-广域网-PPP协议和HDLC协议
  7. zencart设置产品始终免运费sql
  8. 搭建tidb集群linux_无服务器计算,学习Go,Linux系统恢复,TiDB,Udev等
  9. 在 Ubuntu 和 Linux Mint 上释放空间的9种简单方法
  10. 腾讯背后的神秘金主,1000亿美元资本大收割
  11. 不同VPC路由器通过静态路由、动态路由(OSPF)实现网络互通实战
  12. python离线语音识别_python语音识别模块
  13. ceph 删除 osd
  14. Java 对上传文件后缀格式的校验
  15. JAVA将图片背景色设置为透明
  16. win7双屏幕,双任务栏
  17. 十大编程语言黑客向,学会一个不怕没工作,全部学会随便秀操作
  18. javamail 读取邮箱邮件并下载附件
  19. 银行卡号每输四位加空格,及银行卡的识别(此银行卡号是那个银行)
  20. vue项目,h5图片放大后,支持手指缩放功能

热门文章

  1. 罗永浩的电子烟公司融资3000万元?8月或将发布新品 定价600元左右
  2. 拼多多:永远不会对孵化品牌“二选一” 扶持千家工厂触达4.4亿消费者
  3. 亚马逊无人商店因拒收现金被美国多地禁止:被认定歧视消费者
  4. 厉害了!这家国产厂商2018年在印度高端手机市场销量第一
  5. 拳王虚拟项目公社:虚拟资源如何挣钱,小白月入过万的操作思路
  6. 【点阵液晶编程连载三】点阵LCD 的驱动与显控
  7. 聊聊Socket、TCP/IP、HTTP、FTP及网络编程
  8. qt 里面使用webengine
  9. tensorflow线性回归基础函数
  10. Golang实践录:命令行cobra库实例优化