很长时间没有分享过学习心得了,看了下发布记录,最后一篇文章的时间都在2020-12-10年了,今天抽时间整理下一个很早就想整理的技术分享。顺便说句题外话,因为我一直没时间整理,再加上开发的小伙伴对Mq的理解不够,我开掉了好几个处理这个事情的开发小伙伴,所以我希望这篇文章能对大家带来一点帮助。

背景说明

Mq(消息队列)做为一个消峰工具而常被使用,我们常用的Mq主要分为以下四种:

  • ActiveMQ
  • RabbitMq
  • Kafka
  • RocketMq

今天主要是聊聊RabbitMq,业务场景上选择RabbitMq的原因有很多,今天就不细说了。今天主要是说下如何动态创建队列,并实现动态监听的的方法。

需求背景

做为一个CRM-SAAS平台,每天平台会进入大量的客户信息,那么我们需要用高效的方式把这些数据及时的发给销售,那么这里需要考虑以下几个问题:

  1. 下发数据的及时性
  2. 数据分组
  3. 接收人员属于不同的分组和不同的级别
  4. 数据不满足下发条件(这里举个例子:接收人员都在忙的情况,可能需要过段时间重发)重发的问题

技术方案

  1. 为保证数据及时性,数据进入系统之后及时推进消费队列
  2. 针对数据分组和接收人员不同的分组和不同的级别,并要人为可控的话,那么就设定不同的队列来进行监听消费,我们还可以让队列名称变得有意义,从队列名称上获取我们所需要某些必要信息,例如数据属于那个分组,数据应该下发的群体等。

基于上述考虑,我们选择RabbitMq来实现这个方案,既然是不同的队列消费不同的数据,那么第一步就是考虑如何动态创建队列,因为这里还要设定一个人为可控,也就是人员可以管理,所以比然后伴随着队列的删除和重建。

队列的创建方式

基于注解的使用

    @Beanpublic Queue syncCdrQueue(){return new Queue(CrmMqConstant.SYNC_CDR_TO_CRM_Q,true,false,false);}

非注解配置

     Channel channelForm = connectionFactory().createConnection().createChannel(false);channelForm.queueDeclare(nameForm, true, false, false, null);

基于RabbitAdmin

rabbitAdmin.declareQueue(queue);
rabbitAdmin.declareExchange(fanoutExchange);
rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));

从创建队列的灵活度来说,肯定是依次减弱的:

  • 注解方式:提前定义队列名称,一般以常量来定义,当然也支持变量的方式,但是对于加载先后的要求就高了,例如:这里用一个动态IP作为队列名称举例

    private final String QUEUE_NAME="crm.websocket."+ IPUtils.getLocalhostIp();@Beanpublic Queue queue(){return new Queue(QUEUE_NAME,true,false,false);}//监听
    @RabbitListener(queues = "#{queue.name}")
  • 非注解方式:这个其实就是通过ConnectionFactory来获取通道创建队列的,这个比较适合在建立链接的时候使用,所以一般在批量初始化队列时候比较合适
    @Beanpublic List<String> mqMsgQueues() throws IOException {List<String> queueNames = new ArrayList<String>();List<Map<String,Object>> engineList = autoAssignEngineService.getAllAutoAssignEngine(-1,-1);logger.info("engineList:{}", JsonUtils.toJson(engineList));if(engineList != null && engineList.size() > 0) {for(Map<String,Object> engine : engineList) {String groupId = String.valueOf(engine.get("orgId"));String semAdType = String.valueOf(engine.get("semAdType"));logger.info("groupId:{},semAdType:{}", groupId,semAdType);createQueue(queueNames, groupId,semAdType,"1");createQueue(queueNames, groupId,semAdType,"2");}}return queueNames;}private void createQueue(List<String> queueNames, String groupId, String semType, String level) throws IOException {String nameForm = queue +"."+ groupId+"."+semType + "." + level;logger.info("nameForm:{}",nameForm);Channel channelForm = connectionFactory().createConnection().createChannel(false);channelForm.queueDeclare(nameForm, true, false, false, null);channelForm.exchangeDeclare(topicExchange, BuiltinExchangeType.TOPIC,true);channelForm.queueBind(nameForm,topicExchange,routingKey + "."+groupId+"."+semType+"."+level);queueNames.add(nameForm);}
  • 基于RabbitAdmin的方式:那么这个就相对来说比较灵活,支持随时创建队列了。那么简单封装下:

       public void createMqQueue(String queueName,String exName,String rk,String type){Properties properties = rabbitAdmin.getQueueProperties(queueName);if(properties==null) {Queue queue = new Queue(queueName, true, false, false, null);if(BuiltinExchangeType.DIRECT.getType().equals(type)) {DirectExchange directExchange = new DirectExchange(exName);rabbitAdmin.declareQueue(queue);rabbitAdmin.declareExchange(directExchange);rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rk));}else if(BuiltinExchangeType.FANOUT.getType().equals(type)){FanoutExchange fanoutExchange = new FanoutExchange(exName);rabbitAdmin.declareQueue(queue);rabbitAdmin.declareExchange(fanoutExchange);rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));}else{TopicExchange topicExchange = new TopicExchange(exName);rabbitAdmin.declareQueue(queue);rabbitAdmin.declareExchange(topicExchange);rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(rk));}}}

    我们知道如何动态创建队列之后,接下来我们想办法解决动态消费监听得事情就行:

动态消费监听

RabbitMq得抽象监听类是:AbstractMessageListenerContainer,他下面有三个实现类,这里就使用SimpleMessageListenerContainer类来进行简单得说明。

方式一(我一个前辈得方式):

初始化队列,存储在静态缓存,用不同得bean来加载监听:

private List<Map<String,String>> groupOrgIds = new ArrayList<Map<String,String>>()
@PostConstructpublic void init() {if (logger.isDebugEnabled()) {logger.debug("initbean...");}List<AutoAssignEngine> engineList = autoAssignEngineService.getAllAutoAssignEngine();if (engineList != null && engineList.size() > 0) {for(AutoAssignEngine engine : engineList) {createQueueList(engine.getOrgId(),engine.getSemAdType(),"1");createQueueList(engine.getOrgId(),engine.getSemAdType(),"2");}}}
private void createQueueList(String orgId,String semType,String userLevel) {Map<String,String> feed = new HashMap<String, String>();feed.put("orgId", orgId);feed.put("type", semType);feed.put("userLevel", userLevel);groupOrgIds.add(feed);}
public SimpleMessageListenerContainer setContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(rabbitConfig.connectionFactory());container.setAcknowledgeMode(AcknowledgeMode.AUTO);container.setMaxConcurrentConsumers(8);container.setConcurrentConsumers(5);container.setPrefetchCount(10);return container;}public SimpleMessageListenerContainer queueMethod(SimpleMessageListenerContainer container) {Map<String,String> orgIdMap = groupOrgIds.get(0);String orgId = orgIdMap.get("orgId");String sourceType = orgIdMap.get("type");String userLevel = orgIdMap.get("userLevel");String queueNames=queueName + "." + orgId+"."+sourceType+"."+userLevel;container.addQueueNames(queueNames);excute(orgId,sourceType, container,queueNames);groupOrgIds.remove(0);return container;}
public SimpleMessageListenerContainer excute(String orgId,String semAdType, SimpleMessageListenerContainer container,String queneName) {container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {}});return container;}
/*** 创建多个队列监听,利用Bean的初始化顺序,去消费groupOrgIds*/@Beanpublic SimpleMessageListenerContainer container1() {SimpleMessageListenerContainer container = setContainer();queueMethod(container);return container;}
@Beanpublic SimpleMessageListenerContainer container2() {SimpleMessageListenerContainer container = setContainer();queueMethod(container);return container;}
.....

那么这种方式呢确实能动态监听不同得队列和消费,但是因为利用得是Bean得初始化得方式,所以每次变更需要加载得队列内容就得重新加载Bean,也就是需要重启服务。

方式二:真正得动态监听

 @Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer() {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();container.setConnectionFactory(rabbitConfig.connectionFactory());container.setAcknowledgeMode(AcknowledgeMode.AUTO);container.setMaxConcurrentConsumers(8);container.setConcurrentConsumers(5);container.setPrefetchCount(10);// 查询有多少分配引擎,每个分配引擎一个队列List<AutoAssignEngine> engineList = autoAssignEngineService.getAllAutoAssignEngine();if (engineList != null && engineList.size() > 0) {for(AutoAssignEngine engine : engineList) {mqService.addNewListener(engine.getOrgId(),engine.getSemAdType(),"1",container);mqService.addNewListener(engine.getOrgId(),engine.getSemAdType(),"2",container);}}return container;}public Boolean addNewListener(String orgId,String semType,String userLevel,SimpleMessageListenerContainer container ){String queueNames=queueName + "." + orgId+"."+semType+"."+userLevel;container.addQueueNames(queueNames);container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {}});return true;}

问题1:这里再接收消息(onMessage方法内)得时候不要用方法传参,会出现并发问题。

解决方式1:

String receiveQueueName = message.getMessageProperties().getConsumerQueue();

队列名称解析获取,本人使用。

解决方式2:

使用final变量重新接收传参,不过这个有待测试,不一定又用。

问题2:这不是还是在Bean初始化得时候加载得嘛,如果想要在服务启动之后再增加监听如何处理

完整得动态创建队列和监听(业务过程种实现)

我们知道如何创建队列和监听之后就开始解决问题2。

需求:变更现有队列。

转化需求为:删除现有队列和监听,新建新得队列并增加监听

问题:推送和消费不再统一服务。

解决方式:暴露接口,利用http请求实现同步。

代码实现:

消费端

    public Boolean updateListener(String orgId,String semType,String oldOrg){logger.info("================================消费端开始处理");String newFirstQueueName = queueName+"."+orgId+"."+semType+"."+1;String newFirstRk = routingKey+"."+orgId+"."+semType+"."+1;String newSecondQueueName = queueName+"."+orgId+"."+semType+"."+2;String newSecondRk =  routingKey+"."+orgId+"."+semType+"."+2;createMqQueue(newFirstQueueName,topicExchange,newFirstRk, BuiltinExchangeType.TOPIC.getType());createMqQueue(newSecondQueueName,topicExchange,newSecondRk, BuiltinExchangeType.TOPIC.getType());logger.info("================================创建队列");SimpleMessageListenerContainer container = SpringCtxUtils.getBean(SimpleMessageListenerContainer.class);String oneQueueNames=queueName + "." + orgId+"."+semType+"."+1;String twoQueueNames=queueName + "." + orgId+"."+semType+"."+2;if(!"NO".equals(oldOrg)) {String oneOldQueueNames = queueName + "." + oldOrg + "." + semType + "." + 1;String twoOldQueueNames = queueName + "." + oldOrg + "." + semType + "." + 2;container.removeQueueNames(oneOldQueueNames);container.removeQueueNames(twoOldQueueNames);logger.info("================================删除监听成功");}container.addQueueNames(oneQueueNames);container.addQueueNames(twoQueueNames);logger.info("================================添加监听成功");container.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {}});return true;}public void createMqQueue(String queueName,String exName,String rk,String type){Properties properties = rabbitAdmin.getQueueProperties(queueName);if(properties==null) {Queue queue = new Queue(queueName, true, false, false, null);if(BuiltinExchangeType.DIRECT.getType().equals(type)) {DirectExchange directExchange = new DirectExchange(exName);rabbitAdmin.declareQueue(queue);rabbitAdmin.declareExchange(directExchange);rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(directExchange).with(rk));}else if(BuiltinExchangeType.FANOUT.getType().equals(type)){FanoutExchange fanoutExchange = new FanoutExchange(exName);rabbitAdmin.declareQueue(queue);rabbitAdmin.declareExchange(fanoutExchange);rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(fanoutExchange));}else{TopicExchange topicExchange = new TopicExchange(exName);rabbitAdmin.declareQueue(queue);rabbitAdmin.declareExchange(topicExchange);rabbitAdmin.declareBinding(BindingBuilder.bind(queue).to(topicExchange).with(rk));}}}

暴露接口:

    @GetMapping("/add-listener/{orgId}/{semType}/{oldOrg}")public ComResponse addListener(@PathVariable("orgId") String orgId,@PathVariable("semType") String semType,@PathVariable("oldOrg") String oldOrg){mqService.updateListener(orgId,semType,oldOrg);return ComResponse.successResponse();}

注意:执行顺序,创建新队列,删除监听,添加监听

推送端

//添加新得监听String requestUrl = consumerUrl+"/"+newOrg+"/"+semType+"/"+oldOrgId;String result = restTemplateService.getWithNoParams(requestUrl,String.class);log.info("请求结束:{}",result);if(!"NO".equals(oldOrgId)) {String firstQueueName = queue + "." + oldOrgId + "." + semType + "." + 1;String secondQueueName = queue + "." + oldOrgId + "." + semType + "." + 2;mqService.deleteMqQueue(firstQueueName);mqService.deleteMqQueue(secondQueueName);log.info("删除队列结束");}//新增新的的队列String newFirstQueueName = queue+"."+newOrg+"."+semType+"."+1;String newFirstRk = routingKey+"."+newOrg+"."+semType+"."+1;String newSecondQueueName = queue+"."+newOrg+"."+semType+"."+2;String newSecondRk =  routingKey+"."+newOrg+"."+semType+"."+2;mqService.createMqQueue(newFirstQueueName,topicExchange,newFirstRk, BuiltinExchangeType.TOPIC.getType());mqService.createMqQueue(newSecondQueueName,topicExchange,newSecondRk, BuiltinExchangeType.TOPIC.getType());log.info("添加队列结束");

注意:执行顺序:变更监听,删除队列,添加新得队列

到这里基本上就实现了动态创建队列和动态监听。大家如果有什么不太明白得可以留言,抽时间整理得,所以写得比较草,大家凑合着看把。

聊聊RabbitMq动态监听这点事相关推荐

  1. 本地java【动态监听】zk集群节点变化

    [README]搭建zk cluster, refer 2 https://blog.csdn.net/PacosonSWJTU/article/details/111404364 [1] 动态监听代 ...

  2. vue动态监听窗口高度 - 全背景banner

    vue动态监听窗口高度 - 全背景banner 参考项目文件 src/hr/index.vue [结合下文:第一种方法] 第一种方法:[本文手写代码] data() {return {screenHe ...

  3. zookeeper专题:使用zookeeper客户端实现动态监听节点并获取数据

    文章目录 1. zookeeper原生客户端 2. Curator客户端 1. zookeeper原生客户端 zookeeper原生客户端就是zookeeper官方自带的客户端,作为代码与zk服务器交 ...

  4. oracle未获得监听器,无监听文件listener.ora的动态监听小例试验

    在数据库服务器上,监听文件的位置是:$ORACLE_HOME/network/admin/listener.ora 试验如下: 移动db服务器上的监听文件,如下命令: [oracle@ENMOEDU ...

  5. oracle什么时候使用静态监听,Oracle监听之动态监听与静态监听特点

    动态注册不需要显示的配置listener.ora文件,实例启动的时候,PMON进程根据instance_name,service_name参数将实例和服务动态注册 1.如何查询某服务是静态监听注册还是 ...

  6. oracle 动态监听例子,ORACLE动态监听总结

    1 动态监听 本文档介绍ORACLE动态监听服务的配置及原理: 1.1 监听文件 $ORACLE_HOME/network/admin/listener.ora 1.2 动态监听 ORACLE实例在启 ...

  7. RabbitMQ消息监听(多种模式-fanout/topic)

    1.rabbitmq消息监听,兼容多种模式的消息,fanout/topic等模式 MQ消息配置监听: package com.test.ddyin.conf;import java.util.Hash ...

  8. Oracle 动态监听和静态监听非1521端口配置

    硬核配置方法, 动态监听配置默认的1521不做演示 动态注册非1521端口,需要配置三个地方listener.ora.tnsname.ora.local_listener. listener.ora配 ...

  9. Zookeeper——服务器动态上下线、客户端动态监听

    文章目录: 1.前言 2.实操步骤 2.1 服务端代码 2.2 客户端代码 2.3 测试 1.前言 某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线. ...

最新文章

  1. Xcode 调试的正确打开方式——Debugging
  2. 二叉树路径应用举例(基于非递归后序遍历)
  3. ng机器学习视频笔记(十四) ——推荐系统基础理论
  4. matlab读取文件与写入文件
  5. 引用js实现checkbox批量选中
  6. [LeetCode 题解]: Roman to Interger
  7. [bib]论文参考文献的获取方式(持更)
  8. 去掉讨厌的“windows盗版软件受害者”的提示
  9. IDEA设置默认浏览器为chrome
  10. 163邮箱登录不了Outlook解决方案
  11. 【Cocos Creator实战教程(6)】——镜头跟随
  12. 软考岗位设置与岗位描述
  13. BZOJ3876支线剧情
  14. 微信个性签名服务器维护,微信个性签名显示由于系统维护
  15. 超级电容怎么才能把内阻做小_如何测试超级电容内阻?
  16. glue logic-胶合逻辑
  17. Linux中ssh登录跳过RSA key fingerprint输入yes/no
  18. linux安装iostat,yum安装iostat命令时,提示No package iostat available. 错误:无须任何处理(示例代码)...
  19. Oracle数据库基础(二)
  20. 支付宝 网站 支付(AliPay)开发

热门文章

  1. GAN学习历程之CycleGAN论文笔记
  2. 到底是什么原因?让200多家企业参与区块链改革?
  3. 国标 计算机房 湿度,数据中心机房:温度、湿度标准是什么?
  4. OpenLayers之多源数据加载七:矢量地图
  5. CSV是什么文件格式【转】
  6. 2021-2027年全球与中国彩色隐形眼镜行业市场前瞻与投资战略规划分析报告
  7. linux编程常用指令
  8. Android studio实现多个按钮跳转多个页面
  9. 坐落尘世的繁华,幽眉清黛任花开花谢
  10. 机器人会偷走你的饭碗吗——写作篇