聆听自己的声音

如果自己学不动了,或者感觉没有动力的时候,看看书,听听音乐,跑跑步,休息两天,重新出发,偷懒虽好,可不要贪杯。

话说上回书我们说到了,Redis的使用修改《【BCVP更新】StackExchange.Redis的异步开发方式》,通过异步的时候,基本上会解决StackExRedis组件使用过程中,可能在并发的时候遇到的问题,而且该组件也是微软官方推荐的(参考微软微服务框架eShopOnContainers),如果一定要抬杠说不好用,其实是没必要的。那今天我们继续往下说,简单说下如何基于Redis实现消息队列。

目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ等这几种。当然常见的还是基于RabbitMQ来实现的,Redis份额稍微小了一点,但是因为Redis的仓储、缓存等多个方面的好处,使得Redis也是很火。

1

什么是消息队列

这个其实我今天不打算重点讲,因为我详细每个人能看这篇文章,肯定都知道消息队列的相关内容,但是为了不那么突兀,我就从网上粘贴几块基本概念,了解一二:

基本概念:

消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。

消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列交互。消息会保存在队列中,直到接收者取回它。

最终可以实现解耦的目的。

下面通过一个简单的架构模型来解释:

  • Producer:消息生产者,负责产生和发送消息到Broker。

  • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个Queue。

  • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理。

有哪些优缺点:

从上边的定义中,我们可以看出来,优点主要是三块:

异步、流量削峰与流控、解耦

这三个优点在高并发等三高场景还是很有必要的,甚至说是十分必要的。

典型的广播模式,一个消息可以发布到多个消费者;

消息即时发送,消息不用等待消费者读取,消费者会自动接收到信道发布的消息;

比如我们某宝下订单,或某6抢车票,那都是放到队列里缓冲的,要是都用服务端等待,可能早就崩了,当然实际上比这个复杂的多。

而且,通过订阅发布的模式,异步执行,这样就会大大缓解时间压力。

但是,随之而来的弊端也是有的:
比如为了异步,就是接收者必须轮询消息队列,才能收到最近的消息。然后还有就是不能达到实时性,说白了就是用空间换时间,从而降低瓶颈。

消息一旦发布,不能接收。换句话就是发布时若客户端不在线,则消息丢失,不能寻回。不能保证每个消费者接收的时间是一致的。若消费者客户端出现消息积压,到一定程度,会被强制断开,导致消息意外丢失。

五种常见模式

简单模式Hello World
功能:一个生产者P发送消息到队列Q,一个消费者C接收

工作队列模式Work Queue
功能:一个生产者,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列

发布/订阅模式Publish/Subscribe
功能:一个生产者发送的消息会被多个消费者获取。一个生产者、一个交换机、多个队列、多个消费者

路由模式Routing
说明:生产者发送消息到交换机并且要指定路由key,消费者将队列绑定到交换机时需要指定路由key

通配符(主题)模式Topic
说明:生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;

更多具体的内容呢,自己感兴趣多去搜索下吧,肯定还是有很多其他问题的,我这里就不铺开了讲了,下边咱们就说说,如何在Blog.Core里添加队列吧。

2

订阅发布相关配置案例

案例有很多,自己可以根据情况自定义。

那既然要讲东西,肯定不能随便放一个算法,肯定是需要一个小demo,一个应用场景,这样更有助于初学者去理解,之前考虑了很多,一直没有想好在BlogCore里边使用什么案例场景来说一说消息队列,最后实在是没办法,只能说日志了,万事不决就说日志,好像软件开发都是这么举例的。

这里说一下,假设我们自定义了一个日志记录的方法,就是在txt里写数据,其实我现在也是这么用的,平时肯定会一边查一边写,如果并发高一下,肯定就会出现死锁或者异常的出现,那我们就可以把写日志放到消息队列里,缓冲一下,然后在写一个订阅者,专门来“盯着”队列,一有消息传过来,就写到日志文件里,这样就能很好的实现相应的目的。如果不缓冲下,有时候日志可能高达几万条,瞬间爆炸。

那说了这个小场景,接下来就简单的模拟一下吧。

1、定义消息队列操作类与接口

既然要发布和订阅消息,肯定就需要有相应的操作方法,在上一篇文章中,我新建了一个RedisBasketRepository.cs的操作类,那我们还继续在这个类文件里写吧,注意,这个实现类和接口,已经注册到服务容器了,如果你第一次操作,可以参考文章开头上篇文章内容:

 /// <summary>/// 根据key获取RedisValue/// </summary>/// <typeparam name="T"></typeparam>/// <param name="redisKey"></param>/// <returns></returns>public async Task<RedisValue[]> ListRangeAsync(string redisKey){return await _database.ListRangeAsync(redisKey);}/// <summary>/// 在列表头部插入值。如果键不存在,先创建再插入值/// </summary>/// <param name="redisKey"></param>/// <param name="redisValue"></param>/// <returns></returns>public async Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1){return await _database.ListLeftPushAsync(redisKey, redisValue);}/// <summary>/// 在列表尾部插入值。如果键不存在,先创建再插入值/// </summary>/// <param name="redisKey"></param>/// <param name="redisValue"></param>/// <returns></returns>public async Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1){return await _database.ListRightPushAsync(redisKey, redisValue);}/// <summary>/// 移除并返回存储在该键列表的第一个元素  反序列化/// </summary>/// <param name="redisKey"></param>/// <returns></returns>public async Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class{return JsonConvert.DeserializeObject<T>(await _database.ListLeftPopAsync(redisKey));}

我这里只是简单的Copy出来几个做例子,总的一共有12个,当然你也可以自定义增加或删除某些不必要的,核心的可以看出来,都是根据redisKey来操作的

Task<RedisValue[]> ListRangeAsync(string redisKey);Task<long> ListLeftPushAsync(string redisKey, string redisValue, int db = -1);Task<long> ListRightPushAsync(string redisKey, string redisValue, int db = -1);Task<long> ListRightPushAsync(string redisKey, IEnumerable<string> redisValue, int db = -1);Task<T> ListLeftPopAsync<T>(string redisKey, int db = -1) where T : class;Task<T> ListRightPopAsync<T>(string redisKey, int db = -1) where T : class;Task<string> ListLeftPopAsync(string redisKey, int db = -1);Task<string> ListRightPopAsync(string redisKey, int db = -1);Task<long> ListLengthAsync(string redisKey, int db = -1);Task<IEnumerable<string>> ListRangeAsync(string redisKey, int db = -1);Task<IEnumerable<string>> ListRangeAsync(string redisKey, int start, int stop, int db = -1);Task<long> ListDelRangeAsync(string redisKey, string redisValue, long type = 0, int db = -1);Task ListClearAsync(string redisKey, int db = -1);

2、如何发布消息与接收消息

上边定义好了相应的操作方法以后,就很简单了,我们来发布一条消息来试试:

 [HttpGet][AllowAnonymous]public async Task RedisMq(){var msg = "这里是一条日志";await _redisBasketRepository.ListLeftPushAsync(RedisMqKey.Loging, msg);}

就是这么简单,构造函数注入以后,直接调用相应的方法,就把消息msg推送到了队列里了,这里的redisKey,我用了常量定义,具体可操作Blog.Core源代码。

现在是发布消息特别简单,只需要一行接口,那如何去获取呢,在上边的获取方法中,我们定义的是:

Task<RedisValue[]> ListRangeAsync(string redisKey);

这个方法也是可以的,只不过我们需要对其进行转换,毕竟存的msg是字符串string类型的,但是这里的返回类型的RedisValue[],所以需要劈里啪啦转化一下。

但是这里有一个问题,就是如何去定时获取呢,也就是如何设计一个订阅者进行消费消息呢,这需要思考下,当然比较简单的就是while(true){},可能平时就是这么使用的,不过还是不是那么爽快,可以写一个组件来处理,简单快捷,正好,有一个大佬已经封装好了,我们可以直接拿来用,如果你有什么问题,可以给他提issue。

3、InitQ组件来订阅消息

在nuget中,可以直接安装组个组件:

<PackageReference Include="InitQ" Version="1.0.0.4" />

他的开源地址是:

https://github.com/wmowm/Initq

使用方法很简单,可以参考他的README里的介绍:

1、先添加服务

 /// <summary>
/// Redis 消息队列 启动服务
/// </summary>
public static class RedisInitMqSetup
{public static void AddRedisInitMqSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));services.AddInitQ(m =>{//时间间隔m.SuspendTime = 5000;//redis服务器地址m.ConnectionString = "127.0.0.1:6379";//对应的订阅者类,需要new一个实例对象,当然你也可以传参,比如日志对象m.ListSubscribe = new List<IRedisSubscribe>() { new RedisSubscribe()};//显示日志m.ShowLog = false;});}
}

2、定义订阅者

 public class RedisSubscribe : IRedisSubscribe{[Subscribe(RedisMqKey.Loging)]private async Task SubRedisLoging(string msg){Console.WriteLine($"队列{RedisMqKey.Loging} 消费到/接受到 消息:{msg}");await Task.CompletedTask;}}

整体很简单,继承接口,然后添加上特性,这个特性里的参数,就是我们消息发布的时候的那个key,然后方法的参数,就是对应的消息msg,是不是很简单。

当然这里你可以传递一个日志的对象实例,这样就把日志信息分流到了队列里,然后队列走到这个订阅者里,由这里进行缓冲,然后把日志填充到日志文件,从而达到减峰的目的。

最终的效果可以看看:

好啦,今天的redis消息队列已经说完了,还是很简单的,其中重点还是那五种模式要自己好好了解下,然后整体过程自己把握把握,至于RabbitMQ,这个以后再说吧。

END

扫码关注

老张的哲学

更多精彩等着你

【BCVP】实现基于 Redis 的消息队列相关推荐

  1. activemq消息丢失_基于Redis实现消息队列的典型方案

    基于Redis实现消息队列典型方案 1 概述 2 基于List的 LPUSH+BRPOP 的实现 3 PUB/SUB,订阅/发布模式 4 基于SortedSet有序集合的实现 5 基于 Stream ...

  2. 秒杀抢购异步下单:基于Redis的消息队列秒杀抢购异步下单功能

    学习Redis时,练习的实战项目代码--基于Redis的Stream类型的秒杀抢购异步下单. 说明: Redis的stream类型的消息队列实现异步下单功能.Redis版本至少要5.0及以上版本才可以 ...

  3. 基于Redis的消息队列php-resque

    转载:http://netstu.5iunix.net/archives/201305-835/ 最近的做一个短信群发的项目,需要用到消息队列.因此开始了我对消息队列选型的漫长路. 为什么选型会纠结呢 ...

  4. Redis做消息队列,香吗?

    来自:架构师修行之路 菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的 ...

  5. 程序员过关斩将--redis做消息队列,香吗?

    菜菜哥,我刚做完了一个订单系统,感觉很简单呀 说说看,大量的订单状态怎么处理的? 我设计的时候可是考虑了这一点,所以用了异步处理,采用了MQ 那用的什么MQ呢,透露一下呗 我用的redis做的MQ,很 ...

  6. java 结合redis队列_在 Java 中使用 redis 的消息队列服务

    前言 关于 redis 我们前面已经讨论过了缓存.分布式锁.分布式唯一标识.LBS服务的用法,这里我们来谈谈利用 redis 来实现一个消息服务. 典型的消息服务是一个生产者和消费者模式的服务.一般是 ...

  7. ​redis实现消息队列

    redis是一个开源的key-value存储系统.与Memcached类似,Redis将大部分数据存储在内存中,支持的数据类型包括:字符串.哈希表.链表.集合.有序集合以及基于这些数据类型的相关操作. ...

  8. c#进阶(4)—— Redis 用于消息队列的存储

    1.参考的博文 a : http://www.cnblogs.com/lori/archive/2012/04/12/2443708.html -- 主要的实现思路 b:  http://www.cn ...

  9. 队列消息在html中怎么排列,Redis实现消息队列

    redis只是提供一个高性能的.原子操作的分布式队列实现.具体的业务还是得需要你自己定制. 你的需CSS布局HTML小编今天和大家分享实际上是一个变形的生产者-消费者实现. 对于此类需CSS布局HTM ...

最新文章

  1. b树删除节点每次只能删一个吗_面试官,请不要问我B+树了!!
  2. 高分辨率扫描出来的图片有摩尔纹_ue4 摩尔纹 远处模型闪烁问题
  3. 青龙羊毛——东方头条(搬砖,非原创)
  4. [reference]-armv8汇编学习-书籍推荐
  5. ORB-SLAM2源代码中ROS部分ros-mono源代码中subscribe /camera/image_raw topic谁发布publish的
  6. 一个在Windows下的ping脚本(使用WMI 的Win32_PingStatus 实现)
  7. 处理文件、摄像头和图形用户界面
  8. 如何使用Python脚本
  9. javase 的一些基础常用类
  10. LeetCode(#26)————删除排序数组中的重复项
  11. 【HTML】建站成功默认页面
  12. java 垃圾回收知识点
  13. zipkin使用_我的Spring Cloud(十):Zipkin 服务跟踪
  14. java集合快速构建成树形json
  15. JMETER目录结构详解
  16. 【AHOI2009】【BZOJ1798】Seq 维护序列seq(线段树模板,易错提醒)
  17. 手把手带你SQLite3快速入门
  18. 在淘宝,我如何做好一个项目的启动?
  19. 职称英语 计算机哪报名,职称计算机考试报名
  20. ES系列-- ILM索引生命周期管理

热门文章

  1. QWaiteCondition思考3
  2. StackExchange.Redis 使用 (一)
  3. .net Mvc Controller 接收 Json/post方式 数组 字典 类型 复杂对象
  4. 硬盘安装 solaris
  5. CRM学习笔记(一)
  6. Asp.net 2.0 发送电子邮件
  7. cdh中使用hue使用教程_我可以在户外使用Philips Hue灯泡吗?
  8. 禁用内置键盘_如何禁用Windows 10的所有内置广告
  9. linux用户的根目录_为什么Linux允许用户删除根目录?
  10. POJ 3080 Blue Jeans (后缀数组)