01

背 景

B/S架构下很多业务场景下我们需要服务端主动推送消息到客户端,在html5之前一般使用长轮询(除此之外还有iframe流或者Flash Socket)的方式来实现,而长轮询的方式缺点很明显,频繁交互的情况下,大量的连接被建立和释放,并且交互频率受限于两次http的请求间隔,html5开始可以使用websocket全双工的通信协议,在tomcat和jetty都有实现。

虽然在java1.4以后可以用nio包实现非阻塞的websocket通信,但是使用起来太过底层和复杂;netty封装了nio,使用了大量异步和事件驱动,封装了拆包粘包,实现了网络编程和业务分离,但即便如此,使用和优化netty还是需要深厚的网络编程知识,况且我们需要的仅仅是用户与用户之间的文本消息的推送,这里我们介绍一个基于netty实现的websocket服务端框架,socket.io屏蔽了使用netty的细节,针对聊天场景做了高度封装,并且提供了包含WEBSOCKET、POLLING等多种推送方式的支持,同时支持namespace(命名空间)、broadcast(广播)、room(房间/聊天室)、event(业务事件)、应用层ack机制。

02

需 求

构建一个统一的文本消息推送平台,可以在此之上构建业务系统,比如:客服系统、内部聊天工具、站内信、事件系统等。基础平台要做到用户管理、状态管理、文本消息发送/推送(群/点对点)、图片/文件推送、未读消息管理、历史消息、会话管理。

03

业务架构

restful:通用接口http服务层、提供鉴权、发送消息、历史消息、离线消息、会话等相关接口

logic:业务逻辑层、消息/会话持久化、用户状态、连接状态存储、离线消息

router:路由层、消息路由、把消息推送到用户连接的connector

connector:连接层、提供消息推送服务、维护房间、ack

消息的上行是走的restful,在这层可以做uniauth(内部的统一登录平台)权限验证、负载均衡等,也让connector更轻量级,依赖更少;connector是有状态的,保持着用户的连接信息,同时要记录用户和connector的映射关系,以用于router层在推送消息时路由到对应的节点,connector使用单独的物理机来运行,因为虚拟主机并不能稳定保证较高并发的长连接数量。

04

轻量级的消息发送

一个消息推送平台最重要的是高可用性和最终一致性(消息可达),所以消息发送可以做的非常轻量,发送消息时会带上客户端生成的唯一消息id,同时客户端持久化,服务端只需要把消息写到mq即发送成功,然后logic消费mq再做异步批量的消息持久化,从可用性角度来说,发送消息并不依赖websocket,而是通过restful服务,更加高可用和可扩展。消息发送的可靠性,没有依赖于rabbitmq的事务或者publisher confirms,后面介绍专门的机制来保证。

05

点对点/群消息

消息我们定义了三种事件类型,MessageCategoryEnum

MSG:文本消息,包含图片、文件、语音、文字等子类型。

IQ:操作消息,客户端用来做某些特殊动作处理。

STATUS:状态消息,和用户上下线等状态有关。

消息体如下

对于点对点消息来说,发送者和接受者都是一个用户id,对于群消息来说,接受者可能是一个SESSION(会话),会话需要提前创建,会话中包含若干的用户,使用socket.io的room功能,很方便的实现一个会话,room就是SESSION的id。

1. 用户上下线的时候都要主动加入/离开和自己有关的所有room

com.corundumstudio.socketio.transport.NamespaceClient.joinRoom(String room)

2. 点对点消息推送就调用 com.corundumstudio.socketio.transport.NamespaceClient.sendEvent(String 事件类型, AckCallback<?> ack, Object... 消息(SocketIOMessage))

3. 群消息推送就调用

com.corundumstudio.socketio.BroadcastOperations.sendEvent(String 事件类型, Object 消息(SocketIOMessage ), BroadcastAckCallback<T> ack)

06

消息可达保证(QoS)

使用六报文的方式保证消息可达,图中演示的是三报文的消息发送阶段。

1.client-A向im-server发送一个消息请求包,即msg:R;

2.im-server在成功处理后,回复client-A一个消息响应包,即msg:A;

3.如果此时client-B在线,则im-server主动向client-B推送一个消息通知包(不在线做离线存储),即msg:N;

4.client-B向im-server发送一个ack请求包表示msg已经收到,即ack:R;

5.im-server在成功处理后,回复client-B一个ack响应包,即ack:A;

6.im-server主动向client-A发送一个ack通知包,即ack:N;

7.client-A发出消息后,超过设定的某个时间没有收到ack:N,那么我们认为消息没有推送成功(用户离线除外,用户离线,服务端会模拟一个ack:N),需要客户端重发,服务端根据客户端id幂等处理;

在我们系统,msg:N和ack:N是connector推送,其他报文就是走http的消息发送和ack;其实ack:N可以不需要,服务器可以保存未读消息列表,接受ack:R来判断是否要重发,但是这样可靠性稍差一些,极端情况会造成消息丢失。

07

消息顺序性

首先我们要确定以什么样的逻辑排列消息的顺序,两个人聊天,首先要保证的是同一个人消息的先后顺序,其次要保证会话内消息的顺序。服务端不能以持久化时生成自增id作为顺序,因为异步消费不能保证顺序(某些mq,如kafka以用户为key可以保证消息顺序性,但也存在重试问题)。从客户端的角度来说,可以通过同步发送保证消息的顺序性,所以服务端可以在发mq之前通过redis生成会话内自增id,推送的时候可能会存在重发和乱序,就需要客户端根据服务端生成的id做幂等和重排序。

如果不依赖于redis,可以使用服务器时间(精确到秒)+客户端生成的自增id作为排序字段,但这样就要求集群的时间同步在一秒内。

08

未读(离线)消息

关于未读消息的存储有两种方案:写扩散和读扩散,写扩散即未读消息在持久化时就针对每个用户保存一份,读扩散就是利用群消息的偏序特性,只保存用户在会话内ack的最后一条消息id,无论是哪种方式,未读消息都只是保存消息的id,未读消息的推送仅在上线或者连接的时候。

写扩散:

session_user(session_id, user_id);//会话-用户

messages(msgid,session_id,sender_user_id,time,content);//消息

user_messages(user_id, msgid, session_id);//用户-未读消息

1. 消息持久化后,如果接受用户不在线,那么在user_messages为每个用户新增一条记录;

2. 用户ack后,删除t_user_messages记录;

3. 用户查未读消息,先查user_messages,再查messages。

读扩散:

session_user(session_id, user_id, last_ack_msgid);//会话-用户

messages(msgid,session_id,sender_user_id,time,content);//消息

user_messages(user_id, msgid, session_id);//用户-未读消息

1.消息持久化,推送在线用户;

2.用户ack,更新last_ack_msgid;

3.用户查未读消息,先查last_ack_msgid,再查messages。

优化点:

1. 未读消息user_messages表或者last_ack_msgid字段的更新很频繁,可以用redis来实现存储;

2. ack的频率很高,可以让客户端批量或在一个时间段内ack,减少请求,即使消息重发,也有客户端去重;

3. 未读消息可能存在消息量很大的情况,可以设置过期时间,过期后即表示该消息已读;大量未读消息不适合做推送,最好通过http分页拉取,甚至可以把拉取请求和ack请求合并。

09

调 优

1. Jdk nio会判断Linux kernels >= 2.6,是则使用level-triggered epoll,否则使用select/poll,而netty实现了edge-triggered epoll,在高并发场景下性能略优,也是netty推荐的模式,设置socket.io的useLinuxNativeEpoll=true,使用linux epoll, NioEventLoopGroup 替换成EpollEventLoopGroup,NioServerSocketChannel 替换成 EpollServerSocketChannel;

2. 要建立大量连接需要修改最大文件句柄数,修改sysctl.conf的fs.file-max = 1000000,同时修改limits.conf

soft  nofile  1000000

hard  nofile  1000000

3. 设置TCP_NODELAY=true禁用nagle算法;

4. 设置全连接accept队列长度,SO_BACKLOG=65535,同时设置内核参数net.core.somaxconn=65535

5. 根据消息的平均大小,调整tcp socket读写缓冲区的默认值net.ipv4.tcp_wmem=4096 4096 4161536 net.ipv4.tcp_rmem=4096 4096 4161536

6. 调整net.ipv4.tcp_mem=786432 2097152 3145728,当连接量较大时,单个连接占用的堆外内存接近于4096+4096=8k,如果有50w连接,需要至少4g内存,那么tcp_mem最大值要大于4g;

7. 上行数据使用AdaptiveRecvByteBufAllocator动态分配ByteBuf;

8. 业务逻辑不要占用EventLoop线程,启用业务线程池处理,否则可能阻塞I/O;

9. Tcp层的心跳只能检测连接,不能确定应用可用,所以有了应用层心跳, socket.io默认25s,可以调整为180s,频率过高会占用大量带宽和流量,过低可能会导致连接断开。同时关闭tcp心跳保活SO_KEEPALIVE=false;

10. Netty的boss threads设置为1(1个监听端口),worker设置为16(cpu*2)。

10

Socket.io

1.服务端

netty-socketio是java版的服务端实现,目前只支持xhr-polling和websocket 两种transport。

NamespacesHub是持有所有命名空间的一个map,value是Namespace。

Namespace持有当前命名空间下的所有client连接room和client的对应关系。

NamespaceClient代表当前命名空间下的一个client连接。

ClientHead代表一个client连接,持有会话id、TransportState以及子NamespaceClient(一个Channel可以对应多个命名空间)。

TransportState持有一个Packet的无锁队列,以及一个netty的通道Channel,通过此向客户端推送消息。

Packet就是一个socket.io协议下的数据包。

socket.io为我们封装了支持websocket协议的ChannelHandler,其中就包括最重要的InPacketHandler,InPacketHandler提供了协议解码,用来处理非OPEN,UPGRADE的PacketType事件:CONNECT(连接)、PING(应用层心跳)、UPGRADE(升级websocket)、CLOSE(连接关闭)、MESSAGE(业务消息),socket.io使用自定义协议Packet,并使用json序列化的方式传输。

2.客户端

2.1 连接

var socket = io.connect('http://127.0.0.1:9092/namespace');//发起客户端连接,客户端sdk发起connect的时候会发出下面的若干请求

https://crm-connector-dev.dianrong.com/socket.io/?EIO=3&transport=polling&t=MFJ3jBa

Response:{"sid":"bbfd934a-9665-4138-9593-f23fe2836215","upgrades":["websocket"],"pingInterval":180000,"pingTimeout":60000}

会生成一个sid表示本次连接会话,upgrades:websocket表示客户端可以把协议升级为websocket而不再使用轮询,而客户端也会判断当前浏览器是否支持websocket来决定是否升级

https://crm-connector-dev.dianrong.com/socket.io/?EIO=3&transport=polling&t=MFJ3jD0&sid=bbfd934a-9665-4138-9593-f23fe2836215

Response:[{"code":"0","message":"success"}]

客户端发送了一个业务上的注册事件给服务端,并接受了一个成功的返回,在websocket没有连接成功前,会一直使用长轮询方式接受服务端推送

wss://crm-connector-dev.dianrong.com/socket.io/?EIO=3&transport=websocket&sid=bbfd934a-9665-4138-9593-f23fe2836215

建立websocket长连接

2.2 监听

监听一个msg业务事件

socket.on(‘msg’, function(data) {

//do something

});

监听断开连接事件

socket.on('disconnect', function() {

//do something

});

3.Socket.io websocket协议包

每一个Frame数据帧都以数字开头,三位数字分别表示command(命令):messageType(消息类型):req_id(自增id)

第一行 2probe 表示首次ping

第二行 3probe 表示首次pong

第三行 5 表示UPGRADE,升级为websocket

第四行 421[“iq...] 表示iq事件消息

如何构建一个消息推送平台相关推荐

  1. 基于SpringBoot、RabbitMQ的Android消息推送平台搭建

    消息推送,类似于微信来新消息时出现在通知栏那种情景.很多APP都有这个功能.现在有很多第三方平台可以实现这个需要,但是有的公司对所要推送的消息保密要求比较高,不希望被第三方看到,可以使用此种方式进行消 ...

  2. 如何构建一套高可用的 APP 消息推送平台

    转载自  如何构建一套高可用的 APP 消息推送平台 消息推送作为移动 APP 运营中的一项关键技术,已经被越来越广泛的运用.本文追溯了推送技术的发展历史,剖析了其核心原理,并对推送服务的关键技术进行 ...

  3. 如何构建一套高可用的移动消息推送平台?

    消息推送作为移动 APP 运营中的一项关键技术,已经被越来越广泛的运用.本文追溯了推送技术的发展历史,剖析了其核心原理,并对推送服务的关键技术进行深入剖析,围绕消息推送时产生的服务不稳定性,消息丢失. ...

  4. 干货 | Reactive模式在Trip.com消息推送平台上的实践

    作者简介 KevinTen,携程后端开发工程师,关注Reactive和RPC领域,深度参与开源社区,对Reactive技术有浓厚兴趣. Pin,携程技术专家,Apache Dubbo贡献者,关注RPC ...

  5. 消息推送平台高可用实践(下)

    消息推送平台高可用实践(下) 消息推送平台现已为几十个产品提供推送服务,同时在线用户连接数超过300w,日收发消息量达几千万,对消息的实时性和可靠性均提出了较高的要求.上篇 从架构设计和部署方案角度介 ...

  6. 【Java开源项目】消息推送平台 日志引入

    大家好,我是3y.在正文之前,先给各位粉丝汇报下austin消息推送平台项目进度: 总的来说,我感觉这次的反响是不错的,虽然阅读量不高.但留言的人多了很多,也有很多人都担心我会不会鸽掉(更新一半中途就 ...

  7. Android消息推送:第三方消息推送平台详细解析

    前言 消息推送在Android开发中应用的场景是越来越多了,比如说电商产品进行活动宣传.资讯类产品进行新闻推送等等,如下图: 本文将介绍Android中实现消息推送的第三方推送的详细解决方案 阅读本文 ...

  8. 京东大规模消息推送平台搭建实践

    背景 每个app或者业务都有将信息推送到用户客户端的需求.作为中台的推送平台,需要为公司内部许多个不同app同时提供可用,稳定的推送服务,因此我们消息推送平台应运而生. 推送平台架构 名词解释: dt ...

  9. 深入解析消息推送平台的设计原理,百万门店同时推送消息是如何实现的?

    简介 现有项目中存在需要针对百万门店同时推送消息的需求,需要设计一个消息推送中心的系统进行专门的消息推送 需求 对百万门店进行消息推送 支持坐席侧websocket实时推送消息通知客服需要注意的事项 ...

最新文章

  1. 视频编解码之理论概述 和即时通信
  2. time.h 详细介绍
  3. 守卫者的挑战(guard)
  4. 2018集训队日常训练1
  5. jquery.MultiFile 实现自动删掉上传列表
  6. Climbing Stairs - Print Path
  7. JavaScript创建Element元素/标签的工具/方法
  8. 微软将允许Epic Games等App登上微软商店
  9. SQLi LABS Less 27a 联合注入+布尔盲注+时间盲注
  10. 2019百度之星程序设计大赛 1005 Seq
  11. scapy:收发数据包
  12. MSP430开发笔记之五:硬件IO中断与IR红外接收
  13. 2016年全国房价会呈什么趋势?
  14. ffmpeg 中av_rescale_rnd 与av_rescale ()(AVRounding结构体)
  15. 一文读懂长非编码RNA(lncRNA)的分类、功能及测序鉴定方法
  16. 使用函数节流思想避免 SAP UI5 应用里按钮短时间内被高频重复点击试读版
  17. Android_adb_Wifi_无线调试,脱离数据线/
  18. docker存储bind mounts用法
  19. Android Q中通知的设置
  20. 《地球以外的文明世界》——阿西莫夫

热门文章

  1. 为什么说在Android中请求权限从来都不是一件简单的事情?
  2. 阿里巴巴2014研发实习生笔试解析
  3. AI框架大牛贾扬清加盟,任职阿里巴巴技术VP
  4. 为什么AR有望彻底改变航空业的飞维修和驾驶现状?
  5. html5 光影效果,HTML5 WebGL GLSL 泛光着色效果
  6. 2019上交、上科、北航、中科大、自动化所计算机夏令营+浙大计算机预推免简记
  7. hihocoder #1716 : 继承顺位
  8. 系统辨识理论(笔记)
  9. 有点想法系列:借助海尔平台打造智能家居的一点想法
  10. 毕业设计的问卷数据处理