基于EasyNetQ的RabbitMQ封装类
最近在捣鼓RabbitMQ,为了方便使用,自己基于EasyNetQ封装了一个类,现在贴出来还望各路大佬神明指点,共同学习。
![](/assets/blank.gif)
![](/assets/blank.gif)
1 /// <summary> 2 /// RabbitMQ客户端封装类,基于EasyNetQ,使用时需要从nuget安装EasyNetQ。 3 /// <para> 4 /// <example> 5 /// 使用方法: 6 /// <code> 7 /// using(var mq = new RabbitMqClient('rabbitmq连接字符串')) 8 /// { ... 9 /// } 10 /// </code> 11 /// </example> 12 /// </para> 13 /// </summary> 14 public class RabbitMqClient : IDisposable 15 { 16 private readonly IBus bus; 17 18 /// <summary> 19 /// 构造函数 20 /// </summary> 21 /// <param name="connectionString">rabbitmq连接字符串</param> 22 public RabbitMqClient(string connectionString) 23 { 24 if (string.IsNullOrEmpty(connectionString)) 25 throw new ArgumentNullException(nameof(connectionString)); 26 bus = RabbitHutch.CreateBus(connectionString); 27 } 28 /// <summary> 29 /// 发布一条消息(广播) 30 /// </summary> 31 /// <param name="message"></param> 32 public void Publish<TMessage>(TMessage message) where TMessage:class 33 { 34 bus.PublishAsync(message); 35 } 36 37 /// <summary> 38 /// 指定Topic,发布一条消息 39 /// </summary> 40 /// <param name="message"></param> 41 /// <param name="topic"></param> 42 public void PublishWithTopic<TMessage>(TMessage message, string topic) where TMessage : class 43 { 44 if(string.IsNullOrEmpty(topic)) 45 Publish(message); 46 else 47 bus.PublishAsync(message, x=>x.WithTopic(topic)); 48 } 49 50 /// <summary> 51 /// 发布消息。一次性发布多条 52 /// </summary> 53 /// <param name="messages"></param> 54 public void PublishMany<TMessage>(List<TMessage> messages) where TMessage : class 55 { 56 foreach (var message in messages) 57 { 58 Publish(message); 59 Thread.Sleep(50);//必须加上,以防消息阻塞 60 } 61 } 62 63 /// <summary> 64 /// 发布消息。一次性发布多条 65 /// </summary> 66 /// <param name="messages"></param> 67 /// <param name="topic"></param> 68 public void PublishManyWithTopic<TMessage>(List<TMessage> messages, string topic) where TMessage : class 69 { 70 foreach (var message in messages) 71 { 72 PublishWithTopic(message, topic); 73 Thread.Sleep(50);//必须加上,以防消息阻塞 74 } 75 } 76 77 /// <summary> 78 /// 给指定队列发送一条信息 79 /// </summary> 80 /// <param name="queue">队列名称</param> 81 /// <param name="message">消息</param> 82 public void Send<TMessage>(string queue, TMessage message) where TMessage : class 83 { 84 bus.Send(queue, message); 85 } 86 87 /// <summary> 88 /// 给指定队列批量发送信息 89 /// </summary> 90 /// <param name="queue">队列名称</param> 91 /// <param name="messages">消息</param> 92 public void SendMany<TMessage>(string queue, IList<TMessage> messages) where TMessage : class 93 { 94 foreach (var message in messages) 95 { 96 SendAsync(queue, message); 97 Thread.Sleep(50);//必须加上,以防消息阻塞 98 } 99 } 100 101 /// <summary> 102 /// 给指定队列发送一条信息(异步) 103 /// </summary> 104 /// <param name="queue">队列名称</param> 105 /// <param name="message">消息</param> 106 /// <returns></returns> 107 public async void SendAsync<TMessage>(string queue, TMessage message) where TMessage:class 108 { 109 await bus.SendAsync(queue, message); 110 } 111 112 /// <summary> 113 /// 从指定队列接收一天信息,并做相关处理。 114 /// </summary> 115 /// <param name="queue">队列名称</param> 116 /// <param name="process"> 117 /// 消息处理委托方法 118 /// <para> 119 /// <example> 120 /// 例如: 121 /// <code> 122 /// message=>Task.Factory.StartNew(()=>{ 123 /// Console.WriteLine(message); 124 /// }) 125 /// </code> 126 /// </example> 127 /// </para> 128 /// </param> 129 public void Receive<TMessage>(string queue, Func<TMessage, Task> process) where TMessage:class 130 { 131 bus.Receive(queue, process); 132 } 133 134 /// <summary> 135 /// 消息订阅 136 /// </summary> 137 /// <param name="subscriptionId">消息订阅标识</param> 138 /// <param name="process"> 139 /// 消息处理委托方法 140 /// <para> 141 /// <example> 142 /// 例如: 143 /// <code> 144 /// message=>Task.Factory.StartNew(()=>{ 145 /// Console.WriteLine(message); 146 /// }) 147 /// </code> 148 /// </example> 149 /// </para> 150 /// </param> 151 public void Subscribe<TMessage>(string subscriptionId, Func<TMessage, Task> process) where TMessage:class 152 { 153 bus.Subscribe<TMessage>(subscriptionId, message => process(message)); 154 } 155 156 /// <summary> 157 /// 消息订阅 158 /// </summary> 159 /// <param name="subscriptionId">消息订阅标识</param> 160 /// <param name="process"> 161 /// 消息处理委托方法 162 /// <para> 163 /// <example> 164 /// 例如: 165 /// <code> 166 /// message=>Task.Factory.StartNew(()=>{ 167 /// Console.WriteLine(message); 168 /// }) 169 /// </code> 170 /// </example> 171 /// </para> 172 /// </param> 173 /// <param name="topic">topic</param> 174 public void SubscribeWithTopic<TMessage>(string subscriptionId, Func<TMessage, Task> process, string topic) where TMessage:class 175 { 176 bus.Subscribe<TMessage>(subscriptionId, message => process(message), x=>x.WithTopic(topic)); 177 } 178 179 /// <summary> 180 /// 自动订阅 181 /// </summary> 182 /// <param name="assemblyName"></param> 183 /// <param name="subscriptionIdPrefix"></param> 184 /// <param name="topic"></param> 185 public void AutoSubscribe(string assemblyName, string subscriptionIdPrefix, string topic) 186 { 187 var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix); 188 if (!string.IsNullOrEmpty(topic)) 189 subscriber.ConfigureSubscriptionConfiguration = x => x.WithTopic(topic); 190 subscriber.Subscribe(Assembly.Load(assemblyName)); 191 } 192 193 /// <summary> 194 /// 资源释放 195 /// </summary> 196 public void Dispose() 197 { 198 if (bus != null) bus.Dispose(); 199 } 200 }
View Code
转载于:https://www.cnblogs.com/tongyinaocan/p/6950772.html
基于EasyNetQ的RabbitMQ封装类相关推荐
- .NET Core微服务之基于EasyNetQ使用RabbitMQ消息队列
Tip: 此篇已加入.NET Core微服务基础系列文章索引 一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含 ...
- .NET CoreWebApi基于EasyNetQ使用RabbitMQ消息队列
一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中 ...
- NetCore基于EasyNetQ的高级API使用RabbitMq
一.消息队列 消息队列作为分布式系统中的重要组件,常用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ.至于各种消息队列的优缺点比较,在这里就不做扩展了,网上资源很多. 更 ...
- EasyNetQ操作RabbitMQ
EasyNetQ 是一个容易使用,专门针对RabbitMQ的 .NET API.EasyNetQ是为了提供一个尽可能简洁的适用与RabbitMQ的.NET类库. 下面看下怎么集成. 1.nuget 安 ...
- 基于Docker搭建RabbitMQ(多图)
1.一点废话(可直接跳转至标题2) 通常在拉取镜像之前,除了通过命令执行 docker search xxx 之外,我们还可以通过 Docker 镜像仓库查询指定的镜像. 如下是 rabbitmq 镜 ...
- 基于NetCore的RabbitMQ使用
由于最近公司做的项目,需要发短信/邮件/第三方接口异步回调信息等的及时处理,自己就简单的研究了以下RabbitMQ在NetCore中的实现. RabbitMQ是什么具体就不再这里详细介绍了,自己去百度 ...
- 基于SpringCloud开发rabbitmq五种工作模式实现
工作模式 1. RabbitMQ消息模型 2. SpringAMQP 2.1. Basic Queue 简单队列模型 2.1.1.消息发送 2.1.2.消息接收 2.1.3.测试 2.2. WorkQ ...
- 基于PHP使用rabbitmq实现消息队列
1.从github上面获取AMQP基于php的实现扩展 2.创建生产者 send.php 1 require(__DIR__ . '/../protected/vendor/autoload.php' ...
- 基于SpringBoot、RabbitMQ的Android消息推送平台搭建
消息推送,类似于微信来新消息时出现在通知栏那种情景.很多APP都有这个功能.现在有很多第三方平台可以实现这个需要,但是有的公司对所要推送的消息保密要求比较高,不希望被第三方看到,可以使用此种方式进行消 ...
最新文章
- 超级队长VR线下体验店落地上海,让娱乐突破想象
- php-fpm打开错误日志的配置
- jsonobject中getkey_FastJson中JSONObject用法及常用方法总结
- ibatis查询CHAR类型的字段
- [Java] 蓝桥杯ADV-203 算法提高 8皇后·改
- Python中用format函数格式化字符串的用法(2.7版本讲解哦!)
- Hibernate【缓存】知识要点
- Galerkin method 热传导公式推导过程
- 会议及作用篇--项目管理(十三)
- VC.【转】采用_beginthread/_beginthreadex函数创建多线程
- android gps测速代码,【GPS测速仪】GPS测速仪 GPS speedometer 1.6.0下载_安卓(android)软件下载-魅族溜...
- 网课学习:PDF阅读器,Blumind思维导图
- jsp网页视频播放器
- 台式计算机硬件组成主机,台式电脑主机的硬件组成部分简介
- MAC OS下免费下载YouTube
- 【树莓派智能门锁】电机锁控制电源测试【2】
- C语言(二)— 整型
- SWF是什么文件,SWF文件用什么软件可以打开 1
- [vuex] unknown action type: jia1
- 弹性法计算方法的mck法_SAM4E单片机之旅——9、UART与MCK之MAINCK
热门文章
- iOS关于rar解压第三方库Unrar4iOS使用总结
- JavaScript中一个对象如何继承另外一个对象
- [Ruby on Rails]Rails 3使用ActionMailer通过163发送邮件
- BCALV* 查看所有ALV DEMO
- Centos7安装 Redis 实践
- Flutter视频播放、Flutter VideoPlayer 视频播放组件精要
- Flutter ValueNotifier 异步通信、ValueListenableBuilder异步更新数据
- Flutter抖动动画、颤抖动画、Flutter文字抖动效果
- flutter 弹框 dialog,flutter提示框
- 从0开始架构一个IOS程序——03 — -分包用添加pch全局引用文件