通过Channel创建队列、交换机,发送消息以及消费消息

package com.yzm.rabbitmq_09.config;import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.springframework.stereotype.Component;@Component
public class RabbitConfig {public static final String QUEUE = "queue_a";public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE_A = "fanout_queue_a";public static final String FANOUT_QUEUE_B = "fanout_queue_b";public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE_A = "direct_queue_a";public static final String DIRECT_QUEUE_B = "direct_queue_b";public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE_A = "topic_queue_a";public static final String TOPIC_QUEUE_B = "topic_queue_b";public static final String TOPIC_QUEUE_C = "topic_queue_c";public static final String HEADER_EXCHANGE = "header.exchange";public static final String HEADER_QUEUE_A = "header_queue_a";public static final String HEADER_QUEUE_B = "header_queue_b";// 获取RabbitMQ服务器连接public static Connection getConnection() {Connection connection = null;try {ConnectionFactory factory = new ConnectionFactory();factory.setHost("127.0.0.1");factory.setPort(5672);factory.setUsername("guest");factory.setPassword("guest");connection = factory.newConnection();} catch (Exception e) {e.printStackTrace();}return connection;}
}
package com.yzm.rabbitmq_09.sender;import com.rabbitmq.client.*;
import com.yzm.rabbitmq_09.config.RabbitConfig;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;@RestController
@RequestMapping("/sender")
public class Sender {@GetMapping("/simple")public void simple() throws IOException, TimeoutException {//1、获取连接Connection connection = RabbitConfig.getConnection();//2、创建通道,使用通道才能完成消息相关的操作Channel channel = connection.createChannel();/** 3、声明队列* String queue 队列名称* boolean durable 是否持久化,如果持久化,mq重启后队列还在* boolean exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建* boolean autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)* Map<String, Object> arguments 参数,可以设置一个队列的扩展参数,比如:可设置存活时间*/channel.queueDeclare(RabbitConfig.QUEUE, true, false, false, null);/** 4、发送消息* exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")* routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称* props,向消费者传递的 消息属性(比如文本持久化)* body,消息内容*/for (int i = 1; i <= 10; i++) {String message = "Hello World!...... " + i;System.out.println(" [ Sent ] 消息内容 " + message);channel.basicPublish("", RabbitConfig.QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}// 5、释放资源channel.close();connection.close();}@GetMapping("/fanout")public void fanout() throws IOException, TimeoutException {Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();/** 声明 fanout 交换机*  exchange 交换机名称*  type 交换机类型*  durable 是否可持久化*  autoDelete 是否自动删除*  internal 是否设置为内置交换机*/channel.exchangeDeclare(RabbitConfig.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true, false, false, null);// 声明队列channel.queueDeclare(RabbitConfig.FANOUT_QUEUE_A, true, false, false, null);channel.queueDeclare(RabbitConfig.FANOUT_QUEUE_B, true, false, false, null);// 队列绑定交换机,不需要路由键,用空字符串表示channel.queueBind(RabbitConfig.FANOUT_QUEUE_A, RabbitConfig.FANOUT_EXCHANGE, "");channel.queueBind(RabbitConfig.FANOUT_QUEUE_B, RabbitConfig.FANOUT_EXCHANGE, "");for (int i = 1; i <= 10; i++) {String message = "Hello World!...... " + i;System.out.println(" [ Sent ] 消息内容 " + message);channel.basicPublish(RabbitConfig.FANOUT_EXCHANGE, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());}channel.close();connection.close();}@GetMapping("/direct")public void direct() throws IOException, TimeoutException {Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();/** 声明 direct 交换机*/channel.exchangeDeclare(RabbitConfig.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true, false, false, null);channel.queueDeclare(RabbitConfig.DIRECT_QUEUE_A, true, false, false, null);channel.queueDeclare(RabbitConfig.DIRECT_QUEUE_B, true, false, false, null);channel.queueBind(RabbitConfig.DIRECT_QUEUE_A, RabbitConfig.DIRECT_EXCHANGE, "direct.admin");channel.queueBind(RabbitConfig.DIRECT_QUEUE_B, RabbitConfig.DIRECT_EXCHANGE, "direct.yzm");for (int i = 1; i <= 10; i++) {String message = "Hello World! " + i;if (i % 2 == 0) {channel.basicPublish(RabbitConfig.DIRECT_EXCHANGE, "direct.admin", null, message.getBytes());} else {channel.basicPublish(RabbitConfig.DIRECT_EXCHANGE, "direct.yzm", null, message.getBytes());}System.out.println(" [ Sent ] 消息内容 " + message);}channel.close();connection.close();}@GetMapping("/topic")public void topic() throws IOException, TimeoutException {Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();/** 声明 topic 交换机*/channel.exchangeDeclare(RabbitConfig.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true, false, false, null);channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_A, true, false, false, null);channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_B, true, false, false, null);channel.queueDeclare(RabbitConfig.TOPIC_QUEUE_C, true, false, false, null);channel.queueBind(RabbitConfig.TOPIC_QUEUE_A, RabbitConfig.TOPIC_EXCHANGE, "topic.admin.*");channel.queueBind(RabbitConfig.TOPIC_QUEUE_B, RabbitConfig.TOPIC_EXCHANGE, "topic.#");channel.queueBind(RabbitConfig.TOPIC_QUEUE_C, RabbitConfig.TOPIC_EXCHANGE, "topic.*.yzm");for (int i = 1; i <= 10; i++) {String message = "Hello World! " + i;if (i % 3 == 0) {channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.admin.xxx", null, message.getBytes());} else if (i % 3 == 1) {channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.xxx.xxx", null, message.getBytes());} else {channel.basicPublish(RabbitConfig.TOPIC_EXCHANGE, "topic.xxx.yzm", null, message.getBytes());}System.out.println(" [ Sent ] 消息内容 " + message);}channel.close();connection.close();}@GetMapping("/headers")public void headers() throws IOException, TimeoutException {Connection connection = RabbitConfig.getConnection();Channel channel = connection.createChannel();/** 声明 header 交换机*/channel.exchangeDeclare(RabbitConfig.HEADER_EXCHANGE, BuiltinExchangeType.HEADERS, true, false, false, null);channel.queueDeclare(RabbitConfig.HEADER_QUEUE_A, true, false, false, null);channel.queueDeclare(RabbitConfig.HEADER_QUEUE_B, true, false, false, null);Map<String, Object> mapA = new HashMap<>();mapA.put("x-match", "all"); // 完全匹配mapA.put("key1", "value1");mapA.put("name", "yzm");channel.queueBind(RabbitConfig.HEADER_QUEUE_A, RabbitConfig.HEADER_EXCHANGE, "", mapA);Map<String, Object> mapB = new HashMap<>();mapB.put("x-match", "any"); // 匹配任意一个mapB.put("name", "yzm");channel.queueBind(RabbitConfig.HEADER_QUEUE_B, RabbitConfig.HEADER_EXCHANGE, "", mapB);Map<String, Object> headersA = new HashMap<>();headersA.put("key1", "value1");headersA.put("name", "yzm");AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder().headers(headersA).build();System.out.println("生产者:该消息会被1和2打印");channel.basicPublish(RabbitConfig.HEADER_EXCHANGE, "", basicProperties, "该消息会被1和2打印".getBytes());/*        Map<String, Object> headersB = new HashMap<>();headersB.put("name", "yzm");AMQP.BasicProperties basicPropertiesB = new AMQP.BasicProperties.Builder().headers(headersB).build();System.out.println("生产者:该消息只会被2打印");channel.basicPublish(RabbitConfig.HEADER_EXCHANGE, "", basicPropertiesB, "该消息只会被2打印".getBytes());*/channel.close();connection.close();}
}
package com.yzm.rabbitmq_09.receiver;import com.rabbitmq.client.*;
import com.yzm.rabbitmq_09.config.RabbitConfig;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.io.IOException;
import java.nio.charset.StandardCharsets;@RestController
@RequestMapping("/receiver")
public class Receiver {@GetMapping("/simple")public void simple() throws IOException {// 1、获取连接Connection connection = RabbitConfig.getConnection();// 2、创建通道Channel channel = connection.createChannel();/* 3、消费消息* String queue 队列名称* boolean autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复* Consumer consumer,消费方法,当消费者接收到消息要执行的方法*/channel.basicConsume(RabbitConfig.QUEUE, true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(" [ received@simple_1 ] 消息内容 : " + new String(body, StandardCharsets.UTF_8) + "!");}});//取消自动ack
/*        channel.basicConsume(RabbitConfig.QUEUE, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(" [ received@simple_2 ] 消息内容 : " + new String(body, StandardCharsets.UTF_8) + "!");//消息确认channel.basicAck(envelope.getDeliveryTag(), false);}});*/}@GetMapping("/fanout")public void fanout() {Thread t1 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.FANOUT_QUEUE_A, true, getConsumer(channel, " [ received@fanout_1 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t1");Thread t2 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.FANOUT_QUEUE_B, true, getConsumer(channel, " [ received@fanout_2 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t2");t1.start();t2.start();}@GetMapping("/direct")public void direct() {Thread t3 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.DIRECT_QUEUE_A, true, getConsumer(channel, " [ received@direct_1 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t3");Thread t4 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.DIRECT_QUEUE_B, true, getConsumer(channel, " [ received@direct_2 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t4");t3.start();t4.start();}@GetMapping("/topic")public void topic() {Thread t5 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.TOPIC_QUEUE_A, true, getConsumer(channel, " [ received@topic_1 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t5");Thread t6 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.TOPIC_QUEUE_B, true, getConsumer(channel, " [ received@topic_2 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t6");Thread t7 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.TOPIC_QUEUE_C, true, getConsumer(channel, " [ received@topic_3 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t7");t5.start();t6.start();t7.start();}@GetMapping("/headers")public void headers() {Thread t8 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.HEADER_QUEUE_A, true, getConsumer(channel, " [ received@headers_1 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t8");Thread t9 = new Thread(() -> {Connection connection = RabbitConfig.getConnection();Channel channel = null;try {channel = connection.createChannel();channel.basicConsume(RabbitConfig.HEADER_QUEUE_B, true, getConsumer(channel, " [ received@headers_2 ] : "));} catch (IOException e) {e.printStackTrace();}}, "t9");t8.start();t9.start();}private DefaultConsumer getConsumer(Channel channel, String s) {return new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(s + new String(body, StandardCharsets.UTF_8) + "!");}};}}

http://localhost:8080/sender/simple
http://localhost:8080/receiver/simple
运行结果:

http://localhost:8080/sender/fanout
http://localhost:8080/receiver/fanout
运行结果:

http://localhost:8080/sender/direct
http://localhost:8080/receiver/direct
运行结果:

http://localhost:8080/sender/topic
http://localhost:8080/receiver/topic
运行结果:

http://localhost:8080/sender/headers
http://localhost:8080/receiver/headers
运行结果:

相关链接

首页
上一篇:消息回调
下一篇:事务以及Confirm确认

RabbitMQ之Channel相关推荐

  1. 自定义创建rabbitMQ的channel连接池

    参考地址:https://blog.csdn.net/qq447995687/article/details/80233621 利用commons-pool2自定义对象池 commons-pool2是 ...

  2. golang rabbitMQ 生产者复用channel以及生产者组分发策略

    引用的是rabbitMQ官方示例的库:github.com/rabbitmq/amqp091-go 在网络编程中我们知道tcp连接的创建.交互.销毁等相关操作的"代价"都是很高的, ...

  3. RabbitMQ 入门系列(6)— 如何保证 RabbitMQ 消息不丢失

    1. 消息丢失源头 RabbitMQ 消息丢失的源头主要有以下三个: 生产者丢失消息 RabbitMQ 丢失消息 消费者丢失消息 下面主要从 3 个方面进行说明并提供应对措施 2. 生产者丢失消息 R ...

  4. 7.RabbitMQ RFC同步调用

    RabbitMQ RFC同步调用是使用了两个异步调用完成的,生产者调用消费者的同时,自己也作为消费者等待某一队列的返回消息,消费者接受到生产者的消息同时,也作为消息发送者发送一消息给生产者.参考下图: ...

  5. 面试官:说说RabbitMQ 消费端限流、TTL、死信队列

    欢迎关注方志朋的博客,回复"666"获面试宝典 1. 为什么要对消费端限流 假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户 ...

  6. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  7. RabbitMQ 中 7 种消息队列

    点击关注公众号,Java干货及时送达 七种模式介绍与应用场景 简单模式(Hello World) 做最简单的事情,一个生产者对应一个消费者,RabbitMQ相当于一个消息代理,负责将A的消息转发给B ...

  8. RabbitMQ 七种队列模式应用场景案例分析(通俗易懂)

    点击关注公众号,Java干货及时送达 作者:我思知我在 blog.csdn.net/qq_32828253/article/details/110450249 七种模式介绍与应用场景 简单模式(Hel ...

  9. RabbitMQ 延迟队列,太实用了!

    点击关注公众号,Java干货及时送达 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付 ...

  10. 消息队列之 RabbitMQ

    消息队列之 RabbitMQ 关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了. 市面上的消息队列产品有很 ...

最新文章

  1. 抓包概念大比较:数据报、数据包、分组
  2. C# 的EF框架怎么连接Oracle数据库
  3. Gartner:大数据投资增长,但计划投资的组织机构却在减少
  4. REM中的几种发料方式
  5. 特征预处理--长尾分布的处理方案
  6. github 提交报403 forbidden的错误解决
  7. Spring测试上下文缓存+ AspectJ @Transactional + Ehcache的痛苦
  8. ajax实现向上正在加载,向上滚动或者向下滚动分页异步加载数据(Ajax + lazyload)
  9. c语言高低位拷贝_C语言指针详解
  10. Delphi:龟兔赛跑游戏(Timer、Button、Editor控件的综合应用)
  11. Python中布尔值是False的所有值
  12. 物流车辆数据在金融科技的应用
  13. 我当测试总监的那几年
  14. 为什么plsql不显示tns配置_电脑为什么不定时的会卡?电脑卡和哪些硬件配置有关系呢?...
  15. Altium Designer使用-----快速覆铜脚本的使用
  16. Junglescout 正版账号共享 亚马逊卖家选品必备软件 junglescout插件同步升级
  17. java rxtx下载_1、下载64位rxtx for java 链接:http://fizzed.com/oss/rxtx-for-java2、下载下来的包解压后按照说明放到JAV...
  18. (已解决)ubuntu下网易云音乐无法打开
  19. 帮表弟的女友买了个5900的dell 1420
  20. python 制作二维码

热门文章

  1. matlab中counter怎么用,matlab中fspecial函数的用法
  2. 新浪短连接(t.cn)在线生成工具
  3. 插上耳机一说话别人听有很大的电流声怎么办?
  4. 《预训练周刊》第10期:基于Swin变换器的自监督学习、基于锐度感知最小化的泛化性提升...
  5. AR VR MR 到底有啥区别?
  6. 通过脚手架安装Ant+react+umi+dva项目(一)
  7. Linux内存管理 (1)物理内存初始化
  8. 边境的悍匪—机器学习实战:第十六章使用RNN和注意力机制进行自然语言处理
  9. watch蜂窝开通服务器中断,原因找到了!Apple Watch Series 3为何无法连接蜂窝网络...
  10. linux系统下网络吞吐量/CPU占用率/流量控制的测试