消息无序产生的原因

消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端(Consumer),尽管消费端在拉取消息时是有序的,但各个消息由于网络等方面原因无法保证在各个消费端中处理时有序。

场景分析

先后两次修改了商品信息,消息A和消息B先后同步写入MySQL,接着异步写入消息队列中发送消息,此时消息队列生产端(Producer)按时序先后发出了A和B两条消息(消息A先发出,消息B后发出)。按业务逻辑,商品信息的最终状态需要以消息A和消息B综合为准。

看似一个比较常见的同步写数据库,异步发送消息的场景,但实际上需要保证消息的有序消费。

假设1:消息A只包含修改的商品名称,消息B只包含修改的商品重量,此时消息队列的消费端实际上不需要关注消息时序,消息队列消费端(Consumer)只管消费即可。

假设2:消息A包含修改的商品名称、重量,消息B包含修改的商品名称,此时消费端首先接收到消息B,后接收到消息A,那么消息B的修改就会被覆盖。此时消息队列的消费端实际上又需要关注消息时序。

可见,你无法保证消息中包含什么信息,此时必须保证消息的有序消费。

业务角度如何保证消息有序消费

生产端在发送消息时,始终保证消息是全量信息。

消费端在接收消息时,通过缓存时间戳的方式,消费消息时判断消息产生的时间是否最新,如果不是则丢弃,如果是则执行下一步。

下面通过伪代码的方式描述:

生产端伪代码

insertWare(ware); #插入数据到数据库,通常在插入数据库时我们只会update修改的字段,而不会全量插入

ware = selectWareById(ware.getId); #获取商品的全量信息(此时是最新的),用于将它放入到消息队列中

syncMq(ware); #异步发送mq消息A

消费端伪代码

ware = fetchWare(); #获取消息

if (isLasted(ware)) #通过商品的修改时间戳判断是否是最新的修改

​ TODO #执行下一步业务逻辑

else

​ return #丢弃该消息

重点在于消费端如何判断该消息是否是最新的修改也就是isLasted方法。

isLasted方法

Long modified = getCacheById(ware.getId); #获取缓存中该条商品的最新修改时间

If (ware.getModified > modified) { #如果消息中商品修改时间大于缓存中的时间,说明是最新操作

​ setCacheById(ware); #将该条消息的商品修改时间戳写入到缓存中

​ return true;

} else #如果消息中的商品修改时间小于缓存中的时间,说明该条消息属于“历史操作”,不对其更新

​ return false;

以上就是通过伪代码的方式,描述如何通过业务手段保证消息有序消费,重点在于全量发送信息和缓存时间戳。在其中还有一些技术实现细节。

例如:消费端消费消息B,执行到获取时间戳缓存之后,并在重新设置新的缓存之前,此时另一个消费端恰好也正在消费B它也正执行到获取时间戳缓存,由于消息A此时并没有更新缓存,消息A拿到的缓存仍然是旧的缓存,这时就会存在两个消费端都认为自己所消费的消息时最新的,造成该丢弃的消息没丢。

显然,这是分布式线程安全问题,分布式锁通常使用Redis或者ZooKeeper,加锁后的执行时序如下图所示。

这是从业务角度保证消息在消费端有序消费。通过在消息发送端全量发送消息以及在消息消费端缓存时间戳就可以保证消息的有序消费。

在上述场景中是先同步写入MySQL,再获取商品全量数据,接着再异步发送消息。这一系列的步骤可以通过接MySQL的binlog实现,在同步写入MySQL后,MySQL发送binlog变更,通过阿里巴巴Canal中间件接收MySQL的binlog变更再发送消息到消息队列。

java如何保证mq一定被消费_消费端如何保证消息队列MQ的有序消费相关推荐

  1. websphere mq 查看队列中是否有数据_全网最全的 “消息队列”

    消息队列的使用场景 以下介绍消息队列在实际应用常用的使用场景.异步处理.应用解耦.流量削锋和消息通讯四个场景. 1]异步处理:场景说明:用户注册后,需要发注册邮件和注册短信. 引入消息队列后架构如下: ...

  2. 消息队列的使用场景_消息队列MQ的特点、选型及应用场景

    一.什么是消息队列 消息队列(Message Queue,简称MQ),指保存消息的一个容器,本质是个队列. 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可 ...

  3. java队列_RPC远程调用和消息队列MQ的区别

    RPC和MQ同样都是用于分布式系统的两个很重要的技术,都有服务提供者.消费者的概念,可在一定程度上对系统进行解耦.但两者之间还是有区别的,本篇简单介绍~ 一.RPC RPC(Remote Proced ...

  4. 后端技术:消息队列MQ/JMS/Kafka相关知识介绍

    ?今天给大家分享消息队列MQ/JMS/Kafka相关知识介绍 1.消息队列介绍 首先举个收快递的栗子,传统的收快递,快递小哥把我们的快递送到我们的手里.他需要什么条件嗯? 快递小哥有时间送, 我们有时 ...

  5. 消息中间件系列(四):消息队列MQ的特点、选型、及应用场景详解

    前面集中谈了分布式缓存Redis系列: 高并发架构系列:分布式锁的由来.特点.及Redis分布式锁的实现详解 高并发架构系列:Redis并发竞争key的解决方案详解 高并发架构系列:Redis缓存和M ...

  6. 消息队列MQ夺命连环11问:kafka、rabbitmq、rocketmq、activemq

    <消息队列MQ如何保证消息的幂等性> <RabbitMQ架构> <ZeroMQ简介:一种高性能的异步消息传递库> <Rocketmq原理&最佳实践&g ...

  7. 消息队列 MQ 入门理解

    转载于阿里官方文档,把一些基础部分提炼整理了一下 功能特性: 应用场景: 消息队列 MQ 可应用于如下几个场景: 分布式事务 在传统的事务处理中,多个系统之间的交互耦合到一个事务中,响应时间长,影响系 ...

  8. 【消息队列MQ使用场景及测试点总结】

    消息队列MQ是一种应用程序对应用程序的通信方法.本质是一种先进先出的数据结构. MQ将消息持久化后,发送Ack消息给Client,此处有可能因为网络问题导致Ack消息无法发送到Client,那么Cli ...

  9. 【部署类】专题:消息队列MQ、进程守护Supervisor

    目录 1 背景需求 2 技术方案 2.1 消息队列 2.2 进程守护 3 源码介绍 3.1 supervisor部分 3.1.1 supervisord.conf 内容 3.1.2 MM3D.conf ...

  10. 阿里云消息队列MQ学习—阿里云大学视频课

    在刷ACE题的过程中,感觉对于消息队列部分的理解不是很深刻,这里来学习一下. 例行还是先走一遍阿里云大学的一些视频课程扫扫盲,选择如下课程: 阿里消息队列MQ简介:阿里巴巴中间件技术部自主研发的专业消 ...

最新文章

  1. 解读:为何在今年的淘宝造物节上!AR直播火到如此程度?
  2. axios的简单封装和http请求实践
  3. python装饰器原理-简单了解python装饰器原理及使用方法
  4. 基于 Quartz 开发企业级任务调度应用
  5. 织梦手机版list.php,解决织梦一级目录作域名list.php无法跳转到手机站的问题
  6. 使用google map实现周边搜索的功能_「转」“搜索”的原理,架构,实现,实践,面试不用再怕了...
  7. 2020运动相机推荐_2020年最好的全景运动相机推荐
  8. java 多线程 临界区_【Java并发性和多线程】竞态条件与临界区
  9. iOS研发助手DoraemonKit技术实现之Crash查看
  10. 基于Python的《庆余年》评论分析
  11. jmeter连接mysql并定义变量提供给后续接口使用
  12. SCI文章写作攻略—起飞站
  13. Java模拟醉汉行走问题_醉汉随机行走问题的统计学模型.pdf
  14. 如何将多个excel表格合并成一个_如何把两个excel表合并成一个
  15. 辐射避难所ol服务器维护,辐射避难所Online8月3日更新内容 育普雷斯顿up卡池
  16. Qt - QLabel设置字体颜色
  17. 超全园林 景观cc0高清摄影图片素材网站整理
  18. 前端(五)DOM 文档对象模型
  19. mv150us无线网卡驱动linux,水星MW150US2.0驱动-水星MW150US无线网卡驱动下载v2.0 官方最新版-西西软件下载...
  20. 如何用车辆历史违章查询API接口进行快速开发

热门文章

  1. C#递归的应用实例详解
  2. ASP.NET中过滤HTML字符串的两个方法
  3. python getchar_system(“pause”)和getchar()
  4. 开源之夏 | 阿里开源近百任务上线
  5. 全面 Severless 化只需要 7天!
  6. 5分钟完成业务实时监控系统搭建,是一种什么样的体验?
  7. 网易自动化UI测试解决方案Airtest Project亮相GDC
  8. Linux对运行服务操作命令
  9. OGG-00446 ERROR: Could not find archived log
  10. 日志挖掘(logminer)