这一篇紧接着上面继续了。

方案二

基于redis的消息回执。主要流程分为下面几个步骤:

1)将消息暂存储与redis中,设置好消息的过期时间

2)客户端回执消息id来消灭暂存的消息

3)开通单独线程论坛在第1)步中的消息。根据消息的时间重新发送消息。如果消息第一次存放的时间大雨有效期(自定义10秒),解析消息中的to查找用户是否还在线。如果在则T掉(因为它长时间不理会服务的重要命令),如果不在线则将消息放置离线表。

OK,先来看看消息的存储格式吧。

1.MESSAGE消息 用户集合

SADD  SOGU:[username]  [VALUE(messageID)] [VALUE(messageID)] ...

2.已读消息设备集合

SADD  RT:[terminalid]  [VALUE(messageID)] [VALUE(messageID)] ...

3.消息内容

HMSET  OGM:[messageID]  CREATIONDATE [VALUE]  UPDATEDATE [VALUE] STANZA [VALUE]

4.用户、设备关联

SADD URT:[USERNAME]  [VALUE(terminalid)] .......

(先根据消息id查找时间,在java中排序后 查找stanza)

MESSAGE

--离线表

ZADD OFOFFLINE:[username]  [INDEX(时间戳)] [VALUE(messageID)] 、[VALUE]、[VALUE]......              [VALUE]

HMSET OFOFFLINE:[messageID] STANZA[VALUE]

CREATIONDATE [VALUE]  MESSAGESIZ[VALUE]

将消息暂时消息存储:

    public void storeMessage(String username, Packet packet) {Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();String packetID = "";if (packet instanceof Message) packetID = ((Message)packet).getID();else if (packet instanceof IQ) packetID = ((IQ)packet).getID();else return;try {jedis.sadd("SOGU:" + username, packetID);Map<String, String> hash = new HashMap<String, String>();hash.put("STANZA", packet.toXML());hash.put("CREATIONDATE", StringUtils.dateToMillis(new Date()));jedis.hmset("OGM:" + packetID, hash);} finally {XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);}htp.execute(addMessagesToDB(packet));}private Runnable addMessagesToDB(final Packet packet) {return new Runnable() {@Overridepublic void run() {MyDBopt.insertMessage(packet);}

客户端收到消息来回执服务端的操作

    private void handle(IQ packet) {JID recipientJID = packet.getTo();if (IQ.Type.crs != packet.getType()) {// Check if the packet was sent to the server hostnameif (recipientJID != null && recipientJID.getNode() == null &&recipientJID.getResource() == null && serverName.equals(recipientJID.getDomain())) {Element childElement = packet.getChildElement();if (childElement != null && childElement.element("addresses") != null) {// to route this packetmulticastRouter.route(packet);return;}}}if (IQ.Type.crs == packet.getType()) {String username = packet.getFrom().getNode();String terminal = packet.getFrom().getTerminal();String msgId = packet.getID();if (username == null || msgId == null || "".equals(msgId)) {return ;}if (terminal == null) {terminal = username + "_" + System.currentTimeMillis()%1000000; }Jedis jedis = XMPPServer.getInstance().getGroupRedisManager().getJedis();try {jedis.sadd("URT:" + username, terminal);jedis.sadd("RT:" + terminal, packet.getID());} finally {XMPPServer.getInstance().getGroupRedisManager().returnRes(jedis);}threadPool.execute(createTask(msgId, username, terminal));return;}if (packet.getID() != null && (IQ.Type.result == packet.getType() || IQ.Type.error == packet.getType())) {// The server got an answer to an IQ packet that was sent from the serverIQResultListener iqResultListener = resultListeners.remove(packet.getID());if (iqResultListener != null) {resultTimeout.remove(packet.getID());if (iqResultListener != null) {try {iqResultListener.receivedAnswer(packet);}catch (Exception e) {Log.error("Error processing answer of remote entity. Answer: "+ packet.toXML(), e);}return;}}}try {// Check for registered components, services or remote serversif (recipientJID != null &&(routingTable.hasComponentRoute(recipientJID) || routingTable.hasServerRoute(recipientJID))) {// A component/service/remote server was found that can handle the PacketroutingTable.routePacket(recipientJID, packet, false);return;}if (isLocalServer(recipientJID)) {// Let the server handle the PacketElement childElement = packet.getChildElement();String namespace = null;if (childElement != null) {namespace = childElement.getNamespaceURI();}if (namespace == null) {if (packet.getType() != IQ.Type.result && packet.getType() != IQ.Type.error) {// Do nothing. We can't handle queries outside of a valid namespaceLog.warn("Unknown packet " + packet.toXML());}}else {// Check if communication to local users is allowedif (recipientJID != null && userManager.isRegisteredUser(recipientJID.getNode())) {PrivacyList list =PrivacyListManager.getInstance().getDefaultPrivacyList(recipientJID.getNode());if (list != null && list.shouldBlockPacket(packet)) {// Communication is blockedif (IQ.Type.set == packet.getType() || IQ.Type.get == packet.getType()) {// Answer that the service is unavailablesendErrorPacket(packet, PacketError.Condition.service_unavailable);}return;}}IQHandler handler = getHandler(namespace);if (handler == null) {if (recipientJID == null) {// Answer an error since the server can't handle the requested namespacesendErrorPacket(packet, PacketError.Condition.service_unavailable);}else if (recipientJID.getNode() == null ||"".equals(recipientJID.getNode())) {// Answer an error if JID is of the form <domain>sendErrorPacket(packet, PacketError.Condition.feature_not_implemented);}else {// JID is of the form <node@domain>// Answer an error since the server can't handle packets sent to a nodesendErrorPacket(packet, PacketError.Condition.service_unavailable);}}else {handler.process(packet);}}}else {// JID is of the form <node@domain/resource> or belongs to a remote server// or to an uninstalled componentroutingTable.routePacket(recipientJID, packet, false);}}catch (Exception e) {......}}

离线消息

离线消息的优化。

同样可以拓展XMPP。比如

客户端获取离线消息,可以这么通讯。

1)先向服务器询问,我总的离线消息的基本状况(有多大,有多少条)

<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy"><query xmlns="http://jabber.org/protocol/offmsg#bif"/>
</iq>

2)服务端返回

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy"><query xmlns="http://jabber.org/protocol/offmsg#bifs"><size>1024b</><count>128</><idset>1001,1002...</></query>
</iq>

3)客户端发送分批获取命令,一次给我发10条发完为止。

<iq id="BfI3V-47" to="8ntmorv1ep4wgcy" type="get" from="test@8ntmorv1ep4wgcy"><query xmlns="http://jabber.org/protocol/offmsg#start"/><pagesize>10</>
</iq>

4)服务端开始发送消息

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">......
</iq>
<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy">......
</iq>
.....

5)告诉客户端我都发完了

<iq type="result" id="BfI3V-47" from="8ntmorv1ep4wgcy" to="test@8ntmorv1ep4wgcy"><query xmlns="http://jabber.org/protocol/offmsg#end"/>
</iq>

6)客户端本地校验,回执已经接收到的消息

<message id="BfI3V-47" to="8ntmorv1ep4wgcy" from="test1@8ntmorv1ep4wgcy/Spark 2.6.3" type="crs"/>

这里本人只是做了一个简单的示意想法。如果需要更加精准的不妨在仔细想想消息处理与格式。

离线消息存储。

将消息存储到redis中:

public void addMessageToRedis(Message message) {if (message == null) {return;}JID recipient = message.getTo();String username = recipient.getNode();// If the username is null (such as when an anonymous user), don't store.if (username == null || !UserManager.getInstance().isRegisteredUser(recipient)) {return;}elseif (!XMPPServer.getInstance().getServerInfo().getXMPPDomain().equals(recipient.getDomain())) {// Do not store messages sent to users of remote serversreturn;}String msgXML = message.getElement().asXML();Jedis jedis = XMPPServer.getInstance().getChatMessageJedisPoolManager().getJedis();try {String newDate = StringUtils.dateToMillis(new java.util.Date());String id = MessageIdTactics.mid(username);jedis.zadd("OFOFFLINE:" + username, Long.valueOf(newDate), id Map<String, String> hash = new HashMap<String, String>();hash.put("STANZA", msgXML);hash.put("MESSAGESIZ", String.valueOf(msgXML.length()));hash.put("CREATIONDATE", newDate);jedis.hmset("OFOFFLINE:" + id, hash);} finally {XMPPServer.getInstance().getChatMessageJedisPoolManager().returnRes(jedis);}if (sizeCache.containsKey(username)) {int size = sizeCache.get(username);size += msgXML.length();sizeCache.put(username, size);}htp.execute(addMessageToDB(message));}

Redis优化这块就到这啦。主要要做的就是:

第一:存储用户或者MUC、Group等这些都需要设置消息存储的生命周期。当用户不处于活跃状态或者长时间不登陆的。要从redis中提出。免得浪费资源。当用户重新加载的时候再将他放置redis中

第二:将需要回执消息和离线消息分开。需要回执的消息需要设置他的生命周期。离线表最好做个定时器。轮询消息。将超时出现范围内的消息(比如周期为一周)的消息同步至关系表中。这里的离线消息需要将用户的设备分开来。

这里要考虑不同的设备终端等很多不同场景,问题会比较绕口。欢迎大家和我邮件交流。

转载于:https://www.cnblogs.com/huwf/p/4273343.html

OpenFire源码学习之二十五:消息回执与离线消息(下)相关推荐

  1. OpenFire源码学习之二十九:openfire集群配置

    集群 Openfire的给集群提供了多种方案.一种是基于Hazelcast插件,还有基于Oracle的coherence插件. Oracle的coherence插件中文开发文档:http://down ...

  2. OpenFire源码学习之二十二:openfie对用户的优化(下)

    用户名片 在预初始化中,贴出来用户名片的程序.这里也一样不在重复. 首先同样先修改系统属性: provider.vcard.className org.jivesoftware.util.redis. ...

  3. OpenGL蓝宝书源码学习(二十)第六章——Dissolve

    侵蚀着色器渲染图元,呈现腐蚀效果的源码示例. // Dissolve.cpp // OpenGL SuperBible // Demonstrates discard fragment command ...

  4. OpenFire源码学习之二十一:openfie对用户的优化(上)

    用户类 优化用户主要是要解决用户的连接量.已经对用户的访问速度和吞吐量. 预初始化 在前面的带面中提出来了用户的预初始化.这里就不在贴出来了.下面将redis用户库连接池处理贴出来UserJedisP ...

  5. 第十四课 k8s源码学习和二次开发原理篇-调度器原理

    第十四课 k8s源码学习和二次开发原理篇-调度器原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第十四课 k8s源码学习和二次开发原理篇-调度器原理 第一节 ...

  6. JavaScript学习(二十五)—实现无缝滚动

    JavaScript学习(二十五)-实现无缝滚动 效果如下: 代码如下: <!DOCTYPE html> <html lang="en"><head& ...

  7. 第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理

    第八课 k8s源码学习和二次开发原理篇-KubeBuilder使用和Controller-runtime原理 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第八课 ...

  8. 第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习

    第三课 k8s源码学习和二次开发-缓存机制Informers和Reflector组件学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第三课 k8s源码学习和二 ...

  9. 第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习

    第四课 k8s源码学习和二次开发-DeltaFIFO和Indexer原理学习 tags: k8s 源码学习 categories: 源码学习 二次开发 文章目录 第四课 k8s源码学习和二次开发-De ...

最新文章

  1. 编写高度可维护javascript代码的几点关键性原则
  2. android开发读书笔记,android开发权威指南读书笔记
  3. mysql中int(15)和varchar(15)
  4. GHOST分区丢失只剩C盘
  5. GIS中的坐标系定义与转换【转】
  6. vue-router 中导航守卫问题
  7. 基于SOA的银行系统架构
  8. Javascript创建对象的几种方式?
  9. 如何判断一个程序是 32bit 还是 64bit ?
  10. 架构的“一小步”,业务的一大步 1
  11. 在CentOS7上安装RocketMQ 4.8.0
  12. 数智德州,创新未来 | 智慧城市赛题上线山东大赛德州分赛场
  13. c语言对c99标准声明,C语言中C89与C99的区别
  14. 【推荐系统】智能推荐算法在直播场景中的应用
  15. mysql base64的编码与解码
  16. 用npm发布一个npm包
  17. 图解设计模式:空对象模式
  18. 飞凌小课堂-OK3399-C linux双千兆网口方案-RTL8153
  19. Cisco ACS 5.8 Radius认证服务器安装教程
  20. 治疗抑郁症,这款聊天机器人是认真的

热门文章

  1. Conan and Agasa play a Card Game codeforce
  2. poj2976Dropping tests (二分搜索+还是涉及昨天遇见的o1分数规划)
  3. [YTU]_2915(Shape系列-1)
  4. MATLAB双纵坐标绘图(重要)
  5. OpenCV学习--saturate_cast防止数据溢出
  6. OpenCV中矩阵的归一化*(Normalize函数)
  7. 打包后放在服务器上二级目录找不到解决办法
  8. Linux问题分析或解决_ssh无法连接
  9. Django-cookie的保存以及删除操作
  10. 基于JAVA的生产者消费者问题