RabbitMQ入门前篇
本篇博文目录:
- 一.RabbitMQ
- 1.消息队列
- 2.RabbitMQ
- 3.安装RabbitMQ
- 4.RabbitMQ常用命令
- 二.使用RabbitMQ进行编程
- 1.AMQP
- 2.第一次MQ通信
- 三.RabbitMQ六中工作模式
- 1.RabbitMQ
- 2.Work queues
- 3.pub/sub订阅发布模式
- 4.Routing模式
- 5.主题Topic模式
- 四.RabbitMQ消息确认机制
- 五.源代码下载
一.RabbitMQ
1.消息队列
Message Queue中文意思消息队列,是一种进程的通信机制,用于上下游传递消息。其中在二个进程之间MQ起到消息中间件的作用,实现在进程之间的解耦操作,原来是进程与进程之间直接通信,这样的话耦合性大,并且如果通信失败无法确定那一面出现问题,而通过在中间多一个信息传递就可以实现解耦,并且当有一端出现问题时,也能够知道是那一端出现了问题,保证了数据传输的可靠性。
2.RabbitMQ
RabbitMQ是众多消息代理服务器中使用的较广,较多的一款,RabbitMQ支持几乎所有的操作系统与编程语言,并且Rabbit提供高并发,高可用的成熟方案,支持多种消息协议,易于部署与使用。
下图列出了RabbitMQ与其他MQ的对比图(注意时效性,视频大概是2018出的)
RabbitMQ的应用场景如下:
3.安装RabbitMQ
- 安装教程
Winddos环境下安装教程: https://www.cnblogs.com/chenwolong/p/rabbitmq.html
Linux环境下安装教程:https://blog.csdn.net/qq_45173404/article/details/116429302
- Widnos的安装包下载(官网下载太慢)
erlang25.0.1版本:otp_win64_25.0.1.exe
https://www.aliyundrive.com/s/NJwrinH1VNy 提取码: m92c
rabbitmq3.11.7版本: rabbitmq-server-3.11.7.exe
https://www.aliyundrive.com/s/wkfkd7ewzNa 提取码: d43w
- 安装完毕,启动RabbitMQ管理模块的插件后,访问http://localhost:15672/ 输入账号guest,密码guest,进行登入,会进入如下界面,说明安装成功:
备注:如果无法访问,你可以看看这篇博文:https://itguye.blog.csdn.net/article/details/128770009
4.RabbitMQ常用命令
可以通过下面的命令来操作RabbitMQ,当然也可以在网站上通过图形化的方式进行操作。
rabbitmq-server 前台启动服务
rabbitmq-server -detached 后台启动服务
rabbitmqctl stop 停止服务
rabbitmqctl start_app 启动应用
rabbitmqctl stop_app 终止应用
rabbitmqctl add_user {username} {password} – 创建新用户
rabbitmqctl delete_user {username} – 删除用户
rabbitmqctl change_password {username} {newpassword}– 重置密码
rabbitmqctl set_user_tags {username} {tag} – 授予用户角色(Tag)
rabbitmqctl set_permissions -p / user_admin'.*' '.*''.*'– 设置用户允许访问的vhost
上面rabbitmqctl set_user_tags {username} {tag}命令中的tag,如下:
二.使用RabbitMQ进行编程
1.AMQP
AMQP是一个协议规范,二个不同应用程序只要遵循该协议就可以实现通信,其中RabbitMQ就是AMQP的一种实现方式。
AMQP中的一些知识概念,如生产者,消费者,消息,队列和虚拟主机等,解释如下:
2.第一次MQ通信
- 添加一个名为/test的虚拟主机
- 创建一个maven项目,并导入依赖
<dependencies><dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.3.0</version></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.5</version></dependency></dependencies>
备注:最新版本为5.16版本: https://mvnrepository.com/artifact/com.rabbitmq/amqp-client/5.16.0
- 创建utiles工具包,并创建RabbitmqUtils和RabbitConstant类,如下:
RabbitmqUtils类,Rabbit工具类,该工具类就一个方法,就是获取RabbitMq的连接对象Connection :
package utils;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class RabbitmqUtils {private static ConnectionFactory connectionFactory = new ConnectionFactory();// 静态代码块进行初始化static{connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/test");}// 获取mq连接对象public static Connection getConnection() {try {Connection connection = connectionFactory.newConnection();return connection;} catch (Exception e) {throw new RuntimeException();}}
}
RabbitConstant类,该类存放RabbitMq的配置常量信息:
package utils;public class RabbitConstant {public static final String QUEUE_HELLOWORLD = "helloworld";public static final String QUEUE_SMS = "sms";public static final String EXCHANGE_WEATHER = "weather";public static final String EXCHANGE_WEATHER_ROUTING = "weather_routing";public static final String QUEUE_BAIDU = "baidu";public static final String QUEUE_SINA = "sina";public static final String EXCHANGE_WEATHER_TOPIC = "weather_topic";
}
- 创建一个helloworld的包,在该文件下创建生产者类和消费者类,来实现生产者发送一个helloworld的字符串,然后消费者接受该字符串的操作。
Producer类:首先通过Rabbitmq的工具类获取连接对象,然后通过连接对象创建通道(虚拟连接),接着通过虚拟连接创建一个名为helloworld的队列,并往队列中传递数据hellowrld:
package helloworld;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 生产者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接对象Connection connection = RabbitmqUtils.getConnection();// 创建通道,虚拟连接Channel channel = connection.createChannel();// 声明一个队列channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);String message = "hello world";channel.basicPublish("", RabbitConstant.QUEUE_HELLOWORLD, null, message.getBytes(StandardCharsets.UTF_8));System.out.println("发送数据成功");// 关闭虚拟通道channel.close();// 关闭连接connection.close();}
}
ConSummer类:通过工具类获取连接,然后创建通道,通过通道使用helloworld队列,并创建一个消费者对象,传入匿名内部类DefaultConsumer,并传入channel对象,获取helloworld消息队列的数据:
package helloworld;import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;/*** 消费者*/
public class ConSummer {public static void main(String[] args) throws IOException {// 获取mq连接Connection connection = RabbitmqUtils.getConnection();// 创建通道号Channel channel = connection.createChannel();// 获取队列channel.queueDeclare(RabbitConstant.QUEUE_HELLOWORLD, false, false, false, null);// 接收channel.basicConsume(RabbitConstant.QUEUE_HELLOWORLD,false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到的数据:"+new String(body));//签收消息,确认消息//envelope.getDeliveryTag() 获取这个消息的TagId//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
- 运行效果
- 消息的状态
三.RabbitMQ六中工作模式
1.RabbitMQ
RabbitMQ六种工作模式中,2~5使用较多,下面会给出实例代码,对于这几种方式存在一定的一致性,都是在方式1的基础上进行添加,功能更加丰富(上文中的案例代码就是Hello World方式)。
2.Work queues
本实例模拟短信通知服务,就是用户购买订单成功后,通过RabbitMQ发送短信给用户进行订单确认,实例代码,首先在helloworld项目中创建一个名为workqueue的包,并创建SMS,Producer,ConSummerOne,ConSummerTwo,ConSummerThree这五个类,详细代码如下:
- SMS类:实体类
package workqueue;public class SMS {private String name;private String mobile;private String content;public SMS(String name, String mobile, String content) {this.name = name;this.mobile = mobile;this.content = content;}public String getName() {return name;}public void setName(String name) {this.name = name;}public String getMobile() {return mobile;}public void setMobile(String mobile) {this.mobile = mobile;}public String getContent() {return content;}public void setContent(String content) {this.content = content;}
}
- Producer:生产者,用来模拟100个用户同时订票
package workqueue;import com.google.gson.Gson;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;/*** 生产者*/
public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// 获取连接对象Connection connection = RabbitmqUtils.getConnection();// 创建通道,虚拟连接Channel channel = connection.createChannel();// 声明一个队列channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);// 给100给订票的用户发送订票成功的短信信息for (int i = 1; i <= 100; i++) {SMS sms = new SMS("乘客" + i, "13900000" + i, "您的车票已预订成功");// 将sms转换为jSON对象String message = new Gson().toJson(sms);channel.basicPublish("", RabbitConstant.QUEUE_SMS, null, message.getBytes(StandardCharsets.UTF_8));}System.out.println("发送成功");// 关闭虚拟通道channel.close();// 关闭连接connection.close();}
}
- ConSummerOne类:发送短信处理1
package workqueue;import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;/*** 消费者*/
public class ConSummerOne {public static void main(String[] args) throws IOException {// 获取mq连接Connection connection = RabbitmqUtils.getConnection();// 创建通道号Channel channel = connection.createChannel();// 获取队列channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个// 接收channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者1,进行发送:"+new String(body));try {Thread.sleep(100);// 延时0.1s} catch (InterruptedException e) {e.printStackTrace();}//签收消息,确认消息//envelope.getDeliveryTag() 获取这个消息的TagId//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
- ConSummerTwo类:发送短信2
package workqueue;import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;/*** 消费者*/
public class ConSummerTwo {public static void main(String[] args) throws IOException {// 获取mq连接Connection connection = RabbitmqUtils.getConnection();// 创建通道号Channel channel = connection.createChannel();// 获取队列channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个// 接收channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者2,进行发送:"+new String(body));try {Thread.sleep(100);// 延时0.1s} catch (InterruptedException e) {e.printStackTrace();}//签收消息,确认消息//envelope.getDeliveryTag() 获取这个消息的TagId//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
- ConsumerThree类:发送短信3
package workqueue;import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;public class ConsumerThree {public static void main(String[] args) throws IOException {// 获取mq连接Connection connection = RabbitmqUtils.getConnection();// 创建通道号Channel channel = connection.createChannel();// 获取队列channel.queueDeclare(RabbitConstant.QUEUE_SMS, false, false, false, null);channel.basicQos(1);// 为1时表示处理完一个取一个,不加就是一次取多个// 接收channel.basicConsume(RabbitConstant.QUEUE_SMS,false, new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费者3,进行发送:"+new String(body));try {Thread.sleep(1000);// 延时1s} catch (InterruptedException e) {e.printStackTrace();}//签收消息,确认消息//envelope.getDeliveryTag() 获取这个消息的TagId//false只确认签收当前的消息,设置为true的时候则代表签收该消费者所有未签收的消息channel.basicAck(envelope.getDeliveryTag(),false);}});}
}
- 运行效果(先运行Consumerxxx,再运行Producer)
- 和helloworld的区别
helloworld是一个发送一个接收(一个生产者,一个消费者),而workqueneu是一个发送多个接收(一个生产者,多个消费者),并且代码上没有多大区别,区别就是多写几个消费者类和消费者类中多了一个 channel.basicQos(1);,这个不是强制性添加,如果加上了,表示处理完后取一个,如果不加就是一次取多个再进行处理,显然前者更好,更合理使用消费者。
3.pub/sub订阅发布模式
该实例代码是模拟中国气象局提供气象数据给所有订阅的消费者,如百度,新浪等消费者,实现就是将中国气象局的数据接入到交换机中,然后百度,新浪等平台绑定交换机就可以获取中国气象局的数据。
- 首先,在http://localhost:15672/ 管理Web网页中,添加一个交换机,如下:
- 在helloworld项目中创建一个名为pubsub的包,并创建WeatherProducer,SinaSummer,BaiduSummer三个类,详细代码如下:
WeatherProducer类:生成者,中国气象局发布天气数据
package pubsub;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;public class WeatherProducer {public static void main(String[] args) throws IOException, TimeoutException {Connection connection = RabbitmqUtils.getConnection();String input = new Scanner(System.in).next();Channel channel = connection.createChannel();channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER,"" , null , input.getBytes());channel.close();connection.close();}
}
BaiduSummer类:百度消费者
package pubsub;import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;public class BaiduSummer {public static void main(String[] args) throws IOException {Connection connection = RabbitmqUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
SinaSummer类:新浪消费者
package pubsub;import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;public class SinaSummer {public static void main(String[] args) throws IOException {Connection connection = RabbitmqUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由key(暂时用不到)channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER, "");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
- 运行效果(先运行消费者,在运行生产者)
- 和workqueueu的区别
首先二者都是一个生成者多个消费者,不同的是后者需要在WEB网页上创建交换机,并且代码上订阅与发布多了一个queueBind绑定操作和生产者中的basicPublish()使用的是exchange,通过上面的配置就可以通过消费者根据绑定的交互机找到对应生成者交换机上的数据,也就是实现了生产者发布主题,消费者订阅主题,并收到订阅主题的数据。
4.Routing模式
Routing模式在订阅与发布的基础上进行扩展,增加了条件就是交换机会根据Routing Key的条件进行数据刷选再发给消费者队列。
- 创建交换机weather_routing
- 和上一个案例代码差不多,创建包和类:
- 详细代码
WeatherBureau类:生产者
package com.itlaoqi.rabbitmq.routing;import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.TimeoutException;public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String, String>();area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");Connection connection = RabbitUtils.getConnection();Channel channel = connection.createChannel();Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//Routing key 第二个参数相当于数据筛选的条件channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING,me.getKey() , null , me.getValue().getBytes());}channel.close();connection.close();}
}
Sina类:新浪消费者
package com.itlaoqi.rabbitmq.routing;import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Sina {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_SINA, false, false, false, null);channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991011");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991011");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.la.20991012");channel.queueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.henan.zhengzhou.20991012");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_SINA , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("新浪收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
Baidu类:百度消费者
package com.itlaoqi.rabbitmq.routing;import com.itlaoqi.rabbitmq.utils.RabbitConstant;
import com.itlaoqi.rabbitmq.utils.RabbitUtils;
import com.rabbitmq.client.*;import java.io.IOException;public class Baidu {public static void main(String[] args) throws IOException {Connection connection = RabbitUtils.getConnection();final Channel channel = connection.createChannel();channel.queueDeclare(RabbitConstant.QUEUE_BAIDU, false, false, false, null);//queueBind用于将队列与交换机绑定//参数1:队列名 参数2:交互机名 参数三:路由keychannel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991011");channel.queueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.shandong.qingdao.20991012");channel.basicQos(1);channel.basicConsume(RabbitConstant.QUEUE_BAIDU , false , new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("百度收到气象信息:" + new String(body));channel.basicAck(envelope.getDeliveryTag() , false);}});}
}
- 测试(先运行消费者,在运行生产者)
- 和订阅与发布的区别
首先是交换机的类型变为了direct,并且Routing中生成者的basicPublish()多了一个routingKey参数,在消费者中会根据queueBind的routingKey和生产者中的routingKey做比较,如果一致就接收,不一致不接收。
5.主题Topic模式
主题Topic模式是在Routing模式下再一次进行扩展,和Routing不同的是,后者支持模糊查询,通过通配符的方式,*表示任意一个字符,#表示任意多个字符。
- 在WEB管理上创建Topic的交换机
- 项目和Routing差不做,直接说不同点
消费者代码中的queueBind()里的routingKey采用通配符
- 运行效果(先执行消费者,在执行生产者)
四.RabbitMQ消息确认机制
通过RabbitMQ消息确认机制可以知道生产者和代理人(Broker)之间发送数据的情况,有二种情况,情况一Confirm表示消息送到Broker了,如果为ack表示Broker接收,为nack表示没有接收。情况二Return表示消息被Broker正常接收(ack)后,但Broker没有对应的队列进行投递时产生的状态,消息被退回给生产者。
- 实例代码,那上面的例子为例,由于消息确认机制是生产者和Broker之间的事情,所以不用管消费者,所以只需要在生产者中添加代码,详细代码如下。
package confirm;import com.rabbitmq.client.*;
import utils.RabbitConstant;
import utils.RabbitmqUtils;import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;public class WeatherBureau {public static void main(String[] args) throws IOException, TimeoutException {Map area = new LinkedHashMap<String, String>();area.put("china.hebei.shijiazhuang.20991011", "中国河北石家庄20991011天气数据");area.put("china.shandong.qingdao.20991011", "中国山东青岛20991011天气数据");area.put("china.henan.zhengzhou.20991011", "中国河南郑州20991011天气数据");area.put("us.cal.la.20991011", "美国加州洛杉矶20991011天气数据");area.put("china.hebei.shijiazhuang.20991012", "中国河北石家庄20991012天气数据");area.put("china.shandong.qingdao.20991012", "中国山东青岛20991012天气数据");area.put("china.henan.zhengzhou.20991012", "中国河南郑州20991012天气数据");area.put("us.cal.la.20991012", "美国加州洛杉矶20991012天气数据");Connection connection = RabbitmqUtils.getConnection();Channel channel = connection.createChannel();//开启confirm监听模式channel.confirmSelect();// 进行监听channel.addConfirmListener(new ConfirmListener() {public void handleAck(long l, boolean b) throws IOException {//第二个参数代表接收的数据是否为批量接收,一般我们用不到。System.out.println("消息已被Broker接收,Tag:" + l);}public void handleNack(long l, boolean b) throws IOException {System.out.println("消息已被Broker拒收,Tag:" + l);}});channel.addReturnListener(new ReturnCallback() {public void handle(Return r) {System.err.println("===========================");System.err.println("Return编码:" + r.getReplyCode() + "-Return描述:" + r.getReplyText());System.err.println("交换机:" + r.getExchange() + "-路由key:" + r.getRoutingKey() );System.err.println("Return主题:" + new String(r.getBody()));System.err.println("===========================");}});Iterator<Map.Entry<String, String>> itr = area.entrySet().iterator();while (itr.hasNext()) {Map.Entry<String, String> me = itr.next();//Routing key 第二个参数相当于数据筛选的条件//第三个参数为:mandatory true代表如果消息无法正常投递则return回生产者,如果false,则直接将消息放弃。channel.basicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC,me.getKey() ,true, null , me.getValue().getBytes());}/*channel.close();connection.close();*/}
}
- 不同处
- 运行效果
五.源代码下载
在我的微信公众号后台回复 rabbitmq
就可以获取本篇博文相关的源代码了,如果有什么疑问后台给为留言,我看见会第一时间回复你的。
RabbitMQ入门前篇相关推荐
- COBOL 学习笔记 之 入門篇(续集)
书接上一回(COBOL 学习笔记 之 入門篇 ) 从程序可以看到,COBOL程序分为四部分: IDENTIFICATION DIVISION. ENVIRONMENT DIVISION. D ...
- 【外行也能看懂的RabbitMQ系列(一)】—— RabbitMQ快速入门篇(内含丰富实例)
系列文章目录 准备篇 RabbitMQ安装文档 第一章 RabbitMQ快速入门篇 第二章 RabbitMQ的Web管理界面详解 第三章 RabbitMQ进阶篇之死信队列 第四章 RabbitMQ进阶 ...
- 谈谈FrozenUI前端框架(应用心得) - 入坑篇
FrozenUI框架,是一个相对比较纯粹的css类框架,只适合小项目的样式开发:官方并没有太多,甚至可以说没有提供任何js相关的功能性包,所以个人不推荐开发者使用. 官方宣称: FrozenUI 是一 ...
- 【微服务】RabbitMQ部署高级篇
RabbitMQ部署高级篇 1.单机部署 1.1.下载镜像 1.2.安装MQ 2.安装DelayExchange插件 2.1.下载插件 2.2.上传插件 2.3.安装插件 3.集群部署 2.1.集群分 ...
- 入門篇-耦合Coupling AC/DC/GND差別在哪
摘自:https://www.strongpilab.com/?p=156 [示波器操作]入門篇-耦合Coupling AC/DC/GND差別在哪 2016-06-26 儀器 Instrument, ...
- RabbitMQ——实用总结篇
RabbitMQ--实用总结篇 一.初识MQ 1.1.同步和异步通讯 1.1.1.同步通讯 1.1.2.异步通讯 1.2.技术对比: 二.快速入门 2.1.安装RabbitMQ 2.2.RabbitM ...
- 计算机组装论文关于显示器,显示器参数扫盲—小白入坑篇
显示器参数扫盲-小白入坑篇 2019-05-01 14:11:35 24点赞 103收藏 20评论 事情经历 上周六(2019年4月27日)在bilibili刷到一个组装台式机的视频,总的算下来花费小 ...
- 【Electron】酷家乐客户端开发实践分享 — 入坑篇
作者:钟离,酷家乐PC客户端负责人 原文地址:webfe.kujiale.com/electron-ku- 酷家乐客户端:下载地址 www.kujiale.com/activity/13- 文章背景: ...
- RabbitMQ 集群篇
RabbitMQ 集群篇 00.集群架构原理 前面我们有介绍到 RabbitMQ 内部有各种基础构件,包括队列.交换器.绑定.虚拟主机等,他们组成了 AMQP 协议消息通信的基础,而这些构件以元数据的 ...
最新文章
- oracle实时监控触发邮件,利用EasySQLMAIL实现数据库订单监控和邮件发送
- Oracle 函数大全(字符串函数,数学函数,日期函数,逻辑运算函数,其他函数)
- Java高级篇——深入浅出Java类加载机制
- 从MySQL导入导出大量数据的程序实现方法
- svn图形化控制(svnmanager)
- android反调试之父子调试
- 点评复兴号超载无法运行的事件
- 主数据——共享数据的核心,数据资产的灵魂
- VScode 配置 Java 环境
- linux gnu编译器下载,GNU Compiler Collection(gcc编译器)下载_GNU Compiler Collection(gcc编译器)官方下载-太平洋下载中心...
- 华为中国生态伙伴大会2019盛大开幕:智能进化 共赢生态未来
- Longhorn 云原生容器分布式存储 - Air Gap 安装
- win7或者win10碰到需要administrator权限才能删除的解决办法
- iwnpi 5621ds RF测试指令
- 第三代电力电子半导体:SiC MOSFET学习笔记(五)驱动电源调研
- 涛涛打保龄球 【map】篝火晚会(两道题)
- C# MessageBox弹窗
- 微信网站域名被红(被封锁、被屏蔽、被和谐)的解决方法
- 《计算机组成原理》第九章:控制单元【知识点总结】
- 运用RapidMiner进行聚类分析
热门文章
- LNK2038: “_ITERATOR_DEBUG_LEVEL”的不匹配项
- 身份证人脸认证接口是如何识别网络诈骗?
- matlab logpolar,GitHub - luxinjin/polar-code: matlab simulation for polar code
- 一本通1375:骑马修栅栏(fence)
- C#调用cplex出现引发类型为“ILOG.Concert.Exception”的异常
- 世界上第一代电子计算机取名为,计算机应用基础知识计算机应用基础试题及答案...
- 博途TIA Portal V15 下载与安装教程
- 2021年危险化学品经营单位安全管理人员考试内容及危险化学品经营单位安全管理人员考试资料
- SpringCloud(3)CloudAlibaba Nacos Sentinel Seata
- 某Y易盾滑块acToken、data逆向分析