在团队协同工具worktile的使用过程中,你会发现无论是右上角的消息通知,还是在任务面板中拖动任务,还有用户的在线状态,都是实时刷新。Worktile中的推送服务是采用的是基于xmpp协议、erlang语言实现的ejabberd,并在其源码基础上,结合我们的业务,对源码作了修改以适配我们自身的需求。另外,基于amqp协议也可以作为实时消息推送的一种选择,踢踢网就是采用rabbitmq+stomp协议实现的消息推送服务。本文将结合我在worktile和踢踢网的项目实践,介绍下消息推送服务的具体实现。

实时推送的几种实现方式

相较于手机端的消息推送(一般都是以socket方式实现),web端是基于http协议,很难像tcp一样保持长连接。但随着技术的发展,出现了websocket, comet等新的技术可以达到类似长连接的效果,这些技术大体可分为以下几类:

  1. 短轮询。页面端通过js定时异步刷新,这种方式实时效果较差。

  2. 长轮询。页面端通过js异步请求服务端,服务端在接收到请求后,如果该次请求没有数据,则挂起这次请求,直到有数据到达或时间片(服务端设定)到,则返回本次请求,客户端接着下一次请求。示例如下:

  1. Websocket。浏览器通过websocket协议连接服务端,实现了浏览器和服务器端的全双工通信。需要服务端和浏览器都支持websocket协议。
    以上几种方式中,方式1实现较简单,但效率和实时效果较差。方式2对服务端实现的要求比较高,尤其是并发量大的情况下,对服务端的压力很大。方式3效率较高,但对较低版本的浏览器不支持,另外服务端也需要有支持websocket的实现。Worktile的web端实时消息推送,采用的是xmpp扩展协议xep-0124 BOSH(http://xmpp.org/extensions/xep-0124.html),本质是采用方式2长轮询的方式。踢踢网则采用了websocket连接rabbitmq的方式实现,下面我会具体介绍如何用这两种方式实现Server Push。

运行时环境准备

服务端的实现中,无论采用ejabberd还是rabbitmq,都是基于erlang语言开发的,所以必须安装erlang运行时环境。Erlang是一种函数式语言,具有容错、高并发的特点,借助OTP的函数库,很容易构建一个健壮的分布式系统。目前,基于erlang开发的产品有,数据库方面:Riak(Dynamo实现)、CouchDB, Webserver方面:Cowboy、Mochiweb, 消息中间件有rabbitmq等。对于服务端程序员来说,erlang提供的高并发、容错、热部署等特性是其他语言无法达到的。无论在实时通信还是在游戏程序中,用erlang可以很容易为每一个上线用户创建一个对应的process,对一台4核8个G的服务器来说,承载上百万个这样的process是非常轻松的事。下图是erlang程序发起process的一般性示意图:

如图所示,Session manager(or gateway)负责为每个用户(uid)创建相对应的process, 并把这个对应关系(map)存放到数据表中。每个process则对应用户数据,并且他们之间可以相互发送消息。Erlang的优势就是在内存足够的情况下创建上百万个这样的process,而且它的创建和销毁比java的thread要轻量的多,两者不是一个数量级的。

好了,我们现在开始着手erlang环境的搭建(实验的系统为ubuntu12.04, 4核8个G内存):

1、依赖库安装

    sudo apt-get install build-essentialsudo apt-get install libncurses5-devsudo apt-get install libssl-dev libyaml-devsudo apt-get install m4sudo apt-get install unixodbc unixodbc-devsudo apt-get install freeglut3-dev libwxgtk2.8-devsudo apt-get install xsltprocsudo apt-get install fop tk8.5 libxml2-utils

2、官网下载otp源码包(http://www.erlang.org/download.html), 解压并安装:

   \>\> tar zxvf otpsrcR16B01.tar.gz\>\> cd otpsrcR16B01\>\> configure\>\> make & make install

至此,erlang运行环境就完成了。下面将分别介绍rabbitmq和ejabberd构建实时消息服务。

基于RabbitMQ的实时消息服务

RabbitMQ是在业界广泛应用的消息中间件,也是对AMQP协议实现最好的一种中间件。AMQP协议中定义了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual Host等实体,他们的关系如下图所示:

消息发布者(Producer)连接交换器(Exchange), 交换器和消息队列(Message Queue)通过key进行Binding,Binding是根据Exchange的类型(分为fanout、direct、topic、header)分别对消息作不同形式的派发。Message Queue又分为durable、temporary、auto-delete三种类型,durable queue是持久化队列,不会因为服务shutdown而消失,temporary queue则服务重启后会消失,auto-delete则是在没有consumer连接时自动删除。另外RabbitMQ有很多第三方插件,可以基于AMQP协议基础之上做出很多扩展的应用。下面我们将介绍web stomp插件构建基于AMQP之上的stomp文本协议,通过浏览器websocket达到实时的消息传输。系统的结构如图:

如图所示,web端我们使用stomp.js和sockjs.js与rabbitmq的web stomp plugin通信,手机端可以用stompj, gozirra(Android)或者objc-stomp(IOS)通过stomp协议与rabbitmq收发消息。因为我们是实时消息系统通常都是要与已有的用户系统结合,rabbitmq可以通过第三方插件rabbitmq-auth-backend-http来适配已有的用户系统,这个插件可以通过http接口完成用户连接时的认证过程。当然,认证方式还有ldap等其他方式。下面介绍具体步骤:

  • 从官网(http://rabbitmq.com/download.html)下载最新版本的源码包,解压并安装:
    \>\> tar zxf rabbitmq-server-x.x.x.tar.gz\>\> cd rabbitmq-server-x.x.x\>\> make & make install
  • 为rabbitmq安装web-stomp插件
    \>\> cd /path/to/your/rabbitmq\>\> ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp\>\> ./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples\>\> ./sbin/rabbitmqctl stop\>\> ./sbin/rabbitmqctl start\>\> ./sbin/rabbitmqctl status

将会显示下图所示的运行的插件列表

  • 安装用户授权插件
   \>\> cd /path/to/your/rabbitmq/plugins\>\>wget http://www.rabbitmq.com/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez\>\> cd ..\>\> ./sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http

编辑rabbitmq.config文件(默认存放于/etc/rabbitmq/下),添加:

    [...{rabbit, [{auth_backends, [rabbit_auth_backend_http]}]},...{rabbitmq_auth_backend_http,[{user_path, “http://your-server/auth/user”},{vhost_path, “http://your-server/auth/vhost”},{resource_path, “http://your-server/auth/resource”}]}...].

其中,user_path是根据用户名密码进行校验,vhost_path是校验是否有权限访问vhost, resource_path是校验用户对传入的exchange、queue是否有权限。我下面的代码是用nodejs实现的这三个接口的示例:

    var express = require('express');var app = express();app.get('/auth/user', function(req, res){var name = req.query.username;var pass = req.query.password;console.log("name : " + name + ", pass : " + pass);
if(name === 'guest' && pass === "guest"){console.log("allow");res.send("allow");
}else{res.send('deny');
}
});
app.get('/auth/vhost', function(req, res){console.log("/auth/vhost");res.send("allow");
});
app.get('/auth/resource', function(req, res){console.log("/auth/resource");res.send("allow");
});
app.listen(3000);
  • 浏览器端js实现,示例代码如下:
    ......
var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp');
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
client.debug = pipe('#second');
var print_first = pipe('#first', function(data) {
client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data);
});
var on_connect = function(x) {
id = client.subscribe("/exchange/feed/user_x", function(d) {
print_first(d.body);
});
};
var on_error = function() {
console.log('error');
};
client.connect('guest1', 'guest1', on_connect, on_error, '/');
......

需要说明的时,在这里我们首先要在rabbitmq实例中创建feed这个exchange,我们用stomp.js连接成功后,根据当前登陆用户的id(user_x)绑定到这个exchange,即 subscribe("/exchange/feed/user_x", ...) 这个操作的行为,这样在向rabbitmq中feed exchange发送消息并指定用户id(user_x)为key,页面端就会通过websocket实时接收到这条消息。

到目前为止,基于rabbitmq+stomp实现web端消息推送就已经完成,其中很多的细节需要小伙伴们亲自去实践了,这里就不多说了。实践过程中可以参照官方文档:

  • http://rabbitmq.com/stomp.html
  • http://rabbitmq.com/web-stomp.html
  • https://github.com/simonmacmullen/rabbitmq-auth-backend-http

以上的实现是我本人在踢踢网时采用的方式,下面接着介绍一下现在在Worktile中如何通过ejabberd实现消息推送。

基于ejabberd的实时消息推送

与rabbitmq不同,ejabberd是xmpp协议的一种实现,与amqp相比,xmpp广泛应用于即时通信领域。Xmpp协议的实现有很多种,比如java的openfire,但相较其他实现,ejabberd的并发性能无疑使最优秀的。Xmpp协议的前身是jabber协议,早期的jabber协议主要包括在线状态(presence)、好友花名册(roster)、IQ(Info/Query)几个部分。现在jabber已经成为rfc的官方标准,如rfc2799, rfc4622, rfc6121,以及xmpp的扩展协议(xep)。Worktile Web端的消息提醒功能就是基于XEP-0124、XEP-0206定义的BOSH扩展协议。

由于自身业务的需要,我们对ejabberd的用户认证和好友列表模块的源码进行修改,通过redis保存用户的在线状态,而不是mnesia和mysql。另外好友这块我们是从已有的数据库中(mongodb)中获取项目或团队的成员。Web端通过strophe.js来连接(http-bind),strophe.js可以以长轮询和websocket两种方式来连接,由于ejabberd还没有好的websocket的实现,就采用了BOSH的方式模拟长连接。整个系统的结构如下:

  • Web端用strophe.js通过http-bind进行连接nginx代理,nginx反向代理ejabberd cluster。
  • IOS用xmpp-framwork连接, Android可以用smack直接连ejabberd服务器集群。这些都是现有的库,无需对client进行开发。
  • 在线状态根据用户uid作为key定义了在线、离线、忙等状态存放于redis中。好友列表从mongodb的project表中获取。

用户认证直接修改了ejabberd_auth_internal.erl文件,通过mongodb驱动连接用户库,在线状态等功能是新加了模块,其部分代码如下:

<pre name="code" class="html">-module(wt_mod_proj).-behaviour(gen_mod).-behaviour(gen_server).-include("ejabberd.hrl").-include("logger.hrl").-include("jlib.hrl").-define(SUPERVISOR, ejabberd_sup)....-define(ONLINE, 1).-define(OFFLINE, 0).-define(BUSY, 2).-define(LEAVE, 3)....%% API-export([start_link/2, get_proj_online_users/2]).%% gen_mod callbacks-export([start/2, stop/1]).%% gen_server callbacks-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).%% Hook callbacks-export([user_available/1, unset_presence/3, set_presence/4]).-export([get_redis/1, remove_online_user/3, append_online_user/3])....-record(state,{host = <<"">>, server_host, rconn, mconn}).start_link(Host, Opts) ->Proc = gen_mod:get_module_proc(Host, ?MODULE),gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).user_available(New) ->LUser = New#jid.luser, LServer = New#jid.lserver,Proc = gen_mod:get_module_proc(LServer, ?MODULE),gen_server:cast(Proc, {user_available, LUser, LServer}).append_online_user(Uid, Proj, Host) ->Proc = gen_mod:get_module_proc(Host, ?MODULE),gen_server:call(Proc, {append_online_user, Uid, Proj}).remove_online_user(Uid, Proj, Host) ->Proc = gen_mod:get_module_proc(Host, ?MODULE),gen_server:call(Proc, {remove_online_user, Uid, Proj})....set_presence(User, Server, Resource, Packet) ->Proc = gen_mod:get_module_proc(Server, ?MODULE),gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet})....start(Host, Opts) ->Proc = gen_mod:get_module_proc(Host, ?MODULE),ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},transient, 2000, worker, [?MODULE]},supervisor:start_child(?SUPERVISOR, ChildSpec).stop(Host) ->Proc = gen_mod:get_module_proc(Host, ?MODULE),gen_server:call(Proc, stop),supervisor:delete_child(?SUPERVISOR, Proc).init([Host, Opts]) ->MyHost = gen_mod:get_opt_host(Host, Opts, <<"wtmuc.@HOST@">>),RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -> B end,?REDIS_HOST),RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?REDIS_PORT),ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100),ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50),ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50),MongoHost = gen_mod:get_opt(mongo_host, Opts, fun(B) -> binary_to_list(B) end, ?MONGO_HOST),MongoPort = gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I>0 -> I end, ?MONGO_PORT),{ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}),C = c(RedisHost, RedisPort),ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rconn = C, mconn = Mongo}}.terminate(_Reason, #state{host = Host, rconn = C, mconn = Mongo}) ->ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100),ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50),ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50),eredis:stop(C),ok....handle_call({append_online_user, Uid, ProjId}, _From, State) ->C = State#state.rconn,Key = <<?PRE_RPOJ_ONLINE_USERS/binary, ProjId/binary>>,Resp = eredis:q(C, ["SADD", Key, Uid]),{reply, Resp, State};handle_call({remove_online_user, Uid, ProjId}, _From, State) ->...handle_call({get_proj_online_users, ProjId}, _From, State) ->...handle_cast({set_presence, User, Server, Resource, Packet}, #state{mconn = Mongo} = State) ->C = State#state.rconn,Key = <<?USER_PRESENCE/binary, User/binary>>,Pids = get_user_projs(User, Mongo),Cmd = get_proj_key(Pids, ["SUNION"]),case xml:get_subtag_cdata(Packet, <<"show">>) of<<"away">> ->eredis:q(C, ["SET", Key, ?LEAVE]);<<"offline">> ->...handle_cast(_Msg, State) -> {noreply, State}.handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rconn = RedisConn, mconn = Mongo} = State) ->case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of{'EXIT', Reason} ->?ERROR_MSG("~p", [Reason]);_ ->okend,{noreply, State};handle_info(_Info, State) -> {noreply, State}.code_change(_OldVsn, State, _Extra) -> {ok, State}....

其中,user\_available\_hook和sm\_remove\_connection\_hook 就是用户上线和用户断开连接触发的事件,ejabberd 中正是由于这些hook,才能很容易扩展功能。

在用tsung对ejabberd进行压力测试,测试机器为4核心8G内存的普通PC,以3台客户机模拟用户登录、设置在线状态、发送一条文本消息、关闭连接操作,在同时在线达到30w时,CPU占用不到3%,内存大概到3个G左右,随着用户数增多,主要内存的损耗较大。由于压力测试比较耗时,再等到有时间的时候,会在做一些更深入的测试。

对于ejabberd的安装与集群的搭建,大家可以参照官方文档,这里不再赘述。如果在使用过程中有什么问题,可以加入worktile官方群(110257147),进行讨论。




原文:Worktile中的实时消息推送服务实现

</pre>

Worktile中的实时消息推送服务实现相关推荐

  1. Worktile中百万级实时消息推送服务的实现

    Worktile中百万级实时消息推送服务的实现 出自:http://blog.jobbole.com/81125/ 转载于:https://www.cnblogs.com/ribavnu/p/4531 ...

  2. Worktile 中百万级实时消息推送服务的实现

    Worktile 中百万级实时消息推送服务的实现 转自:http://www.360doc.com/content/15/0907/19/1073512_497529854.shtml 这是一个创建于 ...

  3. 消息推送技术干货:美团实时消息推送服务的技术演进之路

    本文由美团技术团队分享,作者"健午.佳猛.陆凯.冯江",原题"美团终端消息投递服务Pike的演进之路",有修订. 1.引言 传统意义上来说,实时消息推送通常都是 ...

  4. python websocket实现消息推送_python Django websocket 实时消息推送

    [实例简介] Django websocket 实时消息推送 服务端主动推送 调用 send(username, title, data, url) username:用户名 title:消息标题 d ...

  5. 浅析即时通讯开发中移动端实时消息推送技术

    实时消息推送在移动端互联网时代很平常,也很重要,它的存在让智能终端真正成为全时信息传播的工具.本文将从移动端无线网络的特点来谈谈实时消息推送的技术原理及相关问题,希望能给你带来些许启发. 移动端实时消 ...

  6. 未读消息(小红点),前端 与 RabbitMQ 实时消息推送实践,贼简单~

    前几天粉丝群里有个小伙伴问过:web 页面的未读消息(小红点)怎么实现比较简单,刚好本周手头有类似的开发任务,索性就整理出来供小伙伴们参考,没准哪天就能用得上呢. 之前在 <springboot ...

  7. SSM项目使用GoEasy 实现web消息推送服务

      一.背景 之前项目需要做一个推送功能,最开始我用websocket实现我的功能.使用websocket的好处是免费自主开发,但是有几个问题:1)浏览器的兼容问题,尤其是低版本的ie:2)因为是推送 ...

  8. .net 实时通信_基于 RabbitMQ 的实时消息推送

    实现服务器端推送的几种方式 Web 应用都是基于 HTTP 协议的请求/响应模式,无法像 TCP 协议那样保持长连接,因此 Web 应用就很难像手机那样实现实时的消息推送.就目前来看,Web 应用的消 ...

  9. python websocket实时消息推送

    python websocket实时消息推送 十分想念顺店杂可... 本人写的渣,大神勿喷. 转载请附带本文链接,谢谢. 服务端代码 # -*- coding: utf-8 -*- # @Time : ...

最新文章

  1. Hello World
  2. debian虚拟机装上后开机不行_华为MT9进水不开机, 一步一个“坑”把掌柜修的也是无语,想发火...
  3. Hexo博客NexT主题美化之文末统一添加“本文结束”标记
  4. c#中调用Excel
  5. Trace SAP OData execution in CRM backend system
  6. 对象设计——责任、角色和协作思维导图笔记
  7. 8: springMVC ModelAndView 作用与功能解析
  8. Kaggle实战之一回归问题
  9. 自学-Linux-老男孩Linux77期-day6
  10. 国家信息安全水平考试NISP一级官方视频知识点整理
  11. 电商自营藏猫腻 苏宁国美京东的套路谁最深?
  12. 解读LED灯具中国能效认证
  13. 老毛桃U盘快速安装ghost win7系统图文教程
  14. ffmpeg 硬件加速 wmv 视频转码
  15. 【RF】【元素定位】 Other element would receive the click
  16. socket是什么?有什么作用?
  17. 大数据分析工程师入门15-数据收集
  18. 只会 Python 不行,不会 Python 万万不行
  19. 红队笔记之杀软原理介绍与免杀技术总结
  20. kettle 提交数据量_基于kettle工具提高表输出写入速度(每秒万条记录)

热门文章

  1. 用例图(use case diagram)
  2. ios阴阳是不显示服务器,阴阳师IOS登录异常怎么办 苹果不能正常登录解决办法...
  3. 秀一波酷炫可视化大屏!
  4. 过程之美——过程让结果更精彩
  5. Vue 获取当前日期
  6. java排序接口ComparableT 的实现与使用
  7. linux下载测序数据,高速下载测序数据(SRA,Fastq等)
  8. 名编辑电子杂志大师教程 | 怎样给电子画册设置目录?
  9. 天创速盈带您速读:拼多多有必要开直通车吗?有哪些禁忌?
  10. GIF录制编辑工具(GifCam)