.net平台的rabbitmq使用封装
前言
RabbitMq大家再熟悉不过,这篇文章主要整对rabbitmq学习后封装RabbitMQ.Client的一个分享。文章最后,我会把封装组件和demo奉上。
Rabbitmq的关键术语
1、绑定器(Binding):根据路由规则绑定Queue和Exchange。
2、路由键(Routing Key):Exchange根据关键字进行消息投递。
3、交换机(Exchange):指定消息按照路由规则进入指定队列
4、消息队列(Queue):消息的存储载体
5、生产者(Producer):消息发布者。
6、消费者(Consumer):消息接收者。
Rabbitmq的运作
从下图可以看出,发布者(Publisher)是把消息先发送到交换器(Exchange),再从交换器发送到指定队列(Queue),而先前已经声明交换器与队列绑定关系,最后消费者(Customer)通过订阅或者主动取指定队列消息进行消费。
那么刚刚提到的订阅和主动取可以理解成,推(被动),拉(主动)。
推,只要队列增加一条消息,就会通知空闲的消费者进行消费。(我不找你,就等你找我,观察者模式)
拉,不会通知消费者,而是由消费者主动轮循或者定时去取队列消息。(我需要才去找你)
使用场景我举个例子,假如有两套系统 订单系统和发货系统,从订单系统发起发货消息指令,为了及时发货,发货系统需要订阅队列,只要有指令就处理。
可是程序偶尔会出异常,例如网络或者DB超时了,把消息丢到失败队列,这个时候需要重发机制。但是我又不想while(IsPostSuccess == True),因为只要出异常了,会在某个时间段内都会有异常,这样的重试是没意义的。
这个时候不需要及时的去处理消息,有个JOB定时或者每隔几分钟(失败次数*间隔分钟)去取失败队列消息,进行重发。
Publish(发布)的封装
步骤:初始化链接->声明交换器->声明队列->换机器与队列绑定->发布消息。注意的是,我将Model存到了ConcurrentDictionary里面,因为声明与绑定是非常耗时的,其次,往重复的队列发送消息是不需要重新初始化的。
1 /// <summary> 2 /// 交换器声明 3 /// </summary> 4 /// <param name="iModel"></param> 5 /// <param name="exchange">交换器</param> 6 /// <param name="type">交换器类型: 7 /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 8 /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 9 /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog 10 /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都 11 /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 12 /// 交换机转发消息是最快的。 13 /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多 14 /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 15 /// 只会匹配到“audit.irs”。</param> 16 /// <param name="durable">持久化</param> 17 /// <param name="autoDelete">自动删除</param> 18 /// <param name="arguments">参数</param> 19 private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct, 20 bool durable = true, 21 bool autoDelete = false, IDictionary<string, object> arguments = null) 22 { 23 exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim(); 24 iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments); 25 } 26 27 /// <summary> 28 /// 队列声明 29 /// </summary> 30 /// <param name="channel"></param> 31 /// <param name="queue">队列</param> 32 /// <param name="durable">持久化</param> 33 /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见, 34 /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可 35 /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连 36 /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者 37 /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param> 38 /// <param name="autoDelete">自动删除</param> 39 /// <param name="arguments">参数</param> 40 private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false, 41 bool autoDelete = false, IDictionary<string, object> arguments = null) 42 { 43 queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim(); 44 channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments); 45 } 46 47 /// <summary> 48 /// 获取Model 49 /// </summary> 50 /// <param name="exchange">交换机名称</param> 51 /// <param name="queue">队列名称</param> 52 /// <param name="routingKey"></param> 53 /// <param name="isProperties">是否持久化</param> 54 /// <returns></returns> 55 private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false) 56 { 57 return ModelDic.GetOrAdd(queue, key => 58 { 59 var model = _conn.CreateModel(); 60 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties); 61 QueueDeclare(model, queue, isProperties); 62 model.QueueBind(queue, exchange, routingKey); 63 ModelDic[queue] = model; 64 return model; 65 }); 66 } 67 68 /// <summary> 69 /// 发布消息 70 /// </summary> 71 /// <param name="routingKey">路由键</param> 72 /// <param name="body">队列信息</param> 73 /// <param name="exchange">交换机名称</param> 74 /// <param name="queue">队列名</param> 75 /// <param name="isProperties">是否持久化</param> 76 /// <returns></returns> 77 public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false) 78 { 79 var channel = GetModel(exchange, queue, routingKey, isProperties); 80 81 try 82 { 83 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8()); 84 } 85 catch (Exception ex) 86 { 87 throw ex.GetInnestException(); 88 } 89 }
View Code
下次是本机测试的发布速度截图:
4.2W/S属于稳定速度,把反序列化(ToJson)会稍微快一些。
Subscribe(订阅)的封装
发布的时候是申明了交换器和队列并绑定,然而订阅的时候只需要声明队列就可。从下面代码能看到,捕获到异常的时候,会把消息送到自定义的“死信队列”里,由另外的JOB进行定时重发,因此,finally是应答成功的。
/// <summary>/// 获取Model/// </summary>/// <param name="queue">队列名称</param>/// <param name="isProperties"></param>/// <returns></returns>private static IModel GetModel(string queue, bool isProperties = false){return ModelDic.GetOrAdd(queue, value =>{var model = _conn.CreateModel();QueueDeclare(model, queue, isProperties);//每次消费的消息数model.BasicQos(0, 1, false);ModelDic[queue] = model;return model;});} /// <summary>/// 接收消息/// </summary>/// <typeparam name="T"></typeparam>/// <param name="queue">队列名称</param>/// <param name="isProperties"></param>/// <param name="handler">消费处理</param>/// <param name="isDeadLetter"></param>public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class{//队列声明var channel = GetModel(queue, isProperties);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var msgStr = body.DeserializeUtf8();var msg = msgStr.FromJson<T>();try{handler(msg);}catch (Exception ex){ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");if (!isDeadLetter)PublishToDead<DeadLetterQueue>(queue, msgStr, ex);}finally{channel.BasicAck(ea.DeliveryTag, false);}};channel.BasicConsume(queue, false, consumer);}
View Code
下次是本机测试的发布速度截图:
快的时候有1.9K/S,慢的时候也有1.7K/S
Pull(拉)的封装
直接上代码:
/// <summary>/// 获取消息/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchange"></param>/// <param name="queue"></param>/// <param name="routingKey"></param>/// <param name="handler">消费处理</param>private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class{var channel = GetModel(exchange, queue, routingKey);var result = channel.BasicGet(queue, false);if (result.IsNull())return;var msg = result.Body.DeserializeUtf8().FromJson<T>();try{handler(msg);}catch (Exception ex){ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");}finally{channel.BasicAck(result.DeliveryTag, false);}}
View Code
快的时候有1.8K/s,稳定是1.5K/S
Rpc(远程调用)的封装
首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:
1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常
2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。
/// <summary>/// RPC客户端/// </summary>/// <param name="exchange"></param>/// <param name="queue"></param>/// <param name="routingKey"></param>/// <param name="body"></param>/// <param name="isProperties"></param>/// <returns></returns>public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false){var channel = GetModel(exchange, queue, routingKey, isProperties);var consumer = new QueueingBasicConsumer(channel);channel.BasicConsume(queue, true, consumer);try{var correlationId = Guid.NewGuid().ToString();var basicProperties = channel.CreateBasicProperties();basicProperties.ReplyTo = queue;basicProperties.CorrelationId = correlationId;channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());var sw = Stopwatch.StartNew();while (true){var ea = consumer.Queue.Dequeue();if (ea.BasicProperties.CorrelationId == correlationId){return ea.Body.DeserializeUtf8();}if (sw.ElapsedMilliseconds > 30000)throw new Exception("等待响应超时");}}catch (Exception ex){throw ex.GetInnestException();}} /// <summary>/// RPC服务端/// </summary>/// <typeparam name="T"></typeparam>/// <param name="exchange"></param>/// <param name="queue"></param>/// <param name="isProperties"></param>/// <param name="handler"></param>/// <param name="isDeadLetter"></param>public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter){//队列声明var channel = GetModel(queue, isProperties);var consumer = new EventingBasicConsumer(channel);consumer.Received += (model, ea) =>{var body = ea.Body;var msgStr = body.DeserializeUtf8();var msg = msgStr.FromJson<T>();var props = ea.BasicProperties;var replyProps = channel.CreateBasicProperties();replyProps.CorrelationId = props.CorrelationId;try{msg = handler(msg);}catch (Exception ex){ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");}finally{channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());channel.BasicAck(ea.DeliveryTag, false);}};channel.BasicConsume(queue, false, consumer);}
View Code
可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。
结尾
本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 https://github.com/SkyChenSky/Sikiro.Mq.Rabbit。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。
如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。
转载于:https://www.cnblogs.com/skychen1218/p/6496891.html
.net平台的rabbitmq使用封装相关推荐
- 【微信小程序控制硬件⑧ 】微信小程序以 websocket 连接阿里云IOT物联网平台mqtt服务器,封装起来使用就是这么简单!(附带Demo)
[微信小程序控制硬件第1篇 ] 全网首发,借助 emq 消息服务器带你如何搭建微信小程序的mqtt服务器,轻松控制智能硬件! [微信小程序控制硬件第2篇 ] 开始微信小程序之旅,导入小程序Mqtt客户 ...
- H5脱单盲盒交友解密授权版/分销提现/存取小纸条盲盒匹配管理平台/免签支付/可封装APP/带教程
源码介绍: 本套源码为独立版,前端是h5的,有三套模板可切换,后台是thinkphp的框架.有文档搭建教程.有代理商模式 可付费购买代理商 有分销功能 成为代理商后,可获取下级分成.本系统免公gz号支 ...
- 内涵社 weixin.php,GitHub - yzbx/weixin: This is an enclosed class for WeChat apis. 微信公共平台api的php封装...
WeChat Api This is an enclosed class for WeChat apis. The offical wiki is here:WeChat api Current ve ...
- openstack云计算平台 4(镜像封装、块存储服务)
目录 一.镜像封装 二.块存储服务 1.简介 2.环境部署 3.安装并配置控制节点 4.安装并配置一个存储节点 5.验证操作 一.镜像封装 目前我们只有一个测试镜像,接下来我们去构建一个镜像 点击虚拟 ...
- 手赚网试玩平台源码 可封装APP 带文章资讯功能 帝国cms7.5内核
简介: 帝国CMS开发的手赚网源码,多平台带文章资讯手机APP试玩网站源码,可自行后台增减平台和链接,和早先几个版本比较的话,这个版本功能更全,版面更为漂亮. 已经带有一定的试玩平台数据,只需要把你的 ...
- 头歌实践实践教学平台:Java面向对象 - 封装、继承和多态的综合练习
第1关:封装.继承和多态进阶(一) 任务描述 本关任务:按要求编写一个Java应用程序,巩固Java面向对象知识. 相关知识 为了完成本关任务,我们回顾一下前面所学知识:1.面向对象思想 :2.封装: ...
- H5脱单盲盒交友解密授权版/分销提现/存取小纸条盲盒匹配管理平台/免签支付/可封装APP/带教程-亲测可用
本套源码为独立版,前端是h5的,有三套模板可切换,后台是thinkphp的框架.有文档搭建教程. 有代理商模式 可付费购买代理商 有分销功能 成为代理商后,可获取下级分成. 本系统免公gz号支持第三方 ...
- 头歌实践教学平台:Java面向对象 - 封装、继承和多态
第1关:什么是封装,如何使用封装 任务描述 本关任务:构造一个类,把对象的属性封装起来,同时提供一些可以被外界访问属性的方法. 相关知识 为了完成本关任务,你需要掌握:1.什么是封装:2.封装的意义: ...
- 2023最新帝国CMS7.5手赚网试玩平台源码/可封装APP+带文章功能系统
正文: 帝国CMS开发的手赚网源码,多平台带文章资讯手机APP试玩网站源码 可自行后台增减平台和链接,和早先几个版本比较的话,这个版本功能更全,版面更为漂亮,有兴趣的自行去安装体验吧,其它就没什么好介 ...
最新文章
- B站学强化学习?港中文周博磊变身up主,中文课程已上线
- android好用的第三方库2018使用总结
- 低压抽屉柜常见故障处理方法_电磁流量计的常见故障及处理方法
- android camera(三):camera V4L2 FIMC
- stm32固件库assert_param()
- js前台编码,asp.net后台解码 防止前台传值到后台为乱码
- Go的sync.WaitGroup(二):WaitGroup让主程序与协程全部都执行 并且全部执行完成
- 信工干货||C语言输入输出语句
- 第 6-4 课:MyBatis 核心和面试题(上)
- 5年外包码农,拿到阿里offer,成功上岸,凭什么?
- SpringBoot(十四)_springboot使用内置定时任务Scheduled的使用(一)
- 利用 50 行 Python 代码构建一个在线文本生成器!
- sql创建和添加时间字段
- Hi Developer,您有一份来自华为云学院的微服务开发攻略请查收
- 液晶显示屏的LED背光辉度公式计算?
- 计算机网络为什么要分层,从形而上到形而下视角的理解
- 什么是 PDF 扁平化?怎样扁平化 PDF? 一起涨知识!
- 瑞芯微RV1126部署yolov5-face_模型转换_输出后处理C++实现
- 冰刃-删除顽固文件的利器!
- 计算机专业大四课程,计算机专业大学四年课表.doc