开启消息轨迹

broker端

traceTopicEnable属性设置为true,默认值为false。设置为true,broker启动的时候会初始化存储轨迹数据的默认topic:RMQ_SYS_TRACE_TOPIC;
traceOn属性设置为true,默认值也是true。该属性如果设置false,客户端不会发送轨迹数据到broker端

producer

构造producer对象的时候,设置enableMsgTrace=true,customizedTraceTopic可以为空,使用默认的topic,其它重载接口类似

    /*** Constructor specifying producer group, enabled msgTrace flag and customized trace topic name.** @param producerGroup Producer group, see the name-sake field.* @param enableMsgTrace Switch flag instance for message trace.* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default* trace topic name.*/public DefaultMQProducer(final String producerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {this(null, producerGroup, null, enableMsgTrace, customizedTraceTopic);}

consumer

构造consumer对象的时候,设置enableMsgTrace=true,customizedTraceTopic可以为空,使用默认的topic,其它重载接口类似

    /*** Constructor specifying consumer group, enabled msg trace flag and customized trace topic name.** @param consumerGroup Consumer group.* @param enableMsgTrace Switch flag instance for message trace.* @param customizedTraceTopic The name value of message trace topic.If you don't config,you can use the default trace topic name.*/public DefaultMQPushConsumer(final String consumerGroup, boolean enableMsgTrace, final String customizedTraceTopic) {this(null, consumerGroup, null, new AllocateMessageQueueAveragely(), enableMsgTrace, customizedTraceTopic);}

消息轨迹数据的存储介质

消息轨迹数据还是存储在RocketMQ的broker,每条轨迹数据就像普通消息一样,发送到指定的topic上。
原始消息的ID和KEYS会做为轨迹消息的KEYS,这样可以用来检索指定消息的轨迹数据。
不使用外部存储介质的一个好处是避免依赖第三方组件。

Producer如何采集轨迹数据

在初始化producer的时候注册一个SendMessageHook,在消息发送前、后采集消息发送的上下文信息,并在消息发送完成后异步投递轨迹数据到broker

    @Overridepublic void sendMessageBefore(SendMessageContext context) {//if it is message trace data,then it doesn't recordedif (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())) {return;}//build the context content of TuxeTraceContextTraceContext tuxeContext = new TraceContext();tuxeContext.setTraceBeans(new ArrayList<TraceBean>(1));context.setMqTraceContext(tuxeContext);tuxeContext.setTraceType(TraceType.Pub);tuxeContext.setGroupName(NamespaceUtil.withoutNamespace(context.getProducerGroup()));//build the data bean object of message traceTraceBean traceBean = new TraceBean();// 发送前采集的轨迹数据如下traceBean.setTopic(NamespaceUtil.withoutNamespace(context.getMessage().getTopic()));traceBean.setTags(context.getMessage().getTags());traceBean.setKeys(context.getMessage().getKeys());traceBean.setStoreHost(context.getBrokerAddr());traceBean.setBodyLength(context.getMessage().getBody().length);traceBean.setMsgType(context.getMsgType());traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostProducer().getmQClientFactory().getClientId());tuxeContext.getTraceBeans().add(traceBean);// 发送前采集部分数据到上下文}@Overridepublic void sendMessageAfter(SendMessageContext context) {//if it is message trace data,then it doesn't recordedif (context == null || context.getMessage().getTopic().startsWith(((AsyncTraceDispatcher) localDispatcher).getTraceTopicName())|| context.getMqTraceContext() == null) {return;}if (context.getSendResult() == null) {return;}if (context.getSendResult().getRegionId() == null|| !context.getSendResult().isTraceOn()) {// if switch is false,skip itreturn;}TraceContext tuxeContext = (TraceContext) context.getMqTraceContext();// traceBean里保存了发送前采集的相关信息TraceBean traceBean = tuxeContext.getTraceBeans().get(0);// 发送耗时,traceBeans实际只会有一条数据int costTime = (int) ((System.currentTimeMillis() - tuxeContext.getTimeStamp()) / tuxeContext.getTraceBeans().size());tuxeContext.setCostTime(costTime);if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {tuxeContext.setSuccess(true);} else {tuxeContext.setSuccess(false);}tuxeContext.setRegionId(context.getSendResult().getRegionId());traceBean.setMsgId(context.getSendResult().getMsgId());traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());// 计算存储时间方式:就是认为总耗时的一半,所以这不一个准确值traceBean.setStoreTime(tuxeContext.getTimeStamp() + costTime / 2);// 准备异步发送轨迹数据,并不是立即发送localDispatcher.append(tuxeContext);}

Consumer如何采集轨迹数据

consusmer和producer类似,注册一个ConsumeMessageHook,但与producer最大的区别是producer是消息发送完成后发条该条消息的轨迹数据,但是consumer是在消费前采集部分数据发送一次,消费后再采集部分数据发送一次,消费是共2条轨迹数据。如果消费失败进行重试,每重试一次自己就要再记录两条

    @Overridepublic void consumeMessageBefore(ConsumeMessageContext context) {if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {return;}TraceContext traceContext = new TraceContext();context.setMqTraceContext(traceContext);traceContext.setTraceType(TraceType.SubBefore);//traceContext.setGroupName(NamespaceUtil.withoutNamespace(context.getConsumerGroup()));//List<TraceBean> beans = new ArrayList<TraceBean>();for (MessageExt msg : context.getMsgList()) {if (msg == null) {continue;}String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);if (traceOn != null && traceOn.equals("false")) {// If trace switch is false ,skip itcontinue;}TraceBean traceBean = new TraceBean();traceBean.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic()));//traceBean.setMsgId(msg.getMsgId());//traceBean.setTags(msg.getTags());//traceBean.setKeys(msg.getKeys());//traceBean.setStoreTime(msg.getStoreTimestamp());//traceBean.setBodyLength(msg.getStoreSize());//traceBean.setRetryTimes(msg.getReconsumeTimes());//traceBean.setClientHost(((AsyncTraceDispatcher)localDispatcher).getHostConsumer().getmQClientFactory().getClientId());traceContext.setRegionId(regionId);//beans.add(traceBean);}if (beans.size() > 0) {traceContext.setTraceBeans(beans);traceContext.setTimeStamp(System.currentTimeMillis());localDispatcher.append(traceContext);//消费前发送一次}}@Overridepublic void consumeMessageAfter(ConsumeMessageContext context) {if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {return;}TraceContext subBeforeContext = (TraceContext) context.getMqTraceContext();if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size() < 1) {// If subbefore bean is null ,skip itreturn;}TraceContext subAfterContext = new TraceContext();subAfterContext.setTraceType(TraceType.SubAfter);//subAfterContext.setRegionId(subBeforeContext.getRegionId());//subAfterContext.setGroupName(NamespaceUtil.withoutNamespace(subBeforeContext.getGroupName()));//subAfterContext.setRequestId(subBeforeContext.getRequestId());//subAfterContext.setSuccess(context.isSuccess());//// Caculate the cost time for processing messagesint costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size());subAfterContext.setCostTime(costTime);//subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);if (contextType != null) {subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());}localDispatcher.append(subAfterContext);//消费后发送一次}

如何发送轨迹数据

客户端在发送或消费消息时,将轨迹消息放入一个阻塞队列便结束了,会有一个异步线程从这个队列里取出轨迹消息封装为一个发送任务提交到线程池,然后发送到broker。

  1. 存放待处理轨迹消息的队列默认大小1024,如果满了,当前轨迹消息打个日志就丢弃了
  2. 有一个异步线程不断轮询从存入轨迹消息的队列取出数据,每次最多100条(或者等待5ms还不够100),封装为一个发送请求任务,提交到发送轨迹消息的线程池
  3. 发送任务将这批次消息按topic分类,每个topic一批消息,按批次处理发送给轨迹topic,把原始消息的消息ID和消息keys做为这个轨迹消息的keys,这批原始消息的元数据(多条的话,每条消息的元数据最终是拼接到一条的,每条消息的元数据尾部有个字段分隔符,查询的时候可以用来拆分)作为消息体

如何查询轨迹数据

因为消息轨迹数据是发到指定的轨迹topic上,原始消息的ID和消息KEYS作为轨迹消息的KEYS,所以可以用目标消息的消息ID作为轨迹消息的key从轨迹topic上查出来相关消息,对查出来的消息体解析,如果解析出来的消息体数据的消息ID字段与目标消息ID匹配,就是我们想要的消息轨迹数据。一般来说,正常情况下发送和消费应该能查出来是3条,一条发送轨迹,两条消费轨迹(消费前和消费后)。

RocketMQ消息轨迹相关推荐

  1. RocketMQ消息轨迹-设计篇

    RocketMQ 消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹-设计相关. RocketMQ消息轨迹,主要跟踪消息发送.消息消费的轨迹,即详细记录消息各个处理环 ...

  2. rocketmq 消息 自定义_RocketMQ消息轨迹-设计篇

    RocketMQ 消息轨迹主要包含两篇文章:设计篇与源码分析篇,本节将详细介绍RocketMQ消息轨迹-设计相关. RocketMQ消息轨迹,主要跟踪消息发送.消息消费的轨迹,即详细记录消息各个处理环 ...

  3. Apache RocketMQ 发布 v4.4.0,新添权限控制和消息轨迹特性

    近日,Apache RocketMQ 发布了 v4.4.0,该版本主要增加了权限控制(ACL)和消息轨迹(Message Trace)两大特性,并做了8项优化,和修复了4处bug. 权限控制(ACL) ...

  4. rocketmq python 某个队列不消费_消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?...

    关于 消息队列 RocketMQ 版消息轨迹没有显示消费信息,为什么?的搜索结果 回答 2021一月拼团已有400余人拼团成功最低一折 点击进入:一月新人专场 服务器配置时间价格1核2G1年84元1核 ...

  5. RocketMQ如何实现消息轨迹:消息何时发送的?耗时多久?谁消费的?存在哪个broker了?

    文章目录 一.前言 二.消息轨迹 1.消息轨迹的引入目的 2.如何使用消息轨迹 1)使用案例 2)消息轨迹内容 3) RocketMQ-Console中查看消息轨迹 3.消息轨迹实现原理 1)消息轨迹 ...

  6. 喜马拉雅 Apache RocketMQ 消息治理实践

    简介:本文通过喜马拉雅的RocketMQ治理实践分享,让大家了解使用消息中间件过程中可能遇到的问题,避免实战中踩坑. 作者:曹融,来自喜马拉雅,从事微服务和消息相关中间件开发. 本文通过喜马拉雅的Ro ...

  7. RocketMQ 消息丢失场景分析及如何解决!

    本文来源:https://blog.csdn.net/LO_YUN/article/details/103949317 既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一些涉及到了金钱 ...

  8. RocketMQ消息丢失场景及解决办法

    作者:霁云HYY 来源:https://blog.csdn.net/LO_YUN/article/details/103949317 既然在项目中使用了MQ,那么就不可避免的需要考虑消息丢失问题.在一 ...

  9. RocketMQ 消息丢失场景及解决办法

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | blog.csdn.net/LO_YUN/ar ...

最新文章

  1. Matlab编程与数据类型 -- 多分支条件选择语句if/elseif/…/else/end
  2. 阐明性问题生成 (Clarification Question Generation) 概览
  3. linux vmware硬盘,给vmware的Linux虚拟机增添硬盘
  4. Linux入门学习(十)
  5. php生成网页桌面快捷方式
  6. 安装Eclipse ADT插件时遇到的一些问题,错误
  7. 在windows使用vs2008编译live555
  8. java金蝶云单据查询_如果在单据上查或下查的“单据关联”界面添加功能
  9. Firefox扩展IE Tab Plus内置功能导致浏览所有网页加载superfish.com脚本
  10. arcpy投影(一)——prj、gtf文件定义、路径及解析(arcmap,arcpro)
  11. OpenGL 简化点光源与平行光的对比实验
  12. (1366, Incorrect string value: '\\xE6\\xB7\\xB1\\xE5\\x85\\xA5...' for column '
  13. MPG4 MP42 MP43: Microsoft MPEG-4 versions 1, 2, and 3
  14. c语言切蛋糕问题程序设计报告,c语言切蛋糕问题,问题如图。望大神指教。
  15. 四字母net域名值钱吗?四字母域名取名有什么技巧?
  16. 室内设计和平面设计哪个更适合女生学习?
  17. [1101]flink常用参数说明
  18. java 第三方库common系统详解
  19. 微课在中职计算机基础中的应用,微课在中职计算机基础教学中的应用探析
  20. 全终端办公电子邮件集成方案

热门文章

  1. SVL-Simulation自动驾驶仿真器
  2. RPA 项目经验分享
  3. RuntimeError: NCCL error in: /pytorch/torch/lib/c10d/ProcessGroupNCCL.cpp:784, unhandled system erro
  4. Boostrap nav和navbar的详细使用
  5. Eclipes和Myeclipse插件安装方法
  6. [渝粤教育] 中国地质大学 Java语言程序设计 复习题
  7. RK3288 系统升级流程
  8. mc是用java写的吗_都说MC的代码特别差劲,你觉得它在所有游戏中,能排第几?...
  9. matlab 工具箱下载地址
  10. llqrcode.js识别二维码,解析二维码信息