Worktile 中百万级实时消息推送服务的实现

转自:http://www.360doc.com/content/15/0907/19/1073512_497529854.shtml
这是一个创建于 381 天前的主题,其中的信息可能已经有所发展或是发生改变。

摘要:相较于手机端的消息推送(一般都是以Socket方式实现),WEB端是基于HTTP协议,很难像TCP一样保持长连接。但随着技术的发展,出现了WebSocket,Comet等新的技术可以达到类似长连接的效果。

在团队协同工具 Worktile的使用过程中,你会发现无论是右上角的消息通知,还是在任务面板中拖动任务,还有用户的在线状态,都是实时刷新。Worktile中的推送服务是采用的是基于XMPP协议、Erlang语言实现的Ejabberd,并在其源码基础上,结合我们的业务,对源码作了修改以适配我们自身的需求。另外,基于AMQP协议也可以作为实时消息推送的一种选择,踢踢网就是采用 RabbitMQ+STOMP协议实现的消息推送服务。本文将结合我在Worktile和踢踢网的项目实践,介绍下消息推送服务的具体实现。

实时推送的几种实现方式

相较于手机端的消息推送(一般都是以Socket方式实现),WEB端是基于HTTP协议,很难像TCP一样保持长连接。但随着技术的发展,出现了WebSocket,Comet等新的技术可以达到类似长连接的效果,这些技术大体可分为以下几类:

1)短轮询。页面端通过JS定时异步刷新,这种方式实时效果较差。

2)长轮询。页面端通过JS异步请求服务端,服务端在接收到请求后,如果该次请求没有数据,则挂起这次请求,直到有数据到达或时间片(服务端设定)到,则返回本次请求,客户端接着下一次请求。示例如下:

3)WebSocket。浏览器通过WebSocket协议连接服务端,实现了浏览器和服务器端的全双工通信。需要服务端和浏览器都支持WebSocket协议。

以上几种方式中,方式1实现较简单,但效率和实时效果较差。方式2对服务端实现的要求比较高,尤其是并发量大的情况下,对服务端的压力很大。方式3效率较高,但对较低版本的浏览器不支持,另外服务端也需要有支持WebSocket的实现。Worktile的WEB端实时消息推送,采用的是XMPP扩展协议XEP-0124 BOSH( http://xmpp.org/extensions/xep-0124.html),本质是采用方式2长轮询的方式。踢踢网则采用了WebSocket连接RabbitMQ的方式实现,下面我会具体介绍如何用这两种方式实现Server Push。

运行时环境准备

服务端的实现中,无论采用Ejabberd还是RabbitMQ,都是基于Erlang语言开发的,所以必须安装Erlang运行时环境。Erlang是一种函数式语言,具有容错、高并发的特点,借助OTP的函数库,很容易构建一个健壮的分布式系统。目前,基于Erlang开发的产品有,数据库方面:Riak(Dynamo实现)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中间件有RabbitMQ等。对于服务端程序员来说,Erlang提供的高并发、容错、热部署等特性是其他语言无法达到的。无论在实时通信还是在游戏程序中,用Erlang可以很容易为每一个上线用户创建一个对应的Process,对一台4核8个G的服务器来说,承载上百万个这样的Process是非常轻松的事。下图是Erlang程序发起Process的一般性示意图:

如图所示,Session Manager(or Gateway)负责为每个用户(UID)创建相对应的Process, 并把这个对应关系(MAP)存放到数据表中。每个Process则对应用户数据,并且他们之间可以相互发送消息。Erlang的优势就是在内存足够的情况下创建上百万个这样的Process,而且它的创建和销毁比JAVA的Thread要轻量的多,两者不是一个数量级的。

好了,我们现在开始着手Erlang环境的搭建(实验的系统为Ubuntu 12.04, 4核8个G内存):

1、依赖库安装

[js] view plaincopy在CODE上查看代码片派生到我的代码片
sudo apt-get install build-essential

sudo apt-get install libncurses5-dev

sudo apt-get install libssl-dev libyaml-dev

sudo apt-get install m4

sudo apt-get install unixodbc unixodbc-dev

sudo apt-get install freeglut3-dev libwxgtk2.8-dev

sudo apt-get install xsltproc

sudo apt-get install fop tk8.5 libxml2-utils

2、官网下载OTP源码包( http://www.erlang.org/download.html), 解压并安装

[js] view plaincopy在CODE上查看代码片派生到我的代码片
tar zxvf otpsrcR16B01.tar.gz

cd otpsrcR16B01

configure

make & make install

至此,erlang运行环境就完成了。下面将分别介绍rabbitmq和ejabberd构建实时消息服务。

基于RabbitMQ的实时消息服务

RabbitMQ是在业界广泛应用的消息中间件,也是对AMQP协议实现最好的一种中间件。AMQP协议中定义了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual Host等实体,他们的关系如下图所示:

消息发布者(Producer)连接交换器(Exchange), 交换器和消息队列(Message Queue)通过KEY进行Binding,Binding是根据Exchange的类型(分为Fanout、Direct、Topic、Header)分别对消息作不同形式的派发。Message Queue又分为Durable、Temporary、Auto-Delete三种类型,Durable Queue是持久化队列,不会因为服务ShutDown而消失,Temporary Queue则服务重启后会消失,Auto-Delete则是在没有Consumer连接时自动删除。另外RabbitMQ有很多第三方插件,可以基于AMQP协议基础之上做出很多扩展的应用。下面我们将介绍WEB STOMP插件构建基于AMQP之上的STOMP文本协议,通过浏览器WebSocket达到实时的消息传输。系统的结构如图:

如图所示,WEB端我们使用STOMP.JS和SockJS.JS与RabbitMQ的WEB STOMP Plugin通信,手机端可以用STOMPj, Gozirra(Android)或者Objc-STOMP(IOS)通过STOMP协议与RabbitMQ收发消息。因为我们是实时消息系统通常都是要与已有的用户系统结合,RabbitMQ可以通过第三方插件RabbitMQ-AYTH-Backend-HTTP来适配已有的用户系统,这个插件可以通过HTTP接口完成用户连接时的认证过程。当然,认证方式还有LDAP等其他方式。下面介绍具体步骤:

从官网( http://rabbitmq.com/download.html)下载最新版本的源码包,解压并安装

[js] view plaincopy在CODE上查看代码片派生到我的代码片
tar zxf rabbitmq-server-x.x.x.tar.gz

cd rabbitmq-server-x.x.x

make & make install

为RabbitMQ安装WEB-STOMP插件

[js] view plaincopy在CODE上查看代码片派生到我的代码片
cd /path/to/your/rabbitmq

./sbin/rabbitmq-plugins enable rabbitmq_web_stomp

./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples

./sbin/rabbitmqctl stop

./sbin/rabbitmqctl start

./sbin/rabbitmqctl status

将会显示下图所示的运行的插件列表

安装用户授权插件

[js] view plaincopy在CODE上查看代码片派生到我的代码片
cd /path/to/your/rabbitmq/plugins

wget <a href="http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez">http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez</a>

cd ..

./sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http

编辑RabbitMQ.Config文件(默认存放于/ECT/RabbitMQ/下),添加
[js] view plaincopy在CODE上查看代码片派生到我的代码片
[

...

{rabbit, [{auth_backends, [rabbit_auth_backend_http]}]},

...

{rabbitmq_auth_backend_http,

[{user_path, “http://your-server/auth/user”},

{vhost_path, “http://your-server/auth/vhost”},

{resource_path, “http://your-server/auth/resource”}

]}

...

].

其中,User_Path是根据用户名密码进行校验,VHOST_Path是校验是否有权限访问VHOST, Resource_Path是校验用户对传入的Exchange、Queue是否有权限。我下面的代码是用Node.js实现的这三个接口的示例:

[js] view plaincopy在CODE上查看代码片派生到我的代码片
var express = require('express');

var app = express();

app.get('/auth/user', function(req, res){

var name = req.query.username;

var pass = req.query.password;

console.log("name : " + name + ", pass : " + pass);

if(name === 'guest' && pass === "guest"){

console.log("allow");

res.send("allow");

}else{

res.send('deny');

}

});

app.get('/auth/vhost', function(req, res){

console.log("/auth/vhost");

res.send("allow");

});

app.get('/auth/resource', function(req, res){

console.log("/auth/resource");

res.send("allow");

});

app.listen(3000);

浏览器端JS实现,示例代码如下:
[js] view plaincopy在CODE上查看代码片派生到我的代码片
......

var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp');

var client = Stomp.over(ws);

// SockJS does not support heart-beat: disable heart-beats

client.heartbeat.outgoing = 0;

client.heartbeat.incoming = 0;

client.debug = pipe('#second');

var print_first = pipe('#first', function(data) {

client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data);

});

var on_connect = function(x) {

id = client.subscribe("/exchange/feed/user_x", function(d) {

print_first(d.body);

});

};

var on_error = function() {

console.log('error');

};

client.connect('guest1', 'guest1', on_connect, on_error, '/');

......

需要说明的时,在这里我们首先要在RabbitMQ实例中创建Feed这个Exchange,我们用STOMP.JS连接成功后,根据当前登陆用户的ID(user_x)绑定到这个Exchange,即Subscribe(“/exchange/feed/user_x”, …) 这个操作的行为,这样在向RabbitMQ中Feed Exchange发送消息并指定用户ID(user_x)为KEY,页面端就会通过WEB Socket实时接收到这条消息。

到目前为止,基于RabbitMQ+STOMP实现WEB端消息推送就已经完成,其中很多的细节需要小伙伴们亲自去实践了,这里就不多说了。实践过程中可以参照官方文档:

http://rabbitmq.com/stomp.html
http://rabbitmq.com/web-stomp.html
https://github.com/simonmacmullen/rabbitmq-auth-backend-http

以上的实现是我本人在踢踢网时采用的方式,下面接着介绍一下现在在Worktile中如何通过Ejabberd实现消息推送。

基于Ejabberd的实时消息推送

与RabbitMQ不同,Ejabberd是XMPP协议的一种实现,与AMQP相比,XMPP广泛应用于即时通信领域。XMPP协议的实现有很多种,比如JAVA的OpenFire,但相较其他实现,Ejabberd的并发性能无疑使最优秀的。XMPP协议的前身是Jabber协议,早期的Jabber协议主要包括在线状态(Presence)、好友花名册(Roster)、IQ(Info/Query)几个部分。现在Jabber已经成为RFC的官方标准,如RFC2799,RFC4622,RFC6121,以及XMPP的扩展协议(XEP)。Worktile Web端的消息提醒功能就是基于XEP-0124、XEP-0206定义的BOSH扩展协议。

由于自身业务的需要,我们对Ejabberd的用户认证和好友列表模块的源码进行修改,通过Redis保存用户的在线状态,而不是Mnesia和MySQL。另外好友这块我们是从已有的数据库中(MongoDB)中获取项目或团队的成员。Web端通过Strophe.JS来连接(HTTP-BIND),Strophe.JS可以以长轮询和WebSocket两种方式来连接,由于Ejabberd还没有好的WebSocket的实现,就采用了BOSH的方式模拟长连接。整个系统的结构如下:

Web端用Strophe.JS通过HTTP-BIND进行连接Nginx代理,Nginx反向代理EjabberdCluster。iOS用XMPP-FramWork连接, Android可以用Smack直接连Ejabberd服务器集群。这些都是现有的库,无需对Client进行开发。在线状态根据用户UID作为KEY定义了在线、离线、忙等状态存放于Redis中。好友列表从MongoDB的Project表中获取。用户认证直接修改了Ejabberd_Auth_Internal.erl文件,通过MongoDB驱动连接用户库,在线状态等功能是新加了模块,其部分代码如下:

[js] view plaincopy在CODE上查看代码片派生到我的代码片
-module(wt_mod_proj).

-behaviour(gen_mod).

-behaviour(gen_server).

-include("ejabberd.hrl").

-include("logger.hrl").

-include("jlib.hrl").

-define(SUPERVISOR, ejabberd_sup).

...

-define(ONLINE, 1).

-define(OFFLINE, 0).

-define(BUSY, 2).

-define(LEAVE, 3).

...

%% API

-export([start_link/2, get_proj_online_users/2]).

%% gen_mod callbacks

-export([start/2, stop/1]).

%% gen_server callbacks

-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).

%% Hook callbacks

-export([user_available/1, unset_presence/3, set_presence/4]).

-export([get_redis/1, remove_online_user/3, append_online_user/3]).

...

-record(state,{host = <<"">>, server_host, rconn, mconn}).

start_link(Host, Opts) ->

Proc = gen_mod:get_module_proc(Host, ?MODULE),

gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).

user_available(New) ->

LUser = New#jid.luser, LServer = New#jid.lserver,

Proc = gen_mod:get_module_proc(LServer, ?MODULE),

gen_server:cast(Proc, {user_available, LUser, LServer}).

append_online_user(Uid, Proj, Host) ->

Proc = gen_mod:get_module_proc(Host, ?MODULE),

gen_server:call(Proc, {append_online_user, Uid, Proj}).

remove_online_user(Uid, Proj, Host) ->

Proc = gen_mod:get_module_proc(Host, ?MODULE),

gen_server:call(Proc, {remove_online_user, Uid, Proj}).

...

set_presence(User, Server, Resource, Packet) ->

Proc = gen_mod:get_module_proc(Server, ?MODULE),

gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet}).

...

start(Host, Opts) ->

Proc = gen_mod:get_module_proc(Host, ?MODULE),

ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},

transient, 2000, worker, [?MODULE]},

supervisor:start_child(?SUPERVISOR, ChildSpec).

stop(Host) ->

Proc = gen_mod:get_module_proc(Host, ?MODULE),

gen_server:call(Proc, stop),

supervisor:delete_child(?SUPERVISOR, Proc).

init([Host, Opts]) ->

MyHost = gen_mod:get_opt_host(Host, Opts, <<"wtmuc.@HOST@">>),

RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -> B end,?REDIS_HOST),

RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?REDIS_PORT),

ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100),

ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50),

ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50),

MongoHost = gen_mod:get_opt(mongo_host, Opts, fun(B) -> binary_to_list(B) end, ?MONGO_HOST),

MongoPort = gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?MONGO_PORT),

{ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}),

C = c(RedisHost, RedisPort),

ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rconn = C, mconn = Mongo}}.

terminate(_Reason, #state{host = Host, rconn = C, mconn = Mongo}) ->

ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100),

ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50),

ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50),

eredis:stop(C),

ok.

...

handle_call({append_online_user, Uid, ProjId}, _From, State) ->

C = State#state.rconn,

Key = <<!--?PRE_RPOJ_ONLINE_USERS /binary, ProjId/binary-->>,

Resp = eredis:q(C, ["SADD", Key, Uid]),

{reply, Resp, State};

handle_call({remove_online_user, Uid, ProjId}, _From, State) ->

...

handle_call({get_proj_online_users, ProjId}, _From, State) ->

...

handle_cast({set_presence, User, Server, Resource, Packet}, #state{mconn = Mongo} = State) ->

C = State#state.rconn,

Key = <<!--?USER_PRESENCE /binary, User/binary-->>,

Pids = get_user_projs(User, Mongo),

Cmd = get_proj_key(Pids, ["SUNION"]),

case xml:get_subtag_cdata(Packet, <<"show">>) of

<<"away">> ->

eredis:q(C, ["SET", Key, ?LEAVE]);

<<"offline">> ->

...

handle_cast(_Msg, State) -> {noreply, State}.

handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rconn = RedisConn, mconn = Mongo} = State) ->

case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of

{'EXIT', Reason} ->

?ERROR_MSG("~p", [Reason]);

_ ->

ok

end,

{noreply, State};

handle_info(_Info, State) -> {noreply, State}.

code_change(_OldVsn, State, _Extra) -> {ok, State}.

...

其中,User_Available_HOOK和SM_Remove_Connection_HOOK 就是用户上线和用户断开连接触发的事件,Ejabberd 中正是由于这些HOOK,才能很容易扩展功能。

在用Tsung对Ejabberd进行压力测试,测试机器为4核心8G内存的普通PC,以3台客户机模拟用户登录、设置在线状态、发送一条文本消息、关闭连接操作,在同时在线达到30w时,CPU占用不到3%,内存大概到3个G左右,随着用户数增多,主要内存的损耗较大。由于压力测试比较耗时,再等到有时间的时候,会在做一些更深入的测试。

Worktile 中百万级实时消息推送服务的实现相关推荐

  1. Worktile中百万级实时消息推送服务的实现

    Worktile中百万级实时消息推送服务的实现 出自:http://blog.jobbole.com/81125/ 转载于:https://www.cnblogs.com/ribavnu/p/4531 ...

  2. Worktile中的实时消息推送服务实现

    在团队协同工具worktile的使用过程中,你会发现无论是右上角的消息通知,还是在任务面板中拖动任务,还有用户的在线状态,都是实时刷新.Worktile中的推送服务是采用的是基于xmpp协议.erla ...

  3. 百亿级实时消息推送的实战之道,与王者荣耀一班车就是这么稳!

    要说现在市面上最火爆的手游,莫非拥有两亿注册用户的王者荣耀了.据悉,王者荣耀的渗透率高达22.3%,这意味着每7个中国人中就有一位是王者荣耀注册用户.众所周知,手游App对推送实时性和精准性要求非常高 ...

  4. 消息推送技术干货:美团实时消息推送服务的技术演进之路

    本文由美团技术团队分享,作者"健午.佳猛.陆凯.冯江",原题"美团终端消息投递服务Pike的演进之路",有修订. 1.引言 传统意义上来说,实时消息推送通常都是 ...

  5. 浅析即时通讯开发中移动端实时消息推送技术

    实时消息推送在移动端互联网时代很平常,也很重要,它的存在让智能终端真正成为全时信息传播的工具.本文将从移动端无线网络的特点来谈谈实时消息推送的技术原理及相关问题,希望能给你带来些许启发. 移动端实时消 ...

  6. 千万级WebSocket消息推送服务技术分析

    拉模式和推模式区别 拉模式(定时轮询访问接口获取数据) 数据更新频率低,则大多数的数据请求时无效的 在线用户数量多,则服务端的查询负载很高 定时轮询拉取,无法满足时效性要求 推模式(向客户端进行数据的 ...

  7. 设计一个百万级的消息推送系统

    2019独角兽企业重金招聘Python工程师标准>>> 前言 首先迟到的祝大家中秋快乐. 最近一周多没有更新了.其实我一直想憋一个大招,分享一些大家感兴趣的干货. 鉴于最近我个人的工 ...

  8. python websocket实现消息推送_python Django websocket 实时消息推送

    [实例简介] Django websocket 实时消息推送 服务端主动推送 调用 send(username, title, data, url) username:用户名 title:消息标题 d ...

  9. 未读消息(小红点),前端 与 RabbitMQ 实时消息推送实践,贼简单~

    前几天粉丝群里有个小伙伴问过:web 页面的未读消息(小红点)怎么实现比较简单,刚好本周手头有类似的开发任务,索性就整理出来供小伙伴们参考,没准哪天就能用得上呢. 之前在 <springboot ...

最新文章

  1. 【Python自学】万文字,学习框架+思维整理,入门就是这么简单
  2. 结构体数组实现的简易学生信息管理系统
  3. android webview js 失效,Android WebView注入JQuery、JS脚本及执行无效的问题解决
  4. oracle存储过程使用ftp,ASM存储FTP上传文件
  5. 【Elasticsearch】父子聚合 can‘t specify parent if no parent field has been configured
  6. [Linux_Ubuntu13] 声音很小前台无法调节的处理方法
  7. 录音转文字 android,录音转文字pro
  8. 1995-2019,中国互联网产品的发展与变革
  9. 正态分布的峰度和偏度分别为_关于偏度与峰度的一些探索
  10. 【资料】印度数学家拉马努金
  11. 十年工作经验的中层员工如何在大厂生存?
  12. 650 storm 铃木v_V双暴风雨 2018款铃木V-Strom 650/650XT
  13. 更快地编写更好的代码:5 分钟阅读
  14. 人工智能产业2021年的五大趋势
  15. java--设计一个Javabean记载网页的访问数量
  16. 【Linux】进程间通信(学习复习兼顾)
  17. Cracking the Interview 读书笔记 -- Java
  18. Windows上本地安装MySQL数据库
  19. TCGA数据库的基因表达情况分析
  20. matlab反函数求不出来,求反函数,得到的结果不太明白

热门文章

  1. 蓝桥杯 平方和(JAVA)
  2. 使用java进行pdf转word实战
  3. DWH中增量数据的抽取
  4. html省市多级联动下拉框,基于javascript实现全国省市二级联动下拉选择菜单
  5. 掌控堆栈确保系统稳定 IAR技术手册翻译
  6. 深度学习降噪方案-RNNoise简介和环境配置
  7. 高薪程序员面试题精讲系列28之你熟悉哪些设计模式?
  8. 新零售时代,异业联盟怎么做?
  9. 才发现!华为手机竟隐藏着5个高级功能
  10. 3轴/6轴/9轴传感器是什么, 加速计/陀螺仪/磁力计又是什么?