最近在捣鼓RabbitMQ,为了方便使用,自己基于EasyNetQ封装了一个类,现在贴出来还望各路大佬神明指点,共同学习。

  1     /// <summary>
  2     /// RabbitMQ客户端封装类,基于EasyNetQ,使用时需要从nuget安装EasyNetQ。
  3     /// <para>
  4     /// <example>
  5     /// 使用方法:
  6     /// <code>
  7     /// using(var mq = new RabbitMqClient('rabbitmq连接字符串'))
  8     /// { ...
  9     /// }
 10     /// </code>
 11     /// </example>
 12     /// </para>
 13     /// </summary>
 14     public class RabbitMqClient : IDisposable
 15     {
 16         private readonly IBus bus;
 17
 18         /// <summary>
 19         /// 构造函数
 20         /// </summary>
 21         /// <param name="connectionString">rabbitmq连接字符串</param>
 22         public RabbitMqClient(string connectionString)
 23         {
 24             if (string.IsNullOrEmpty(connectionString))
 25                 throw new ArgumentNullException(nameof(connectionString));
 26             bus = RabbitHutch.CreateBus(connectionString);
 27         }
 28         /// <summary>
 29         /// 发布一条消息(广播)
 30         /// </summary>
 31         /// <param name="message"></param>
 32         public void Publish<TMessage>(TMessage message) where TMessage:class
 33         {
 34             bus.PublishAsync(message);
 35         }
 36
 37         /// <summary>
 38         /// 指定Topic,发布一条消息
 39         /// </summary>
 40         /// <param name="message"></param>
 41         /// <param name="topic"></param>
 42         public void PublishWithTopic<TMessage>(TMessage message, string topic) where TMessage : class
 43         {
 44             if(string.IsNullOrEmpty(topic))
 45                 Publish(message);
 46             else
 47                 bus.PublishAsync(message, x=>x.WithTopic(topic));
 48         }
 49
 50         /// <summary>
 51         /// 发布消息。一次性发布多条
 52         /// </summary>
 53         /// <param name="messages"></param>
 54         public void PublishMany<TMessage>(List<TMessage> messages) where TMessage : class
 55         {
 56             foreach (var message in messages)
 57             {
 58                 Publish(message);
 59                 Thread.Sleep(50);//必须加上,以防消息阻塞
 60             }
 61         }
 62
 63         /// <summary>
 64         /// 发布消息。一次性发布多条
 65         /// </summary>
 66         /// <param name="messages"></param>
 67         /// <param name="topic"></param>
 68         public void PublishManyWithTopic<TMessage>(List<TMessage> messages, string topic) where TMessage : class
 69         {
 70             foreach (var message in messages)
 71             {
 72                 PublishWithTopic(message, topic);
 73                 Thread.Sleep(50);//必须加上,以防消息阻塞
 74             }
 75         }
 76
 77         /// <summary>
 78         /// 给指定队列发送一条信息
 79         /// </summary>
 80         /// <param name="queue">队列名称</param>
 81         /// <param name="message">消息</param>
 82         public void Send<TMessage>(string queue, TMessage message) where TMessage : class
 83         {
 84             bus.Send(queue, message);
 85         }
 86
 87         /// <summary>
 88         /// 给指定队列批量发送信息
 89         /// </summary>
 90         /// <param name="queue">队列名称</param>
 91         /// <param name="messages">消息</param>
 92         public void SendMany<TMessage>(string queue, IList<TMessage> messages) where TMessage : class
 93         {
 94             foreach (var message in messages)
 95             {
 96                 SendAsync(queue, message);
 97                 Thread.Sleep(50);//必须加上,以防消息阻塞
 98             }
 99         }
100
101         /// <summary>
102         /// 给指定队列发送一条信息(异步)
103         /// </summary>
104         /// <param name="queue">队列名称</param>
105         /// <param name="message">消息</param>
106         /// <returns></returns>
107         public async void SendAsync<TMessage>(string queue, TMessage message) where TMessage:class
108         {
109             await bus.SendAsync(queue, message);
110         }
111
112         /// <summary>
113         /// 从指定队列接收一天信息,并做相关处理。
114         /// </summary>
115         /// <param name="queue">队列名称</param>
116         /// <param name="process">
117         /// 消息处理委托方法
118         /// <para>
119         /// <example>
120         /// 例如:
121         /// <code>
122         /// message=>Task.Factory.StartNew(()=>{
123         ///     Console.WriteLine(message);
124         /// })
125         /// </code>
126         /// </example>
127         /// </para>
128         /// </param>
129         public void Receive<TMessage>(string queue, Func<TMessage, Task> process) where TMessage:class
130         {
131             bus.Receive(queue, process);
132         }
133
134         /// <summary>
135         /// 消息订阅
136         /// </summary>
137         /// <param name="subscriptionId">消息订阅标识</param>
138         /// <param name="process">
139         /// 消息处理委托方法
140         /// <para>
141         /// <example>
142         /// 例如:
143         /// <code>
144         /// message=>Task.Factory.StartNew(()=>{
145         ///     Console.WriteLine(message);
146         /// })
147         /// </code>
148         /// </example>
149         /// </para>
150         /// </param>
151         public void Subscribe<TMessage>(string subscriptionId, Func<TMessage, Task> process) where TMessage:class
152         {
153             bus.Subscribe<TMessage>(subscriptionId, message => process(message));
154         }
155
156         /// <summary>
157         /// 消息订阅
158         /// </summary>
159         /// <param name="subscriptionId">消息订阅标识</param>
160         /// <param name="process">
161         /// 消息处理委托方法
162         /// <para>
163         /// <example>
164         /// 例如:
165         /// <code>
166         /// message=>Task.Factory.StartNew(()=>{
167         ///     Console.WriteLine(message);
168         /// })
169         /// </code>
170         /// </example>
171         /// </para>
172         /// </param>
173         /// <param name="topic">topic</param>
174         public void SubscribeWithTopic<TMessage>(string subscriptionId, Func<TMessage, Task> process, string topic) where TMessage:class
175         {
176             bus.Subscribe<TMessage>(subscriptionId, message => process(message), x=>x.WithTopic(topic));
177         }
178
179         /// <summary>
180         /// 自动订阅
181         /// </summary>
182         /// <param name="assemblyName"></param>
183         /// <param name="subscriptionIdPrefix"></param>
184         /// <param name="topic"></param>
185         public void AutoSubscribe(string assemblyName, string subscriptionIdPrefix, string topic)
186         {
187             var subscriber = new AutoSubscriber(bus, subscriptionIdPrefix);
188             if (!string.IsNullOrEmpty(topic))
189                 subscriber.ConfigureSubscriptionConfiguration = x => x.WithTopic(topic);
190             subscriber.Subscribe(Assembly.Load(assemblyName));
191         }
192
193         /// <summary>
194         /// 资源释放
195         /// </summary>
196         public void Dispose()
197         {
198             if (bus != null) bus.Dispose();
199         }
200     }

View Code

转载于:https://www.cnblogs.com/tongyinaocan/p/6950772.html

基于EasyNetQ的RabbitMQ封装类相关推荐

  1. .NET Core微服务之基于EasyNetQ使用RabbitMQ消息队列

    Tip: 此篇已加入.NET Core微服务基础系列文章索引 一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含 ...

  2. .NET CoreWebApi基于EasyNetQ使用RabbitMQ消息队列

    一.消息队列与RabbitMQ 1.1 消息队列 "消息"是在两台计算机间传送的数据单位.消息可以非常简单,例如只包含文本字符串:也可以更复杂,可能包含嵌入对象.消息被发送到队列中 ...

  3. NetCore基于EasyNetQ的高级API使用RabbitMq

    一.消息队列 消息队列作为分布式系统中的重要组件,常用的有MSMQ,RabbitMq,Kafa,ActiveMQ,RocketMQ.至于各种消息队列的优缺点比较,在这里就不做扩展了,网上资源很多. 更 ...

  4. EasyNetQ操作RabbitMQ

    EasyNetQ 是一个容易使用,专门针对RabbitMQ的 .NET API.EasyNetQ是为了提供一个尽可能简洁的适用与RabbitMQ的.NET类库. 下面看下怎么集成. 1.nuget 安 ...

  5. 基于Docker搭建RabbitMQ(多图)

    1.一点废话(可直接跳转至标题2) 通常在拉取镜像之前,除了通过命令执行 docker search xxx 之外,我们还可以通过 Docker 镜像仓库查询指定的镜像. 如下是 rabbitmq 镜 ...

  6. 基于NetCore的RabbitMQ使用

    由于最近公司做的项目,需要发短信/邮件/第三方接口异步回调信息等的及时处理,自己就简单的研究了以下RabbitMQ在NetCore中的实现. RabbitMQ是什么具体就不再这里详细介绍了,自己去百度 ...

  7. 基于SpringCloud开发rabbitmq五种工作模式实现

    工作模式 1. RabbitMQ消息模型 2. SpringAMQP 2.1. Basic Queue 简单队列模型 2.1.1.消息发送 2.1.2.消息接收 2.1.3.测试 2.2. WorkQ ...

  8. 基于PHP使用rabbitmq实现消息队列

    1.从github上面获取AMQP基于php的实现扩展 2.创建生产者 send.php 1 require(__DIR__ . '/../protected/vendor/autoload.php' ...

  9. 基于SpringBoot、RabbitMQ的Android消息推送平台搭建

    消息推送,类似于微信来新消息时出现在通知栏那种情景.很多APP都有这个功能.现在有很多第三方平台可以实现这个需要,但是有的公司对所要推送的消息保密要求比较高,不希望被第三方看到,可以使用此种方式进行消 ...

最新文章

  1. 超级队长VR线下体验店落地上海,让娱乐突破想象
  2. php-fpm打开错误日志的配置
  3. jsonobject中getkey_FastJson中JSONObject用法及常用方法总结
  4. ibatis查询CHAR类型的字段
  5. [Java] 蓝桥杯ADV-203 算法提高 8皇后·改
  6. Python中用format函数格式化字符串的用法(2.7版本讲解哦!)
  7. Hibernate【缓存】知识要点
  8. Galerkin method 热传导公式推导过程
  9. 会议及作用篇--项目管理(十三)
  10. VC.【转】采用_beginthread/_beginthreadex函数创建多线程
  11. android gps测速代码,【GPS测速仪】GPS测速仪 GPS speedometer 1.6.0下载_安卓(android)软件下载-魅族溜...
  12. 网课学习:PDF阅读器,Blumind思维导图
  13. jsp网页视频播放器
  14. 台式计算机硬件组成主机,台式电脑主机的硬件组成部分简介
  15. MAC OS下免费下载YouTube
  16. 【树莓派智能门锁】电机锁控制电源测试【2】
  17. C语言(二)— 整型
  18. SWF是什么文件,SWF文件用什么软件可以打开 1
  19. [vuex] unknown action type: jia1
  20. 弹性法计算方法的mck法_SAM4E单片机之旅——9、UART与MCK之MAINCK

热门文章

  1. iOS关于rar解压第三方库Unrar4iOS使用总结
  2. JavaScript中一个对象如何继承另外一个对象
  3. [Ruby on Rails]Rails 3使用ActionMailer通过163发送邮件
  4. BCALV* 查看所有ALV DEMO
  5. Centos7安装 Redis 实践
  6. Flutter视频播放、Flutter VideoPlayer 视频播放组件精要
  7. Flutter ValueNotifier 异步通信、ValueListenableBuilder异步更新数据
  8. Flutter抖动动画、颤抖动画、Flutter文字抖动效果
  9. flutter 弹框 dialog,flutter提示框
  10. 从0开始架构一个IOS程序——03 — -分包用添加pch全局引用文件