1、安装NuGet包RabbitMQ.Client

2    生产者-确认机制

(1). 含义:就是应答模式,生产者发送一条消息之后,Rabbitmq服务器做了个响应,表示收到了。

(2). 特点:异步模式,在响应之前,可以继续发送消息,单条消息、批量消息均可继续发送。

(3). 核心代码:单条消息确认: channel.waitForConfirms()

         批量消息确认: channel.waitForConfirmsOrDie()

大致流程:channel.ConfirmSelect();  开启确认模式→发送消息→提供一个回执方法WaitForConfirms();   返回一个bool 值

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";//RabbitMQ服务器地址
factory.DispatchConsumersAsync = true;//支持异步发送消息
string exchangeName = "exchange1";//交换机的名字
string eventName = "myEvent";// routingKey的值
using var conn = factory.CreateConnection();
while (true)
{string msg = DateTime.Now.TimeOfDay.ToString();//待发送消息using (var channel = conn.CreateModel())//创建信道{try{var properties = channel.CreateBasicProperties();properties.DeliveryMode = 2;//1非持久化、2是持久化//交换机 Name:交换机名称。// Type: 交换机类型——direct、topic、fanout、headers、sharding//Durable:消息代理重启后,交换机是否还存在。//Auto-delete :当所有与之绑定的消息队列都完成了对此交换机的使用后,删掉它。//Arguments:依赖代理本身。channel.ExchangeDeclare(exchange: exchangeName, type: "direct", true, false);//声明交换机byte[] body = Encoding.UTF8.GetBytes(msg);channel.ConfirmSelect();//开启消息确认模式//发布消息    //exchange:交换机名称//mandatory为true时,表示如果消息没有被正确路由,消息将退回消息的生产者 如果设置为false,那么broker端自动删除该消息。//routingKey:路由键//props:消息属性字段,比如消息头部信息等等//body:消息主体部分channel.BasicPublish(exchange: exchangeName, routingKey: eventName,mandatory: true, basicProperties: properties, body: body);Console.WriteLine("发布了消息:" + msg);/*首先开启Confirm模式,通知消息生产者成功推送到RabbitMQ中*/if (channel.WaitForConfirms())  //单条消息确认{//表示消息发送成功(已经存入队列)Console.WriteLine($"【{msg}】成功发送到RabbitMQ!");}else{Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");}//channel.WaitForConfirmsOrDie();//如果所有消息发送成功 就正常执行, 如果有消息发送失败;就抛出异常;}catch (Exception ex){//表示消息发送失败Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");}}Thread.Sleep(1000);
}

运行结果:

2. TX事务模式

(1). 含义:基于AMPQ协议;可以让信道设置成一个带事务的信道,分为三步:开启事务、提交事务、事务回滚

(2). 特点:同步模式,在事务提交之前不能继续发送消息,该模式相比Confirm模式效率差一点。

(3). 核心代码:channel.TxSelect();   开启一个事务

         channel.TxCommit();  提交事务, 这一步成功后,消息才真正的写入队列

       channel.TxRollback();   事务回滚

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";
factory.DispatchConsumersAsync = true;
string exchangeName = "exchange1";
string eventName = "myEvent";
using var conn = factory.CreateConnection();
while (true)
{string msg = DateTime.Now.TimeOfDay.ToString();using (var channel = conn.CreateModel()){try{var properties = channel.CreateBasicProperties();properties.DeliveryMode = 2;channel.ExchangeDeclare(exchange: exchangeName, type: "direct", true, false);byte[] body = Encoding.UTF8.GetBytes(msg);channel.TxSelect(); //开启事务channel.BasicPublish(exchange: exchangeName, routingKey: eventName,mandatory: true, basicProperties: properties, body: body);Console.WriteLine($"【{msg}】成功发送到RabbitMQ!");channel.TxCommit(); //只有事务提交成功以后,才会真正的写入到队列里面去}catch (Exception ex){//表示消息发送失败Console.WriteLine($"【{msg}】发送到RabbitMQ失败!");channel.TxRollback();}}Thread.Sleep(1000);
}

3、消费者(手动确认)

(1) 含义:消费者消费一条,回执给RabbitMq一条消息,Rabbitmq 只删除当前这一条消息;相当于是一条消费了,删除一条消息,性能稍微低一些;

(2) 特点:消费1条应答一次,可以告诉RabbitMq消费成功or失败,消费成功,服务器删除该条消息,消费失败,可以删除也可以重新写入。

(3) 核心代码:autoAck: false,表示不自动确认

 然后:channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);    表示消费成功

 channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: false);         表示消费失败, 可以配置:requeue: true:重新写入到队列里去; false: 删除消息

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;var factory = new ConnectionFactory();
factory.HostName = "127.0.0.1";//服务器地址
factory.DispatchConsumersAsync = true;//支持异步接受
string exchangeName = "exchange1";//交换机的名称
string eventName = "myEvent";//路由键
using var conn = factory.CreateConnection();
using var channel = conn.CreateModel();//创建信道
string queueName = "queue1";//队列名称//声明了交换机
channel.ExchangeDeclare(exchange: exchangeName, type: "direct",true,false);//声明一个队列
//queuename: 队列的名称
//durable 是否持久化。true消息会持久化到本地,保证重启服务后消息不会丢失
//exclusive :表示独占方式,设置为true 在某些情景下有必要,例如:顺序消费。表示只有一个channel可以去监听,其他channel都不能够监听。目的就是为了保证顺序消费。
//autoDelete:队列如果与Exchange未绑定,则自动删除
//arguments:扩展参数
channel.QueueDeclare(queue: queueName, durable: true,exclusive: false, autoDelete: false, arguments: null);//绑定队列
channel.QueueBind(queue: queueName,exchange: exchangeName, routingKey: eventName);//创建一个消费者
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.Received += Consumer_Received;
//注册消费者订阅
//autoAck 是否自动确认消息, true自动确认,false 不自动要手动调用, 建立设置为false
channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
Console.ReadLine();async Task Consumer_Received(object sender, BasicDeliverEventArgs args)
{try{var bytes = args.Body.ToArray();string msg = Encoding.UTF8.GetString(bytes);Console.WriteLine(DateTime.Now + "收到了消息" + msg);//DeliveryTag: 唯一的编号//multiple:是否批量确认.true:将一次性ack所有小于deliveryTag的消息。channel.BasicAck(args.DeliveryTag, multiple: false);手动确认 await Task.Delay(800);}catch (Exception ex){//异常重试//DeliveryTag: 唯一的编号//requeue:如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。channel.BasicReject(args.DeliveryTag, true);Console.WriteLine("处理收到的消息出错" + ex);}
}

运行结果

参考

【Windows安装RabbitMQ详细教程】_慕之寒的博客-CSDN博客_rabbitmq安装windows

第四节:RabbitMq剖析生产者、消费者的几种消息确认机制(Confirm、事务、自动、手动) - Yaopengfei - 博客园

C# 对RabbitMQ使用相关推荐

  1. RabbitMQ 入门系列(11)— RabbitMQ 常用的工作模式(simple模式、work模式、publish/subscribe模式、routing模式、topic模式)

    1. simple 模式 simple 模式是最简单最常用的模式 2. work 模式 work 模式有多个消费者 消息产生者将消息放入队列.生产者系统不需知道哪一个任务执行系统在空闲,直接将任务扔到 ...

  2. Go 学习笔记(57)— Go 第三方库之 amqp (RabbitMQ 生产者、消费者整个流程)

    1. 安装 rabbitmq 的 golang 包 golang 可使用库 github.com/streadway/amqp 操作 rabbitmq .使用下面命令安装 RabbitMQ . go ...

  3. RabbitMQ 入门系列(4)— RabbitMQ 启动、停止节点和应用程序、用户管理、权限配置

    1. 服务器管理 我们使用 "节点" 来指代 RabbitMQ 实例,当我们谈到 RabbitMQ 节点时指的是 RabbitMQ 应用程序和其所在的 Erlang 节点. 1.1 ...

  4. RabbitMQ 入门系列(3)— 生产者消费者 Python 代码实现

    生产者消费者代码示例 上一章节中对消息通信概念做了详细的说明,本章节我们对 RabbitMQ 生产者和消费者代码分别做一示例说明. 1. 生产者代码 #!/usr/bin/env python # c ...

  5. RabbitMQ 入门系列(2)— 生产者、消费者、信道、代理、队列、交换器、路由键、绑定、交换器

    本系列是「RabbitMQ实战:高效部署分布式消息队列」和 「RabbitMQ实战指南」书籍的读书笔记. RabbitMQ 中重要概念 1. 生产者 生产者(producer)创建消息,然后发送到代理 ...

  6. RabbitMQ 入门系列(1)— Ubuntu 安装 RabbitMQ 及配置

    1. RabbitMQ 简介 消息 (Message) 是指在应用间传送的数据.消息可以非常简单,比如只包含文本字符串.JSON等,也可以很复杂,比如内嵌对象. 消息队列中间件(Message Que ...

  7. RabbitMQ超详细安装教程(Linux)

    目录 1.简介 2.下载安装启动RabbitMQ 2.1.下载RabbitMQ 2.2.下载Erlang 2.3.安装Erlang 2.4.安装RabbitMQ 2.5.启动RabbitMQ服务 3. ...

  8. 第五节 RabbitMQ在C#端的应用-消息收发

    原文:第五节 RabbitMQ在C#端的应用-消息收发 版权声明:未经本人同意,不得转载该文章,谢谢 https://blog.csdn.net/phocus1/article/details/873 ...

  9. RabbitMQ学习笔记一:本地Windows环境安装RabbitMQ Server

    一:安装RabbitMQ需要先安装Erlang语言开发包,百度网盘地址:http://pan.baidu.com/s/1jH8S2u6.直接下载地址:http://erlang.org/downloa ...

  10. RabbitMQ使用及与spring boot整合

    1.MQ 消息队列(Message Queue,简称MQ)--应用程序和应用程序之间的通信方法 应用:不同进程Process/线程Thread之间通信 比较流行的中间件: ActiveMQ Rabbi ...

最新文章

  1. c++语言static作用,详解c++中的 static 关键字及作用
  2. 实用比较,帮你决策到底选择Vue还是Angular4、5
  3. SAP Spartacus 会使用 Session timeout 吗?
  4. Linux并发与竞争实验(一次只允许一个应用程序操作LED灯)
  5. java 内部类 菜鸟编程,java中的匿名内部类
  6. 最老程序员创业札记:全文检索、数据挖掘、推荐引擎应用33
  7. 【Python】IDLE工具
  8. 关键字Restrict
  9. GAT1400---视图库标准
  10. Hadoop运行原理总结
  11. 串口硬盘如何应用于并口硬盘计算机,串口硬盘和并口硬盘如何区别?
  12. itools苹果录屏大师_如何录制ZOOM视频会议?实用的录屏软件
  13. Unity制作3d生存游戏视频教程
  14. wmp流代理服务器设置为空,03服务器安装wmp10的方法
  15. 分享《Essential Linux Device Drivers》中文版高清电子版
  16. 用深度学习做命名实体识别(五)-模型使用
  17. 什么是Session,Session常用API
  18. 【拜小白opencv】45-二维H-S直方图绘制----calcHist()函数、minMaxLoc()函数
  19. localhost: mv: 无法获取“/opt/module/hadoop-3.1.3/logs/hadoop-atguigu-datanode-hadoop102.out.3“ 的文件状态(sta
  20. 梯度,散度,拉普拉斯算子

热门文章

  1. Android蓝牙开发与蓝牙模块进行通讯(基于eclipse)
  2. 利用工具实现关键词快速挖掘
  3. js实现trim()去空格
  4. 遇到一个macOS下malware中毒很深的网友,安装的恶意软件MyCouponsmart、SearchMine.AnySearch、Advanced Mac Cleaner等真多!
  5. 阿里巴巴张勇:大数据是新商业时代的原油
  6. MySQL查询优化之五-嵌套循环连接算法(Nested-Loop Join Algorithms)
  7. 《终身成长》樊登读书会
  8. JS使用定时器实现倒计时
  9. 高德地图服务端对接APIUtil
  10. Web送货单打印管理系统毕业设计