消息队列-简述

个人总结:消息队列是一个主要为了处理高并发业务场景的中间件,核心数据结构是队列,即先进先出,结合经典秒杀场景,将一个个秒杀请求存储在消息队列中,通过路由分发到一个个消费者异步处理消息,从而避免了,服务器在短时间内应对大量高并发请求而导致服务瘫痪的尴尬场景。
推荐一篇介绍得很好的博客介绍:消息队列技术介绍
我使用的消息队列是RabbitMQ,语言是C#

安装Erlang环境(是安装RabbitMQ服务需要的一个环境)

Windows下载地址:Erlang-Windows-Installer
安装好后,需要配置环境变量,配置完后,调出控制台,查看erl版本号:


至此,erlang安装成功

安装RabbitMQ服务(Rabbitmq-server)

rabbitmq-server官网地址:rabbitmq-server官网
在GetStarted下面找到下载链接,最后找到Windows安装包如下图下载地址:

下载安装略. rabbitMq官网上有很多关于rabbitMq的介绍,以及在相关语言环境下使用

RabbitMQ可视化后台管理界面

找到我们刚刚安装好的RabbitMq-server安装路径下:

C:\Program Files\RabbitMQ Server\rabbitmq_server-3.9.14\sbin

在这里打开cmd控制台,或者将此路径添加到环境变量中,输入以下命令(启动管理插件,可视化界面):

rabbitmq-plugins enable rabbitmq_management

转-RabbitMq常用命令:RabbitMq常用命令
启动之后,在浏览器上打开可视化管理界面:
本地可视化管理界面地址:http://127.0.0.1:15672

默认用户名和密码都是:guest

基本消息发送

创建连接消息队列配置信息实体:

public class RabbitMqModel{/// <summary>/// 服务器ip地址/// </summary>private string host;public string Host{get { return host; }set { host = value; }}/// <summary>///服务器端口/// </summary>private int port;public int Port{get { return port; }set { port = value; }}/// <summary>/// 消息队列登录用户名/// </summary>private string userName;public string UserName{get { return userName; }set { userName = value; }}/// <summary>/// 消息队列登录密码/// </summary>private string passWord;public string PassWord{get { return passWord; }set { passWord = value; }}/// <summary>/// 消息队列名称/// </summary>private string queueName;public string QueueName{get { return queueName; }set { queueName = value; }}/// <summary>/// 交换机名称/// </summary>private string exchangeName;public string ExchangeName{get { return exchangeName; }set { exchangeName = value; }}}

初始化RabbitMqModel:

private static RabbitMqModel _rabbitMqModel;
_rabbitMqModel = new RabbitMqModel(){Host = "localhost",Port = 5672,UserName = "guest",PassWord = "guest",QueueName = "testQue1",ExchangeName = "testExchange"};
RabbitMQUtil.InitFactory(_rabbitMqModel);public static void InitFactory(RabbitMqModel mqModel){if (_connectionFactory != null) return;_connectionFactory = new ConnectionFactory(){HostName = mqModel.Host,Port = mqModel.Port,UserName = mqModel.UserName,Password = mqModel.PassWord};}

生产者(消息发送):

public static bool SendMsgToServer(object content,string queueName,string exchangeName,ILog log){if (_connectionFactory == null){// 发送消息必须先实例化工厂return false;}bool flag;_connectionFactory.RequestedHeartbeat= TimeSpan.FromSeconds(10);try{using (var connection = _connectionFactory.CreateConnection()){//创建一个通道,这个就是Rabbit自己定义的规则了,如果自己写消息队列,这个就可以开脑洞设计了using (var channel = connection.CreateModel()){//声明交换机exchangchannel.ExchangeDeclare(exchange: "myexchange",type: ExchangeType.Direct,durable: false,autoDelete: false,arguments: null);channel.QueueDeclare(queueName, false, false, false,arguments: new Dictionary<string, object>() {{ "x-max-priority",10 }  //设置队列优先级,如果不设置消息优先级,则不生效,前提是必须先创建好一个带有"x-max-priority"属性的队列,否则会报错;});channel.QueueBind(queue: queueName, exchange: "myexchange", ""); // 绑定队列名称和exchange路由channel.ConfirmSelect();  //确认消息是否发送成功// 序列化对象 发送消息string message = JsonConvert.SerializeObject(content);var properties = channel.CreateBasicProperties();properties.Persistent = true;  //消息持久化,队列没有设置持久化,同样不会生效,队列持久化durable:trueproperties.Priority = 1; // 设置消息优先级var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("myexchange", "",properties, body); //发送消息var isOk = channel.WaitForConfirms();if (!isOk){//throw new Exception("The message is not reached to the server!");flag = false;}flag = true;}}}catch (Exception ex){log.Error(ex.Message, ex);flag = false;}return flag;}

消费者(接收消息):

public class Consumer{public static IConnection _connection;public static IModel _channel;public static void ConsumerMessage(string queueName){try{if (_connection == null) return;_channel.BasicQos(0, 1, false); // 设置当没有收到上条消息处理完时,下一条消息将不被处理var consumer = new EventingBasicConsumer(_channel);consumer.Received += (model, ea) =>{string message = Encoding.UTF8.GetString(ea.Body.ToArray());//接收消息TaskInfoDTO taskInfo = JsonConvert.DeserializeObject<TaskInfoDTO>(message);// 模拟处理任务,延时5秒Thread.Sleep(5000);try{// 判断任务是否存在if (RabbitMQUtil._keys.Count > 0 && RabbitMQUtil._keys.Contains(taskInfo.Task_Id)){//  模拟数据库修改任务状态TaskInfoDTO task=null;foreach (var item in MainWindow.data.TaskInfos){if (item.Task_Id == taskInfo.Task_Id){task = item;break;}}if (task != null){Application.Current.Dispatcher.Invoke(() =>{MainWindow.data.TaskInfos.Remove(task);MainWindow.RefreshDataGrid();});_channel.BasicAck(ea.DeliveryTag, false); // 标志消息已被处理RabbitMQUtil._keys.Remove(taskInfo.Task_Id); // 删除缓存}else{//这里可以将消息重回队列,也可将消息删除,(可以设置一个删除队列,将无用的消息放到删除队列)}}}catch (Exception ex){LogUtils.log.Error(ex.Message, ex);}};_channel.BasicConsume(queueName, false, consumer);  // 设置手动交付消息}catch (Exception ex){LogUtils.log.Error(ex.Message, ex);}}}

设置消息优先级发送

设置带优先级的发送消息,生产者做部分修改,消费者不做改变:

public static bool SendMsgToServerWithLevel(object content, string queueName, string exchangeName,ILog log){if (_connectionFactory == null){// 发送消息必须先实例化工厂return false;}bool flag;_connectionFactory.RequestedHeartbeat = TimeSpan.FromSeconds(10);try{using (var connection = _connectionFactory.CreateConnection()){using (var channel = connection.CreateModel()){//声明交换机exchangchannel.ExchangeDeclare(exchange: "myexchange",type: ExchangeType.Direct,durable: false,autoDelete: false,arguments: null);channel.QueueDeclare(queueName, false, false, false, arguments:new Dictionary<string, object>() {{ "x-max-priority",10 }});channel.QueueBind(queue: queueName, exchange: "myexchange","");channel.ConfirmSelect();// 序列化对象 发送消息string message = JsonConvert.SerializeObject(content);var properties = channel.CreateBasicProperties();JObject obj = JsonConvert.DeserializeObject<JObject>(message);if (!string.IsNullOrEmpty(obj["Remark"].ToString()) && obj["Remark"].ToString().Equals("vip")){properties.Priority = 9;// 设置优先级}else{properties.Priority = 1;}properties.Persistent = true; // 设置消息持久化var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish("myexchange", "", properties,body);var isOk = channel.WaitForConfirms();if (!isOk){//throw new Exception("The message is not reached to the server!");flag = false;}flag = true;}}}catch (Exception ex){log.Error(ex.Message, ex);flag = false;}return flag;}

消息发送确认机制

示例文章:发送消息确认

消息持久化、消息确认、优先级

示例文章:消息持久化、消息确认、优先级

不被重复消费

示例文章:消息不被重复消费

RabbitMQ各个参数含义

示例文章:rabbitMq各个参数含义
示例文章:重要方法简述

基础队列生产消费录屏

RabbitMq基本消费队列

优先级队列生产消费录屏

RabbitMq优先级消费者队列

代码上传

源代码:rabbitMq学习

以上示例文章均转载其他博主优秀文章,如有侵权请告知删除,谢谢。

RabbitMQ处理MES调度任务相关推荐

  1. RabbitMQ系列教程之二:工作队列(Work Queues)

    今天开始RabbitMQ教程的第二讲,废话不多说,直接进入话题.   (使用.NET 客户端 进行事例演示) 在第一个教程中,我们编写了一个从命名队列中发送和接收消息的程序.在本教程中,我们将创建一个 ...

  2. 基于MES的生产管理系统应用

    分享一下我老师大神的人工智能教程!零基础,通俗易懂!http://blog.csdn.net/jiangjunshow 也欢迎大家转载本篇文章.分享知识,造福人民,实现我们中华民族伟大复兴! 通过对某 ...

  3. celery mysql 异步_celery配合rabbitmq任务队列实现任务的异步调度执行[celery redis]

    前言: 51cto的文章已经不再补充更新了,另外celery rabbitmq详细的使用方法请到这里浏览. http://xiaorui.cc/2014/11/16/celery-rabbitmq%E ...

  4. MES如何对车间设备进行调度和控制实现智能化生产(一)

    目前,有很多制造型企业都开始逐渐的使用MES系统,MES系统是ERP系统的下层,数据源由ERP提供,它的主要作用是实现车间内的精细化管理,并可以精确的对某一工单从投产到入库整个这一过程进行有效的实时监 ...

  5. 汽车制造MES介绍之3 - AVI车辆识别与调度

    http://note.youdao.com/share/?id=ff70b66ac1a39bccefdd82ca07c8cb92&type=note#/ 转载于:https://www.cn ...

  6. RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失

    1. 消息丢失源头 RabbitMQ 消息丢失的源头主要有以下三个: 生产者丢失消息 RabbitMQ 丢失消息 消费者丢失消息 下面主要从 3 个方面进行说明并提供应对措施 2. 生产者丢失消息 R ...

  7. RabbitMQ 七战 Kafka,差异立现!

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 译者丨王欢,Golang后端工程师,DockOne社区译者 ...

  8. 选型必看:RabbitMQ 七战 Kafka,差异立现

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作为一个有丰富经验的微服务系统架构师,经常有人问我,"应 ...

  9. 半吊子架构师,一来就想干掉RabbitMQ ...

    点击上方"方志朋",选择"设为星标" 回复"666"获取新整理的面试文章 作为一个有丰富经验的微服务系统架构师,经常有人问我,"应 ...

最新文章

  1. 线上服务 CPU 又 100% 啦?一键定位 so easy!
  2. 多媒体制作技术心得体会_多媒体课件制作学习心得体会
  3. 数据库分页存储过程(4)
  4. 信道检测手机软件 ios_【手机软件】云听:稀有神器,移动音频的国家队,某拉雅资源它都有!...
  5. alter database open resetlogs
  6. 70+漂亮且极具亲和力的导航菜单设计推荐
  7. 理解Javascript_12_执行模型浅析
  8. java extern的作用_学习笔记之20-static和extern关键字2-对变量的作用
  9. python自动化测试-python自动化之(自动化测试报告)
  10. 多个表结果的并列显示
  11. 企业微信对接金蝶云星空单据模板-日常费用报销
  12. Qt环境下调用捷宇高拍仪OCX
  13. Tomcat出现中文乱码
  14. Word自定义目录的设置
  15. win10升级2004失败?
  16. 文案自动修改软件-文案自动改写的免费软件下载
  17. 【图形设计】什么是组织架构图?如何画组织架构图
  18. 大梦谁先觉 --伍立杨
  19. 关于SpringCloud,Spring容器重复初始化的问题
  20. c++基础题:判断某整数是否既是5又是7的整数倍

热门文章

  1. arduinohanshu_Arduino基础-函数 (范例)
  2. 彻底理解Java并发:Java内存模型
  3. 通讯录管理系统程序开发
  4. html原生js进度条圆形,原生 JavaScript 实现进度条
  5. 关于JSONObject调用fromObject方法的出错问题
  6. linux环境下载google云盘文件
  7. turtle绘制无角正方形
  8. 使用dd路由中继Chinanet,欺骗登录,达到免费上网。-无线路由器-中国无线论坛 -...
  9. 如何将IIS6服务器的网站批量迁移到IIS7中
  10. 点分治学习:树的重心(质心)