最近支持一下 ONS 内部skywalking 增强支持,RocketMQ 开源版本支持skywalking ,阿里云上的skywalking不支持 简单的实现一下,了解整个实现的逻辑。

一、开发指南

参考文档

官方开发指南
https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/java-plugin-development-guide/

https://skywalking.apache.org/zh/2019-01-21-agent-plugin-practice/
非官方文档 很详细
https://www.jianshu.com/p/e5fb4d46c618
https://blog.csdn.net/kaiyuanshe/article/details/109685249

1.1 核心参数传递 ContextCarrier

在客户端,创建一个新的 traceId 所有信息放到HTTP heads、Dubbo attachments 或者Kafka messages。
通过服务调用,traceId 传递。
skywalking 中使用 ContextCarrier 传递 信息
比如: 这里通过 Mq 用户自定义属性传递 skywalking 中的携带信息

Properties userProperties = message.getUserProperties();
CarrierItem next = contextCarrier.items();
while (next.hasNext()) {next = next.next();if (!StringUtil.isEmpty(next.getHeadValue())) {userProperties.setProperty(next.getHeadKey(), next.getHeadValue());}
}

1.1.1 跨线程传递 ContextSnapshot

听名字 就有意思 上下文快照,直接粘贴官方的文档 。
Besides cross-process tracing, cross-thread tracing has to be supported as well. For instance, both async process (in-memory MQ) and batch process are common in Java. Cross-process and cross-thread tracing are very similar in that they both require propagating context, except that cross-thread tracing does not require serialization.
Here are the three steps on cross-thread propagation:

  1. Use ContextManager#capture to get the ContextSnapshot object.
  2. Let the sub-thread access the ContextSnapshot through method arguments or being carried by existing arguments
  3. Use ContextManager#continued in sub-thread.

2. 字节码增强

https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/java-plugin-development-guide/

字节码增强使用 bytebuddy

可以先去官方的Java agent 代码下载下来湫湫一下子. 很多的例子

 <dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-agent-core</artifactId><version>${sky-agent-version}</version><scope>provided</scope>
</dependency>

ClassInstanceMethodsEnhancePluginDefine �
比如增强这个类的实例 com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl


package com.aliyun.openservices.ons.api;public interface Consumer extends Admin {void subscribe(String var1, String var2, MessageListener var3);void subscribe(String var1, MessageSelector var2, MessageListener var3);void unsubscribe(String var1);
}

注意这里为什么不能直接增强 MessageListener? 如下写法为lambda 表达式 skywalking不支持
https://blog.csdn.net/weixin_39850981/article/details/118846538 bytebuddy 支持官方没有实现
所以改为覆盖参数进行增强
consumer.subscribe(topic, tag, (msg, context) -> {
})

2.1 定义你对哪个class 增强?

NameMatch.byName(ENHANCE_CLASS);

2.2 定义你对哪个方法增强 &构造函数?

ProducerConstructorInterceptor & InstanceMethodsAroundInterceptor 信息订阅
如下为 阿里云ONS 消费者的定义增强

public class ConsumerImplInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {private static final String ENHANCE_CLASS = "com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl";private static final String CONSUMER_MESSAGE_METHOD = "subscribe";private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerImplInterceptor";public static final String CONSTRUCTOR_INTERCEPT_TYPE = "java.util.Properties";public static final String CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.ons.ConsumerConstructorInterceptor";@Overrideprotected ClassMatch enhanceClass() {return NameMatch.byName(ENHANCE_CLASS);}@Overridepublic ConstructorInterceptPoint[] getConstructorsInterceptPoints() {return new ConstructorInterceptPoint[]{new ConstructorInterceptPoint() {@Overridepublic ElementMatcher<MethodDescription> getConstructorMatcher() {return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);}@Overridepublic String getConstructorInterceptor() {return CONSUMER_CONFIG_CONSTRUCTOR_INTERCEPTOR_CLASS;}}};}@Overridepublic InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {return new InstanceMethodsInterceptPoint[]{new InstanceMethodsInterceptPoint() {@Overridepublic ElementMatcher<MethodDescription> getMethodsMatcher() {return named(CONSUMER_MESSAGE_METHOD).and(takesArgumentWithType(1, "java.lang.String")).and(takesArgumentWithType(2, "com.aliyun.openservices.ons.api.MessageListener"));}@Overridepublic String getMethodsInterceptor() {return INTERCEPTOR_CLASS;}@Overridepublic boolean isOverrideArgs() {return true;}}};}
}

2.3 定义增强的处理逻辑

2.3.1 消费者构造函数处理获取参数
/*** {@link  com.aliyun.openservices.ons.api.impl.rocketmq.ProducerImpl}** @author wangji* @date 2022-04-12 09:46*/
public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {private static final ILog LOGGER = LogManager.getLogger(ProducerConstructorInterceptor.class);@Overridepublic void onConstruct(EnhancedInstance objInst, Object[] allArguments) throws Throwable {try {Properties properties = (Properties) allArguments[0];ConfigProducerPropertiesCache cache = new ConfigProducerPropertiesCache();cache.setNameServer(properties.getProperty(PropertyKeyConst.NAMESRV_ADDR));cache.setGroupId(properties.getProperty(PropertyKeyConst.GROUP_ID));cache.setmQType(properties.getProperty(PropertyKeyConst.MQType));objInst.setSkyWalkingDynamicField(cache);} catch (Exception e) {LOGGER.error("ProducerImpl {}",e);}}
}
2.3.2 消费者实现处理进行参数替换

consumer.subscribe(topic, tag, (msg, context) -> {
})
这里为什么进行替换参数是由于内部写法为lambda表达式 so 采用替换参数,debug 看日志的时候发现一直增强不行.

/*** {@link  com.aliyun.openservices.ons.api.impl.rocketmq.ConsumerImpl}** @author wangji* @date 2022-04-09 18:03*/
public class ConsumerImplInterceptor implements InstanceMethodsAroundInterceptor {@Overridepublic void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes, MethodInterceptResult result) throws Throwable {ConfigConsumerPropertiesCache configConsumerPropertiesCache = (ConfigConsumerPropertiesCache) objInst.getSkyWalkingDynamicField();Object messageListener = allArguments[2];if (null != messageListener) {if (messageListener instanceof EnhancedInstance) {} else if (messageListener instanceof MessageListener) {MessageListenerCache messageListenerCache = new MessageListenerCache();messageListenerCache.setMessageListener((MessageListener) messageListener);messageListenerCache.setConsumerPropertiesCache(configConsumerPropertiesCache);allArguments[2] = new MessageListenerAdapterInterceptor(messageListenerCache);}}}@Overridepublic Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,Object ret) throws Throwable {return ret;}@Overridepublic void handleMethodException(EnhancedInstance enhancedInstance, Method method, Object[] objects, Class<?>[] classes, Throwable throwable) {}}
2.3.3 消费者Listener 监听逻辑

从userMessage 中获取 上下文信息,设置 ContextCarrier 信息 创建 Span

/*** 消息监听处理..** @author wangji* @date 2022-04-11 18:42*/
public class MessageListenerAdapterInterceptor implements MessageListener, EnhancedInstance {public static final String CONSUMER_OPERATION_NAME_PREFIX = "ALiYunOns/";private MessageListenerCache cache;public MessageListenerAdapterInterceptor(MessageListenerCache cache) {this.cache = cache;}@Overridepublic Action consume(Message message, ConsumeContext context) {ContextCarrier contextCarrier = getContextCarrierFromMessage(message);ConfigConsumerPropertiesCache consumerPropertiesCache = cache.getConsumerPropertiesCache();AbstractSpan span = ContextManager.createEntrySpan(CONSUMER_OPERATION_NAME_PREFIX + message.getTopic() + "/Consumer", contextCarrier);span.setComponent(ComponentsDefine.ROCKET_MQ_CONSUMER);span.setPeer(consumerPropertiesCache.getNameServer());SpanLayer.asMQ(span);ContextManager.extract(getContextCarrierFromMessage(message));StringTag groupIdTag = (StringTag) Tags.ofKey(PropertyKeyConst.GROUP_ID);groupIdTag.set(span,consumerPropertiesCache.getGroupId());StringTag messageModel = (StringTag) Tags.ofKey(PropertyKeyConst.MessageModel);messageModel.set(span,consumerPropertiesCache.getMessageModel());StringTag msgIdTag = (StringTag) Tags.ofKey(Message.SystemPropKey.MSGID);msgIdTag.set(span,message.getMsgID());StringTag tagTag = (StringTag) Tags.ofKey(Message.SystemPropKey.TAG);tagTag.set(span,message.getTag());if(message.getKey() !=null && message.getKey().length()>0){StringTag keyTag = (StringTag) Tags.ofKey(Message.SystemPropKey.KEY);keyTag.set(span,message.getKey());}try {Action consume = cache.getMessageListener().consume(message, context);if (consume != null) {AbstractSpan activeSpan = ContextManager.activeSpan();Tags.MQ_STATUS.set(activeSpan, consume.name());if (consume != Action.CommitMessage) {activeSpan.errorOccurred();}}return consume;} catch (Throwable t) {ContextManager.activeSpan().log(t);if (t instanceof RuntimeException) {throw (RuntimeException) t;} else {throw new RuntimeException(t);}} finally {ContextManager.stopSpan();}}@Overridepublic Object getSkyWalkingDynamicField() {return cache;}@Overridepublic void setSkyWalkingDynamicField(Object o) {}private ContextCarrier getContextCarrierFromMessage(Message message) {ContextCarrier contextCarrier = new ContextCarrier();CarrierItem next = contextCarrier.items();while (next.hasNext()) {next = next.next();next.setHeadValue(message.getUserProperties(next.getHeadKey()));}return contextCarrier;}
}

3、打包

https://blog.csdn.net/kaiyuanshe/article/details/109685249

3.1 定义增强的配置

src/main/resources/skywalking-plugin.def

# Key=value的形式
# key随便写;value是Instrumentation类的包名类名全路径
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ProducerInstrumentation
aliyun-ons-plugin=org.apache.skywalking.apm.plugin.ons.define.ConsumerImplInstrumentation

3.2 maven shade 定义

因为 skywalking里面增对 bytebuddy 进行了shade 所以也需要处理一下,然后扔进plugin 里面就可以了

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gongdao</groupId><artifactId>apm-aliyun-mq-ons-plugin</artifactId><version>2022-04-15-SNAPSHOT</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><sky-agent-version>8.9.0</sky-agent-version><bytebuddy.version>1.11.18</bytebuddy.version><shade.package>org.apache.skywalking.apm.dependencies</shade.package><shade.net.bytebuddy.source>net.bytebuddy</shade.net.bytebuddy.source><shade.net.bytebuddy.target>${shade.package}.${shade.net.bytebuddy.source}</shade.net.bytebuddy.target></properties><dependencies><dependency><groupId>com.aliyun.openservices</groupId><artifactId>ons-client</artifactId><version>1.8.7.1.Final</version><scope>provided</scope></dependency><dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-agent-core</artifactId><version>${sky-agent-version}</version><scope>provided</scope></dependency><dependency><groupId>net.bytebuddy</groupId><artifactId>byte-buddy</artifactId><version>${bytebuddy.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.skywalking</groupId><artifactId>java-agent-util</artifactId><version>${sky-agent-version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-test-tools</artifactId><version>${sky-agent-version}</version><scope>test</scope></dependency></dependencies><build><plugins><plugin><artifactId>maven-shade-plugin</artifactId><version>3.1.0</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><shadedArtifactAttached>false</shadedArtifactAttached><createDependencyReducedPom>true</createDependencyReducedPom><createSourcesJar>true</createSourcesJar><shadeSourcesContent>true</shadeSourcesContent><relocations><relocation><pattern>${shade.net.bytebuddy.source}</pattern><shadedPattern>${shade.net.bytebuddy.target}</shadedPattern></relocation></relocations></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.1</version><configuration><source>1.8</source><target>1.8</target></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-source-plugin</artifactId><version>3.0.1</version><executions><execution><id>attach-sources</id><goals><goal>jar</goal></goals></execution></executions></plugin></plugins></build>
</project>

4、测试debug

启动应用程序 agent 调试都类似

-DSW_AGENT_COLLECTOR_BACKEND_SERVICES=127.0.0.1:11800 可以不用配置 agent config 里面可以处理
刚开始开发 logging.level=${SW_LOGGING_LEVEL:INFO} 可以修改日志的等级为debug 查看 agent的日志

  • idea 通过run的方式启动remote debug ,非debug 模式.然后在插件的工程通过remote debug 链接上。
-javaagent:/Users/wangji/agent/skywalking-agent.jar
-Dskywalking.agent.service_name=test-provider
-agentlib:jdwp=transport=dt_socket,address=5005,server=y,suspend=n
  • idea 通过debug的方式启动,可以将 plugin 包通过add lib 的方式 获取到代码然后进行debug,这样既可以debug 工程代码 也可以debug agent代码。
-javaagent:/Users/wangji/agent/skywalking-agent.jar
-Dskywalking.agent.service_name=test-provider

二、skywalking 跨线程能力

apm-jdk-threading-plugin 插件

提供了 针对 java.lang.Runnable & java.util.concurrent.Callable 的子类 ,注意 lambda 表达式不支持,提供前缀配置类的前缀,如果有一个统一的 https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/configurations/

plugin.jdkthreading.threading_class_prefixes Threading classes (java.lang.Runnable and java.util.concurrent.Callable) and their subclasses, including anonymous inner classes whose name match any one of the THREADING_CLASS_PREFIXES (splitted by ,) will be instrumented, make sure to only specify as narrow prefixes as what you’re expecting to instrument, (java. and javax. will be ignored due to safety issues) SW_PLUGIN_JDKTHREADING_THREADING_CLASS_PREFIXES

使用注解 @TraceCrossThread or apm-toolkit-trace 里面的包装类

https://skywalking.apache.org/docs/skywalking-java/v8.10.0/en/setup/service-agent/java-agent/application-toolkit-trace-cross-thread/
注意这里必须插入线程之前构造当前类,这个实现原理通过监听构造函数

  ExecutorService executorService = Executors.newFixedThreadPool(1);executorService.submit(CallableWrapper.of(new Callable<String>() {@Override public String call() throws Exception {return null;}}));

实现原理

构造先抓取
public class CallableOrRunnableConstructInterceptor implements InstanceConstructorInterceptor {@Overridepublic void onConstruct(EnhancedInstance objInst, Object[] allArguments) {if (ContextManager.isActive()) {// 用于传递参数objInst.setSkyWalkingDynamicField(ContextManager.capture());}
}}
执行方法前注入快照
public class CallableOrRunnableInvokeInterceptor implements InstanceMethodsAroundInterceptor {@Overridepublic void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,MethodInterceptResult result) throws Throwable {ContextManager.createLocalSpan("Thread/" + objInst.getClass().getName() + "/" + method.getName());// 获取快照ContextSnapshot cachedObjects = (ContextSnapshot) objInst.getSkyWalkingDynamicField();if (cachedObjects != null) {ContextManager.continued(cachedObjects);}}@Overridepublic Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class<?>[] argumentsTypes,Object ret) throws Throwable {ContextManager.stopSpan();// clear ContextSnapshotobjInst.setSkyWalkingDynamicField(null);return ret;}@Overridepublic void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,Class<?>[] argumentsTypes, Throwable t) {ContextManager.activeSpan().log(t);}
}

三、插件常用的启动参数

3.1 skywalking 支持arthas 缓存起来 默认false

缓存起来代码可以查看到增强后的逻辑哦~

# If true, SkyWalking agent will cache all instrumented classes files to memory or disk files (decided by class cache mode),
# allow other javaagent to enhance those classes that enhanced by SkyWalking agent.
agent.is_cache_enhanced_class=${SW_AGENT_CACHE_CLASS:true}# The instrumented classes cache mode: MEMORY or FILE
# MEMORY: cache class bytes to memory, if instrumented classes is too many or too large, it may take up more memory
# FILE: cache class bytes in `/class-cache` folder, automatically clean up cached class files when the application exits
agent.class_cache_mode=${SW_AGENT_CLASS_CACHE_MODE:FILE}

3.2 jdbc 增强是否打印参数 默认false


# If set to true, the parameters of the sql (typically `java.sql.PreparedStatement`) would be collected.
plugin.jdbc.trace_sql_parameters=${SW_JDBC_TRACE_SQL_PARAMETERS:true}

3.3 是否打印dubbo的参数 测试环境可以打开

#  Apache Dubbo consumer collect `arguments` in RPC call, use `Object#toString` to collect `arguments`.
plugin.dubbo.collect_consumer_arguments=${SW_PLUGIN_DUBBO_COLLECT_CONSUMER_ARGUMENTS:true}#  Apache Dubbo provider collect `arguments` in RPC call, use `Object#toString` to collect `arguments`.
plugin.dubbo.collect_provider_arguments=${SW_PLUGIN_DUBBO_COLLECT_PROVIDER_ARGUMENTS:true}

skywalking plugin 开发初探 ONS plugin 实践相关推荐

  1. CRM中Plugin开发如何将功能放入多个模块

    近期做CRM的Plugin开发,发现Plugin中的功能必须全部放在一个DLL里,感觉不爽,如果我要用的功能在别人提供的DLL里,或有些功能需要在多个地方使用岂不是很难过? 用了VS2012的Dyna ...

  2. Flutter Plugin开发流程

    这篇文章主要介绍了Flutter Plugin开发流程,包括如何利用Android Studio开发以及发布等. 本文主要给大家介绍如何开发Flutter Plugin中Android的部分.有关Fl ...

  3. 通用流量录制回放工具 jvm-sandbox-repeater 尝鲜 (三)—— repeater plugin 开发

    本文作者陈恒捷是TesterHome社区主编,第十届MTSC大会上海站-开源专场出品人.先后在PP助手.PPmoney.荔枝等公司从事测试效能提升相关工作,在测试技术及效率提升方面有丰富的经验积累. ...

  4. 提升Android开发效率的最佳实践

    本文属于Android入门与最佳实践系列,有兴趣的可以围观笔者的前一篇关于Android实践建议的文章:2016里一些Android最佳实践列表--Opinionated 原文地址 软件工程师的工作效 ...

  5. Django 开发中的最佳实践之一

    为什么80%的码农都做不了架构师?>>>    Django 开发中的最佳实践之一 本文关注 Django 开发中调试的部分. 出自 Flask 项目旗下的 Werkzeug deb ...

  6. JAVA应用开发MQ实战最佳实践——Series2:消息队列RocketMQ性能测试案例

    简介:JAVA应用开发MQ实战最佳实践--Series2:消息队列RocketMQ性能测试案例 往期内容 JAVA应用开发MQ实战最佳实践--Series1:RocketMQ综述及代码设计 1. 消息 ...

  7. Django开发准则与最佳实践

    最近在网易云课堂学习一门django高级实战教程,本文是学习课时14.15的一些笔记 Django开发准则与最佳实践 一.优先使用自定义用户模型 继承BaseUserManager和AbstractB ...

  8. 第六十二期:腾讯云发布“小程序·云开发十大优秀实践”:猫眼、唯品会等入选

    作者:周小白 [TechWeb]10 月 19 日消息,今日,腾讯云首次对外公布了"小程序·云开发十大优秀实践",包括白鹭引擎.千墨科技.腾讯新闻.即速应用.微盟.唯品会.猫眼.香 ...

  9. 敏捷开发“松结对编程”实践之六:大型团队篇|后记(大型研发团队,学习型团队,139团队,师徒制度,人员招聘,职业生涯规划)...

    本文是"松结对编程"系列的第六篇.(之一,之二,之三,之四,之五,之六,之七,之八) 松结对编程是小型团队的实践,大约运行在1个师傅+1-3个徒弟的尺度上,当面临更大尺度的时候,就 ...

最新文章

  1. 李飞飞高徒新项目,一眼看穿你下届总统会选谁!
  2. 【Python学习系列二十三】Scikit_Learn库降维方法(矩阵分解)-PCAFA
  3. JAVA显式锁,各种锁分类
  4. ML 自学者周刊:第 1 期
  5. 设置背景图片,解决手机上背景图片高度适应问题
  6. LeetCode贪心 最长回文串
  7. 面试:MySQL InnoDB 事务隔离
  8. oracle空间整理,ORACLE数据库空间整理心得
  9. CCF202112-2 序列查询新解
  10. 九九乘法表 利用for语句
  11. 常见基本题型:进制的转换
  12. android ndk下载安装教程,NDK安装教程20180605
  13. m115b linux 驱动下载,富士施乐m115b打印机驱动
  14. Android编程怎么打开,安卓手机如何开启开发者模式?简单三步骤教你解决!
  15. 一梦三四年——国产MOBA网游的巅峰
  16. 数据科学工程师面试宝典系列---旅游评论数据中的自然语言处理
  17. 科学设置百度网盟到访定向,提升竞价转化率!
  18. mail在linux的端口,linux 上mailx通过465端口发送邮件
  19. 单位根检验、协整检验和格兰杰因果关系检验三者之间的关系
  20. 2019 ICPC Asia Nanchang Regional

热门文章

  1. FX1S控制步进电机的实例(图与程序)
  2. 目前最新《Swift4打造今日头条视频实战教程》
  3. html5 | 通过js实现对网页文本内容语音朗读 | 教程
  4. 数控铣削图案及编程_数控铣床编程30例带图
  5. 《SRE生存指南》金句分享
  6. 服务器启动文件夹,win10系统开机启动文件夹的路径是什么_网站服务器运行维护...
  7. python 百度搜索结果 浏览器 和终端不一致_python自动爬取百度搜索结果
  8. oracle修复工具下载,Oracle数据库恢复工具DataNumen Oracle Recovery
  9. 最近这只狗狗,真的很忙
  10. 苏黎世联邦理工学院计算机科学系,苏黎世联邦理工大学专业设置