什么是消息队列:

消息(Message)是指在应用间传输的数据,消息可以包括简答的文本字符串,也可以有嵌入对象等,消息队列(Message Queue)是一种应用间的通信方式,用来监视消息是否发送成功,确保消息传出,并基于数据通信来进行分布式系统的集成。

消息队列主流中间件:

当前使用较多的消息队列有 RabbitMQRocketMQActiveMQKafkaZeroMQMetaMQ 等,而部分数据库 如 RedisMySQL 以及 phxsql ,如果硬搞的话,其实也可实现消息队列的功能。

Redis消息队列适用范围:

适用于简单的业务场景,Redis是特别轻量级的消息队列。不需要用到RabbitMQ和Kafka的

消息队列基础:

三个角色:

生产者、消费者、消息处理中心

消息队列的异步处理模式:

消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。也就是实现松耦合。

List数据类型的消息队列

原理:

List列表是一种可以按照顺序进出,并插入排序的数据结构,可以当做异步队列处理,将需要处理的消息序列化成字符串塞进List,另一个线程从这个列表中轮询数据进行处理。

List基本操作:

1、LPUSH key value1 [value2]

与  RPUSH key value1 [value2]

将一个或多个值插入到列表头部

127.0.0.1:6379> LPUSH list1 "foo"           #返回值为列表长度
(integer) 1
127.0.0.1:6379> LPUSH list1 "bar"
(integer) 2
127.0.0.1:6379> LRANGE list1 0 -1          #查询列表值
1) "bar"
2) "foo"
redis 127.0.0.1:6379> RPUSH mylist "hello"         #执行 RPUSH 操作后,列表的长度。
(integer) 1
redis 127.0.0.1:6379> RPUSH mylist "foo"
(integer) 2
redis 127.0.0.1:6379> LRANGE mylist 0 -1
1) "hello"
2) "foo"

2、LPOP key

移出并获取列表的第一个元素

      RPOP key

移除列表的最后一个元素,返回值为移除的元素。

redis 127.0.0.1:6379> RPUSH list1 "foo"
(integer) 1
redis 127.0.0.1:6379> RPUSH list1 "bar"
(integer) 2
redis 127.0.0.1:6379> LPOP list1           #移除尾元素
"foo"
redis> RPUSH mylist "one"                 #RPUSH插入元素
(integer) 1
redis> RPUSH mylist "two"
(integer) 2
redis> RPUSH mylist "three"
(integer) 3
redis> RPOP mylist                               #RPOP移除尾元素
"three"
redis> LRANGE mylist 0 -1
1) "one"
2) "two"

3、LRANGE key start stop

获取列表指定范围内的元素

4、LINDEX key index

通过索引获取列表中的元素

redis 127.0.0.1:6379> LINDEX mylist -1       #下标为指定索引值的元素。
"World"
redis 127.0.0.1:6379> LINDEX mylist 3        # index不在 mylist 的区间范围内
(nil)

5、 LLEN key

获取列表长度

redis 127.0.0.1:6379> LLEN list1
(integer) 2                  #返回列表长度

6、LREM key count value

移除指定元素

redis> LREM mylist -2 "hello"          #删除指定元素
(integer) 2

#count > 0 : 从表头开始向表尾搜索,移除与 VALUE 相同的元素,数量为 COUNT 。
#count < 0 : 从表尾开始向表头搜索,移除与 VALUE 相同的元素,数量为 COUNT 的绝对值。
#count = 0 : 移除表中所有与 VALUE 相等的值。

list实现消息队列:

使用几个push和pop实现:

127.0.0.1:6379> lpush mylist a a b c d e
(integer) 6
127.0.0.1:6379> rpop mylist
"a"
127.0.0.1:6379> rpop mylist
"a"
127.0.0.1:6379> rpop mylist
"b"
127.0.0.1:6379> 

List缺陷

即时消费问题:

使用List作为消息队列可能会造成,如果消费者想要及时的处理消息数据,就要在程序中不断进行遍历,不断地进行pop命令,这会给程序造成一定的性能损失。

解决办法:

所以,Redis 还提供了 BLPOPBRPOP 这种阻塞式读取的命令(带 B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式就节省了不必要的 CPU 开销。

127.0.0.1:6379> lpush yourlist a b c d
(integer) 4
127.0.0.1:6379> blpop yourlist 10
1) "yourlist"
2) "d"
127.0.0.1:6379> blpop yourlist 10
1) "yourlist"
2) "c"
127.0.0.1:6379> blpop yourlist 10
1) "yourlist"
2) "b"
127.0.0.1:6379> blpop yourlist 10
1) "yourlist"
2) "a"
127.0.0.1:6379> blpop yourlist 10
(nil)
(10.02s)
BLPOP BLPOP key [key ...] timeout 移出并获取列表的第一个元素, 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止

消息处理机制(ACK机制)

问题描述:

List队列中的消息一旦发出去,就会从原队列中删除,但如果消费者因为否中网络原因,或者数据崩溃了,导致消费者没有接收到这条消息,此时就会丢失消息,这种问题就是缺少了消息确认机制

解决办法:

1、阻塞的从List中发送一条消息的同时,将这条消息复制到另一个队列中,当做备份消息。

2、在业务流程安全结束后,再删除该备份的队列元素,向队列发送受到消息状态,完成消息确认机制。

127.0.0.1:6379> rpush myqueue one
(integer) 1
127.0.0.1:6379> rpush myqueue two
(integer) 2
127.0.0.1:6379> rpush myqueue three
(integer) 3
127.0.0.1:6379> rpoplpush myqueue queuebak
"three"
127.0.0.1:6379> lrange myqueue 0 -1
1) "one"
2) "two"
127.0.0.1:6379> lrange queuebak 0 -1
1) "three"
BRPOPLPUSH BRPOPLPUSH source destination timeout 从列表中弹出一个值,将弹出的元素插入到另外一个列表中并返回它; 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
RPOPLPUSH RPOPLPUSH source destinationb 命令 RPOPLPUSH 在一个原子时间内,执行以下两个动作:将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端。将 source 弹出的元素插入到列表 destination ,作为 destination 列表的的头元素 RPOPLPUSH list01 list02

发布订阅模式实现消息队列

Redis 发布订阅 (pub/sub) 是一种消息通信模式:发送者 (pub) 发送消息,订阅者 (sub) 接收消息。

消息发布者,即publish客户端,无需独占链接,你可以在publish消息的同时,使用同一个redis-client链接进行其他操作(例如:INCR等)

 消息订阅者,即subscribe客户端,需要独占链接,即进行subscribe期间,redis-client无法穿插其他操作,此时client以阻塞的方式等待“publish端”的消息;这一点很好理解,因此subscribe端需要使用单独的链接,甚至需要在额外的线程中使用。

发布订阅原理

Redis 客户端可以订阅任意数量的频道。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

实例演示:

发布-订阅需要打开两个redis-cli客户端,首先先创建订阅频道

# SUBSCRIBEchannel1…订阅给定的一个或多个频道的信息。

redis 127.0.0.1:6379> SUBSCRIBE mychannel

Reading messages... (press Ctrl-C to quit)

1) "subscribe"                  #显示模式,subscribe是订阅

2) "mychannel"                #显示订阅的频道

3) (integer) 1                   #订阅成功返回1

1) "message"

2) "mychannel"

3) "a"

第二个redis-cli客户端,在同一个频道 mychannel发布消息

PUBLISH channel message

将信息发送到指定的频道。

redis 127.0.0.1:6379> PUBLISH mychannel "send the message to mtchannel"
(integer) 1             #发送消息成功返回1

此时第一个客户端,也就是订阅者客户端会显示发送的消息

# SUBSCRIBEchannel1…订阅给定的一个或多个频道的信息。

redis 127.0.0.1:6379> SUBSCRIBE mychannel

Reading messages... (press Ctrl-C to quit)

1) "subscribe"

2) "mychannel"

3) (integer) 1

1) "message"#显示模式,此处是消息

2) "mychannel"#显示订阅者频道

3) "send the message to mtchannel "#显示消息内容

订阅发布使用场景

该模式主要用于用户编辑某个模块需要清除缓存,需要在编辑完某模块后发布到频道,在订阅该频道实现该模块清除缓存。

订阅发布其他基本操作

1、PUBSUB subcommand [argument [argument ...]]

查看订阅与发布系统状态

redis 127.0.0.1:6379> PUBSUB CHANNELS
(empty list or set)#若此时有值,则返回活跃频道的列表

2、 UNSUBSCRIBE [channel [channel ...]]

指退订给定的频道。

redis 127.0.0.1:6379> UNSUBSCRIBE mychannel
1) "unsubscribe"
2) " send the message to mtchannel "
3) (integer) 0

模式匹配订阅

Redis 的Pub/Sub实现支持模式匹配。

如果需要订阅test.name   test.addr    test.age等test频道,可以直接订阅全风格的模式如:

PSUBCSCRIBE test.*

插入消息:

127.0.0.1:6379> PUBLISH test.name haha

(integer) 1

取消该订阅:

PUNSUBSCRIBE test.*

发布订阅MQ缺点

Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。

Stream数据类型

原理:

Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失,像一个仅追加内容的消息链表,将所有加入的消息串起来,且消息持久化。

增删改查消息操作例子:

127.0.0.1:6379> xadd mystream * f1 v1 f2 v2 f3 v3
"1609404470049-0"  ## 生成的消息 ID,有两部分组成,毫秒时间戳-该毫秒内产生的第1条消息# 消息ID 必须要比上个 ID 大
127.0.0.1:6379> xadd mystream 123 f4 v4
(error) ERR The ID specified in XADD is equal or smaller than the target stream top item# 自定义ID
127.0.0.1:6379> xadd mystream 1609404470049-1 f4 v4
"1609404470049-1"# -表示最小值 , + 表示最大值,也可以指定最大消息ID,或最小消息ID,配合 -、+ 使用
127.0.0.1:6379> xrange mystream - +
1) 1) "1609404470049-0"2) 1) "f1"2) "v1"3) "f2"4) "v2"5) "f3"6) "v3"
2) 1) "1609404470049-1"2) 1) "f4"2) "v4"127.0.0.1:6379> xdel mystream 1609404470049-1
(integer) 1
127.0.0.1:6379> xlen mystream
(integer) 1
# 删除整个 stream
127.0.0.1:6379> del mystream
(integer) 1
XADD 添加消息到末尾,保证有序,可以自动生成唯一ID

XADD key ID field value [field value ...]

XDEL 删除消息 XDEL key ID [ID ...]
XLEN 获取流包含的元素数量,即消息长度 XLEN key
XRANGE 获取消息列表,会自动过滤已经删除的消息 XRANGE key start end [COUNT count]

阻塞或非阻塞形式获取消息列表:

# 从ID是0-0的开始读前2条
127.0.0.1:6379> xread count 2 streams mystream 0
1) 1) "mystream"2) 1) 1) "1609405178536-0"2) 1) "f5"2) "v5"2) 1) "1609405198676-0"2) 1) "f1"2) "v1"3) "f2"4) "v2"# 阻塞的从尾部读取流,开启新的客户端xadd后发现这里就读到了,block 0 表示永久阻塞
127.0.0.1:6379> xread block 0 streams mystream $
1) 1) "mystream"2) 1) 1) "1609408791503-0"2) 1) "f6"2) "v6"
(42.37s)#$这个特殊的 ID 意思是 XREAD 应该使用流 mystream 已经存储的最大 ID 作为最后一个 ID。

创建消费者组:

在某些问题中,我们希望让不同的消费者从同一流中向许多不同客户端提供不同消费集,因此就需要创建消费者组。

消费者组基本命令:

Stream 不像 Kafak 那样有分区的概念,如果想实现类似分区的功能,就要在客户端使用一定的策略将消息写到不同的 Stream。

  • xgroup create:创建消费者组
  • xgreadgroup:读取消费组中的消息
  • xack:ack 掉指定消息
# 创建消费者组的时候必须指定 ID, ID 为 0 表示从头开始消费,为 $ 表示只消费新的消息,也可以自己指定
127.0.0.1:6379> xgroup create mystream mygroup $
OK# 查看流和消费者组的相关信息,可以查看流、也可以单独查看流下的某个组的信息
127.0.0.1:6379> xinfo stream mystream1) "length"2) (integer) 4  # 共 4 个消息3) "radix-tree-keys"4) (integer) 15) "radix-tree-nodes"6) (integer) 27) "last-generated-id"8) "1609408943089-0"9) "groups"
10) (integer) 1  # 一个消费组
11) "first-entry" # 第一个消息
12) 1) "1609405178536-0"2) 1) "f5"2) "v5"
13) "last-entry"  # 最后一个消息
14) 1) "1609408943089-0"2) 1) "f6"2) "v6"

按照消费组进行消费

方法:

1、使用xreadgroup指令进行消费组组内消费,它也可以阻塞等待新消息

2、当读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构中

3、客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

#  消费组 mygroup1 中的 消费者 c1 从 mystream 中 消费组数据
# > 号表示从当前消费组的 last_delivered_id 后面开始读
# 每当消费者读取一条消息,last_delivered_id 变量就会前进
#last_delivered_id :每个消费组会有个游标 last_delivered_id 在数组之上往前移动,表示当前消费#组已经消费到哪条消息了
127.0.0.1:6379> xreadgroup group mygroup1 c1 count 1 streams mystream >
1) 1) "mystream"2) 1) 1) "1609727806627-0"2) 1) "f1"2) "v1"3) "f2"4) "v2"5) "f3"6) "v3"
127.0.0.1:6379> xreadgroup group mygroup1 c1 count 1 streams mystream >
1) 1) "mystream"2) 1) 1) "1609727818650-0"2) 1) "f4"2) "v4"
# 已经没有消息可读了
127.0.0.1:6379> xreadgroup group mygroup1 c1 count 2 streams mystream >
(nil)# 还可以阻塞式的消费
127.0.0.1:6379> xreadgroup group mygroup1 c2 block 0 streams mystream >
µ1) 1) "mystream"2) 1) 1) "1609728270632-0"2) 1) "f5"2) "v5"
(89.36s)# 观察消费组信息
127.0.0.1:6379> xinfo groups mystream
1) 1) "name"2) "mygroup1"3) "consumers"4) (integer) 2  # 2个消费者5) "pending"6) (integer) 3   # 共 3 条正在处理的信息还没有 ack7) "last-delivered-id"8) "1609728270632-0"127.0.0.1:6379> xack mystream mygroup1 1609727806627-0  # ack掉指定消息
(integer) 1
XREAD 以阻塞或非阻塞方式获取消息列表 XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
XGROUP CREATE 创建消费者组 XGROUP [CREATE key groupname id-or-] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
XREADGROUP GROUP 读取消费者组中的消息 XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
XACK 将消息标记为"已处理" XACK key group ID [ID ...]

Redis消息队列三种方案相关推荐

  1. Redis 消息队列的三种方案(List、Streams、Pub/Sub)

    现如今的互联网应用大都是采用 分布式系统架构 设计的,所以 消息队列 已经逐渐成为企业应用系统 内部通信 的核心手段,它具有 低耦合.可靠投递.广播.流量控制.最终一致性 等一系列功能. 当前使用较多 ...

  2. Redis 消息队列的三种方案选型

    文章目录 Redis 消息队列的三种方案选型 消息队列(Message Queue,简称 MQ) 消息队列使用场景 Redis 消息队列应用背景,选型思考 Redis消息队列发展历程 在Redis中提 ...

  3. activemq消息丢失_基于Redis实现消息队列的典型方案

    基于Redis实现消息队列典型方案 1 概述 2 基于List的 LPUSH+BRPOP 的实现 3 PUB/SUB,订阅/发布模式 4 基于SortedSet有序集合的实现 5 基于 Stream ...

  4. redis消息队列,你还不敢用?

    文章目录 前言 一.关于消息队列 1.应用场景 2.如何设计消息队列 二.Redis 消息队列解决方案 1.基于 List 的消息队列解决方案 2.基于 zset 的消息队列解决方案 3.基于 Str ...

  5. Redis(十二) - Redis消息队列

    文章目录 一.Redis消息队列 1. 消息队列 2. 基于List结构模拟消息队列 3. 基于PubSub的消息队列 4. 基于Stream的消息队列 - 单消费模式 4. 基于Stream的消息队 ...

  6. Python 全栈系列122 redis消息队列搭建

    说明 不太喜欢rabbitmq之类的消息中间件,出问题不太好调试.打算使用redis替代. 内容 1 安装 1.1 版本 以docker方式安装,可以作为每台主机的一个标配.(其他的标配数据库还有mo ...

  7. Day741.Redis消息队列 -Redis 核心技术与实战

    Redis消息队列 Hi,我是阿昌,今天学习的相关内容是Redis消息队列内容. 现在的互联网应用基本上都是采用分布式系统架构进行设计的,而很多分布式系统必备的一个基础软件就是消息队列. 消息队列要能 ...

  8. Redis消息队列发展历程

    简介:Redis是目前最受欢迎的kv类数据库,当然它的功能越来越多,早已不限定在kv场景,消息队列就是Redis中一个重要的功能.Redis从2010年发布1.0版本就具备一个消息队列的雏形,随着10 ...

  9. 【Redis消息队列实现异步秒杀】--Redis学习笔记08

    前言 秒杀业务的优化思路: 先利用Redis完成库存余量.一人一单判断,完成抢单业务 再将下单业务放入队列中(阻塞队列,消息队列),利用独立线程异步下单 基于阻塞队列的异步秒杀存在哪些问题? 内存限制 ...

最新文章

  1. LeetCode Rotate List
  2. Android Service完全解析,关于服务你所需知道的一切(下)
  3. PanDownload复活了!速度60MB/s!
  4. linux配置串口不支持serial,linux 串口serial1和serial2不能用?
  5. P3435-[POI2006]OKR-Periods of Words【KMP】
  6. android 仿快递步骤_Android开发-类似物流快递进度效果
  7. ASP.NET状态管理详解,让你明明白白
  8. 如何分析android的OOM,与java静态代码分析工具
  9. Caffe中merge卷积和bn层的原理
  10. Atitit 游戏的通常流程 attilax 总结 基于cocos2d api
  11. yaahp层次分析法步骤_什么是层次分析法?(文末附yaahp软件)
  12. Pentaho相关组件下载,sourceforget加速
  13. android代码 发警报音,Android 8中的警报重复
  14. Dest0g3 520迎新赛部分WP
  15. 如何通过Pyqt 或者PySide 在电脑上直接显示手机屏幕。
  16. 晶振波形不是正弦波_晶振的3种输出波形,你了解吗?
  17. 科研写作——常见句式(二)
  18. bolb layer
  19. Cant bind to ngModel since it isnt a known property of input.ngtsc(-998002) app.component.ts(8, 7):
  20. 数博会上,马云马化腾李彦宏都说了什么

热门文章

  1. 解决Android 8.0 WebView回退失效(判断canGoBack()和调用goBack())
  2. 横切关注点的两种实现方法
  3. js 判断视频大小、上传视频
  4. 动态组件切换的2种实现方式
  5. ARCGIS10.6安装,ArcMap卡在loading document闪退问题
  6. 如何遍历 HashMap,遍历HashMap 的 5 种最佳方式
  7. 小迪渗透测试学习笔记(四)基础入门-WEB源码拓展
  8. 【C++】命名空间(namespace)详解
  9. Base64j加密解密、动态代理、正则表达式、单例多例设计模式、枚举、工厂设计模式
  10. QC协议+华为FCP+三星AFC快充取电5V9V芯片FS2601应用