RabbitMQ消息中间件
1.MQ
使用MQ完成系统与系统之间得调用
2.MQ的优缺点
优点:
2.异步提速
3.削峰填谷
缺点:
什么情况下要使用MQ
1.生产者无需从消费者获取反馈。引入消息队列前的直接调用,其接口返回值为空,这会让下层的动作还没做,上层却继续向后运行,即异步成为可能。
2.容许短暂的不一致性
3.使用后有效果。解耦、提速和削峰的收益超过它的成本。
3.MQ的种类
rabbitMQkafkaRocketMQActiveMQ
4.rabbitMQ安装
请参考rabbirMQ安装说明文档
注意:在linux中需要放行下面的端口号或者关闭linux的防火墙
5.rabbitMQ的端口号解释
6. rabbit的工作原理
7.java程序连接RabbitMQ服务
rabbitMQ的官网
RabbitMQ Tutorials — RabbitMQ
官网提供了五种模式
创建maven工程,引入依赖
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.14.2</version></dependency>
1. 简单模式--Hello
简单模式生产者代码:
public class Test01 {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("simple_queue",true,false,false, null);//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/String msg="hello world~~~~1";channel.basicPublish("","simple_queue",null,msg.getBytes());connection.close();}
}
运行示例:
注:若出现连接超时,检查是否未放行5672端口号
简单模式消费者代码:
public class Test01 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("simple_queue",true,consumer);}
}
2. 工作者模式--work queues
P:生产者C1:消费者1C2:消费者2Q: 队列消费者1和消费者2属于竞争关系,一个消息只会被一个消费者消费。
工作者模式的生产者代码(和简单模式没什么区别):
public class WorkTest {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("work_queue",true,false,false, null);//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/for(int i=0;i<10;i++) {String msg = "hello work~~~~"+i;channel.basicPublish("", "work_queue", null, msg.getBytes());}connection.close();}
}
工作者模式消费者代码:
消费1:
public class WorkTest01 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("work_queue",true,consumer);}
}
消费者2:
public class WorkTest02 {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("work_queue",true,consumer);}
}
两个消费者采用轮询策略
3. 发布订阅模式
p: producter 生产者x:exchange交换机Q: 队列C1和C2:消费者
生产者:
//发布订阅模式
public class TestSubscribe {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("publisher_queue01",true,false,false, null);channel.queueDeclare("publisher_queue01",true,false,false, null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type,交换机的种类* boolean durable:是否持久化*/channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT,true);//交换机和队列绑定/*** String queue,队列名* String exchange,交换机名* String routingKey 路由key*/channel.queueBind("publisher_queue01","fanout_exchange","");channel.queueBind("publisher_queue02","fanout_exchange","");//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/for(int i=0;i<10;i++) {String msg = "hello work~~~~"+i;channel.basicPublish("", "fanout_exchange", null, msg.getBytes());}connection.close();}
}
消费者01:
public class TestSubscribe {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("publisher_queue01",true,consumer);}
}
4. 路由模式--router
p:生产者
x: 交换机---Direct (路由模式)
c1和c2表示消费者:
Q:队列
生产者;
public class Test {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("router_queue01",true,false,false, null);channel.queueDeclare("router_queue01",true,false,false, null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type,交换机的种类* boolean durable:是否持久化*/channel.exchangeDeclare("direct_exchange", BuiltinExchangeType.DIRECT,true);//交换机和队列绑定/*** String queue,队列名* String exchange,交换机名* String routingKey 路由key*/channel.queueBind("router_queue01","direct_exchange","error");channel.queueBind("router_queue02","direct_exchange","error");channel.queueBind("router_queue02","direct_exchange","info");channel.queueBind("router_queue02","direct_exchange","warning");//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/String msg = "hello router~~~~";channel.basicPublish("direct_exchange", "info", null, msg.getBytes());connection.close();}
}
消费者;
public class Test {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("router_queue01",true,consumer);}
}
5. 主题模式--topic
* 通配一个单词
# 通配零个或多个单词
生产者:
public class TestTopic {public static void main(String[] args) throws Exception{ConnectionFactory factory=new ConnectionFactory();//设置rabbitMQ服务器的地址 默认localhostfactory.setHost("192.168.226.234");// 设置rabbitMQ的端口号 默认5672factory.setPort(5672);//设置账号和密码 默认guestfactory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名 默认为 /factory.setVirtualHost("/");// 获取连接通道Connection connection=factory.newConnection();// 获取channel信道Channel channel = connection.createChannel();// 创建队列/*** 如果该队列名不存在则自动创建,存在则不创建* String queue,队列名* boolean durable,是否持久化* boolean exclusive,(独占)声明队列同一时间只能保 证一个连接,且该队列只有被这一个连接使用。* boolean autoDelete,是否自动删除* Map<String, Object> arguments: 其他参数*/channel.queueDeclare("topic_queue01",true,false,false, null);channel.queueDeclare("topic_queue02",true,false,false, null);//创建交换机/*** String exchange,交换机的名称* BuiltinExchangeType type,交换机的种类* boolean durable:是否持久化*/channel.exchangeDeclare("topic_exchange", BuiltinExchangeType.TOPIC,true);//交换机和队列绑定/*** String queue,队列名* String exchange,交换机名* String routingKey 路由key*/channel.queueBind("topic_queue01","topic_exchange",".orange.");channel.queueBind("topic_queue02","topic_exchange","lazy.#");channel.queueBind("topic_queue02","topic_exchange","*.*.rabbit");//发送消息到队列/*** String exchange,把消息发给哪个交换机--简单模式没 有交换机""* String routingKey,消息绑定的路由key 如果为简单模 式 默认写为队列名称* BasicProperties props, 消息的属性* byte[] body: 消息的内容*/String msg="hello routers~~~~1";channel.basicPublish("topic_exchange","lazy.orange.rabbit.qqq",null,msg.getBytes());connection.close();}
}
消费者:
public class Test {public static void main(String[] args) throws Exception{ConnectionFactory factory = new ConnectionFactory();//设置rabbitMQ服务器的地址factory.setHost("192.168.226.234");//端口号factory.setPort(5672);//设置账号密码factory.setUsername("guest");factory.setPassword("guest");//设置虚拟主机名factory.setVirtualHost("/");//获取连接通道Connection connection = factory.newConnection();//获取channel信道Channel channel = connection.createChannel();//监听队列/*** String queue,监听的队列名称* autoAck:是否自动确认消息* Consumer callback: 监听到消息后触发的回调函数*/DefaultConsumer consumer = new DefaultConsumer(channel){//有消息就会触发该方法// body:消息内容@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接受的消息:"+new String(body));}};channel.basicConsume("topic_queue02",true,consumer);}
}
RabbitMQ消息中间件相关推荐
- 慕课网_《RabbitMQ消息中间件极速入门与实战》学习总结
慕课网<RabbitMQ消息中间件极速入门与实战>学习总结 时间:2018年09月05日星期三 说明:本文部分内容均来自慕课网.@慕课网:https://www.imooc.com 教学源 ...
- SpringCloud项目:实现推送消息到RabbitMQ消息中间件
作者:杨桃桃 blog.csdn.net/yt812100/article/details/111785839 一.CRT创建RabbitMQ容器 CRT容器自带RabbitMQ消息中间件,只需要在C ...
- RabbitMq 消息中间件介绍初体验
RabbitMq 消息中间件介绍&为什么要使用消息中间件&什么时候使用消息中 间件 我们用java来举例子, 打个比方 我们客户端发送一个下单请求给订单系统(order)订单系统发 ...
- ①RabbitMQ 消息中间件/消息队列、单节点、集群、镜像集群
文章目录 RabbitMQ 消息中间件/消息队列 1.消息中间件 1.简介 2.作用 消息中间件的两种模式 P2P模式 Rabbitmq Pub/Sub模式(发布/订阅:Topic,可以重复消费) K ...
- RabbitMQ消息中间件技术精讲全集
RabbitMQ消息中间件技术精讲 导航: RabbitMQ消息中间件技术精讲 一. 主流消息中间件介绍 1.1 ActiveMQ 1.2 Kafka 1.3 RocketMQ 1.4 RabbitM ...
- Linux中级实战专题篇:rabbitmq(消息中间件p2p模式和pub模式,消息队列rabbitmq详解,单机安装,集群部署以及配置实战)
一.消息中间件相关概念 1.简介 消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台相关 的数据交流,并基于数据通信来进行分布式系统的集成.通过提供消息传递和消息 队列模型,可以在分布 ...
- Rabbitmq 消息中间件 秒杀问题思考
对于秒杀类似的高并发大流量处理问题,采用消息中间件处理比较合适.网上大部分解决方式似乎都在消费者端,采用basicQos限制消息取出个数,basicAck手动处理执行结果.其实很大部分不需要到消费者来 ...
- RabbitMQ消息中间件-基础篇
文章目录 一.什么是MQ 1.1 mq的作用 1.2 mq的区别 二. RabbitMQ 2.1 四大核心概念 2.2 RabbitMQ核心部分 2.3 RabbitMQ工作原理 三.docker安装 ...
- Rabbitmq消息中间件整合Springboot
首先在docker-compose中部署rabbitmq: docker-compose.yml version: '3' services:rabbitmq:image: rabbitmq:mana ...
最新文章
- 分组背包----HDU1712 ACboy needs your help
- 多视图几何总结——三角形法
- 关于matlab中princomp的使用说明讲解
- JVM总结---各处总结
- linux脚本计算器加减乘除,用shell写一个简易计算器,可以实现加、减、乘、除运算,假如脚本名字为1.sh,执行示例:./1....
- java int 数据类型_Java 基本数据类型
- 《C和指针》——将无符号整数转换为字符
- 牛客网暑期ACM多校训练营(第三场): C. Chiaki Sequence Reloaded(数位DP)
- [转]用C#编写ActiveX控件
- gulp-sass 使用报错Error:gulp-sass no longer has a default Sass compiler; please set one yourself
- 第二人生的源码分析(2)第二人生的基本功能
- 我的世界服务器银行系统,我的世界多功能银行系统制作教程
- NodeJs+mongoose实现搜索功能
- [反汇编练习] 160个CrackMe之024
- 如何创建自己的 Google Chrome 扩展程序
- 第二十三章 类关键字 - Language
- C语言函数递归调用实验报告,C语言函数的递归和调用实例分析
- vue 使用Computed实现数据的动态计算
- winsty: 我的PhD总结
- pyechars切片器