工业级大数据接入MQ消息发送异常性及最终一致性解决方案-DW商业环境实战
版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。
1 工业级高并发数据接入消息一致性保障
- 消息中间件在高并发数据接入大数据平台时的的主要作用: 异步通讯、解耦、并发缓冲
- 消息发送一致性:是指产生消息的业务动作与消息发送的一致。 (也就是说,如果业务操作成功,那么由这个业务操作所产生的消息一定要成功投递出去,否则就丢消息)
2 工业级MQ数据接入消息一致性理论模型
版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。
2.1 从主动方业务来分析
预发送消息失败,消息未进存储,业务操作未执行(可能的原因:主动方应用、网络、消息中间件、消息存储),数据此时保持一致。
预发送消息后,主动方业务没有收到返回消息存储结果,分为两种可能
(1)消息未进存储,业务操作未执行 数据此时保持一致。(2)消息已进存储(待确认),业务操作未执行 数据此时保持不一致。 复制代码
收到消息存储成功的返回结果,但未执行业务操作就失败
消息已进存储(待确认),业务操作未执行 数据此时保持不一致。 复制代码
2.2 从消息中间件的角度来分析:
消息中间件没有收到主动方应用的业务操作处理结果
(1)消息已进存储(待确认),业务操作未执行(或 业务操作出错回滚了) 数据此时保持不一致(2)消息已进存储(待确认),业务操作成功 数据此时保持不一致 复制代码
消息中间件收到业务操作结果(成功/失败),但处理消息存储中的消息状态失败
(1)消息已进存储(待确认),业务操作未执行(或业务操作出错回滚了) 数据此时保持不一致 (2)消息已进存储(待确认),业务操作成功 数据此时保持不一致 复制代码
2.3 可靠消息的整体流程
- 主动方应用先把消息发给消息中间件,消息状态标记为“待确认”;
- 消息中间件收到消息后,把消息持久化到消息存储中,但并不向被动方应用投递消息;
消息中间件返回消息持久化结果(成功/失败),主动方应用根据返 回结果进行判断如何进行业务操作处理:
a) 失败:放弃业务操作处理,结束(必要时向上层返回失败结果);b) 成功:执行业务操作处理; 复制代码
- 业务操作完成后,把业务操作结果(成功/失败)发送给消息中间件;
消息中间件收到业务操作结果后,根据业务结果进行处理;
a) 失败:删除消息存储中的消息,结束;b) 成功:更新消息存储中的消息状态为“待发送(可发送)”; 复制代码
- 被动方应用监听并接收“待发送”状态的消息,执行业务处理;
- 业务处理完成后,向消息中间件发送ACK,确认消息已经收到(消息 主动方应用主机/主机集群 消息中间件主机/主机集群 被动方应用主机/主机集群 中间件将从队列中删除该消息)。
2.4 消息重复发送异常分析
- 1 被动方应用接收到消息,业务处理完成后应用出问题,消息中间件不知道消息处理结果,会重新投递消息。
- 2 被动方应用接收到消息,业务处理完成后网络出问题,消息中间件收不到消息处理结果,会重新投递消息。
- 3 被动方应用接收到消息,业务处理时间过长,消息中间件因消息超时未确认,会再次投递消息。
- 4 被动方应用接收到消息,业务处理完成,消息中间件问题导致收不到消息处理结果,消息会重新投递。
- 5 被动方应用接收到消息,业务处理完成,消息中间件收到了消息处理结果,但由于消息存储故障导致消息没能成功确认, 消息会再次投递。
3 工业级MQ数据接入消息一致性实际处理方案
版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。
3.1 优点准则
- 消息确认及恢复系统
- 消费者流控确认系统进行消息消费确认,并删除原生消息。
- 从应用层弱化对MQ消息可靠性的依赖,通过应用端消息重试及恢复系统和消费者流控确认系统来加强消息的一致性。
- 通过应用层的监听和回调,保证了消息的时效性。
3.2 弊端准则
- 消息系统的可靠性服务(消息重试及恢复系统和消费者流控确认系统)与具体业务场景进行绑定,耦合性强,不可共用
- 消息数据和业务数据同一数据库,占用了业务系统资源。
4 工业级MQ数据接入消息一致性实际处理优化演进方案
4.1 优点准则
- 消息服务子系统可以独立部署,独立维护及弹性伸缩
- 减轻与应用层的强耦合,并弱化对MQ消息可靠性的依赖。
4.2 弊端准则
- 消息的发送需要两次请求
- 产品线主动方系统需要提供操作状态校验查询接口查询
4.3 核心代码剖析
消息服务子系统接口规范
public interface RpTransactionMessageService {/*** 预存储消息. */public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 确认并发送消息.*/public void confirmAndSendMessage(String messageId) throws MessageBizException;/*** 存储并发送消息.*/public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 直接发送消息.*/public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 重发消息.*/public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;/*** 根据messageId重发某条消息.*/public void reSendMessageByMessageId(String messageId) throws MessageBizException;/*** 将消息标记为死亡消息.*/public void setMessageToAreadlyDead(String messageId) throws MessageBizException;/*** 根据消息ID获取消息*/public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;/*** 根据消息ID删除消息*/public void deleteMessageByMessageId(String messageId) throws MessageBizException;/*** 重发某个消息队列中的全部已死亡的消息.*/public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException; 复制代码
}
消息类型及核心字段
public class RpTransactionMessage extends BaseEntity {private static final long serialVersionUID = 1757377457814546156L;private String messageId;private String messageBody;private String messageDataType;private String consumerQueue;private Integer messageSendTimes;private String areadlyDead;private String field1;private String field2;private String field3;} 复制代码
消息服务子系统核心实现
预发送消息public int saveMessageWaitingConfirm(RpTransactionMessage message) {if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");}message.setEditTime(new Date());message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());message.setAreadlyDead(PublicEnum.NO.name());message.setMessageSendTimes(0);return rpTransactionMessageDao.insert(message);}消息确认并发送 public void confirmAndSendMessage(String messageId) {final RpTransactionMessage message = getMessageByMessageId(messageId);if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");}message.setStatus(MessageStatusEnum.SENDING.name());message.setEditTime(new Date());rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}保存并发送不进行预发送public int saveAndSendMessage(final RpTransactionMessage message) {if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");}message.setStatus(MessageStatusEnum.SENDING.name());message.setAreadlyDead(PublicEnum.NO.name());message.setMessageSendTimes(0);message.setEditTime(new Date());int result = rpTransactionMessageDao.insert(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});return result;}直接发送 public void directSendMessage(final RpTransactionMessage message) {if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");}notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}重复发送public void reSendMessage(final RpTransactionMessage message) {if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");}if (StringUtil.isEmpty(message.getConsumerQueue())) {throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");}message.addSendTimes();message.setEditTime(new Date());rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});}public void reSendMessageByMessageId(String messageId) {final RpTransactionMessage message = getMessageByMessageId(messageId);if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");}int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));if (message.getMessageSendTimes() >= maxTimes) {message.setAreadlyDead(PublicEnum.YES.name());}message.setEditTime(new Date());message.setMessageSendTimes(message.getMessageSendTimes() + 1);rpTransactionMessageDao.update(message);notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());notifyJmsTemplate.send(new MessageCreator() {public Message createMessage(Session session) throws JMSException {return session.createTextMessage(message.getMessageBody());}});} 标记死亡public void setMessageToAreadlyDead(String messageId) {RpTransactionMessage message = getMessageByMessageId(messageId);if (message == null) {throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");}message.setAreadlyDead(PublicEnum.YES.name());message.setEditTime(new Date());rpTransactionMessageDao.update(message);} 复制代码
5 最大努力通知型业务逻辑实现工业预警
业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。
业务活动的被动方根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。
适用范围
• 对业务最终一致性的时间敏感度低• 跨企业的业务活动 复制代码
6 总结
本文结合工业大数据高并发数据接入场景,通过弱化MQ的消息一致性,加强业务系统的一致性保证,实现了工业级大数据的数据仓库建设。
秦凯新 于深圳
工业级大数据接入MQ消息发送异常性及最终一致性解决方案-DW商业环境实战相关推荐
- rocket mq 监听端口_MQ消息最终一致性解决方案
随着分布式服务架构的流行与普及,原来在单体应用中执行的多个逻辑操作,现在被拆分成了多个服务之间的远程调用.虽然服务化为我们的系统带来了水平伸缩的能力,然而随之而来挑战就是分布式事务问题,多个服务之间使 ...
- Cris 玩转大数据系列之消息队列神器 Kafka
Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 文章目录 Cris 玩转大数据系列之消息队列神器 Kafka Author:Cris 1. Kafka 概述 1.1 消息队 ...
- 星火计划 | Apache InLong一站式大数据接入平台沙龙火热报名中
导语:随着数据的增长以及业务场景的多元化,消息中间件已经成为IT架构中不可或缺的组件.它可以降低系统间的耦合性与复杂度,提升系统稳定性,为大数据时代的数据采集提供巨大助力. 腾讯大数据在2013年自研 ...
- 《大数据》杂志——大数据容灾备份技术挑战和增量备份解决方案
大数据容灾备份技术挑战和增量备份解决方案 罗圣美1,2,李 明1,叶郁文1 (1.中兴通讯股份有限公司 南京 210012: 2.清华大学计算机科学与技术系 北京 100084) 摘要:大数据已成为当 ...
- 《大数据架构和算法实现之路:电商系统的技术实战》——1.5 相关软件:R和Mahout...
本节书摘来自华章计算机<大数据架构和算法实现之路:电商系统的技术实战>一书中的第1章,第1.5节,作者 黄 申,更多章节内容可以访问云栖社区"华章计算机"公众号查看. ...
- 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战
大数据Spark "蘑菇云"行动第76课: Kafka+Spark Streaming+Redis项目实战 jedis插件 redis <dependency> ...
- 大数据学习系列之八----- Hadoop、Spark、HBase、Hive搭建环境遇到的错误以及解决方法
大数据学习系列之八----- Hadoop.Spark.HBase.Hive搭建环境遇到的错误以及解决方法 参考文章: (1)大数据学习系列之八----- Hadoop.Spark.HBase.Hiv ...
- EI会议论文,第二届云计算、大数据与数字经济国际学术会议最终截稿倒计时10天
第二届云计算.大数据与数字经济国际学术会议最终截稿倒计时10天! 诚邀各专家学者投稿参会,七月共聚大连.
- 《大数据架构和算法实现之路:电商系统的技术实战》——2.4 案例实践
本节书摘来自华章计算机<大数据架构和算法实现之路:电商系统的技术实战>一书中的第2章,第2.4节,作者 黄 申,更多章节内容可以访问云栖社区"华章计算机"公众号查看. ...
最新文章
- 公开平等的企业文化是OKR落地第一步
- TechTarget数据库Redis
- Python中一些高效的数据操作
- linux ip别名和辅助ip地址
- Linux 命令之 arch --显示主机的硬件结构类型
- jeecg 与 jeecg-p3有什么区别?
- Docker学习六:综合实践
- 基于FPGA的跨时钟域信号处理——专用握手信号
- String或Integer补0操作
- linux中oppenoffice的安装
- Preferences DataStore------JAVA
- 【Windows】如何关闭svchost,win10系统svchost下载占网速,无缘无故下载软件
- Android studio使用SVN
- System.Web.Security
- MBA-day1数学-应用题-利润问题
- 【电影推荐】20部生存启示录—灾难大片
- 足球比赛常用英语单词集锦
- 使用frp配置内网机器访问
- Ajax跨域请求保证同一个session的问题
- iOS 苹果授权登录(Sign in with Apple)