rabbitlearning下载_RabbitMQ-learning
第一种模式=直连
P:生产者,也就是要发送消息的程序
C:消费者:消息的接受者,会一直等待消息到来。
queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。
producer:
package com.quan.rabbitmq.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;/**
* 直连模型:
* 生产者发送消息
* 消费者,等待消息到来消费
* 消息队列:可以缓存消息,生产者向其中投递消息,消费者从其中取出消息。
**/public class RMQProducer {
public staticvoidmain(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory= newConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("quan");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("quan");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
channel.basicPublish("","hello",null,"hello rabbit".getBytes());
channel.close();
connection.close();
}
}
consumer:
package com.quan.rabbitmq.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class RMQConsumer {
public staticvoidmain(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory= newConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("quan");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("quan");//创建连接
Connection connection =connectionFactory.newConnection();//通过连接创建通道
Channel channel =connection.createChannel();/**
* 参数1:声明通道对应的队列
* 参数2:指定是否持久化
* 参数3:指定是否独占对象
* 参数4:指定是否自动删除队列
* 参数5:对队列的额外设置*/channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,newDefaultConsumer(channel){
@Override
publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(newString(body));
}
});
}
}
第二种模式=任务模型(work quene)
当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。
此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
P:生产者:任务的发布者
C1:消费者-1,领取任务并且完成任务,假设完成速度较慢
C2:消费者-2:领取任务并完成任务,假设完成速度快
P:
/**
* 直连模型:
* 生产者发送消息
* 消费者,等待消息到来消费
* 消息队列:可以缓存消息,生产者向其中投递消息,消费者从其中取出消息。
**/public class RMQProducer2 {
public staticvoidmain(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory= newConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("quan");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("quan");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);for(int i =0 ;i<20;i++){
channel.basicPublish("","hello",null,(i+"=====>hello rabbit").getBytes());
}
channel.close();
connection.close();
}
}
C1
public class RMQConsumer21 {
public staticvoidmain(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory= newConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("quan");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("quan");//创建连接
Connection connection =connectionFactory.newConnection();//通过连接创建通道
Channel channel =connection.createChannel();/**
* 参数1:声明通道对应的队列
* 参数2:指定是否持久化
* 参数3:指定是否独占对象
* 参数4:指定是否自动删除队列
* 参数5:对队列的额外设置*/channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,newDefaultConsumer(channel){
@Override
publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是consumer1"+newString(body));
}
});
}
}
C2
public class RMQConsumer22 {
public staticvoidmain(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory= newConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("quan");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("quan");//创建连接
Connection connection =connectionFactory.newConnection();//通过连接创建通道
Channel channel =connection.createChannel();/**
* 参数1:声明通道对应的队列
* 参数2:指定是否持久化
* 参数3:指定是否独占对象
* 参数4:指定是否自动删除队列
* 参数5:对队列的额外设置*/channel.queueDeclare("hello",true,false,false,null);
channel.basicConsume("hello",true,newDefaultConsumer(channel){
@Override
publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try{
Thread.sleep(1000);
}catch(InterruptedException e) {
e.printStackTrace();
}
System.out.println("我是consumer2"+newString(body));
}
});
}
}
re
当两个消费者都在监听通道中的消息的时候:
我们一旦发消息:
我是consumer2 0=====>hello rabbit
我是consumer22=====>hello rabbit
我是consumer24=====>hello rabbit
我是consumer26=====>hello rabbit
我是consumer28=====>hello rabbit
我是consumer210=====>hello rabbit
我是consumer212=====>hello rabbit
我是consumer214=====>hello rabbit
我是consumer216=====>hello rabbit
我是consumer218=====>hello rabbit
@@@@@@@@@@@@@@我是consumer11=====>hello rabbit
我是consumer13=====>hello rabbit
我是consumer15=====>hello rabbit
我是consumer17=====>hello rabbit
我是consumer19=====>hello rabbit
我是consumer111=====>hello rabbit
我是consumer113=====>hello rabbit
我是consumer115=====>hello rabbit
我是consumer117=====>hello rabbit
我是consumer119=====>hello rabbit
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。
消息确认机制:
public class RMQConsumer21 {
public staticvoidmain(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory= newConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setUsername("quan");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("quan");//创建连接
Connection connection =connectionFactory.newConnection();//通过连接创建通道
final Channel channel =connection.createChannel();
channel.queueDeclare("hello",true,false,false,null);
channel.basicQos(1);//一次只接受一条为确认的消息
//第二个参数:关闭自动确认消息
channel.basicConsume("hello",false,newDefaultConsumer(channel){
@Override
publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是consumer1 "+newString(body));
channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息
}
});
}
}
re:
我是consumer1 0=====>hello rabbit
@@@@@@@@@@@
我是consumer21=====>hello rabbit
我是consumer22=====>hello rabbit
我是consumer23=====>hello rabbit
我是consumer24=====>hello rabbit
我是consumer25=====>hello rabbit
我是consumer26=====>hello rabbit
我是consumer27=====>hello rabbit
我是consumer28=====>hello rabbit
我是consumer29=====>hello rabbit
我是consumer210=====>hello rabbit
我是consumer211=====>hello rabbit
我是consumer212=====>hello rabbit
我是consumer213=====>hello rabbit
我是consumer214=====>hello rabbit
我是consumer215=====>hello rabbit
我是consumer216=====>hello rabbit
我是consumer217=====>hello rabbit
我是consumer218=====>hello rabbit
我是consumer219=====>hello rabbit
第三种模式-广播
fanout 扇出===广播
消息发送流程:
-可以有多个消费者- 每个消费者有自己的queue(队列)- 每个队列都要绑定到Exchange(交换机)- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。-交换机把消息发送给绑定过的所有队列- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费
p;更新部分:
Connection connection =connectionFactory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare("logs","fanout");for(int i =0 ;i<20;i++){//第一个参数:交换机名字
//第二个参数:队列名字
//第三个参数:
//第四个参数:消息,是byte类型
channel.basicPublish("logs","",null,(i+"=====>hello rabbit").getBytes());
}
c1 c2 c3:这三个都是差不多的配置:
//绑定交换机
channel.exchangeDeclare("logs","fanout");//创建临时队列
String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange
channel.queueBind(queue,"logs","");//处理消息
channel.basicConsume(queue,true,newDefaultConsumer(channel){
@Override
publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是consumer1 "+newString(body));
}
});
re:
所有的消息,每个消费者都可以消费得到,
第四种模式-Routing-订阅模式中的-直连(direct)
在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。
这时就要用到Direct类型的Exchange。
流程:
-队列与交换机的绑定,不能是任意绑定了,而是要指定一个`RoutingKey`(路由key)-消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`。- Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息
-P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。-X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列-C1:消费者,其所在队列指定了需要routing key 为 error 的消息- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息
情景:
c1 :error,info,debug
c2:error
c3:info
p:会每种key发一条消息:
p:改变rkey的值:
//声明交换机:参数1:交换机名称
//参数2:交换机类型:
channel.exchangeDeclare("logs1","direct");
String rkey= "debug";//第一个参数:交换机名字
//第二个参数:队列名字/路由key
//第三个参数:
//第四个参数:消息,是byte类型
channel.basicPublish("logs1",rkey,null,(rkey+"消息=====>hello rabbit").getBytes());
channel.close();
connection.close();
}
c1:
//绑定交换机/参数2 交换机类型
channel.exchangeDeclare("logs1","direct");//创建临时队列
String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange
//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key
channel.queueBind(queue,"logs1","error");
channel.queueBind(queue,"logs1","info");
channel.queueBind(queue,"logs1","debug");//处理消息
channel.basicConsume(queue,true,newDefaultConsumer(channel){
@Override
publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("我是consumer1 "+newString(body));
}
});
}
c2:
//绑定交换机
channel.exchangeDeclare("logs1","direct");//创建临时队列
String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange
//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key
channel.queueBind(queue,"logs1","error");//处理消息
C3:
//创建连接
Connection connection =connectionFactory.newConnection();//通过连接创建通道
Channel channel =connection.createChannel();//绑定交换机
channel.exchangeDeclare("logs1","direct");//创建临时队列
String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange
//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key
channel.queueBind(queue,"logs1","info");//处理消息
p每个rkey发送一次消息后的结果:
我是consumer1 error消息=====>hello rabbit
我是consumer1 info消息=====>hello rabbit
我是consumer1 debug消息=====>hello rabbit
我是consumer2 error消息=====>hello rabbit
我是consumer3 info消息=====>hello rabbit
可以知道,这种类型的交换机是可以按需发送的。
Routing的订阅模式--topic:
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列
在绑定Routing key的时候使用通配符!
这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
p:
//声明交换机:参数1:交换机名称
//参数2:交换机类型:
channel.exchangeDeclare("topic1","topic");
String rkey= "user.debug.all";//第一个参数:交换机名字
//第二个参数:队列名字/路由key
//第三个参数:
//第四个参数:消息,是byte类型
channel.basicPublish("topic1",rkey,null,(rkey+"消息=====>hello rabbit").getBytes());
c2:
//绑定交换机
channel.exchangeDeclare("topic1","topic");//创建临时队列
String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange
//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key
channel.queueBind(queue,"topic1","user.#");
user.#可以匹配多个后面的单词:
c1:
//绑定交换机/参数2 交换机类型
channel.exchangeDeclare("topic1","topic");//创建临时队列
String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange
//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key
channel.queueBind(queue,"topic1","user.*");
user.*只能接一个单词:
p发送了一次key为user.debug 和一次user.debug.all:
我是consumer1 user.debug消息=====>hello rabbit
我是consumer2 user.debug消息=====>hello rabbit
我是consumer2 user.debug.all消息=====>hello rabbit
rabbitlearning下载_RabbitMQ-learning相关推荐
- rabbitlearning下载_RabbitMQ Learning
消息队列(Message Queue) 消息队列(MQ)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递.消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只 ...
- Deep Learning — LeCun, Yann, Yoshua Bengio and Geoffrey Hinton
原文链接Deep Learning 由于作者太菜,本文70%为机翻.见谅见谅 第一篇是三巨头LeCun, Yann, Yoshua Bengio和Geoffrey Hinton做的有关Deep Lea ...
- 图像超分辨率之SRCNN(Learning a Deep Convolutional Network for Image Super-Resolution)
论文下载:Learning a Deep Convolutional Network for Image Super-Resolution 代码下载:https://github.com/tegg89 ...
- 【论文简述】Learning Inverse Depth Regression for Pixelwise Visibility-AwareMulti-View Stereo (IJCV 2022)
一.论文简述 1. 第一作者:Qingshan Xu 2. 发表年份:2022 3. 发表期刊:IJCV 4. 关键词:MVS.3D重建.可见性信息.抗噪声训练.逆深度回归.平均组相关 5. 探索动机 ...
- 【论文解读】Learning to Deceive with Attention-Based Explanations 注意力机制作为模型的可解释性存疑
Learning to Deceive with Attention-Based Explanations 这是一篇发表于ACL2020的关于注意力机制用作模型可解释性的论文. 原文:[link](h ...
- [雪峰磁针石博客]kotlin书籍汇总
2019独角兽企业重金招聘Python工程师标准>>> 下载地址 Learning Kotlin by Building Android Applications - 2018 初级 ...
- 吴恩达教你如何使用“锤子”?机器学习新书免费领
作者 | 阿司匹林 继今年 2 月 Deep Learning Specialization 最后一课上线之后,吴恩达又捡起了之前荒废已久的项目-- Machine Learning Yearning ...
- 机器学习的敲门砖:手把手教你TensorFlow初级入门
摘要: 在开始使用机器学习算法之前,我们应该首先熟悉如何使用它们. 而本文就是通过对TensorFlow的一些基本特点的介绍,让你了解它是机器学习类库中的一个不错的选择. 本文由北邮@爱可可-爱生活 ...
- python项目如何打开_python如何打开_linux如何打开python_python程序打开 - 云+社区 - 腾讯云...
广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 当你安装好python时,你可以在python环境下写语句. python环境有 ...
最新文章
- VS2013中, 无法嵌入互操作类型“……”,请改用适用的接口的解决方法
- ios html5 不支持 flv_iExplorer v4.2.6 一款优秀强大的 iOS 资源管理工具
- 《Oracle高性能自动化运维》一一第1章 Linux下的Oracle
- Android 应用开发----ViewPager---2.四大函数
- React Hooks 梳理
- eclipse 闪退原因
- 全网首发:OPPO推送:服务器端的参考代码,JAVA版
- Yii集成PHPWord
- 《架构探险——从零开始写javaweb框架》.pdf
- tomcat乱码的几种解决
- 2D Game Creation - 2D游戏开发基本流程
- 解决客户一例:使用域超级管理员打开Exchange 2010发现没有权限
- 关于自动拼接地图算法
- 12款响应式 Lightbox(灯箱)效果插件
- 现代英语杂志现代英语杂志社现代英语编辑部2022年第6期目录
- 山东理工ACM【1239】水仙花数
- 5800p计算机公式,卡西欧fx-5800p计算器内置公式及公式自定义在隧道测量中的应用...
- 【人工智能】德国人工智能技术发展现状和未来趋势
- 破解Excel的宏密码
- 创建 Swift 自定义集合类