第一种模式=直连

P:生产者,也就是要发送消息的程序

C:消费者:消息的接受者,会一直等待消息到来。

queue:消息队列,图中红色部分。类似一个邮箱,可以缓存消息;生产者向其中投递消息,消费者从其中取出消息。

producer:

package com.quan.rabbitmq.producer;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;/**

* 直连模型:

* 生产者发送消息

* 消费者,等待消息到来消费

* 消息队列:可以缓存消息,生产者向其中投递消息,消费者从其中取出消息。

**/public class RMQProducer {

public staticvoidmain(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory= newConnectionFactory();

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setUsername("quan");

connectionFactory.setPassword("admin");

connectionFactory.setVirtualHost("quan");

Connection connection=connectionFactory.newConnection();

Channel channel=connection.createChannel();

channel.queueDeclare("hello",true,false,false,null);

channel.basicPublish("","hello",null,"hello rabbit".getBytes());

channel.close();

connection.close();

}

}

consumer:

package com.quan.rabbitmq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class RMQConsumer {

public staticvoidmain(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory= newConnectionFactory();

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setUsername("quan");

connectionFactory.setPassword("admin");

connectionFactory.setVirtualHost("quan");//创建连接

Connection connection =connectionFactory.newConnection();//通过连接创建通道

Channel channel =connection.createChannel();/**

* 参数1:声明通道对应的队列

* 参数2:指定是否持久化

* 参数3:指定是否独占对象

* 参数4:指定是否自动删除队列

* 参数5:对队列的额外设置*/channel.queueDeclare("hello",true,false,false,null);

channel.basicConsume("hello",true,newDefaultConsumer(channel){

@Override

publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println(newString(body));

}

});

}

}

第二种模式=任务模型(work quene)

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

P:生产者:任务的发布者

C1:消费者-1,领取任务并且完成任务,假设完成速度较慢

C2:消费者-2:领取任务并完成任务,假设完成速度快

P:

/**

* 直连模型:

* 生产者发送消息

* 消费者,等待消息到来消费

* 消息队列:可以缓存消息,生产者向其中投递消息,消费者从其中取出消息。

**/public class RMQProducer2 {

public staticvoidmain(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory= newConnectionFactory();

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setUsername("quan");

connectionFactory.setPassword("admin");

connectionFactory.setVirtualHost("quan");

Connection connection=connectionFactory.newConnection();

Channel channel=connection.createChannel();

channel.queueDeclare("hello",true,false,false,null);for(int i =0 ;i<20;i++){

channel.basicPublish("","hello",null,(i+"=====>hello rabbit").getBytes());

}

channel.close();

connection.close();

}

}

C1

public class RMQConsumer21 {

public staticvoidmain(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory= newConnectionFactory();

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setUsername("quan");

connectionFactory.setPassword("admin");

connectionFactory.setVirtualHost("quan");//创建连接

Connection connection =connectionFactory.newConnection();//通过连接创建通道

Channel channel =connection.createChannel();/**

* 参数1:声明通道对应的队列

* 参数2:指定是否持久化

* 参数3:指定是否独占对象

* 参数4:指定是否自动删除队列

* 参数5:对队列的额外设置*/channel.queueDeclare("hello",true,false,false,null);

channel.basicConsume("hello",true,newDefaultConsumer(channel){

@Override

publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("我是consumer1"+newString(body));

}

});

}

}

C2

public class RMQConsumer22 {

public staticvoidmain(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory= newConnectionFactory();

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setUsername("quan");

connectionFactory.setPassword("admin");

connectionFactory.setVirtualHost("quan");//创建连接

Connection connection =connectionFactory.newConnection();//通过连接创建通道

Channel channel =connection.createChannel();/**

* 参数1:声明通道对应的队列

* 参数2:指定是否持久化

* 参数3:指定是否独占对象

* 参数4:指定是否自动删除队列

* 参数5:对队列的额外设置*/channel.queueDeclare("hello",true,false,false,null);

channel.basicConsume("hello",true,newDefaultConsumer(channel){

@Override

publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {try{

Thread.sleep(1000);

}catch(InterruptedException e) {

e.printStackTrace();

}

System.out.println("我是consumer2"+newString(body));

}

});

}

}

re

当两个消费者都在监听通道中的消息的时候:

我们一旦发消息:

我是consumer2 0=====>hello rabbit

我是consumer22=====>hello rabbit

我是consumer24=====>hello rabbit

我是consumer26=====>hello rabbit

我是consumer28=====>hello rabbit

我是consumer210=====>hello rabbit

我是consumer212=====>hello rabbit

我是consumer214=====>hello rabbit

我是consumer216=====>hello rabbit

我是consumer218=====>hello rabbit

@@@@@@@@@@@@@@我是consumer11=====>hello rabbit

我是consumer13=====>hello rabbit

我是consumer15=====>hello rabbit

我是consumer17=====>hello rabbit

我是consumer19=====>hello rabbit

我是consumer111=====>hello rabbit

我是consumer113=====>hello rabbit

我是consumer115=====>hello rabbit

我是consumer117=====>hello rabbit

我是consumer119=====>hello rabbit

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

消息确认机制:

public class RMQConsumer21 {

public staticvoidmain(String[] args) throws IOException, TimeoutException {

ConnectionFactory connectionFactory= newConnectionFactory();

connectionFactory.setHost("localhost");

connectionFactory.setPort(5672);

connectionFactory.setUsername("quan");

connectionFactory.setPassword("admin");

connectionFactory.setVirtualHost("quan");//创建连接

Connection connection =connectionFactory.newConnection();//通过连接创建通道

final Channel channel =connection.createChannel();

channel.queueDeclare("hello",true,false,false,null);

channel.basicQos(1);//一次只接受一条为确认的消息

//第二个参数:关闭自动确认消息

channel.basicConsume("hello",false,newDefaultConsumer(channel){

@Override

publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("我是consumer1 "+newString(body));

channel.basicAck(envelope.getDeliveryTag(),false);//手动确认消息

}

});

}

}

re:

我是consumer1 0=====>hello rabbit

@@@@@@@@@@@

我是consumer21=====>hello rabbit

我是consumer22=====>hello rabbit

我是consumer23=====>hello rabbit

我是consumer24=====>hello rabbit

我是consumer25=====>hello rabbit

我是consumer26=====>hello rabbit

我是consumer27=====>hello rabbit

我是consumer28=====>hello rabbit

我是consumer29=====>hello rabbit

我是consumer210=====>hello rabbit

我是consumer211=====>hello rabbit

我是consumer212=====>hello rabbit

我是consumer213=====>hello rabbit

我是consumer214=====>hello rabbit

我是consumer215=====>hello rabbit

我是consumer216=====>hello rabbit

我是consumer217=====>hello rabbit

我是consumer218=====>hello rabbit

我是consumer219=====>hello rabbit

第三种模式-广播

fanout 扇出===广播

消息发送流程:

-可以有多个消费者- 每个消费者有自己的queue(队列)- 每个队列都要绑定到Exchange(交换机)- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。-交换机把消息发送给绑定过的所有队列- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

p;更新部分:

Connection connection =connectionFactory.newConnection();

Channel channel=connection.createChannel();

channel.exchangeDeclare("logs","fanout");for(int i =0 ;i<20;i++){//第一个参数:交换机名字

//第二个参数:队列名字

//第三个参数:

//第四个参数:消息,是byte类型

channel.basicPublish("logs","",null,(i+"=====>hello rabbit").getBytes());

}

c1 c2 c3:这三个都是差不多的配置:

//绑定交换机

channel.exchangeDeclare("logs","fanout");//创建临时队列

String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange

channel.queueBind(queue,"logs","");//处理消息

channel.basicConsume(queue,true,newDefaultConsumer(channel){

@Override

publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("我是consumer1 "+newString(body));

}

});

re:

所有的消息,每个消费者都可以消费得到,

第四种模式-Routing-订阅模式中的-直连(direct)

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。

这时就要用到Direct类型的Exchange。

流程:

-队列与交换机的绑定,不能是任意绑定了,而是要指定一个`RoutingKey`(路由key)-消息的发送方在 向 Exchange发送消息时,也必须指定消息的 `RoutingKey`。- Exchange不再把消息交给每一个绑定的队列,而是根据消息的`Routing Key`进行判断,只有队列的`Routingkey`与消息的 `Routing key`完全一致,才会接收到消息

-P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。-X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列-C1:消费者,其所在队列指定了需要routing key 为 error 的消息- C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

情景:

c1 :error,info,debug

c2:error

c3:info

p:会每种key发一条消息:

p:改变rkey的值:

//声明交换机:参数1:交换机名称

//参数2:交换机类型:

channel.exchangeDeclare("logs1","direct");

String rkey= "debug";//第一个参数:交换机名字

//第二个参数:队列名字/路由key

//第三个参数:

//第四个参数:消息,是byte类型

channel.basicPublish("logs1",rkey,null,(rkey+"消息=====>hello rabbit").getBytes());

channel.close();

connection.close();

}

c1:

//绑定交换机/参数2 交换机类型

channel.exchangeDeclare("logs1","direct");//创建临时队列

String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange

//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key

channel.queueBind(queue,"logs1","error");

channel.queueBind(queue,"logs1","info");

channel.queueBind(queue,"logs1","debug");//处理消息

channel.basicConsume(queue,true,newDefaultConsumer(channel){

@Override

publicvoid handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("我是consumer1 "+newString(body));

}

});

}

c2:

//绑定交换机

channel.exchangeDeclare("logs1","direct");//创建临时队列

String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange

//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key

channel.queueBind(queue,"logs1","error");//处理消息

C3:

//创建连接

Connection connection =connectionFactory.newConnection();//通过连接创建通道

Channel channel =connection.createChannel();//绑定交换机

channel.exchangeDeclare("logs1","direct");//创建临时队列

String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange

//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key

channel.queueBind(queue,"logs1","info");//处理消息

p每个rkey发送一次消息后的结果:

我是consumer1 error消息=====>hello rabbit

我是consumer1 info消息=====>hello rabbit

我是consumer1 debug消息=====>hello rabbit

我是consumer2 error消息=====>hello rabbit

我是consumer3 info消息=====>hello rabbit

可以知道,这种类型的交换机是可以按需发送的。

Routing的订阅模式--topic:

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列

在绑定Routing key的时候使用通配符!

这种模型Routingkey一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

p:

//声明交换机:参数1:交换机名称

//参数2:交换机类型:

channel.exchangeDeclare("topic1","topic");

String rkey= "user.debug.all";//第一个参数:交换机名字

//第二个参数:队列名字/路由key

//第三个参数:

//第四个参数:消息,是byte类型

channel.basicPublish("topic1",rkey,null,(rkey+"消息=====>hello rabbit").getBytes());

c2:

//绑定交换机

channel.exchangeDeclare("topic1","topic");//创建临时队列

String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange

//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key

channel.queueBind(queue,"topic1","user.#");

user.#可以匹配多个后面的单词:

c1:

//绑定交换机/参数2 交换机类型

channel.exchangeDeclare("topic1","topic");//创建临时队列

String queue =channel.queueDeclare().getQueue();//将临时队列绑定交换机exchange

//第一个参数:队列,第2个参数:交换机名字,第3个参数:路由key

channel.queueBind(queue,"topic1","user.*");

user.*只能接一个单词:

p发送了一次key为user.debug 和一次user.debug.all:

我是consumer1 user.debug消息=====>hello rabbit

我是consumer2 user.debug消息=====>hello rabbit

我是consumer2 user.debug.all消息=====>hello rabbit

rabbitlearning下载_RabbitMQ-learning相关推荐

  1. rabbitlearning下载_RabbitMQ Learning

    消息队列(Message Queue) 消息队列(MQ)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递.消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只 ...

  2. Deep Learning — LeCun, Yann, Yoshua Bengio and Geoffrey Hinton

    原文链接Deep Learning 由于作者太菜,本文70%为机翻.见谅见谅 第一篇是三巨头LeCun, Yann, Yoshua Bengio和Geoffrey Hinton做的有关Deep Lea ...

  3. 图像超分辨率之SRCNN(Learning a Deep Convolutional Network for Image Super-Resolution)

    论文下载:Learning a Deep Convolutional Network for Image Super-Resolution 代码下载:https://github.com/tegg89 ...

  4. 【论文简述】Learning Inverse Depth Regression for Pixelwise Visibility-AwareMulti-View Stereo (IJCV 2022)

    一.论文简述 1. 第一作者:Qingshan Xu 2. 发表年份:2022 3. 发表期刊:IJCV 4. 关键词:MVS.3D重建.可见性信息.抗噪声训练.逆深度回归.平均组相关 5. 探索动机 ...

  5. 【论文解读】Learning to Deceive with Attention-Based Explanations  注意力机制作为模型的可解释性存疑

    Learning to Deceive with Attention-Based Explanations 这是一篇发表于ACL2020的关于注意力机制用作模型可解释性的论文. 原文:[link](h ...

  6. [雪峰磁针石博客]kotlin书籍汇总

    2019独角兽企业重金招聘Python工程师标准>>> 下载地址 Learning Kotlin by Building Android Applications - 2018 初级 ...

  7. 吴恩达教你如何使用“锤子”?机器学习新书免费领

    作者 | 阿司匹林 继今年 2 月 Deep Learning Specialization 最后一课上线之后,吴恩达又捡起了之前荒废已久的项目-- Machine Learning Yearning ...

  8. 机器学习的敲门砖:手把手教你TensorFlow初级入门

    摘要: 在开始使用机器学习算法之前,我们应该首先熟悉如何使用它们. 而本文就是通过对TensorFlow的一些基本特点的介绍,让你了解它是机器学习类库中的一个不错的选择. 本文由北邮@爱可可-爱生活  ...

  9. python项目如何打开_python如何打开_linux如何打开python_python程序打开 - 云+社区 - 腾讯云...

    广告关闭 腾讯云11.11云上盛惠 ,精选热门产品助力上云,云服务器首年88元起,买的越多返的越多,最高返5000元! 当你安装好python时,你可以在python环境下写语句. python环境有 ...

最新文章

  1. VS2013中, 无法嵌入互操作类型“……”,请改用适用的接口的解决方法
  2. ios html5 不支持 flv_iExplorer v4.2.6 一款优秀强大的 iOS 资源管理工具
  3. 《Oracle高性能自动化运维》一一第1章 Linux下的Oracle
  4. Android 应用开发----ViewPager---2.四大函数
  5. React Hooks 梳理
  6. eclipse 闪退原因
  7. 全网首发:OPPO推送:服务器端的参考代码,JAVA版
  8. Yii集成PHPWord
  9. 《架构探险——从零开始写javaweb框架》.pdf
  10. tomcat乱码的几种解决
  11. 2D Game Creation - 2D游戏开发基本流程
  12. 解决客户一例:使用域超级管理员打开Exchange 2010发现没有权限
  13. 关于自动拼接地图算法
  14. 12款响应式 Lightbox(灯箱)效果插件
  15. 现代英语杂志现代英语杂志社现代英语编辑部2022年第6期目录
  16. 山东理工ACM【1239】水仙花数
  17. 5800p计算机公式,卡西欧fx-5800p计算器内置公式及公式自定义在隧道测量中的应用...
  18. 【人工智能】德国人工智能技术发展现状和未来趋势
  19. 破解Excel的宏密码
  20. 创建 Swift 自定义集合类

热门文章

  1. 佰家当:盘活万亿民间资产,国内首家资产流通平台横空出世
  2. javaweb在线投票系统
  3. 超级代码助手(管理记录代码的工具)
  4. opengl 花哨小例子
  5. 什么是跨域脚本攻击?
  6. 什么是脚本,脚本与代码的区别
  7. 安卓使用Glide加载图片使宽度充满手机屏幕,高度随宽度等比缩放
  8. Matlab之代数方程求解:方程组求根
  9. 【数据结构与算法】递归树
  10. 第3章 内核编程语言与环境(1)