第三章 打造高性能的视频弹幕系统

  • 场景分析:客户端针对某一视频创建了弹幕,发送给后端进行处理,后端需要对所有正在观看该视频的用户推送该弹幕
  • 两种实现方式:使用短连接进行通信或使用长连接进行通信

短连接实现方案:

  • 所有观看视频的客户端不断轮询后端,若有新的弹幕则拉取后进行显示
  • 缺点:轮询的效率低,非常浪费资源(因为HTTP协议只能由客户端向服务端发起,故必须不停连接后端)

长连接实现方案:

  • 采用 WebSocket 进行前后端通信
  • 为什么要用 WebSocket:HTTP 协议的通信只能由客户端发起,做不到服务器主动向客户端推送信息。

WebSocket 协议

  • WebSocket简介:WebSocket 协议是基于TCP的一种新的网络协议。它实现了浏览器与服务器全双工(Full-Duplex)通信。
  • 全双工(Full-Duplex)通信:客户端可以主动发送信息给服务端,服务端也可以主动发送信息给客户端。
  • WebSocket协议优点:报文体积小、支持长连接。

弹幕系统架构设计

优化方向一(后端接收前端发来的弹幕,并将弹幕推送给前端展示)

  • 假设前端传过来2万条请求(弹幕),后端需要推送这2万条请求到前端,那么就相当于后端总共需要处理4万条请求,后端将这4万条请求分成10批次,每一批就是4000个请求,
  • 但是这10批次里面的第一批我们首先进行处理,第2~10批我们先不进行处理,把它们先放到 MQ 里面进行排队,这个就是削峰;
  • 将第一批的4000条请求,好好利用服务器的并行处理能力,给它进行并发处理,同一时间段内进行并发处理4000条请求的耗时可能也就几百毫秒,
  • 这样在 2~4 秒的时间段内,服务器就能完成这4万条请求的处理;
  • 在前端的用户感知来看,实际就是用户发送了一条弹幕,2~4秒后就可以在页面上看到自己所发送的弹幕了,体验感较好。

优化方向二(后端接收弹幕后,将弹幕持久化到数据库)

  • 后端接收前端传过来的弹幕后,将弹幕通过 MQ 进行异步持久化到数据库,并且采用 MQ 的目的是为了限流削峰,减轻数据库的压力;
  • 并且由于是异步操作,主线程是另外开了一条线程在进行持久化数据库的操作,这样子不会影响主线程的其他操作(例如同步保存弹幕到 Redis 里面)
  • 假设有 2 万条弹幕同时过来数据库,先将弹幕数据保存到 MQ 里面,这样子 MQ 可以每秒处理 2000 个请求,这样的速度保存到数据库中,不至于会使数据库崩溃,能够有效降低数据库的压力。

优化方向三(将弹幕数据写到redis,再次查询可以快速读取)

  • 在将弹幕数据保存到数据库中时,也要将弹幕数据同步保存到 redis(缓存)中,
  • 为了我们在下一次加载到视频详情页的时候,能够把我们当前或者当天的弹幕数据给快速查询出来;
  • 如果某个视频在今天保存了很大的弹幕数据量,如果每次都从数据库中进行查询的话,一是速度慢,二是可能会对数据库造成读取压力(如果有多个视频进行查询,其他视频可能需要排队查询);
  • 如果将今天生成的弹幕数据都保存到 redis 中,在下次进行页面刷新的时候,会调用一个弹幕数据查询的操作,就可以直接从 redis 里面进行读取,这样的速度是非常快的,因为它是从内存里面查询数据。
  • redis 单机最大处理量可以达到 10 ~ 50 万左右。

SpringBoot 整合 WebSocket

导入依赖

<!-- WebSocket依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

工具类

@Configuration
public class WebSocketConfig {/*** 用来发现WebSocket服务的** @return*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}

业务类

@Component
@ServerEndpoint("/imserver")
public class WebSocketService {private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);/*** 当前长连接的数量(在线人数的统计)* 也就是当前有多少客户端通过WebSocket连接到服务端*/private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);/*** 一个客户端 关联 一个WebSocketService* 是一个多例模式的,这里需要注意下*/private static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();/*** 服务端 和 客户端 进行通信的一个会话* 当我们有一个客户端进来了,然后保持连接成功了,那么我们就会保存一个跟这个客户端关联的session*/private Session session;/*** 唯一标识*/private String sessionId;/*** 打开连接** @param session* @OnOpen 连接成功后会自动调用该方法*/@OnOpenpublic void openConnection(Session session) {// 保存session相关信息到本地this.sessionId = session.getId();this.session = session;// 判断WEBSOCKET_MAP是否含有sessionId,有的话先删除再重新添加if (WEBSOCKET_MAP.containsKey(sessionId)) {WEBSOCKET_MAP.remove(sessionId);WEBSOCKET_MAP.put(sessionId, this);} else { // 没有的话就直接新增WEBSOCKET_MAP.put(sessionId, this);// 在线人数加一ONLINE_COUNT.getAndIncrement();}logger.info("用户连接成功:" + sessionId + ",当前在线人数为:" + ONLINE_COUNT.get());// 连接成功之后需要通知客户端,方便客户端进行后续操作try {this.sendMessage("0");} catch (Exception e) {logger.error("连接异常!");}}/*** 客户端刷新页面,或者关闭页面,服务端断开连接等等操作,都需要关闭连接*/@OnClosepublic void closeConnection() {if (WEBSOCKET_MAP.containsKey(sessionId)) {WEBSOCKET_MAP.remove(sessionId);// 在线人数减一ONLINE_COUNT.getAndDecrement();logger.info("用户退出:" + sessionId + ",当前在线人数为:" + ONLINE_COUNT.get());}}/*** 客户端发送消息给后端** @param message*/@OnMessagepublic void onMessage(String message) {}/*** 发生错误之后的处理** @param error*/@OnErrorpublic void onError(Throwable error) {}/*** 后端发送消息给客户端** @param message* @throws IOException*/private void sendMessage(String message) throws IOException {this.session.getBasicRemote().sendText(message);}}

多例模式下引发的Bean注入为null的问题

在启动类中将ApplicationContext传给WebSocketService中的*APPLICATION_CONTEXT*

@SpringBootApplication
@EnableTransactionManagement
public class ImoocBilibiliApplication {public static void main(String[] args) {ApplicationContext app = SpringApplication.run(ImoocBilibiliApplication.class, args);WebSocketService.setApplicationContext(app);}}

WebSocketService

  • @Autowired 在多例模式下是不会自动进行加载的,所以这里我们不能使用@Autowired进行注入;
  • 而我们的启动类生成的ApplicationContext,是可以通过getBean( )方法获取到Spring容器中所有Bean的;
     /*** 全局的上下文变量*/private static ApplicationContext APPLICATION_CONTEXT;/*** 通用的上下文环境变量的方法,每个WebSocketService都会共用同一个ApplicationContext** @param applicationContext*/public static void setApplicationContext(ApplicationContext applicationContext) {WebSocketService.APPLICATION_CONTEXT = applicationContext;}

弹幕系统实现

数据库表设计及相关实体类设计

弹幕记录表

业务层

WebSocketService.java

@Component
@ServerEndpoint("/imserver/{token}")
public class WebSocketService {private final Logger logger = LoggerFactory.getLogger(WebSocketService.class);/*** 当前长连接的数量(在线人数的统计)* 也就是当前有多少客户端通过WebSocket连接到服务端*/private static final AtomicInteger ONLINE_COUNT = new AtomicInteger(0);/*** 一个客户端 关联 一个WebSocketService* 是一个多例模式的,这里需要注意下*/private static final ConcurrentHashMap<String, WebSocketService> WEBSOCKET_MAP = new ConcurrentHashMap<>();/*** 服务端 和 客户端 进行通信的一个会话* 当我们有一个客户端进来了,然后保持连接成功了,那么我们就会保存一个跟这个客户端关联的session*/private Session session;/*** 唯一标识*/private String sessionId;private Long userId;/*** 全局的上下文变量*/private static ApplicationContext APPLICATION_CONTEXT;/*** 打开连接** @param session* @param token* @OnOpen 连接成功后会自动调用该方法* @PathParam("token") 获取 @ServerEndpoint("/imserver/{token}") 后面的参数*/@OnOpenpublic void openConnection(Session session, @PathParam("token") String token) {// 如果是游客观看视频,虽然有弹幕,但是没有用户信息,所以需要用trytry {this.userId = TokenUtil.verifyToken(token);} catch (Exception ignored) {}// 保存session相关信息到本地this.sessionId = session.getId();this.session = session;// 判断WEBSOCKET_MAP是否含有sessionId,有的话先删除再重新添加if (WEBSOCKET_MAP.containsKey(sessionId)) {WEBSOCKET_MAP.remove(sessionId);WEBSOCKET_MAP.put(sessionId, this);} else { // 没有的话就直接新增WEBSOCKET_MAP.put(sessionId, this);// 在线人数加一ONLINE_COUNT.getAndIncrement();}logger.info("用户连接成功:" + sessionId + ",当前在线人数为:" + ONLINE_COUNT.get());// 连接成功之后需要通知客户端,方便客户端进行后续操作try {this.sendMessage("0");} catch (Exception e) {logger.error("连接异常!");}}/*** 客户端发送消息给服务端** @param message*/@OnMessagepublic void onMessage(String message) {logger.info("用户信息:" + sessionId + ",报文:" + message);if (!StringUtils.isNullOrEmpty(message)) {try {// 群发消息(服务端拿到某一个客户端发来的消息,然后群发到所有与它连接的客户端)for (Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()) {// 获取每一个和服务端连接的客户端WebSocketService webSocketService = entry.getValue();// 判断会话是否还处于打开状态if (webSocketService.session.isOpen()) {webSocketService.sendMessage(message);}}if (this.userId != null) {// --------- 保存弹幕到数据库 ----------// 将message转换成Danmu实体类的数据Danmu danmu = JSONObject.parseObject(message, Danmu.class);danmu.setUserId(userId);danmu.setCreateTime(new Date());DanmuService danmuService = (DanmuService) APPLICATION_CONTEXT.getBean("danmuService");danmuService.addDanmu(danmu);// ----------- 保存弹幕到redis -----------danmuService.addDanmusToRedis(danmu);}} catch (Exception e) {logger.error("弹幕接收出现问题!");e.printStackTrace();}}}}

DanmuService.java

@Service
public class DanmuService {private static final String DANMU_KEY = "dm-video-";@Autowiredprivate DanmuDao danmuDao;@Autowiredprivate RedisTemplate<String, String> redisTemplate;/*** 添加弹幕** @param danmu*/public void addDanmu(Danmu danmu) {danmuDao.addDanmu(danmu);}/*** 查询弹幕** @param danmu*/@Asyncpublic void asyncAddDanmu(Danmu danmu) {danmuDao.addDanmu(danmu);}/*** 添加弹幕到redis* 下次加载页面时,可以快速从缓存中获取弹幕** @param danmu*/public void addDanmusToRedis(Danmu danmu) {String key = DANMU_KEY + danmu.getVideoId();String value = redisTemplate.opsForValue().get(key);List<Danmu> list = new ArrayList<>();if (!StringUtil.isNullOrEmpty(value)) {// 将从redis中查询到的数据转换成list集合list = JSONArray.parseArray(value, Danmu.class);}// 将新的弹幕添加到list中list.add(danmu);redisTemplate.opsForValue().set(key, JSONObject.toJSONString(list));}}

推送弹幕性能优化

WebSocketService.java

     /*** 客户端发送消息给服务端** @param message*/@OnMessagepublic void onMessage(String message) {logger.info("用户信息:" + sessionId + ",报文:" + message);if (!StringUtils.isNullOrEmpty(message)) {try {// 群发消息(服务端拿到某一个客户端发来的消息,然后群发到所有与它连接的客户端)for (Map.Entry<String, WebSocketService> entry : WEBSOCKET_MAP.entrySet()) {// 获取每一个和服务端连接的客户端WebSocketService webSocketService = entry.getValue();// 获取到弹幕生产者DefaultMQProducer danmusProducer = (DefaultMQProducer) APPLICATION_CONTEXT.getBean("danmusProducer");JSONObject jsonObject = new JSONObject();jsonObject.put("message", message);jsonObject.put("sessionId", webSocketService.getSessionId());Message msg = new Message(UserMomentsConstant.TOPIC_DANMUS, jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8));// 异步发送消息RocketMQUtil.asyncSendMsg(danmusProducer, msg);}

RocketMQConfig.java

@Configuration
public class RocketMQConfig {@Value("${rocketmq.name.server.address}")private String nameServerAddr;@Autowiredprivate RedisTemplate<String, String> redisTemplate;@Autowiredprivate UserFollowingService userFollowingService;/*** 弹幕生产者** @return* @throws Exception*/@Bean("danmusProducer")public DefaultMQProducer danmusProducer() throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_MOMENTS);// 设置NameServer的地址producer.setNamesrvAddr(nameServerAddr);// 启动Producer实例producer.start();return producer;}/*** 弹幕消费者** @return* @throws Exception*/@Bean("danmusConsumer")public DefaultMQPushConsumer danmusConsumer() throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_DANMUS);// 设置NameServer的地址consumer.setNamesrvAddr(nameServerAddr);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe(UserMomentsConstant.TOPIC_DANMUS, "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt msg = msgs.get(0);if (msg == null) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}String bodyStr = new String(msg.getBody());JSONObject jsonObject = JSONObject.parseObject(bodyStr);String sessionId = jsonObject.getString("sessionId");String message = jsonObject.getString("message");// 根据sessionId获取对应的webSocketServiceWebSocketService webSocketService = WebSocketService.WEBSOCKET_MAP.get(sessionId);// 判断会话是否还处于打开状态if (webSocketService.getSession().isOpen()) {try {// 服务器发送消息给客户端webSocketService.sendMessage(message);} catch (Exception e) {e.printStackTrace();}}// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();return consumer;}
}

弹幕消息异步存储优化

优化一

使用SpringBoot的 @Async 注解进行异步保存弹幕

  • DanmuService.java
     /*** 异步保存弹幕** @param danmu* @Async 标识该方法调用的时候是使用异步的方式*/@Asyncpublic void asyncAddDanmu(Danmu danmu) {danmuDao.addDanmu(danmu);}
  • WebSocketService.java

优化二

使用 MQ 进行削峰操作

  • RocketMQConfig.java
     /*** 异步保存弹幕生产者** @return* @throws Exception*/@Bean("asyncadddanmusProducer")public DefaultMQProducer asyncAddDanmusProducer() throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer(UserMomentsConstant.GROUP_ASYNCADDDANMUS);// 设置NameServer的地址producer.setNamesrvAddr(nameServerAddr);// 启动Producer实例producer.start();return producer;}/*** 异步保存弹幕消费者** @return* @throws Exception*/@Bean("asyncadddanmusConsumer")public DefaultMQPushConsumer asyncAddDanmusConsumer() throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(UserMomentsConstant.GROUP_ASYNCADDDANMUS);// 设置NameServer的地址consumer.setNamesrvAddr(nameServerAddr);// 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息consumer.subscribe(UserMomentsConstant.TOPIC_ASYNCADDDANMUS, "*");// 注册回调实现类来处理从broker拉取回来的消息consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {MessageExt msg = msgs.get(0);if (msg == null) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}String bodyStr = new String(msg.getBody());// 将接收到消息转换成DanMu实体类Danmu danmu = JSONObject.toJavaObject(JSONObject.parseObject(bodyStr), Danmu.class);// 异步保存弹幕danmuService.asyncAddDanmu(danmu);// 标记该消息已经被成功消费return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者实例consumer.start();return consumer;}
  • WebSocketService.java

在线人数统计

设置一个定时任务,每5秒群发一个消息告诉客户端,关于该视频的当前在线人数

     /*** 定时任务,每5秒群发一次消息到与服务器相连的所有客户端** @throws IOException* @Scheduled(fixedRate = 5000) 标识该方法是一个定时任务,并且每隔5秒执行该方法*/@Scheduled(fixedRate = 5000)private void noticeOnlineCount() throws IOException {for (Map.Entry<String, WebSocketService> entry : WebSocketService.WEBSOCKET_MAP.entrySet()) {WebSocketService webSocketService = entry.getValue();if (webSocketService.session.isOpen()) {JSONObject jsonObject = new JSONObject();jsonObject.put("onlineCount", ONLINE_COUNT.get());jsonObject.put("msg", "当前在线人数为" + ONLINE_COUNT.get());// 服务端发送消息给客户端webSocketService.sendMessage(jsonObject.toJSONString());}}}

弹幕查询功能实现

DanmuApi.java

@RestController
public class DanmuApi {@Autowiredprivate DanmuService danmuService;@Autowiredprivate UserSupport userSupport;/*** 查询弹幕* 在游客模式下,是没有办法进行弹幕时间段筛选的* 用户进行登录之后,就可以指定时间段进行弹幕查询** @param videoId   视频id* @param startTime 开始时间* @param endTime   结束时间* @return* @throws Exception*/@GetMapping("/danmus")public JsonResponse<List<Danmu>> getDanmus(@RequestParam Long videoId, String startTime, String endTime) throws Exception {List<Danmu> list;try {// 判断当前是游客模式还是用户登录模式userSupport.getCurrentUserId();// 若是用户登录模式,则允许用户进行时间段筛选list = danmuService.getDanmus(videoId, startTime, endTime);} catch (Exception ignored) {// 若为游客模式,则不允许用户进行时间段筛选list = danmuService.getDanmus(videoId, null, null);}return new JsonResponse<>(list);}}

第三章 打造高性能的视频弹幕系统相关推荐

  1. 开源倾情奉献:基于.NET打造IP智能网络视频监控系统(五)客户端介绍

    开源倾情奉献系列链接 开源倾情奉献:基于.NET打造IP智能网络视频监控系统(一)开放源代码 开源倾情奉献:基于.NET打造IP智能网络视频监控系统(二)基础类库介绍 开源倾情奉献:基于.NET打造I ...

  2. 开源倾情奉献:基于.NET打造IP智能网络视频监控系统(四)服务端介绍

    本文为 Dennis Gao 原创技术文章,发表于博客园博客,未经作者本人允许禁止任何形式的转载. 开源倾情奉献系列链接 开源倾情奉献:基于.NET打造IP智能网络视频监控系统(一)开放源代码 开源倾 ...

  3. 开源倾情奉献:基于.NET打造IP智能网络视频监控系统(一)开放源代码

    本文为 Dennis Gao 原创技术文章,发表于博客园博客,未经作者本人允许禁止任何形式的转载. 开源倾情奉献系列链接 开源倾情奉献:基于.NET打造IP智能网络视频监控系统(一)开放源代码 开源倾 ...

  4. 开源倾情奉献:基于.NET打造IP智能网络视频监控系统

    转载自 http://www.cnblogs.com/gaochundong/p/opensource_ip_video_surveillance_system_part_1_introduction ...

  5. 监控录像服务器性能要求,如何打造高性能的视频监控存储系统?(《中国安防》)...

    编者按:数字化视频监控涉及到海量录像采集.存储和是使用分析的过程.随着视频监控的不断发展,视频数字化.传输网络化.系统集成化.管理职能化,传统的视频安防系统逐步向IT化靠近,在这必然的市场趋势中,作为 ...

  6. 一篇文章让你了解视频监控系统搭建过程中如何选择存储方式

    作为安防视频流媒体服务器软件的提供商,我们日常项目中遇到的需求主要是搭建一整套完整的视频监控系统,包括前端摄像头.线缆.传输系统.存储系统.解码拼控和大屏设备等组成.存储系统是整个监控系统中最为重要的 ...

  7. 【解决方案】SkeyeVSS视频云打造智慧景区视频监控系统促进智慧旅游产业发展

    与传统景区的管理模式不同,智慧景区高度依赖智慧化手段,借用视频监控系统实现传统旅游管理方式向现代管理方式转变,提高景区的综合管理和运营能力,提升旅游服务品质,从而保障游客的人身安全和财产安全,提升景区 ...

  8. 【解决方案】SkeyeVSS综合安防视频云服务打造智慧景区视频监控系统促进智慧旅游产业发展

    与传统景区的管理模式不同,智慧景区高度依赖智慧化手段,借用视频监控系统实现传统旅游管理方式向现代管理方式转变,提高景区的综合管理和运营能力,提升旅游服务品质,从而保障游客的人身安全和财产安全,提升景区 ...

  9. [云炬创业管理笔记]第三章打造优秀创业团队讨论4

最新文章

  1. Blender 2.42
  2. django html文本编辑器,django xadmin 集成DjangoUeditor富文本编辑器
  3. 降级安装_如何升级iOS13测试版,还有降级
  4. [css] sass是怎么定义变量的?
  5. 大数据_Flink_数据处理_运行时架构3_yarn上作业提交流程---Flink工作笔记0018
  6. python中函数的参数传递
  7. 深度学习教程 | 吴恩达专项课程 · 全套笔记解读
  8. 【隐形的翅膀】基于钉钉工作流的人事评价信息采集案例(2):钉钉智能表单、OA审批、自动任务功能对比
  9. 用python如何制作表格_Python中如何用xlwt制作表格
  10. My SQL 排序和分组
  11. 分享蔡澜老师的自问自答
  12. 聊聊我的故事 | 我丰富的十二年···
  13. Oracle 使用序列创建自增字段
  14. Microsoft Office 全家桶下载地址
  15. “永不放弃”成就了再结晶宝石
  16. 拿到阿里,网易游戏,腾讯,smartx的offer的过程 (转)
  17. IllegalStateException: Failure saving state: active Fragment has cleared
  18. 【Rhapsody学习笔记(二)】Linux环境下的Rhapsody集成引擎安装部署
  19. 通达信股票接口盘后数据下载流程是怎么样的?
  20. 如何预防服务器数据丢失及丢失处理

热门文章

  1. 软件性能测试参数化数据准备,性能测试之如何准备测试数据
  2. mac下用户用户组命令行操作
  3. DCS是分布式控制系统的英文缩写(Distributed Control System)
  4. iFrame的sandbox配置
  5. SMP与AMP体系结构
  6. JAVA五子棋人人对战的实现
  7. HDU 5454 Excited Database【线段树】
  8. 招行股东会通过收购永隆银行议案
  9. 【CodeForces】Educational Codeforces Round 118 (Rated for Div. 2)【A-C】
  10. 凤凰涅槃,浴火重生(2013年总结)