1.MQ

MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
之前服务与服务之间如何通信?
Openfeign 服务与服务之间直接调用

使用MQ完成系统与系统之间得调用

2.MQ的优缺点

优点:

1. 应用解耦

2.异步提速

3.削峰填谷

缺点:

什么情况下要使用MQ

1.生产者无需从消费者获取反馈。引入消息队列前的直接调用,其接口返回值为空,这会让下层的动作还没做,上层却继续向后运行,即异步成为可能。

2.容许短暂的不一致性

3.使用后有效果。解耦、提速和削峰的收益超过它的成本。

3.MQ的种类

rabbitMQ
kafka
RocketMQActiveMQ

4.rabbitMQ安装

请参考rabbirMQ安装说明文档

注意:在linux中需要放行下面的端口号或者关闭linux的防火墙

5.rabbitMQ的端口号解释

6. rabbit的工作原理

7.java程序连接RabbitMQ服务

rabbitMQ的官网

RabbitMQ Tutorials — RabbitMQ

官网提供了五种模式

创建maven工程,引入依赖

       <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency>
提供了5种模式。

1. 简单模式--Hello

简单模式生产者代码:

public class Test01 {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("simple_queue",true,false,false, null);//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/String msg="hello world~~~~1";channel.basicPublish("","simple_queue",null,msg.getBytes());connection.close();}
}

运行示例:

注:若出现连接超时,检查是否未放行5672端口号

简单模式消费者代码:

public class Test01 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("simple_queue",true,consumer);}
}

2. 工作者模式--work queues

P:生产者
C1:消费者1
C2:消费者2
Q: 队列
消费者1和消费者2属于竞争关系,一个消息只会被一个消费者消费。

工作者模式的生产者代码(和简单模式没什么区别):

public class WorkTest {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("work_queue",true,false,false, null);//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/for(int i=0;i<10;i++) {String msg = "hello work~~~~"+i;channel.basicPublish("", "work_queue", null, msg.getBytes());}connection.close();}
}

工作者模式消费者代码:

消费1:

public class WorkTest01 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("work_queue",true,consumer);}
}

消费者2:

public class WorkTest02 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("work_queue",true,consumer);}
}

两个消费者采用轮询策略

3. 发布订阅模式

p: producter 生产者
x:exchange交换机
Q: 队列
C1和C2:消费者

生产者:

//发布订阅模式
public class TestSubscribe {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("publisher_queue01",true,false,false, null);channel.queueDeclare("publisher_queue01",true,false,false, null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type,交换机的种类* boolean durable:是否持久化*/channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT,true);//交换机和队列绑定/*** String queue,队列名* String exchange,交换机名* String routingKey 路由key*/channel.queueBind("publisher_queue01","fanout_exchange","");channel.queueBind("publisher_queue02","fanout_exchange","");//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/for(int i=0;i<10;i++) {String msg = "hello work~~~~"+i;channel.basicPublish("", "fanout_exchange", null, msg.getBytes());}connection.close();}
}

消费者01:

public class TestSubscribe {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("publisher_queue01",true,consumer);}
}

4. 路由模式--router

p:生产者

x: 交换机---Direct (路由模式)

c1和c2表示消费者:

Q:队列

生产者;

public class Test {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("router_queue01",true,false,false, null);channel.queueDeclare("router_queue01",true,false,false, null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type,交换机的种类* boolean durable:是否持久化*/channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);//交换机和队列绑定/*** String queue,队列名* String exchange,交换机名* String routingKey 路由key*/channel.queueBind("router_queue01","direct_exchange","error");channel.queueBind("router_queue02","direct_exchange","error");channel.queueBind("router_queue02","direct_exchange","info");channel.queueBind("router_queue02","direct_exchange","warning");//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/String msg = "hello router~~~~";channel.basicPublish("direct_exchange", "info", null, msg.getBytes());connection.close();}
}

消费者;

public class Test {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("router_queue01",true,consumer);}
}

5. 主题模式--topic

* 通配一个单词

# 通配零个或多个单词

生产者:

public class TestTopic {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("topic_queue01",true,false,false, null);channel.queueDeclare("topic_queue02",true,false,false, null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type,交换机的种类* boolean durable:是否持久化*/channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);//交换机和队列绑定/*** String queue,队列名* String exchange,交换机名* String routingKey 路由key*/channel.queueBind("topic_queue01","topic_exchange",".orange.");channel.queueBind("topic_queue02","topic_exchange","lazy.#");channel.queueBind("topic_queue02","topic_exchange","*.*.rabbit");//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/String msg="hello routers~~~~1";channel.basicPublish("topic_exchange","lazy.orange.rabbit.qqq",null,msg.getBytes());connection.close();}
}

消费者:

public class Test {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("topic_queue02",true,consumer);}
}

RabbitMQ消息中间件相关推荐

  1. 慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结

    慕课网<RabbitMQ消息中间件极速入门与实战>学习总结 时间:2018年09月05日星期三 说明:本文部分内容均来自慕课网.@慕课网:https://www.imooc.com 教学源 ...

  2. SpringCloud项目:实现推送消息到RabbitMQ消息中间件

    作者:杨桃桃 blog.csdn.net/yt812100/article/details/111785839 一.CRT创建RabbitMQ容器 CRT容器自带RabbitMQ消息中间件,只需要在C ...

  3. RabbitMq 消息中间件介绍初体验

    RabbitMq 消息中间件介绍&为什么要使用消息中间件&什么时候使用消息中 间件   我们用java来举例子, 打个比方 我们客户端发送一个下单请求给订单系统(order)订单系统发 ...

  4. ①RabbitMQ 消息中间件/消息队列、单节点、集群、镜像集群

    文章目录 RabbitMQ 消息中间件/消息队列 1.消息中间件 1.简介 2.作用 消息中间件的两种模式 P2P模式 Rabbitmq Pub/Sub模式(发布/订阅:Topic,可以重复消费) K ...

  5. RabbitMQ消息中间件技术精讲全集

    RabbitMQ消息中间件技术精讲 导航: RabbitMQ消息中间件技术精讲 一. 主流消息中间件介绍 1.1 ActiveMQ 1.2 Kafka 1.3 RocketMQ 1.4 RabbitM ...

  6. Linux中级实战专题篇:rabbitmq(消息中间件p2p模式和pub模式,消息队列rabbitmq详解,单机安装,集群部署以及配置实战)

    一.消息中间件相关概念 1.简介 消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台相关 的数据交流,并基于数据通信来进行分布式系统的集成.通过提供消息传递和消息 队列模型,可以在分布 ...

  7. Rabbitmq 消息中间件 秒杀问题思考

    对于秒杀类似的高并发大流量处理问题,采用消息中间件处理比较合适.网上大部分解决方式似乎都在消费者端,采用basicQos限制消息取出个数,basicAck手动处理执行结果.其实很大部分不需要到消费者来 ...

  8. RabbitMQ消息中间件-基础篇

    文章目录 一.什么是MQ 1.1 mq的作用 1.2 mq的区别 二. RabbitMQ 2.1 四大核心概念 2.2 RabbitMQ核心部分 2.3 RabbitMQ工作原理 三.docker安装 ...

  9. Rabbitmq消息中间件整合Springboot

    首先在docker-compose中部署rabbitmq: docker-compose.yml version: '3' services:rabbitmq:image: rabbitmq:mana ...

最新文章

  1. 分组背包----HDU1712 ACboy needs your help
  2. 多视图几何总结——三角形法
  3. 关于matlab中princomp的使用说明讲解
  4. JVM总结---各处总结
  5. linux脚本计算器加减乘除,用shell写一个简易计算器,可以实现加、减、乘、除运算,假如脚本名字为1.sh,执行示例:./1....
  6. java int 数据类型_Java 基本数据类型
  7. 《C和指针》——将无符号整数转换为字符
  8. 牛客网暑期ACM多校训练营(第三场): C. Chiaki Sequence Reloaded(数位DP)
  9. [转]用C#编写ActiveX控件
  10. gulp-sass 使用报错Error:gulp-sass no longer has a default Sass compiler; please set one yourself
  11. 第二人生的源码分析(2)第二人生的基本功能
  12. 我的世界服务器银行系统,我的世界多功能银行系统制作教程
  13. NodeJs+mongoose实现搜索功能
  14. [反汇编练习] 160个CrackMe之024
  15. 如何创建自己的 Google Chrome 扩展程序
  16. 第二十三章 类关键字 - Language
  17. C语言函数递归调用实验报告,C语言函数的递归和调用实例分析
  18. vue 使用Computed实现数据的动态计算
  19. winsty: 我的PhD总结
  20. pyechars切片器

热门文章

  1. Testbench编写指南(3)模块化工程的仿真方法
  2. 计算机辅助诊断应用,计算机辅助诊断数学方法应用.ppt
  3. 【L2-029 特立独行的幸福】
  4. 单线激光雷达SLAM(一)数据提取
  5. shell脚本计算离生日还有多少天?
  6. 用vscode实现vue.js项目的一个完整过程
  7. wireshark图片还原
  8. K-均值聚类算法的原理与实现
  9. 程代展先生,您错了!
  10. linux下光盘文件怎么拷到电脑上,怎么把光盘里的东西复制到电脑 拷贝光盘内容方法【图文】...