千万级WebSocket消息推送服务技术分析
拉模式和推模式区别
拉模式(定时轮询访问接口获取数据)
- 数据更新频率低,则大多数的数据请求时无效的
- 在线用户数量多,则服务端的查询负载很高
- 定时轮询拉取,无法满足时效性要求
推模式(向客户端进行数据的推送)
- 仅在数据更新时,才有推送
- 需要维护大量的在线长连接
- 数据更新后,可以立即推送
基于WebSocket协议做推送
- 浏览器支持的socket编程,轻松维持服务端的长连接
- 基于TCP协议之上的高层协议,无需开发者关心通讯细节
- 提供了高度抽象的编程接口,业务开发成本较低
WebSocket协议的交互流程
客户端首先发起一个Http请求到服务端,请求的特殊之处,在于在请求里面带了一个upgrade的字段,告诉服务端,我想生成一个websocket的协议,服务端收到请求后,会给客户端一个握手的确认,返回一个switching, 意思允许客户端向websocket协议转换,完成这个协商之后,客户端与服务端之间的底层TCP协议是没有中断的,接下来,客户端可以向服务端发起一个基于websocket协议的消息,服务端也可以主动向客户端发起websocket协议的消息,websocket协议里面通讯的单位就叫message。
传输协议原理
- 协议升级后,继续复用Http协议的底层socket完成后续通讯
- message底层会被切分成多个frame帧进行传输,从协议层面不能传输一个大包,只能切成一个个小包传输
- 编程时,只需操作message,无需关心frame(属于协议和类库自身去操作的)
- 框架底层完成TCP网络I/O,WebSocket协议的解析,开发者无需关心
服务端技术选型与考虑
NodeJs
- 单线程模型(尽管可以多进程),推送性能有限
C/C++
- TCP通讯、WebSocket协议实现成本高
Go
- 多线程,基于协程模型并发
- Go语言属于编译型语言,运行速度并不慢
- 成熟的WebSocket标准库,无需造轮子
基于Go实现WebSocket服务端
用Go语言对WebSocket做一个简单的服务端实现,以及HTML页面进行调试,并对WebSocket封装,这里就直接给出代码了。
WebSocket服务端
package mainimport ("net/http""github.com/gorilla/websocket""github.com/myproject/gowebsocket/impl""time")
var(upgrader = websocket.Upgrader{// 允许跨域CheckOrigin:func(r *http.Request) bool{return true},}
)func wsHandler(w http.ResponseWriter , r *http.Request){// w.Write([]byte("hello"))var(wsConn *websocket.Connerr errorconn *impl.Connectiondata []byte)// 完成ws协议的握手操作// Upgrade:websocketif wsConn , err = upgrader.Upgrade(w,r,nil); err != nil{return }if conn , err = impl.InitConnection(wsConn); err != nil{goto ERR}// 启动线程,不断发消息go func(){var (err error)for{if err = conn.WriteMessage([]byte("heartbeat"));err != nil{return }time.Sleep(1*time.Second)}}()for {if data , err = conn.ReadMessage();err != nil{goto ERR}if err = conn.WriteMessage(data);err !=nil{goto ERR}}ERR:conn.Close()}func main(){http.HandleFunc("/ws",wsHandler)http.ListenAndServe("0.0.0.0:7777",nil)
}
前端页面
<!DOCTYPE html>
<html>
<head><title>go websocket</title><meta charset="utf-8" />
</head>
<body><script type="text/javascript">var wsUri ="ws://127.0.0.1:7777/ws"; var output; function init() { output = document.getElementById("output"); testWebSocket(); } function testWebSocket() { websocket = new WebSocket(wsUri); websocket.onopen = function(evt) { onOpen(evt) }; websocket.onclose = function(evt) { onClose(evt) }; websocket.onmessage = function(evt) { onMessage(evt) }; websocket.onerror = function(evt) { onError(evt) }; } function onOpen(evt) { writeToScreen("CONNECTED"); // doSend("WebSocket rocks"); } function onClose(evt) { writeToScreen("DISCONNECTED"); } function onMessage(evt) { writeToScreen('<span style="color: blue;">RESPONSE: '+ evt.data+'</span>'); // websocket.close(); } function onError(evt) { writeToScreen('<span style="color: red;">ERROR:</span> '+ evt.data); } function doSend(message) { writeToScreen("SENT: " + message); websocket.send(message); } function writeToScreen(message) { var pre = document.createElement("p"); pre.style.wordWrap = "break-word"; pre.innerHTML = message; output.appendChild(pre); } window.addEventListener("load", init, false); function sendBtnClick(){var msg = document.getElementById("input").value;doSend(msg);document.getElementById("input").value = '';}function closeBtnClick(){websocket.close(); }</script><h2>WebSocket Test</h2> <input type="text" id="input"></input><button onclick="sendBtnClick()" >send</button><button onclick="closeBtnClick()" >close</button><div id="output"></div> </body>
</html>
封装WebSocket
package implimport ("github.com/gorilla/websocket""sync""errors")type Connection struct{wsConnect *websocket.ConninChan chan []byteoutChan chan []bytecloseChan chan bytemutex sync.Mutex // 对closeChan关闭上锁isClosed bool // 防止closeChan被关闭多次
}func InitConnection(wsConn *websocket.Conn)(conn *Connection ,err error){conn = &Connection{wsConnect:wsConn,inChan: make(chan []byte,1000),outChan: make(chan []byte,1000),closeChan: make(chan byte,1),}// 启动读协程go conn.readLoop();// 启动写协程go conn.writeLoop();return
}func (conn *Connection)ReadMessage()(data []byte , err error){select{case data = <- conn.inChan:case <- conn.closeChan:err = errors.New("connection is closeed")}return
}func (conn *Connection)WriteMessage(data []byte)(err error){select{case conn.outChan <- data:case <- conn.closeChan:err = errors.New("connection is closeed")}return
}func (conn *Connection)Close(){// 线程安全,可多次调用conn.wsConnect.Close()// 利用标记,让closeChan只关闭一次conn.mutex.Lock()if !conn.isClosed {close(conn.closeChan)conn.isClosed = true }conn.mutex.Unlock()
}// 内部实现
func (conn *Connection)readLoop(){var(data []byteerr error)for{if _, data , err = conn.wsConnect.ReadMessage(); err != nil{goto ERR}
//阻塞在这里,等待inChan有空闲位置select{case conn.inChan <- data:case <- conn.closeChan: // closeChan 感知 conn断开goto ERR}}ERR:conn.Close()
}func (conn *Connection)writeLoop(){var(data []byteerr error)for{select{case data= <- conn.outChan:case <- conn.closeChan:goto ERR}if err = conn.wsConnect.WriteMessage(websocket.TextMessage , data); err != nil{goto ERR}}ERR:conn.Close()}
千万级弹幕系统的架构设计
技术难点
内核瓶颈
- 推送量大:100W在线 * 10条/每秒 = 1000W条/秒
- 内核瓶颈:linux内核发送TCP的极限包频 ≈ 100W/秒
锁瓶颈
- 需要维护在线用户集合(100W用户在线),通常是一个字典结构
- 推送消息即遍历整个集合,顺序发送消息,耗时极长
- 推送期间,客户端仍旧正常的上下线,集合面临不停的修改,修改需要遍历,所以集合需要上锁
CPU瓶颈
- 浏览器与服务端之间一般采用的是JSon格式去通讯
- Json编码非常耗费CPU资源
- 向100W在线推送一次,则需100W次Json Encode
优化方案
内核瓶颈
- 减少网络小包的发送,我们将网络上几百字节定义成网络的小包了,小包的问题是对内核和网络的中间设备造成处理的压力。方案是将一秒内N条消息合并成1条消息,合并后,每秒推送数等于在线连接数。
锁瓶颈
- 大锁拆小锁,将长连接打散到多个集合中去,每个集合都有自己的锁,多线程并发推送集合,线程之间推送的集合不同,所以没有锁的竞争关系,避免锁竞争。
- 读写锁取代互斥锁,多个推送任务可以并发遍历相同集合
CPU瓶颈
- 减少重复计算,Json编码前置,1次消息编码+100W次推送,消息合并前置,N条消息合并后,只需要编码一次。
单机架构
最外层是在线的长连接,连接到服务端后,打散到多个集合里面存储,我们要发送的消息呢,通过打包后,经过json编码,被多个线程或协程分发到多个集合中去,最终推给了所有的在线连接。
单机瓶颈
- 维护海量长连接,会花费不少内存
- 消息推送的瞬时,消耗大量的CPU
- 消息推送的瞬时带宽高达400-600Mb(4-6Gbits),需要用到万兆网卡,是主要瓶颈
集群
部署多个节点,通过负载均衡,把连接打散到多个 服务器上,但推送消息的时候,不知道哪个直播间在哪个节点上,最常用的方式是将消息广播给所有的网关节点,此时就需要做一个逻辑集群。
逻辑集群
- 基于Http2协议向gateway集群分发消息(Http2支持连接复用,用作RPC性能更佳,即在单个连接上可以做高吞吐的请求应答处理)
- 基于Http1协议对外提供推送API(Http1更加普及,对业务方更加友好)
整体分布式架构图如下:
任何业务方通过Http接口调用到逻辑集群,逻辑集群把消息广播给所有网关,各个网关各自将消息推送给在线的连接即可。
本文讲解了开发消息推送服务的难点与解决方案的大体思路,按照整个理论流程下来,基本能实现一套弹幕消息推送的服务。
千万级WebSocket消息推送服务技术分析相关推荐
- Worktile 中百万级实时消息推送服务的实现
Worktile 中百万级实时消息推送服务的实现 转自:http://www.360doc.com/content/15/0907/19/1073512_497529854.shtml 这是一个创建于 ...
- Worktile中百万级实时消息推送服务的实现
Worktile中百万级实时消息推送服务的实现 出自:http://blog.jobbole.com/81125/ 转载于:https://www.cnblogs.com/ribavnu/p/4531 ...
- SSM项目使用GoEasy 实现web消息推送服务
一.背景 之前项目需要做一个推送功能,最开始我用websocket实现我的功能.使用websocket的好处是免费自主开发,但是有几个问题:1)浏览器的兼容问题,尤其是低版本的ie:2)因为是推送 ...
- 消息推送技术干货:美团实时消息推送服务的技术演进之路
本文由美团技术团队分享,作者"健午.佳猛.陆凯.冯江",原题"美团终端消息投递服务Pike的演进之路",有修订. 1.引言 传统意义上来说,实时消息推送通常都是 ...
- 百亿级实时消息推送的实战之道,与王者荣耀一班车就是这么稳!
要说现在市面上最火爆的手游,莫非拥有两亿注册用户的王者荣耀了.据悉,王者荣耀的渗透率高达22.3%,这意味着每7个中国人中就有一位是王者荣耀注册用户.众所周知,手游App对推送实时性和精准性要求非常高 ...
- Worktile中的实时消息推送服务实现
在团队协同工具worktile的使用过程中,你会发现无论是右上角的消息通知,还是在任务面板中拖动任务,还有用户的在线状态,都是实时刷新.Worktile中的推送服务是采用的是基于xmpp协议.erla ...
- WebSocket消息推送和聊天功能实现
WebSocket消息推送 SpringBoot集成WebSocket实现消息推送和聊天Demo gradle引入依赖 测试用的Controller 两个测试页面 WebSocket的Endpoint ...
- EasyGBS国标平台新增WebSocket消息推送,可快速定位视频播放故障
WebSocket是建立在TCP之上的一种双向通信协议,它能实现浏览器与服务器全双工通信,在性能上具有较强的优势.尤其是在海量并发及客户端与服务器交互负载流量大的情况下,WebSocket可以极大节省 ...
- java连接imserver_java后端IM消息推送服务开发——协议
最近在一家saas企业使用Mqtt开发IM消息推送服务,把开发中的一些问题记录下来,项目仍在商用中,完整的消息服务包括4个模块---协议protocol,信令Signal,规则Rule,状态Statu ...
最新文章
- RxSwift技术路线与参考资料
- ASP.NET缓存全解析4:应用程序数据缓存(转)
- x265-bitstream.cpp
- 程序员面试强行用代码画画被骂,20分钟后面试官闭嘴了
- 机器视觉:CMOS图像传感器
- 连接mysql_spring boot连接mysql提示The server time zone value xxx错误
- 解决git冲突造成的Please move or remove them before you can merge
- 技术解读:Dragonfly 基于 P2P 的智能镜像加速系统 | 龙蜥技术
- kafka 中如何保证数据消息不丢失
- Python 与 SQL 这样超强结合,处理数据才是爆赞
- 微信开发之data:image/png;base64,
- python学习笔记十-文件操作
- git push和 git pull的使用
- 气象历史数据和空气质量历史数据资源汇总免费
- 关于“无穷”的概念---数学笔记“无穷”
- 快速调出multisim里单刀双置开关
- 经典问题:数据有误,一定要重传吗?
- php写彩票中奖代码,php简单中奖算法(实例)
- 中国移动打造“移动信息专家”
- ADB 命令结合 monkey 的简单使用,超详细