1.rabbitmq消息监听,兼容多种模式的消息,fanout/topic等模式

MQ消息配置监听:

package com.test.ddyin.conf;import java.util.HashMap;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;import org.apache.poi.ss.formula.functions.T;
import org.springframework.amqp.core.AbstractExchange;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Binding.DestinationType;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import com.qf.openchannel.mq.MQMessageAware;
import com.qf.openchannel.mq.MQReceiver;@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq", name = "enable", matchIfMissing = false)
public class MQInitConfig {private final String queueNameSufix = ".test.channel";@Autowiredprivate List<MQMessageAware> messageListeners;@BeanList<Queue> queue() {return messageListeners.stream().map(listener -> {return new Queue(listener.getExchange() + queueNameSufix, false);}).collect(Collectors.toList());}@BeanList<Exchange> exchange() {return messageListeners.stream().map(listener -> {return new AbstractExchange(listener.getExchange()) {@Overridepublic String getType() {return listener.getMQType();}};}).collect(Collectors.toList());}@BeanList<Binding> binding() {return messageListeners.stream().map(listener -> {return new Binding(listener.getExchange() + queueNameSufix, DestinationType.QUEUE, listener.getExchange(),listener.getRoutingKey(), new HashMap<String, Object>());}).collect(Collectors.toList());}@BeanSimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setMessageListener(listenerAdapter);messageListeners.forEach(listener -> {container.addQueueNames(listener.getExchange() + queueNameSufix);});return container;}@BeanMessageListenerAdapter listenerAdapter(MQReceiver receiver) {return new MessageListenerAdapter(receiver);}
}

注意:在绑定的时候,要加入exchange和routing key(fanout模式的routing key 为空字符串),其中,在queue,exchange和binding加注解,相当于是在容器中添加了exchange,队列和两者之间的绑定关系,可以直接从applicationContext中获取,其中,也是自动创建了exchange,queue以及两者之间的绑定关系,不需要在rabbitmq界面重新添加exchange,queue以及两者的绑定关系。

MQ消息接收:(MQReceiver)

package com.qf.openchannel.mq;import java.util.HashMap;
import java.util.Map;import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Service;import com.qf.openchannel.util.Constant;
import com.qf.openchannel.util.LoggerUtil;@Service
public class MQReceiver implements MessageListener, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {private final String queueNameSufix = ".test.channel";private ApplicationContext applicationContext;private Map<String, MQMessageAware> messageListener = new HashMap<>();@Overridepublic void onMessage(Message message) {String payload = new String(message.getBody());LoggerUtil.info("Received <" + payload + ">");try {          String exchange = message.getMessageProperties().getConsumerQueue();String routingKey = message.getMessageProperties().getReceivedRoutingKey();LoggerUtil.info("MQReceiverService onMessage routingKey {} exchange {}", routingKey,exchange);if (messageListener.containsKey(exchange)) {if(messageListener.containsKey(routingKey)) {messageListener.get(routingKey).onMessage(payload);}else {messageListener.get(exchange).onMessage(payload);}} else {LoggerUtil.info("MQReceiverService receiveMessage unrecognized message from exchange : ", exchange);}} catch (Exception e) {LoggerUtil.error("MQReceiverService receiveMessage exception: ", e);}}@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {applicationContext.getBeansOfType(MQMessageAware.class).forEach((key, listener) -> {if(listener.getMQType().equals(Constant.MQTYPE_TOPIC)) {LoggerUtil.info("MQReceiverService receiveMessage messageType {} routingKey {} exchange {}", listener.getMQType(), listener.getRoutingKey(),listener.getExchange());messageListener.put(listener.getExchange() + queueNameSufix, listener);messageListener.put(listener.getRoutingKey(), listener);}else if(listener.getMQType().equals(Constant.MQTYPE_FANOUT)){LoggerUtil.info("MQReceiverService receiveMessage messageType {} routingKey {} exchange {}", listener.getMQType(), listener.getRoutingKey(),listener.getExchange());messageListener.put(listener.getExchange() + queueNameSufix, listener);}else {}});}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}
}

注意:区分不同消息类型来绑定不同的监听,对于topic模式,routing key也要绑定对应的listener(监听器),然后通过message可获取监听的exchange和routing key

对于监听器,由于有多个监听,抽象出一个共同接口:

MQMessageAware

package com.test.ddyin.mq;public interface MQMessageAware {String getExchange();void onMessage(String message);String getMQType();String getRoutingKey();
}

然后对于不同的监听可手动实现:

例如:退团消息的监听:

package com.test.ddyin.mq;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import com.fasterxml.jackson.databind.ObjectMapper;
import com.qf.openchannel.model.QuitPlan;
import com.qf.openchannel.service.QuitPlanService;
import com.qf.openchannel.util.Constant;
import com.qf.openchannel.util.DateUtil;
import com.qf.openchannel.util.LoggerUtil;/*** @author ddyin* @date 2017年9月8日 下午14:08:34*/
@Service
public class QuitPlanListener implements MQMessageAware{@AutowiredQuitPlanService quitPlanService;@Overridepublic String getExchange() {return "trade.topic.notification";}@Overridepublic void onMessage(String message) {try {LoggerUtil.info("QuitPlanListener dealMessage start: {}", DateUtil.get14Date());ObjectMapper mapper = new ObjectMapper();QuitPlan quitPlan = mapper.readValue(message, QuitPlan.class);quitPlanService.insertQuitPlan(quitPlan);LoggerUtil.info("QuitPlanListener dealMessage end: {}", DateUtil.get14Date());} catch (Exception e) {LoggerUtil.error("QuitPlanListener.onMessage Exception:{}", e);}}@Overridepublic String getMQType() {return Constant.MQTYPE_TOPIC;}@Overridepublic String getRoutingKey() {return "trade.plan.status.settled";}}

到此,rabbitmq监听可实现不同消息类型的监听。

注意项目中rabbitmq的配置:

rabbitmq:host: 6.6.6.6port: 5674username: testpassword: testvirtual-host: /testenable: false

综述,end

补充:

如果想扩展到多个virtualHost,可以添加ConnectionFactory

其中配置的virtualHost配置在配置文件中,目的是区分对接不同的业务,通过virtualHost来进行隔离。

事例如下:(放置在MqInitConfig.java文件中)

    /*** virtual-host: /host1 ConnectionFactory** @return*/@BeanConnectionFactory connectionFactory1() {com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();connectionFactory.setHost(mqConfig.getHost());connectionFactory.setPort(mqConfig.getPort());connectionFactory.setUsername(mqConfig.getUsername());connectionFactory.setPassword(mqConfig.getPassword());connectionFactory.setVirtualHost(mqConfig.getVirtualHost1());CachingConnectionFactory factory = new CachingConnectionFactory(connectionFactory);return factory;}/*** virtual-host: /host2 ConnectionFactory** @return*/@BeanConnectionFactory connectionFactory2() throws IOException, TimeoutException {com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();connectionFactory.setHost(mqConfig.getHost());connectionFactory.setPort(mqConfig.getPort());connectionFactory.setUsername(mqConfig.getUsername());connectionFactory.setPassword(mqConfig.getPassword());connectionFactory.setVirtualHost(mqConfig.getVirtualHost2());CachingConnectionFactory factory = new CachingConnectionFactory(connectionFactory);return factory;}

添加完之后将多个virtualHost加入到SimpleRoutingConnectionFactory

@BeanConnectionFactory connectionFactory() {SimpleRoutingConnectionFactory factory = new SimpleRoutingConnectionFactory();Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();targetConnectionFactories.put("connectionFactory1", connectionFactory1());try {targetConnectionFactories.put("connectionFactory2", connectionFactory2());} catch (IOException e) {LoggerUtil.error("connectionFactory targetConnectionFactories IOException: {}", e);} catch (TimeoutException e) {LoggerUtil.error("connectionFactory targetConnectionFactories TimeoutException: {}", e);}factory.setTargetConnectionFactories(targetConnectionFactories);return factory;}

可以将对应的connectionFactory添加到container中,通过virtualHost来进行区分。

    @BeanSimpleMessageListenerContainer container(ConnectionFactory connectionFactory,MessageListenerAdapter listenerAdapter) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(connectionFactory);container.setMessageListener(listenerAdapter);messageListeners.forEach(listener -> {if (MQ_RABBIT_VIRTUAL_HOST2.equals(listener.getVirtualHost())) {container.addQueueNames(listener.getExchange() + queueNameSufix);}else{//container.addQueueNames(listener.getExchange() + queueNameSufix);//或者其他业务逻辑}});return container;}

当然也要在MQMessageAware接口中添加方法:

public interface MQMessageAware {String getExchange();void onMessage(String message);String getMQType();String getRoutingKey();String getVirtualHost();
}

可实现多种virtualHost多种配置。。。

补充:

1.当消费者消费信息出现异常时,比如消费者宕机,消息该如何处理,当生产者宕机时,消息该如何处理?

A:对于消费者宕机,rabbitmq提供ack机制,当ack机制设置成true的时候,说明是生产者已经接收到消费者已经完全消费了信息,就会删除掉已经消费掉的信息。

对于生产者宕机,rabbitmq提供了持久化机制,这里的持久化包含了exchange,queue,message的持久化,MessageDeliveryMode的deliveryMode可设置是否持久化,新建exchange和queue的时候也可设置,持久化属性是durable。

2.如何确认消息是否已发送到broker代理服务器上(broker其实就是一个消息队列的服务器实体,包含exchange,queue和binding的信息)

A:方式一:消息队列的channel的confirm模式是针对消息还未到达broker服务器做的一个弥补机制,channel设置成confirm模式后,就可以在到达broker服务器时发送一个反馈(每个消息在发送到broker服务器时都有一个唯一ID)

方式二:消息队列是基于AMQP协议的,通过AMQP协议的事务机制来实现,是基于AMQP协议层面的解决方案。其实就是Channel中的txSelect(),txCommit()和txRollBack()

转载于:https://my.oschina.net/u/3110937/blog/1535865

RabbitMQ消息监听(多种模式-fanout/topic)相关推荐

  1. 多线程消息监听容器配置[ 消费者spring-kafka配置文件]

    1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://ww ...

  2. IOS第五天(2:用户登录,回车的监听(代理模式UITextFieldDelegate)) 和关闭键盘

    *********用户登录,回车的监听(代理模式UITextFieldDelegate) #import "HMViewController.h" @interface HMVie ...

  3. XMPP——Smack[2]会话、消息监听、字体表情和聊天窗口控制

    连接之后,拿到了connection,通过它可以搞定会话 建立一个会话 [java] view plaincopyprint? MessageListener msgListener = new Me ...

  4. Android判断是否飞行模式已经监听飞行模式

    1.判断是否飞行模式: private boolean isAirPlaneModeOn(){int mode = 0;try {mode = Settings.Global.getInt(getCo ...

  5. 聊聊RabbitMq动态监听这点事

    很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享.顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq ...

  6. activemq 开启监听_ActiveMQ 消息监听 MessageListener 的使用

    刚学 ActiveMQ, 最开始搭建环境的时候引入的jar 包,几个核心的jar jms.jar, httpcore.jar , httpclient.jar, activemq-all.jar 准备 ...

  7. 使用jedis实现Redis消息队列(MQ)的发布(publish)和消息监听(subscribe)

    前言: 本文基于jedis 2.9.0.jar.commons-pool2-2.4.2.jar以及json-20160810.jar 其中jedis连接池需要依赖commons-pool2包,json ...

  8. itchat库实现简单的微信消息监听

    itchat itchat是一个开源的微信个人号接口,作为python的一个轻量的第三方库,使用较少的代码就能实现微信消息的收发以及监测. 安装 由于2017年后,新注册的微信基本登录不了网页版,所以 ...

  9. php消费rabbitmq消息QoS,RabbitMQ消息队列-一对多模式

    一对多模式,用图表示如下 一个生产者向消息队列中发送消息,多个消费者同时从消息队列中读取消息,在这个模式下,我们优先考虑的,是解决各个消费者如何读取消息的机制. 下面我们以ThinkPHP的代码来展示 ...

最新文章

  1. C#版 - Leetcode49 - 字母异位词分组 - 题解
  2. 支付宝支付 第十二集:狂神、飞哥支付宝支付配置代码(免费资源,拿走不谢)
  3. jvisualvm安装Visual GC插件
  4. E打开https网站时,提示此网站的安全证书有问题(证书无效)
  5. 计算机配置对电子竞技的影响,配置高并不是唯一优点 看看电竞硬件还要啥?...
  6. word文档打印 自动编码_办公室文件打印有哪些技巧 办公室文件打印技巧介绍【图文】...
  7. int 取值范围_一定范围内的随机数
  8. java原子整数_多线程(四、原子类-AtomicInteger)
  9. Redis与数据库缓存一致性问题
  10. 深圳大学二本计算机软件,深圳大学是几本(深圳大学是一本还是二本)
  11. ubuntu linux 搭建 webssh 网页ssh远程登录其他服务器
  12. JSP+JavaBean+Servlet工作原理实例讲解
  13. 红外反射传感器实验-传感器原理及应用实验
  14. 趣节点带您3分钟搞懂信息流广告
  15. 重磅直播丨迈向移动数字金融 —— 神州信息并购云核网络线上发布会
  16. ArcGIS教程:欧氏距离 (空间分析)
  17. 【线段树】L - GTY‘s gay friends
  18. WmiPrvSE.exe是什么进程?WMI Provider Host占用很高CPU怎么办?
  19. 【我的生活】旅游计划--2019
  20. Docker中安装Mysql报错--[Warning] TIMES TAMP with implicit DEFAULT value is deprecated. Please use - - exp

热门文章

  1. BH1621FVC-TR光环境传感器
  2. HTML5新特性浅谈
  3. Thinking in React(翻译)
  4. 浅析安全启动(Secure Boot) —写得很好
  5. NetLogon事件ID:5722
  6. Excel与Google Sheets中实现线性规划求解
  7. 3050显卡驱动安装+配置pytorch的cuda环境
  8. 0x0EA772D7 (msvcr80.dll) 处有未经处理的异常: 0xC000041D: 用户回调期间遇到未经处理的异常。。
  9. html5混合app原理,HTML5混合App开发
  10. Android:禁止APP录屏和截屏