.net 使用阿里云RocketMQ
1.首先我们来讲解一下消息队列的作用
比如说我们的订单系统,再客户订单生成了以后,可能会有
快递系统,通知系统,和打印系统需要用到当前订单的详细内容
所以这个时候常规的操作是在A里面通过代码调用B,C ,D系统的接口来通知他们有新订单了
如果此时有个E系统呢
那我们的做法可能只能在A系统中增加代码来通知E系统,但是如果后期我们的E系统又不要了呢,岂不是我们又要在A系统中去除掉这一部分代码所以说这样的代码冗余就很高,对后期的性能也很有影响,因为系统A中通知B,C,D系统还要判断 B,C,D返回值中是否成功,如果没有成功还要子再次请求这样系统性能就非常的低下
所以说我们使用了MQ来解决这个问题
我们A系统秩序通知MQ系统 后面的B,C,D要信息的话直接找MQ系统调用就行
接下来我这里讲解下如何把MQ集成到.net 项目中
生产者端:
using Aliyun.MQ;
using Aliyun.MQ.Model;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;namespace CSDNSign.Controllers
{[Route("api/[controller]")][ApiController]public class MQController : ControllerBase{readonly IFreeSql _sql;private readonly ILogger<MQController> _logger;private const string _endpoint = "*************";// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。private const string _accessKeyId = "*************";// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。private const string _secretAccessKey = "*************";// 所属的Topic。private const string _topicName = "xsw";// Topic所属实例ID,默认实例为空。private const string _instanceId = "*************";private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);static MQProducer producer = _client.GetProducer(_instanceId, _topicName);/// <summary>/// /// </summary>/// <param name="sql"></param>/// <param name="logger"></param>public MQController(IFreeSql sql, ILogger<MQController> logger){_sql = sql;_logger = logger;}#region /// <summary>/// /// </summary>/// <param name="json"></param>/// <param name="key"></param>/// <param name="tag"></param>/// <returns></returns>[HttpPost][Route("TestMQ")]public string TestMQ(string json, string key, string tag){try{TopicMessage sendMsg = new TopicMessage(json);// 设置属性。sendMsg.PutProperty("a", "a");// 设置Key。sendMsg.MessageKey = key;TopicMessage result = producer.PublishMessage(sendMsg);return JsonConvert.SerializeObject(result) ;}catch (Exception ex){return ex.Message.ToString();}}#endregion}
}
其中的endpoint可以在阿里云中接入点中查看
由于我们用的http的方式接受所以我们复制下面的地址
消费者端:
using Aliyun.MQ;
using Aliyun.MQ.Model;
using Aliyun.MQ.Model.Exp;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Security.Permissions;
using System.Threading;
using System.Threading.Tasks;
using System.Windows;
using System.Windows.Controls;
using System.Windows.Data;
using System.Windows.Documents;
using System.Windows.Input;
using System.Windows.Media;
using System.Windows.Media.Imaging;
using System.Windows.Navigation;
using System.Windows.Shapes;namespace WPFTest
{/// <summary>/// Interaction logic for MainWindow.xaml/// </summary>public partial class MainWindow : Window{private const string _endpoint = "********";// AccessKey ID阿里云身份验证,在阿里云服务器管理控制台创建。private const string _accessKeyId = "*****";// AccessKey Secret阿里云身份验证,在阿里云服务器管理控制台创建。private const string _secretAccessKey = "*****";// 所属的Topic。private const string _topicName = "*****";// Topic所属实例ID,默认实例为空。private const string _instanceId = "*******";private const string _groupId = "*********";private static MQClient _client = new Aliyun.MQ.MQClient(_accessKeyId, _secretAccessKey, _endpoint);static MQConsumer consumer = _client.GetConsumer(_instanceId, _topicName, _groupId, null);private Thread thread;public MainWindow(){InitializeComponent();this.button_Stop.IsEnabled = false;}private void button_Click(object sender, RoutedEventArgs e){thread = new Thread(new ThreadStart(TaskStart));thread.IsBackground = true;thread.Start();this.button_get.IsEnabled = false;this.button_Stop.IsEnabled = true;}/// <summary>/// 开始任务/// </summary>private void TaskStart(){Dispatcher.Invoke(() => this.textBox.Text = "====== 开始获取MQ任务 ====== \n");// 在当前线程循环消费消息,建议多开个几个线程并发消费消息。while (true){try{// 长轮询消费消息。// 长轮询表示如果Topic没有消息,则请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回客户端。List<Message> messages = null;try{messages = consumer.ConsumeMessage(3, // 一次最多消费3条(最多可设置为16条)。3 // 长轮询时间3秒(最多可设置为30秒)。);}catch (Exception exp1){if (exp1 is MessageNotExistException){Dispatcher.Invoke(() => textBox.Text += string.Format(Thread.CurrentThread.Name + " No new message, " + ((MessageNotExistException)exp1).RequestId + "\n"));continue;}Console.WriteLine(exp1);Thread.Sleep(2000);}if (messages == null){continue;}List<string> handlers = new List<string>();List<string> mqmessage = new List<string>();Console.WriteLine(Thread.CurrentThread.Name + " Receive Messages:");// 处理业务逻辑。foreach (Message message in messages){Console.WriteLine(message);Console.WriteLine("Property a is:" + message.GetProperty("a"));handlers.Add(message.ReceiptHandle);mqmessage.Add(message.Body);}// Message.nextConsumeTime前若不确认消息消费成功,则消息会被重复消费。// 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。try{consumer.AckMessage(handlers);Console.WriteLine("Ack message success:");foreach (string handle in mqmessage){Dispatcher.Invoke(() => textBox.Text += handle + "\n");}Console.WriteLine();return;}catch (Exception exp2){// 某些消息的句柄可能超时,会导致消息消费状态确认不成功。if (exp2 is AckMessageException){AckMessageException ackExp = (AckMessageException)exp2;Console.WriteLine("Ack message fail, RequestId:" + ackExp.RequestId);foreach (AckMessageErrorItem errorItem in ackExp.ErrorItems){Dispatcher.Invoke(() => textBox.Text += string.Format("\tErrorHandle:" + errorItem.ReceiptHandle + ",ErrorCode:" + errorItem.ErrorCode + ",ErrorMsg:" + errorItem.ErrorMessage));}}}}catch (Exception ex){Console.WriteLine(ex);Thread.Sleep(2000);}}}private void button_Clean_Click(object sender, RoutedEventArgs e){this.textBox.Text = "";}private void button_Stop_Click(object sender, RoutedEventArgs e){this.button_get.IsEnabled = true;this.button_Stop.IsEnabled = false;thread.Interrupt();// Wait for newThread to end.//thread.Join();}}
}
最后项目效果如下所示:
.net 使用阿里云RocketMQ相关推荐
- Springboot2.0集成阿里云RocketMQ
介绍 RocketMQ是出自阿里巴巴的一款开源消息中间件,在设计上借鉴了Kafka,2017年成为Apache顶级项目,虽然目前社区无法和Kafka比肩,但其历经多次天猫双十一的考验,其性能和稳定是毋 ...
- 厚积薄发--一文带您了解阿里云 RocketMQ 轻量版消息队列(MNS)
作者: 周新宇&陈涛&李凯 阿里云 RocketMQ 轻量版(MNS)消息队列是一个轻量.可靠.可扩展且完全托管的分布式消息队列服务.MNS 能够帮助应用开发者在他们应用的分布式组件上 ...
- 使用阿里云rocketmq引用ons-client包从1.2.7.Final-1.8.0.Final JSONPResponseBodyAdvice
如题: 使用阿里云rocketmq引用ons-client包从1.2.7.Final-1.8.0.Final 后一直提示: nested exception is org.springframewor ...
- springboot集成阿里云rocketMQ代码示例
集成目标:完成生产者发送消息,消费者接收消息的整个流程 集成步骤: 1.引入jar包依赖 <!--rocketMq消息队列--><dependency><groupId& ...
- 阿里云RocketMQ:No route info of this topic, com.aliyun.openservices.ons.api.exception
使用阿里云的RocketMQ,但是发现本地调试会报错和自己预期的不一样,主要报错是"No route info of this topic,com.aliyun.openservices.o ...
- 阿里云RocketMQ
参考阿里云官网地址: https://help.aliyun.com/document_detail/34411.html?spm=a2c4g.11186623.6.544.7fb547b1YCTgh ...
- No route info of this topic 阿里云rocketmq报错
阿里mq对接的时候报错 , No route info of this topic . 不要被表象给迷惑了 问题原因: 用户名账号密码错误, 使用的并不是账号的AccessKey 与AccessSec ...
- 阿里云消息队列 RocketMQ 5.0 全新升级:消息、事件、流融合处理平台
从"消息"到"消息.事件.流"的大融合 消息队列作为当代应用的通信基础设施,微服务架构应用的核心依赖,通过异步解耦能力让用户更高效地构建分布式.高性能.弹性健壮 ...
- 云栖发布|阿里云消息队列 RocketMQ 5.0:消息、事件、流融合处理平台
简介:RocketMQ5.0 的发布标志着阿里云消息正式从消息领域正式迈向了"消息.事件.流"场景大融合的新局面. 引言:从"消息"到"消息.事件.流 ...
最新文章
- IT人士有哪些保健建议
- java写spark碰到输出为[Ljava.lang.String;@889a8a8的情况
- ARP:地址解析协议
- HTTP 请求之性能优化:DNS预解析 dns-prefetch
- django03_表单(forms.ModelForm)(login前后台)
- java多线程测试性能,总线程使用总时间。
- 清华自动化大一 C++作业引爆全网,特奖得主、阿里P6:我们也做不到
- 雷电2接口_Steinberg 发布旗舰级 32 bit / 384 kHz 雷电 2 音频接口 AXR4
- 李庄 220kV变电站电气部分初步设计
- jmeter使用手册
- paypal支付(Java)
- 听完周杰伦的《Mojito》,我不禁想用分子料理做几颗
- 决策树first task之框架搭建和提出问题
- 维基百科--文件系统大全
- ethtool 开启网卡_技术|如何使用 ethtool 命令管理以太网卡
- AI视频智能平台EasyCVR设备录像出现无法播放现象的问题修复
- pb3-protobuf 格式-上传 网络信息内容
- 逛画展(二分+队列)
- 转行程序员日记---2020-10-12【不是孤独一人】
- 谷粒商城项目笔记总结(2/2)
热门文章
- 前端学习(3116):react-hello-react的事件绑定
- 前端学习(3066):vue+element今日头条管理-频道筛选
- 前端学习(2975):路由传参的两种方式
- [css] css怎么更改表单的单选框或下拉框的默认样式?
- 前端学习(2015)vue之电商管理系统电商系统之实现图片的预览效果
- 前端学习(1559):ng-classt隔行变色
- 前端学习(584):在dom中调试节点
- Python time localtime()方法
- CSS之background-size属性
- docker php composer 使用_如何使用Docker部署PHP开发环境