topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列

生产者工程

package com.example.demo.rabbitMq.exchange.topic;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicRabbitConfig {public static final String TOPIC_MESSAGE = "topic.message";public static final String TOPIC_MESSAGE_S = "topic.messages";public static final String USER_MESSAGE = "user.message";/*** 武器库*/public static final String ARM_QUEUE = "arm.queue";@Beanpublic Queue queueTopicMessage() {return new Queue(TopicRabbitConfig.TOPIC_MESSAGE);}@Beanpublic Queue queueTopicMessages() {return new Queue(TopicRabbitConfig.TOPIC_MESSAGE_S);}@Beanpublic Queue queueUserMessage() {return new Queue(TopicRabbitConfig.USER_MESSAGE);}@Beanpublic Queue queueArm() {return new Queue(TopicRabbitConfig.ARM_QUEUE);}@BeanTopicExchange exchange() {return new TopicExchange("topicExchange");}@BeanBinding bindingExchangeMessage(Queue queueTopicMessage, TopicExchange exchange) {//所有匹配routingKey=topic.message的消息,将放入Queue[name="topic.message"]return BindingBuilder.bind(queueTopicMessage).to(exchange).with("topic.message");}@BeanBinding bindingExchangeMessages(Queue queueTopicMessages, TopicExchange exchange) {//所有匹配routingKey=topic.# 的消息,将放入Queue[name="topic.messages"]return BindingBuilder.bind(queueTopicMessages).to(exchange).with("topic.#");}@BeanBinding bindingExchangeUserMessage(Queue queueUserMessage, TopicExchange exchange) {///所有匹配routingKey=user.# 的消息,将放入Queue[name="user.messages"]return BindingBuilder.bind(queueUserMessage).to(exchange).with("user.#");}@BeanBinding bindingExchangeArm(Queue queueArm, TopicExchange exchange) {return BindingBuilder.bind(queueArm).to(exchange).with("arm.#");}
}

发送消息

package com.example.demo.rabbitMq.exchange.topic;import com.example.demo.dto.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send1() {User user = new User();user.setUserName("Sender1.....");user.setMobile("1111111111");rabbitTemplate.convertAndSend("topicExchange","topic.message",user);}public void send2() {User user = new User();user.setUserName("Sender2.....");user.setMobile("2222222");rabbitTemplate.convertAndSend("topicExchange","topic.messages",user);}public void send3() {User user = new User();user.setUserName("Sender3.....");user.setMobile("33333");rabbitTemplate.convertAndSend("topicExchange","user.message",user);}
}

消费者工程

package com.example.demo.rabbitMq.exchange.topic;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class TopicRabbitConstant {public static final String TOPIC_MESSAGE = "topic.message";public static final String TOPIC_MESSAGE_S = "topic.messages";public static final String USER_MESSAGE = "user.message";
}

package com.example.demo.rabbitMq.exchange.topic;import com.example.demo.dto.User;
import com.example.demo.utils.Base64Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import java.io.IOException;@Component
@RabbitListener(queues = TopicRabbitConstant.TOPIC_MESSAGE)
public class TopicReceiver1 {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate AmqpTemplate rabbitTemplate;@RabbitHandlerpublic void process(User user) {System.out.println("Receiver1  : " + user);}public void rev1(){//手动去获取消息logger.info("获取Queue[topic.message]消息>>>");Message mesg = rabbitTemplate.receive("topic.message");System.out.println(mesg);if(null != mesg){byte[] body = mesg.getBody();try {User u = (User) Base64Utils.byteToObj(body);//获取字符串数据
                System.out.println(u);} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}}
}

测试:

启动消费者工程,生产者,执行如下方法

    @Testpublic void send1() throws Exception {//会匹配到topic.#和topic.message 两个Receiver都可以收到消息for (int i = 0, size = 10; i < size; i++) {topicSender.send1();}}

也可以不用监听的方式,手动自主获取队列消息,如消费工程:

例如生产者工程TopicRabbitConfig.java添加武器队列:

    /*** 武器库*/public static final String ARM_QUEUE = "arm.queue";@Beanpublic Queue queueArm() {return new Queue(TopicRabbitConfig.ARM_QUEUE);}@BeanBinding bindingExchangeArm(Queue queueArm, TopicExchange exchange) {return BindingBuilder.bind(queueArm).to(exchange).with("arm.#");}

生产武器:

 public void send4() {//生产一批武器List<String> list = new ArrayList<String>();list.add("手枪");list.add("步枪");list.add("机枪");rabbitTemplate.convertAndSend("topicExchange","arm.gun",list);}

    @Testpublic void send4() throws Exception {topicSender.send4();}

消费者:

package com.example.demo.rabbitMq;import com.example.demo.dto.User;
import com.example.demo.rabbitMq.exchange.topic.TopicReceiver1;
import com.example.demo.utils.Base64Utils;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;@SpringBootTest
@RunWith(SpringRunner.class)
public class RabbitMqRevTest {private Logger logger = LoggerFactory.getLogger(this.getClass());@Autowiredprivate AmqpTemplate rabbitTemplate;@Testpublic void topicRev1(){rev1();}public void rev1(){//手动去获取消息logger.info("获取Queue[arm.gun]消息>>>");Message mesg = rabbitTemplate.receive("arm.queue");System.out.println(mesg);if(null != mesg){byte[] body = mesg.getBody();try {List u = (List) Base64Utils.byteToObj(body);//获取字符串数据
                System.out.println(u);} catch (IOException e) {e.printStackTrace();} catch (ClassNotFoundException e) {e.printStackTrace();}}}
}

测试:

样例代码:

https://github.com/xiaozhuanfeng?tab=repositories

转载于:https://www.cnblogs.com/xiaozhuanfeng/p/10716236.html

RabbitMQ(4) TopicExchange相关推荐

  1. 目录:SpringBoot学习目录

    SpringBoot配套源码地址:gitee.com/hengboy/spr- SpringCloud配套源码地址:gitee.com/hengboy/spr- SpringBoot相关系列文章请访问 ...

  2. RabbitMQ第五个实操小案例——主题交换机(TopicExchange)

    文章目录 RabbitMQ第五个实操小案例--主题交换机(TopicExchange) RabbitMQ第五个实操小案例--主题交换机(TopicExchange) TopicExchange 和 D ...

  3. RabbitMQ 服务异步通信 -- 入门案例(消息预存机制)、SpringAMQP、发布订阅模式(FanoutExchange、DirectExchange、TopicExchange)、消息转换器

    文章目录 1. 入门案例 2. 完成官方Demo中的hello world案例 2.1 创建1个工程,2个模块 2.1.1 父工程的依赖,子工程不需要导入额外的依赖 2.1.2 配置子工程的配置文件( ...

  4. RabbitMQ使用及与spring boot整合

    1.MQ 消息队列(Message Queue,简称MQ)--应用程序和应用程序之间的通信方法 应用:不同进程Process/线程Thread之间通信 比较流行的中间件: ActiveMQ Rabbi ...

  5. SpringBoot b2b2c 多用户商城系统(十五)Springboot整合RabbitMQ...

    这篇文章带你了解怎么整合RabbitMQ服务器,并且通过它怎么去发送和接收消息.我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter ...

  6. spring amqp rabbitmq fanout配置

    基于spring amqp rabbitmq fanout配置如下: 发布端 <rabbit:connection-factory id="rabbitConnectionFactor ...

  7. RabbitMQ 延迟队列实现定时任务的正确姿势,你学会了么?

    以下文章来源方志朋的博客,回复"666"获面试宝典 场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等 ...

  8. RabbitMQ 延迟队列,太实用了!

    点击关注公众号,Java干货及时送达 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付 ...

  9. SpringBoot第十五篇:Springboot整合RabbitMQ

    这篇文章带你了解怎么整合RabbitMQ服务器,并且通过它怎么去发送和接收消息.我将构建一个springboot工程,通过RabbitTemplate去通过MessageListenerAdapter ...

最新文章

  1. mochawesome如何合并测试报告_Vue项目采用Cypress做e2e自动化测试,手把手一撸到底...
  2. 案例册下载 | 10+ 行业标杆企业实践集锦,为你开启数据驱动之旅
  3. Fusioncharts图表组件在宿舍评分统计中的应用
  4. 【Java多线程】并发时的线程安全:快乐影院示例
  5. HBase(五):HBase基本API操作之CRUD
  6. 程序员最核心的竞争力是什么?
  7. android 仿搜索动画,Android仿京东顶部搜索框滑动伸缩动画效果
  8. k均值的损失函数_机器学习:手撕 cross-entropy 损失函数
  9. 关于InnerHTML存在的问题
  10. Oracle问题小记五:服务启动-索引-子查询-分页存储过程
  11. 三原色是红黄蓝对吗_三原色是哪几种颜色?是红黄蓝,还是红绿蓝
  12. python 大括号嵌套,分析嵌套的大括号/方括号组
  13. C#Directory常用方法
  14. python乒乓球比赛规则介绍_用英语介绍乒乓球的比赛规则
  15. MPEG Audio 简述
  16. 输入关键词获取今日头条免费图片
  17. 解决PPT不能插入页码问题
  18. 【Python 实战基础】Flask 蓝图 Blueprint 怎么用以及怎么集成 Bootstrap
  19. 皓月战地3不显示服务器,【求助】登录皓月服出错。。
  20. linux下Sigal信号值

热门文章

  1. k8s minikube在wsl中通过nodeport来访问
  2. linux fedora卸载vmware16命令
  3. gorm框架:user role用户角色一对一关联Model编写
  4. python django model关联另一个实体类
  5. golang管道channel与协程goroutine配合使用示例
  6. flink HA高可用Standalone集群搭建
  7. k8s控制器:DaemonSet
  8. SpringBoot 2.1.3配置log4j2日志框架完整代码示例
  9. Linux scp -r命令主机间文件复制
  10. dubbo注册中心zookeeper的安装使用