RabbitMQ(4) TopicExchange
topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
生产者工程
package com.example.demo.rabbitMq.exchange.topic;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class TopicRabbitConfig {public static final String TOPIC_MESSAGE = "topic.message";public static final String TOPIC_MESSAGE_S = "topic.messages";public static final String USER_MESSAGE = "user.message";/*** 武器库*/public static final String ARM_QUEUE = "arm.queue";@Beanpublic Queue queueTopicMessage() {return new Queue(TopicRabbitConfig.TOPIC_MESSAGE);}@Beanpublic Queue queueTopicMessages() {return new Queue(TopicRabbitConfig.TOPIC_MESSAGE_S);}@Beanpublic Queue queueUserMessage() {return new Queue(TopicRabbitConfig.USER_MESSAGE);}@Beanpublic Queue queueArm() {return new Queue(TopicRabbitConfig.ARM_QUEUE);}@BeanTopicExchange exchange() {return new TopicExchange("topicExchange");}@BeanBinding bindingExchangeMessage(Queue queueTopicMessage, TopicExchange exchange) {//所有匹配routingKey=topic.message的消息,将放入Queue[name="topic.message"]return BindingBuilder.bind(queueTopicMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueTopicMessages, TopicExchange exchange) {//所有匹配routingKey=topic.# 的消息,将放入Queue[name="topic.messages"]return BindingBuilder.bind(queueTopicMessages).to(exchange).with("topic.#");}@BeanBinding bindingExchangeUserMessage(Queue queueUserMessage, TopicExchange exchange) {///所有匹配routingKey=user.# 的消息,将放入Queue[name="user.messages"]return BindingBuilder.bind(queueUserMessage).to(exchange).with("user.#");}@BeanBinding bindingExchangeArm(Queue queueArm, TopicExchange exchange) {return BindingBuilder.bind(queueArm).to(exchange).with("arm.#");} }
发送消息
package com.example.demo.rabbitMq.exchange.topic;import com.example.demo.dto.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;@Component public class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send1() {User user = new User();user.setUserName("Sender1.....");user.setMobile("1111111111");rabbitTemplate.convertAndSend("topicExchange","topic.message",user);}public void send2() {User user = new User();user.setUserName("Sender2.....");user.setMobile("2222222");rabbitTemplate.convertAndSend("topicExchange","topic.messages",user);}public void send3() {User user = new User();user.setUserName("Sender3.....");user.setMobile("33333");rabbitTemplate.convertAndSend("topicExchange","user.message",user);} }
消费者工程
package com.example.demo.rabbitMq.exchange.topic;import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;@Configuration public class TopicRabbitConstant {public static final String TOPIC_MESSAGE = "topic.message";public static final String TOPIC_MESSAGE_S = "topic.messages";public static final String USER_MESSAGE = "user.message"; }
package com.example.demo.rabbitMq.exchange.topic;import com.example.demo.dto.User; import com.example.demo.utils.Base64Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.io.IOException;@Component @RabbitListener(queues = TopicRabbitConstant.TOPIC_MESSAGE) public class TopicReceiver1 {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate AmqpTemplate rabbitTemplate;@RabbitHandlerpublic void process(User user) {System.out.println("Receiver1 : " + user);}public void rev1(){//手动去获取消息logger.info("获取Queue[topic.message]消息>>>");Message mesg = rabbitTemplate.receive("topic.message");System.out.println(mesg);if(null != mesg){byte[] body = mesg.getBody();try {User u = (User) Base64Utils.byteToObj(body);//获取字符串数据 System.out.println(u);} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}} }
测试:
启动消费者工程,生产者,执行如下方法
@Testpublic void send1() throws Exception {//会匹配到topic.#和topic.message 两个Receiver都可以收到消息for (int i = 0, size = 10; i < size; i++) {topicSender.send1();}}
也可以不用监听的方式,手动自主获取队列消息,如消费工程:
例如生产者工程TopicRabbitConfig.java添加武器队列:
/*** 武器库*/public static final String ARM_QUEUE = "arm.queue";@Beanpublic Queue queueArm() {return new Queue(TopicRabbitConfig.ARM_QUEUE);}@BeanBinding bindingExchangeArm(Queue queueArm, TopicExchange exchange) {return BindingBuilder.bind(queueArm).to(exchange).with("arm.#");}
生产武器:
public void send4() {//生产一批武器List<String> list = new ArrayList<String>();list.add("手枪");list.add("步枪");list.add("机枪");rabbitTemplate.convertAndSend("topicExchange","arm.gun",list);}
@Testpublic void send4() throws Exception {topicSender.send4();}
消费者:
package com.example.demo.rabbitMq;import com.example.demo.dto.User; import com.example.demo.rabbitMq.exchange.topic.TopicReceiver1; import com.example.demo.utils.Base64Utils; import org.junit.Test; import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource; import java.io.IOException; import java.util.List;@SpringBootTest @RunWith(SpringRunner.class) public class RabbitMqRevTest {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void topicRev1(){rev1();}public void rev1(){//手动去获取消息logger.info("获取Queue[arm.gun]消息>>>");Message mesg = rabbitTemplate.receive("arm.queue");System.out.println(mesg);if(null != mesg){byte[] body = mesg.getBody();try {List u = (List) Base64Utils.byteToObj(body);//获取字符串数据 System.out.println(u);} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}} }
测试:
样例代码:
https://github.com/xiaozhuanfeng?tab=repositories
转载于:https://www.cnblogs.com/xiaozhuanfeng/p/10716236.html
RabbitMQ(4) TopicExchange相关推荐
- 目录:SpringBoot学习目录
SpringBoot配套源码地址:gitee.com/hengboy/spr- SpringCloud配套源码地址:gitee.com/hengboy/spr- SpringBoot相关系列文章请访问 ...
- RabbitMQ第五个实操小案例——主题交换机(TopicExchange)
文章目录 RabbitMQ第五个实操小案例--主题交换机(TopicExchange) RabbitMQ第五个实操小案例--主题交换机(TopicExchange) TopicExchange 和 D ...
- RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器
文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...
- RabbitMQ使用及与spring boot整合
1.MQ 消息队列(Message Queue,简称MQ)--应用程序和应用程序之间的通信方法 应用:不同进程Process/线程Thread之间通信 比较流行的中间件: ActiveMQ Rabbi ...
- SpringBoot b2b2c 多用户商城系统(十五)Springboot整合RabbitMQ...
这篇文章带你了解怎么整合RabbitMQ服务器,并且通过它怎么去发送和接收消息.我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter ...
- spring amqp rabbitmq fanout配置
基于spring amqp rabbitmq fanout配置如下: 发布端 <rabbit:connection-factory id="rabbitConnectionFactor ...
- RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?
以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...
- RabbitMQ 延迟队列,太实用了!
点击关注公众号,Java干货及时送达 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付 ...
- SpringBoot第十五篇:Springboot整合RabbitMQ
这篇文章带你了解怎么整合RabbitMQ服务器,并且通过它怎么去发送和接收消息.我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter ...
最新文章
- mochawesome如何合并测试报告_Vue项目采用Cypress做e2e自动化测试,手把手一撸到底...
- 案例册下载 | 10+ 行业标杆企业实践集锦,为你开启数据驱动之旅
- Fusioncharts图表组件在宿舍评分统计中的应用
- 【Java多线程】并发时的线程安全:快乐影院示例
- HBase(五):HBase基本API操作之CRUD
- 程序员最核心的竞争力是什么?
- android 仿搜索动画,Android仿京东顶部搜索框滑动伸缩动画效果
- k均值的损失函数_机器学习:手撕 cross-entropy 损失函数
- 关于InnerHTML存在的问题
- Oracle问题小记五:服务启动-索引-子查询-分页存储过程
- 三原色是红黄蓝对吗_三原色是哪几种颜色?是红黄蓝,还是红绿蓝
- python 大括号嵌套,分析嵌套的大括号/方括号组
- C#Directory常用方法
- python乒乓球比赛规则介绍_用英语介绍乒乓球的比赛规则
- MPEG Audio 简述
- 输入关键词获取今日头条免费图片
- 解决PPT不能插入页码问题
- 【Python 实战基础】Flask 蓝图 Blueprint 怎么用以及怎么集成 Bootstrap
- 皓月战地3不显示服务器,【求助】登录皓月服出错。。
- linux下Sigal信号值