我们项目统计模块导出的时候为了达到异步的效果使用了mq来解决,可是producer和consumer是同个应用也就是都是处在一个应用中,之前因为数据少就没有注意异步的效果,改造之后我们模拟了67w的数据量来做压力测试,发现点击导出之后界面一直处于等待状态不是直接返回前端的,同步了?说之前也遇到过这种状态,重新启动下mq服务就行了,感觉不太像,所以特意修改了下消费端的代码来测试。代码如下

发现确实是同步,一直卡在消费端,等到程序完成前端才有返回,发现确实是这个问题

加载
首先我们在springboot启动的过程中类加载的过程中打断点,看看对于mq相关的加载过程中做了哪些事情,首先我们就定位导出的这个消费者ExportDataRequestConsumer

一直往下跟最终发现IntegrationObjectSupport实现了InitializingBean的init,Spring Integration 是对 Spring Messaging 的扩展。它提出了不少新的概念,包括消息的路由 MessageRoute、消息的分发 MessageDispatcher、消息的过滤 Filter、消息的转换 Transformer、消息的聚合 Aggregator、消息的分割 Splitter 等等。同时还提供了包括 MessageChannel 的实现 DirectChannel、ExecutorChannel、PublishSubscribeChannel 等; MessageHandler 的实现 MessageFilter、ServiceActivatingHandler、MethodInvokingSplitter 等内容。
发现最终是将topic(q.ares.export.data.request)做为key,DirectChannel做为value存在bean工厂(ConcurrentHashMap->singletonObjects)中,DirectChannel中的dispatcher→handlers→invocableHandlerMethod的值为消费端对应的处理方法

发送数据send

进入doDispatch

这里需要注意,就是这个tryOptimizedDispatch,我们来看下实现

当theOneHandler不为null的时候直接执行了handlerMessage方法,在来看看handerMessage方法

看到上图标注的地方了吗?handleMeassageInternal方法名“内部处理”,很像我们想象的结果嘛,再跟看看

到这就可以确定了,不用在把下面的步骤贴上了了,大家都应该明白了,最终来到了消费端的方法

最终等待消费端执行完毕又回到了这里,

producer和consumer不同应用场景模拟

我们再来模拟这种正常的场景看看,将消费端注释掉。运行了一遍发现能够正常返回,异步处理。大概的说下步骤,就不贴图详细说明了,步骤如下,

  1. 首先本地beanFactory.geBean==null,获取不到;
  2. 然后getDynamicDestinations获取动态目的地;
  3. 校验动态允许标识;
  4. 如果是true则创建输出createOutput(MessageChannel→DirectChannel);
  5. 向BeanFactory注册改bean,key=q.ares.export.data.request,value=DirectChannel,注意dispatch→theOneHandler为AmqpOutboundEndpoint,跟上面的不同

6.send数据,调用的是AmqpOutboundEndpoint.handleRequestMessage方法

7.最总调用的是RabbitTemplate.send方法

总结

SpringCloud Stream 中的BinderAwareChannelResolver.resolveDestination(String channelName)获取channel的时候会根据topicname作为key先去本地BeanFactory去获取bean,而程序初始化的时候会先将消费类方法作为值创建DirectChannel放入BeanFactory,而发送数据的时候根据dispatch为StreamListenerMessageHandler直接invoke消费者方法,没有走到mq,所以形成了同步。如果应用中需要producer和consumer在同一应用的情况还是不要使用BinderAwareChannelResolver.resolveDestination方法,使用Binding来进行绑定,下面针对这个导出进行修改

  1. ExportDataRequestSink

    @EnableBinding
    public interface ExportDataRequestSink {
     
        String OUTPUT = "areas-export-output";
        String INPUT = "areas-export-input";
     
       /* @Input(MQConstant.Q_ARES_EXPORT_DATA_REQUEST)
        SubscribableChannel produceExportData();*/
     
        @Input(INPUT)
        SubscribableChannel produceExportData();
     
        @Output(OUTPUT)
        MessageChannel output();
     
    }

  2. ExportDataRequestConsumer只需要把@StreamListener值改下就好,这里就不贴代码了,直接上图
  3. ExportDataServiceImpl只需要该这两处就可以了
  4. 最主要的是要在配置文件中还要记得配置一下输入输出通道对应的物理目标(topic名)

    spring.cloud.stream.bindings.areas-export-output.destination=q.ares.export.data.request
    spring.cloud.stream.bindings.areas-export-input.destination=q.ares.export.data.request

  5. 需要注意这两处要对应起来,而且不能有"."

SpringCloud Stream MQ生产和消费同应用造成的同步问题相关推荐

  1. springcloud 相同服务名_SpringCloud系列之SpringCloud Stream

    SpringCloud Stream 技术兴起的原因:为了解决系统中不同中间件的适配问题,出现了cloud stream,采用适配绑定的方式,自动给不同的MQ之间进行切换. 屏蔽底层消息中间件的差异, ...

  2. SpringCloud Stream+RabbitMQ自定义通道

    SpringCloud Stream默认的消息生产通道和消费通道分别是output和input,我们也可以自定义消息生产通道和消费通道:下面对这一过程进行记录. 1 父maven工程 1.1 工程结构 ...

  3. java如何保证mq一定被消费_消费端如何保证消息队列MQ的有序消费

    消息无序产生的原因 消息队列,既然是队列就能保证消息在进入队列,以及出队列的时候保证消息的有序性,显然这是在消息的生产端(Producer),但是往往在生产环境中有多个消息的消费端(Consumer) ...

  4. 记一次springcloud stream延迟消息失效

    起因 实现一个封号功能,可以封1,3,7,30,永久的不同天数,其中1,3,7,30是通过springcloud stream整合rabbitmq的延迟队列来实现,到期自动解封. 问题 当时间设置30 ...

  5. kafka java_Kafka 使用Java实现数据的生产和消费demo

    前言 在上一篇中讲述如何搭建kafka集群,本篇则讲述如何简单的使用 kafka .不过在使用kafka的时候,还是应该简单的了解下kafka. Kafka的介绍 Kafka是一种高吞吐量的分布式发布 ...

  6. 【专题介绍】视频内容生产与消费创新(Part2)

    " "音视频+无限可能"是一扇 LiveVideoStackCon面向新兴领域开启的大门,在移动互联网红利消失.内卷的局面下,智能车.制造.金融.医疗.出海等新兴领域还在 ...

  7. 【专题介绍】视频内容生产与消费创新(Part1)

    " "音视频+无限可能"是一扇 LiveVideoStackCon面向新兴领域开启的大门,在移动互联网红利消失.内卷的局面下,智能车.制造.金融.医疗.出海等新兴领域还在 ...

  8. 多线程-单生产单消费模型

    2019独角兽企业重金招聘Python工程师标准>>> 创建资源对象,提供保存和取出方法(使用synchronized代码块实现) /*** Created by shaoqingh ...

  9. KAFKA 最新版 Shell API单机生产与消费

    文章目录 一.KAFKA 启动与监控 二.KAFKA 主题创建于查看生产与消费 2.1. 查看主题列表 2.2. 创建主题 2.3. 查看主题信息 2.4. 主题信息分析 三.KAFKA 主题创建于查 ...

最新文章

  1. 算法竞赛知识合集 目录(博客中转站)
  2. static_cast, dynamic_cast, const_cast
  3. python安装哪个版本-python到底安装哪个版本
  4. 串口IDLE空闲中断+DMA实现接收不定长数据基于stm32cubemx
  5. Linux 磁盘分区、格式化、目录挂载
  6. typedef void(*Fun)(void);
  7. Scala是完全面向函数式的编程语言体现点
  8. python怎么赋值int_int对象不支持项赋值
  9. 一文读懂机器学习的常用模型评价指标
  10. SpringBoot (15)---事务处理
  11. Linux动态库soname的使用(转载)
  12. 一个VSCode插件实现软妹音程序员鼓励师24小时在线,还能吐槽PM
  13. 格式化输出latex数字罗马字体
  14. 【LaTeX在线编译器】
  15. java 6面骰子_《剑指offer》 面试题43 n个骰子的点数 (java)
  16. Arranging The Sheep(移动思维)
  17. 如何学photoshop
  18. 从TOP IT公司5年收入和利润变化看货币战争
  19. 微信原生小程序电商实战项目----附源码和分析
  20. 小程序公共方法封装(app.js 源码分享)

热门文章

  1. goahead用jst进行简单互动
  2. 100万并发连接服务器
  3. Android 之 向模拟器的sdcard中添加文件
  4. SQL Server 数据库
  5. 方向向量转欧拉角_【姿态表示】旋转向量、旋转矩阵、欧拉角、四元数
  6. android纪念日源码,分享超炫的表白页面和爱的纪念日源码
  7. 电子计算机工作最主要特征,电子计算机最重要的工作特征是( )
  8. 数据平台权限控制-基于猛犸
  9. 梦江湖获取服务器信息,《一梦江湖》6月5日更新公告
  10. 用c语言写出变色的心形图案