基于 MySQL 的批量事务消息队列

消息队列本质上是一个存储介质,通常是链表结构,不同的进程或线程可以向消息队列中写入或读取消息。消息队列的使用场景有很多,比如异步处理任务、应用解耦、流量削锋等等。通常我们使用消息队列,都是直接使用 MNS、RocketMQ、Kafka 等产品。但某些场景下这些产品也难以满足,或者使用起来成本比较高,比如:批量创建大量(比如 1 万条)消息,并且要么都写入,要么不写入。这时候就需要考虑一些别的方案了。总的来说该场景主要需要满足以下几个条件:批量创建消息

批量创建要保证事务

消费不能被重复消费

接下来就以上述的场景为例,分别分析不同消息队列所面临的问题,以及如何使用 MySQL 来实现一个支持批量事务消息队列。

基于 MNS / RocketMQ

MNS 是阿里云提供的一个分布式消息服务,RocketMQ 是阿里云基于 Apache RocketMQ 构建的分布式消息中间件。

MNS 可以使用 SendMessage 接口向队列发送一条消息,也可以使用 BatchSendMessage 批量创建消息,但该接口一次最多发送 16 条消息。而 RocketMQ 不支持批量创建消息。

所以对于一般的消息队列产品,只能通过多次调用发送单条/(有限的)多条创建消息的接口,来实现批量创建大量消息。但多次调用接口,就很难保证这些调用的事务性,很难保证这一批消息要么都成功写入消息队列,要么都不写入。

综上,MNS、RocketMQ 等消息队列,比较难实现事务性地批量创建消息。

基于 Redis Lists

Redis 是一个经常用来做消息队列的数据库。Redis 的 lists 是一个链表,基于 lists 可以很方便实现一个轻量级的消息队列。

lpush/rpop 或 lpush/brpop

消息队列的操作,就是写入和读取,所以首先可以想到的是把 list 作为一个消息队列。对于生产者,使用 lpush 写入数据到队列头部,消费者通过轮询的方式,每次循环使用 rpop 从队列尾部取出一条消息进行处理。通过 rpop 取出数据后,数据就不在 list 中存在了,所以不同消费者就只能取到不同的数据。另外消费这也可以通过 brpop 阻塞式地从 list 中读取消息,brpop 会在 list 中没有任何元素的时候阻塞连接,这种方式效率更高。

从 2.4 开始,lpush 支持传入多个 elements:LPUSH key element [element ...],这样我们可以很方便使用 lpush 批量向队列中写入消息。

但 lpush/rpop 这种方案实现的消息队列是不可靠的。例如,当消费这通过 rpop 取出消息后,出现了网络问题或者消费者端崩溃了, 那么这个消息就丢失了。所以一般不能简单使用 lpush/rpop 来实现消息队列。

rpoplpush 或 brpoplpush

那么如何使消息队列可靠呢?可以通过 rpoplpush (或 brpoplpush)来实现。rpoplpush source destination 可以从 source 列表中取出一个元素,并将该元素写入到 destination 列表,这两个操作是一个原子操作。

比如我们定义队列的 list 为 queue,正在处理的队列为 processing,消费者可以通过 rpoplpush queue processing 取出消息进行处理,处理成功后,再使用 lrem processing 将消息从 processing 中删除。这样当消费过程中出现异常,消息就会留存在 processing 中。然后我们可以通过另一个 worker ,监听 processing 并将超时的消息取出来,再放回到 queue。但问题就在于监听 processing 的 worker 需要客户端实现,为了计算消息的超时时间,可能还需要别的介质来如 hash 表来存储消息的取出时间,复杂度又上升了。

除了使用 processing 表,还有一种方案就是使用循环队列,即 rpoplpush queue queue ,source 和 destination 是同一个队列。每次从队列尾部取出待消费的消息,同时将消息放在队列头部,消费完成后,使用 lrem queue 将消息删除。但也存在一个问题,当队列中消息数量小于 worker 数量时,不同 worker 就很可能读取到同一个消息,造成消息被重复消费。当然,如果 worker 能够容忍消息被重复处理,这种方式是可行的。

总的来说,基于 Redis Lists,可以实现消息的批量创建,并保证创建消息的事务性,但难以实现消息不被重复消费。

基于 MySQL 的批量事务消息队列

基于 Redis Lists 的消息队列之所以难以避免消息被重复消费,主要是异常消息重试导致的。因为数据一旦从 Redis Lists 中取出来,就只能重新 lpush 回去,难以保证取出消息和消费消息的事务(消息一定是成功消费后再删除)。

那么如何基于 MySQL 实现一个不重复消费的队列呢?

批量事务消息

MySQL 的消息队列的数据写入和 Redis 类似,只是 MySQL 是使用表来存储消息。比如新建一张 queue 表:

CREATE TABLE `queue` (

`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',

`created_at` datetime NOT NULL COMMENT '创建时间',

`updated_at` datetime NOT NULL COMMENT '修改时间',

`retry_times` int(10) unsigned NOT NULL COMMENT '重试次数',

`message` varchar(255) NOT NULL COMMENT '消息内容',

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息队列'

对于 MySQL 消息队列,批量写入消息很容易,直接通过 insert 批量写入数据即可: insert into queue(id,...) values(...)(...)...。并且 SQL 语句也可以放在事务中,很容易实现批量消息的事务。

队列消费者的实现,就是实现一个定时任务,定时从 queue 表中查询出消息进行处理,处理完成之后再删掉该行数据:读取一条消息: select * from queue order by id asc, retry_times asc limit 1

处理该消息

如果处理成功,则删除该消息: delete from queue where id

如果处理失败,则更新 retry_times: update queue set retry_times = retry_times + 1 where id

这样就能保证消息一定是成功被消费,如果消费失败,则消息依旧留存在 queue 表中,等待下一次被消费。

MySQL 队列的最大问题是,当有多个进程都在从 queue 中读取消息进行消费时,很容易读取到重复的行,进而导致重复消费。如何解决这个问题呢?

基于乐观锁的消息消费

最容易想到的就是锁,并且是行锁;如果是表锁,则 SQL 的执行效率就非常低了。主要就是通过锁实现:当一行数据被一个 worker 读取后,就不能被其他 worker 读取。

读取数据时的行锁,最容易想到的就是 select * from queue where id = 1 for update,该 SQL 会给 id 为 1 的行加上排他锁,当一个 worker 执行该 SQL 的时候,别的 worker 就无法读取这一行。但如果 select ... for updat 中的 where 条件没有索引,行锁就会升级为表锁。而我们最开始操作数据的时候,是不知道 id 的。所以没办法直接使用这条 SQL 来查询需要处理的数据,需要另辟蹊径。

其实也很简单,假设我们有 10 个 worker 来消费队列,那么每个 worker 可以先批量查询 100 条数据,然后随机选择其中一条,得到对应的 id。接下来再使用 select * from queue where id = ? for update 来给每个 worker 需要处理的一行数据加锁。流程如下:select * from queue order by id asc, retry_times asc limit 100

随机选择一行数据,得到 id

读取该行数据并加锁 select * from queue where id = ? for update

这样不同 worker 取得同一行数据的概率只有 1/10,由于排他锁的存在,不同 worker 无法同时读取到这一行数据。

排他锁虽然实现了一行数据只能被一个 worker 读取,但一旦不同 worker 随机选择了同一个 id,则后执行的 SQL 就会一直阻塞,直到排他锁被释放。如果抢到锁的 worker 处理速度很慢,则其他 worker 的 SQL 就会阻塞很长时间。显然这种方案还不够好。

那么有没有更好的方案?排他锁的主要问题是,该行锁会阻塞其他 SQL,所以使用一种不会阻塞的行锁就可以了。

首先给 queue 表增加一个字段:lock_id:

CREATE TABLE `queue` (

`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',

`created_at` datetime NOT NULL COMMENT '创建时间',

`updated_at` datetime NOT NULL COMMENT '修改时间',

`retry_times` int(10) unsigned NOT NULL COMMENT '重试次数',

+ `lock_id` char(64) NOT NULL COMMENT '为了加行锁的标志,如 hostname-pid',

`message` varchar(255) NOT NULL COMMENT '消息内容',

PRIMARY KEY (`id`)

) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息队列'

然后不同 worker 还是和之前方案一样,批量读取数据,随机选择一行,然后将该行数据的 lock_id 更新为当前进程的 {hostname}-{pid},接下来再根据 lock_id 去查询一行数据,这样就能保证不同 worker 读取的是不同的数据了。如果 A、B 两个 worker 随机选择了同一个 id,那么造成的影响就是,A 进程的 lock_id 可能被 B 进程覆盖,导致 A 进程最后查询不到可以消费的数据,造成 A 进程的浪费。在保证准确性的情况下,这种浪费也是允许的。详细流程如下:select * from queue where lock_id = '' order by id asc, retry_times asc limit 100

随机选择一行数据,得到 id

为该行数据加锁 update queue set lock_id = {hostname}-{pid}, updated_at = now()

读取对应数据进行消费 select queue where lock_id = {hostname}-{pid}

如果消费成功,删除该行数据 delete from queue where id

如果消费失败,释放锁 update queue set lock_id = '', retry_times = retry_times + 1, updated_at = now()

释放锁失败后的重试

通过 lock_id 实现基于乐观锁消费,就能保证一行数据一定只被一个 worker 消费。现在还面临一个问题就是,如果消费失败,需要释放锁,如果释放锁失败怎么处理?

这个时候就可以加一个兜底方案。在消费消息最后,查询出有 `lock_id` 并且超时的数据,这些数据只在两种情况会产生:消费消息超时

释放锁失败

从队列中移除消息失败

前两种情况,可以直接重新释放锁,即将 `lock_id` 设置为空字符串,等待下一次轮询去处理。

“移从列表中移除消息失败”,准确来说是:消息消费成功了,但没有从队列中移除。解决这个问题,可以把 ”删除消息“ 和 ”消费消息“ 做成一个事务,就能保证消息一定是消费成功才被删除。

这样就实现了一个可靠的消息队列。

总结

针对批量创建事务消息的场景,将 RocketMQ、Redis Lists 和 MySQL 消息队列做个简单的对比,如下:

|| RocketMQ | 基于 Redis Lists 的消息队列 | 基于 MySQL 的消息队列 |

| -------- | -------- | -------- |-------- |

| 批量创建 | 否 | 是 | 是 |

| 批量消息的事务 | 否 | 是 | 是 |

| 避免重复消费 | 是 | 否 | 是 |

总的来说,MNS、RocketMQ 等产品,由于不支持批量写入数据到队列,所以难以满足批量创建事务消息的需求,实现起来成本比较高。而使用 Redis Lists 可以很方便实现一个批量写入数据的消息队列,但难以保证消息只被消费一次。使用 MySQL 实现一个消息队列,批量写入可以通过 insert 一次写入多条消息,并通过乐观锁的方式保证一条消息只被消费一次。当然,技术没有好与不好,只有适合与不适合,上述几种消息队列都有各自适合的场景,最终还是要根据实际需求进行选择。

mysql 消息队列_一个简单的 MySQL 批量事务消息队列相关推荐

  1. mysql建立数据浏览器_一个简单的MySQL数据浏览器

    一个简单的MySQL数据浏览器 2021-01-21 16:17:28679 这个程序可以用来浏览MySQL中的数据,您可以稍做修改就可以做出很不错的MySQL浏览器. */ /* ?cmd=db ? ...

  2. mysql浏览器_一个简单的MySQL数据浏览器

    一个简单的MySQL数据浏览器 更新时间:2006年10月09日 00:00:00   作者: 这个程序可以用来浏览MySQL中的数据,您可以稍做修改就可以做出很不错的MySQL浏览器. */ /* ...

  3. mysql建立数据浏览器_一个简单的MySQL数据浏览器_php

    这个程序可以用来浏览mysql中的数据,您可以稍做修改就可以做出很不错的MySQL浏览器. */ /* ?cmd=db ?cmd=table&db={} http://www.gaodaima ...

  4. android实现mysql数据库存储_一个简单的Android端对象代理数据库系统的实现(二、执行+存储)...

    这是我之前在武汉大学彭智勇老师那边做过的一个对象代理数据库系统.文中给出了一整个系统的几乎所有代码,经测试可正常运行.文章比较长,超出了知乎的最长文章范围,因此分为两篇文章.这是第二篇. 执行 执行部 ...

  5. mysql多副本搭建_一个简单的MySQL多实例环境搭建

    安装mysql 初始化两个数据库目录 mysql_install_db --datadir=/usr/local/var/mysql1 --user=mysql mysql_install_db -- ...

  6. linux mysql 死锁进程_一个罕见的MySQL redo死锁问题排查及解决过程

    作者:张青林,腾讯云布道师.MySQL架构师,隶属腾讯TEG-基础架构部-CDB内核开发团队,专注于MySQL内核研发&相关架构工作,有着服务多个10W级QPS客户的数据库优化及稳定性维护经验 ...

  7. mysql.connector写了一个简单的mysql操作类:

    mport pymysql class MyDB(): def __init__(self, host="127.0.0.1", username="root" ...

  8. mysql查询错误_一个奇怪的MySQL查询错误

    t_user表的phone_number字段是varchar(255)类型的,表示手机号, 在查询某个手机号时,sql语句如下: SELECT phone_number  FROM t_user WH ...

  9. java 简单阻塞队列,制作一个简单的任务队列(使用阻塞队列)

    充分利用阻塞队列的特性,队列中没有任务时,让线程阻塞.代码如下: import java.util.concurrent.BlockingQueue; import java.util.concurr ...

最新文章

  1. AndroidStudio开发jni不加载libs的so,只加载jniLibs的so
  2. RTEMS 的 AT91SAM9260 移植(8): 编译
  3. 数据仓库中的SQL性能优化 - Hive篇
  4. eventBus 与fragment
  5. 1.在c 语言中字符型占,C程序设计试题1.doc
  6. 工作汇报ppt案例_【赠书】开工大吉!今年一定要干过写PPT的!
  7. 【HTML5】页面点击按钮添加一行 删除一行 全选 反选 全不选
  8. cron linux_如何在Linux中使用cron
  9. 推荐一首歌,个人感觉不错...
  10. Python基础函数学习笔记(二)
  11. vue小案例一:todolist
  12. 摩尔定律终结与科学大停滞
  13. css样式的灵异事件
  14. Simple Contrastive Representation Adversarial Learning for NLP Tasks
  15. Docker拉取镜像报错error pulling image configuration
  16. Altium Designer快捷键布线无法实现网络线自动编号
  17. [转]:Quartus II LPM使用指南-FIFO篇
  18. Macbook电池优化的七种建议
  19. 大一新生必会的c语言五子棋!PVP,PVE,EVE模式都有,还有智能的AI部分,复盘等内容!一看就会的五子棋教程,确定不来看看吗?
  20. 曝光中国女性的私密数据-广西富婆多,快上车!

热门文章

  1. Python:实现scoring functions评分函数算法(附完整源码)
  2. 知乎python练手的_Python新手应该如何练手?知乎5600赞答案告诉你!
  3. Java SDK和Java JDK的区别
  4. 使学习效率提高5倍的20个起始步骤
  5. 深入分析Linux虚拟化KVM-Qemu之ARMv8虚拟化
  6. JVM虚拟机的理解(上)
  7. 豆瓣电台WP7客户端 开发记录5
  8. 数据中台与数据仓库区别
  9. C语言实战篇-----调试关键参数+printf输出_文件名_函数名_执行数!!!
  10. RatingBar基本使用