第31讲:OAL 语言,原来定义创造一门新语言如此轻松(上)

在前文介绍 Metrics 实现以及对应的 DIspatcher 实现的时候,会发现有一部分实现类位于 generated-analysis 模块的 target/generated-sources 目录中,这些类的开头都会有下图所示的注释:

通过这行注释我们知道,这些类是通过 OAL(Observability Analysis Language)生成的。OAL 是 SkyWalking 后端自定义的一种脚本,在 SkyWalking 编译阶段会通过 Antlr4 解析 OAL 脚本,并与 freemarker 配合使用,生成上述 Metrics 实现类以及对应的 Dispatcher 实现类。生成的结果如下图所示:

在生成上述代码的过程主要涉及的模块有:generate-tool、generate-tool-grammar、generated-analysis 三个模块。本课时将详细介绍生成上述代码过程中使用到的基础知识以及这三个模块的核心实现。

Antlr4 基础入门

Antlr4(Another Tool for Language Recognition)是一款强大的语法生成器工具,它可以根据输入的字节流自动生成语法树,作为一款开源语法分析器,可用于读取、处理、执行和翻译结构化的文本或二进制文件。基本上是当前 Java 语言中使用最为广泛的语法生成器工具,下面简单列举了 Antlr4 出现的场景:

  • 在很多大数据系统中都使用 Antlr4 ,例如,Hadoop 生态系统中的 Hive、Spark 数据仓库和分析系统所使用的语言,都用到了 Antlr4;

  • Hibernate 对象-关系映射框架(ORM)使用 Antlr 来处理 HQL 语言;

  • Oracle 公司在 SQL 开发者 IDE 和迁移工具中使用了 Antlr4;

  • NetBeans 使用 Antlr4 来解析 C++。

本课时将通过 Antlr4 实现一个简单的计算器功能,在开始之前,我们需要先了解 Antlr4 的一些基础知识。

词法分析器(Lexer)

我们的编程语言通常由一系列关键字以及一套严格定义的语法结构组成。编译的目的是将程序员日常使用的高级编程语言翻译成物理机或是虚拟机可以执行的二进制指令。词法分析器的工作是读取、解析程序员写出来的代码文件,这些文件基本都是文本文件。词法分析器通过读取代码文件中的字节流,就可以将其翻译成一个一个连续的、编程语言预先定义好的 Token 。一个 Token 可以是关键字、标识符、符号(symbols)和操作符等等,下面的语法分析器将通过这些 Token 构造抽象语法树(Abstract Syntax Tree,AST)。

语法分析器(Parser)

在分析读取到的字符流时,词法分析器(Lexer)并不关心所生成的单个 Token 的语法意义及其与上下文之间的关系。语法分析器(Parser)将收到的所有 Token 组织起来,并转换成为目标语言语法定义所允许的序列。

无论是 Lexer 还是 Parser 都是一种识别器,Lexer 是字符序列识别器,而 Parser 是 Token 序列识别器。它们在本质上是类似的东西,而只是在分工上有所不同而已。例如以一条赋值语句

sp = 100;

为例来看它在词法分析器(Lexer)以及语法分析器(Parser)中的处理流程,如下图所示:

Antlr4 允许我们定义识别字符流的词法规则以及用于解释 Token 流的语法分析规则。Antlr4 将会根据用户提供的语法(grammer)文件自动生成相应的词法/语法分析器。用户可以利用它们将输入的文本进行编译,并转换成其他形式,比如前面提到的抽象的语法树(AST)。

下面开始进入我们的“计算器”示例,为了告诉 Antlr4 计算器的词法分析规则和语法分析规则,我们需要定义语法(grammar)文件(“.g4”后缀文件),这就使用到了 Antlr4 的元语言。下面结合计算器示例的 grammar 文件 —— Calculator.g4,介绍一下 Antlr4 元语言的基本内容:

grammar Calculator;

expr : ‘(’ expr ‘)’
     | expr (‘|’/‘) expr
     | expr (’+‘|’-‘) expr
     | FLOAT
;
line : expr EOF ;
WS : [ \t\n\r]+ -> skip;
FLOAT : DIGIT+ ’.' DIGIT EXPONET?
| ’.' DIGIT+ EXPONET?
| DIGIT+ EXPONET?
;

fragment DIGIT : ‘0’…‘9’ ;
fragment EXPONENT : (‘e’|‘E’) (‘+’|‘-’)? DIGIT+ ;

第 1 行:定义了 grammar 的名字,这个名字必须要与文件名相同。

第 3~8 行:expr 和 line 就是我们定义的“Calculator”语言的语法规则。Antlr4 约定词法规则名字以大写字母开头,例如这里的 FLOAT。语法规则名字以小写字母开头,例如这里的 expr、line。 这里的规则定义都是以分号(;)结束。Antlr4 的语法是基于 C 的, 也有很多像正则表达式的地方。

expr 由 4 个备选分支,不同的备选分支由“|”分割,expr 规则的含义分别是:

  • 第一个备选分支表示 expr 语句可以由另一个 expr 加上左右两个括号构成。

  • 第二个备选分支表示 expr 语句可以是 expr * expr 或是 expr / expr 格式。

  • 第三个备选分支表示 xpr 语句可以是 expr + expr 或是 expr - expr 格式。

  • 第四个备选分支表示 expr 语句可以是 FLOAT。

多个备选分支的前后顺序还决定了歧义的问题,例如这里的 expr 规则在处理 1 + 2 * 3 这个表达式的时候,因为 expr * expr 的分支在前,生产的语法树如下图所示:

这也就满足了四则运算中,括号优先级高于乘除,乘除优先级高于加减的要求。

line 规则比较简单,可以匹配一个 expr 语句与结束符。

第 9~13 行是词法定义。其中,第 9 行的 WS 定义了空白字符,后面的 skip 是一个特殊的标记,表示空白字符会被忽略。

第 10~13 行的 FLOAT 是定义的浮点数,这里使用的“+”“*”等符号是正则表达式中的含义,而不是四则运算中的加号和乘号。

第 15~16 行 fragment 定义了两个词法定义中使用到的公共部分,DIGIT 表示的是整数,EXPONENT 表示的是科学计数法。fragment 类似于一种内联函数(或别名),只是为了简化词法规则的定义以及可读性,并不会被识别为 Token。

了解了 Antlr4 基本的语法以及 .g4 文件的编写方式之后,我们将其添加到 maven 项目的 src/main/antlr4 目录下,如下图所示:

接下来要在 pom.xml 文件中添加 Antlr4 依赖 jar 包以及相应版本的 maven 插件,如下所示,这里与 SkyWalking 6.2 版本使用 antlr4.jar 版本相同:

<dependencies><dependency><groupId>org.antlr</groupId><artifactId>antlr4</artifactId><version>4.7.1</version></dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.antlr</groupId>
<artifactId>antlr4-maven-plugin</artifactId>
<version>4.7.1</version <!– 与jar包版本相同–>
<executions>
<execution>
<id>antlr</id>
<goals>
<goal>antlr4</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

执行 mvn package 命令即可在项目的 target/generated-sources 目录下看到 Antlr4 为我们生成 Lexer、Parser 类。为了让 IDEA 能够发现这些生成的 Java 类,我们需要将 target/generated-sources/antlr4 目录标记为 Generated Source Root,如下图所示:

接下来要做的就是使用这些生成的 Lexer 类以及 Parser 类实现解析输入的字节流。Antlr4 提供了 Visitor 和 Listener 两种模式,通过这两种模式可以很轻松地把 Parser 的结果做各种处理。Antlr4 默认会生成 Listener 模式的相关代码(如果需要生成 Visitor 模式的相关代码,需要调整 maven 插件的配置),这里与 SkyWalking 保持一致,使用 Listener 模式。

相信你已经对 Listener 模式都比较熟悉了,简单来说就是在代码中预先定义一系列的事件,然后开发人员可以编写这些事件的 Listener。在程序运行的过程中,如果某些事件被触发了,则程序会根据事件类型调用相应的 Listener 进行处理。

Antlr4 中的定义的 ParseTreeListener 接口就是我们要实现的 Listener 接口,在计算机示例中生成的代码中,CalculatorListener 接口继承了 ParseTreeListener 接口并对其进行了扩展,CalculatorBaseListener 是 CalculatorListener 接口的空实现,具体方法如下:

我们这里提供一个简单的 CalculatorListener 实现类 —— PrintListener,它继承了 CalculatorBaseListener 并覆盖它全部的方法,所有方法实现只会输出当前涉及的节点信息,例如:

public class PrintListener extends com.xxx.CalculatorBaseListener {@Overridepublic void enterLine(com.xxx.CalculatorParser.LineContext ctx) {System.out.println("enterLine:" + ctx.getText());}... ... // 省略其他方法的实现
}

接下来,通过 CalculatorBaseListener 的输出内容来帮助我们明确各个方法的回调时机,同时可以了解 Antlr4 API 的基本使用方式,main() 方法的相关实现如下:

public class Main {public static void main(String[] args) throws Exception {// 这里处理的是"1+2"这一行计算器语言,读取得到字节流ANTLRInputStream input = new ANTLRInputStream("1+2");// 创建CalculatorLexer,词法分析器(Lexer)识别字节流得到 Token流CalculatorLexer lexer = new CalculatorLexer(input);CommonTokenStream tokens = new CommonTokenStream(lexer);// 创建CalculatorParser,语法分析器(Parser)识别 Token流得到 ASTCalculatorParser parser = new CalculatorParser(tokens);ParseTree tree = parser.line();// 遍历 AST中各个节点回调 PrintListener中相应的方法ParseTreeWalker walker = new ParseTreeWalker();walker.walk(new PrintListener(), tree);// 将整个 AST转换成字符串输出System.out.println(tree.toStringTree(parser));}
}

最后来看具体的输出内容:

enterEveryRule:1+2<EOF>
enterLine:1+2<EOF>   # 开始遍历 line: 1+2
enterEveryRule:1+2
enterExpr:1+2   # 开始遍历 expr: 1+2
enterEveryRule:1
enterExpr:1  # 开始遍历 expr: 1
visitTerminal:1
exitExpr:1   # 结束遍历 expr: 1
exitEveryRule:1
visitTerminal:+
enterEveryRule:2
enterExpr:2   # 开始遍历 expr: 2
visitTerminal:2
exitExpr:2    # 结束遍历 expr: 2
exitEveryRule:2
exitExpr:1+2  # 结束遍历 expr: 1+2
exitEveryRule:1+2
visitTerminal:<EOF>
exitEveryRule:1+2<EOF>
(line (expr (expr 1) + (expr 2)) <EOF>)

下图是 IDEA 中 Antlr4 插件展示的 AST,如果你感兴趣可以去尝试一下:

在后面分析 SkyWalking OAL 语言的时候,还会再次看到 Antlr4 Listener 模式的实践,这里就不再继续深入了。

FreeMarker 基础入门

SkyWalking OAL 语言在生成 Java 代码时除了使用到 Antlr4,还会使用到 FreeMarker 模板引擎,即一种基于模板要改变的数据用来生成文本的通用工具。

FreeMarker 使用专门的模板语言 —— FreeMarker Template Language 来编写模板文件(后缀为“.ftl”)。在模板文件中,我们只需要专注于如何展现数据,而在模板之外的逻辑则需要专注于要展示哪些数据,如下图所示,这种模式也被称为 MVC 模式。

下面将通过一个简单的宠物店示例帮助你快速入门 FreeMarker 的基础使用,首先我们定义一个 Animal 抽象类以及多个实现类,如下图所示:

接下来,准备 test.ftl 模板文件。FreeMarker Template Language 的语法与 JSP 中的 EL 表达式非常类似,具体实现如下:

<html>
<body>
<h1><!-- 使用 ${...}展示一个变量 -->Welcome, ${user}<!-- 使用 <#if><#else>标签实现条件分支 --><#if user == "freemarker-user"> our leader<#else>out user</#if>!
</h1>
<p>We have these animals:
<table border=1><tr><td>Animal Name</td><td>Price</td><td>Size</td></tr><!-- 使用 <#list ... as>标签实现对 List集合的遍历 --><#list animals as animal><tr><td>${animal.name}</td><td>${animal.price}</td><td>${animal.size}</td></tr></#list>
</table>
</body>
</html>

最后写一个 main() 方法,其中首先会初始化 FreeMarker 、加载模板文件、创建 Animal 集合,最后将数据写入到文件中:

public static void main(String[] args) throws Exception {//1.初始化并配置Configuration对象Configuration configuration =new Configuration(Configuration.getVersion());//2.设置模板文件所在的目录configuration.setClassForTemplateLoading(Main.class, "/template");//3.设置字符集configuration.setDefaultEncoding("utf-8");//4.加载模板文件Template template = configuration.getTemplate("test.ftl");//5.创建数据模型Map<String, Object> result = createData();//6.创建Writer对象FileWriter writer = new FileWriter(new File("/Users/xxx/Documents/log/test.html"));//7.输出数据模型到文件中template.process(result, writer);//8.关闭Writer对象writer.close();
}

最后生产的 test.html 文件如下图所示:

到此为止,SkyWalking 生成代码涉及的基础知识就介绍完了,下一课时将开始介绍 generate-tool、generate-tool-grammar、generated-analysis 三个模块的具体实现。

深入 OAL

在 generate-tool-grammar 模块中,OAL(Observability Analysis Language)语法的定义分为了 OALLexer.g4 和 OALParser.g4 两个文件,其中 OALLexer.g4 定义了 OAL 的词法规则,OALParser.g4 定义了 OAL 的语法规则。

在 generated-analysis 模块中,official_analysis.oal 是 SkyWalking 默认提供的 OAL 文件,其中就是用 generate-tool-grammar 模块中定义的 OAL 语法编写而成的。在 official_analysis.oal 中我们可以看到很多熟悉的 Metrics 指标,例如:

instance_jvm_old_gc_time = from(ServiceInstanceJVMGC.time).filter(phrase == GCPhrase.OLD).longAvg();
service_cpm = from(Service.*).cpm();
service_p99 = from(Service.latency).p99(10);
service_relation_server_cpm = from(ServiceRelation.*).filter(detectPoint == DetectPoint.SERVER).cpm();

这四条语句是比较典型的 OAL 语句,下面将以 instance_jvm_old_gc_time 这条语句为线索简单介绍 OAL 语法的定义, official_analysis.oal 文件中其他 OAL 语句基本雷同,不再赘述。

这里先用 IDEA Antlr4 插件预览一下该语句解析出来的抽象语法树(AST),如下图所示:

我们从 root 规则开始一步步分析,instance_jvm_old_gc_time 这条语句匹配了哪些语法规则,如下图所示:

整个 instance_jvm_old_gc_time 语句匹配了 aggregationStatement 规则,其中 variable 规则定义了 OAL 语言中变量名称的结构,等号左边的 instance_jvm_old_gc_time 变量名称会匹配到该规则,而整个等号右边的内容会匹配到 metricStatement 规则(除了结尾的分号)。

metricStatement 规则如下图所示,开头的 from 关键字、source、sourceAttribute 三部分比较简单,你可以直接在 OALParser.g4 文件中找到相应的定义。后面的 filterStatement 表达式开头是 filter 关键字,紧接着的括号中是一个 ==、>、<、>= 或 <= 表达式。最后的聚合函数则是我们在其他编程语言中常见的函数调用格式(可以包含参数),例如这里的 longAvg() 以及前文看到的 p99(10)。

看完对 Antlr4 示例以及 metricStatement 规则的分析,相信你已经可以独立分析 OAL 语言剩余的语法规则,filterStatement 以及 aggregateFunction 两个语法规则就不再展开分析了。

SkyWalking 源码分析指北第 27 课时——OAL语言(上),到此结束。


第32讲:OAL 语言,原来定义创造一门新语言如此轻松(下)

了解了 OAL 语言的基础语法以及 official_analysis.oal 文件中典型的 OAL 语句之后,我们来看 official_analysis.oal 文件是如何被解析的。

在generate-tool-grammar 模块中会使 antlr4-maven-plugin 这个 Maven 插件处理 OALParser.g4 以及 OALLexer.g4 文件,得到相应的辅助类,如下图所示,这与前文 Antlr4 示例相同:

generate-tool 模块会使用上述辅助类识别 official_analysis.oal 文件并最终转换成 OALScripts 对象,相关的代码片段如下:

// 构造 official_analysis.oal 文件的完整路径
String scriptFilePath = StringUtil.join(File.separatorChar,modulePath, "src", "main", "resources", "official_analysis.oal");
// 创建 ScriptParser实例
ScriptParser scriptParser =  ScriptParser.createFromFile(scriptFilePath);
// 调用 parse()方法识别 official_analysis.oal文件
OALScripts oalScripts = scriptParser.parse();

在 ScriptParser.parse() 方法中可以看到,generate-tool 模块与前文示例一样,也是使用 Listener 模式遍历生成的抽象语法树(AST)。最后生成的 OALScripts 对象底层封装了一个 List<AnalysisResult>,每个 AnalysisResult 对应一条 OAL 语句。

下面以 instance_jvm_old_gc_time 这条 OAL 语句生成的 AST 为例介绍 OALListener 中各个回调方法的执行流程,下图是该语句生成的简化版 AST,其中的红色箭头标记了 ParseTreeWalker 遍历各个节点的路径:

另外,上图还按序标记了在对应节点上触发的 OALListener 方法,下面是这些方法的具体功能:

(1).enterAggregationStatement() 方法:创建该语句对应的 AnalysisResult 对象。

(2).exitVariable() 方法:填充 AnalysisResult 的 varName、metricsName、tableName 三个字段,会对大小写以及下划线进行处理。

(3).enterSource() 方法:填充 AnalysisResult 的 sourceName、sourceScopeId 两个字段。

(4).enterSourceAttribute() 方法:填充 AnalysisResult 的 sourceAttribute 字段。

(5).enterFilterStatement() 方法:创建 ConditionExpression 对象。

(6)~(8) 三个方法分别填充 ConditionExpression 对象中的三个字段。

(9).exitFilterStatement() 方法:将 ConditionExpression 添加到 AnalysisResult 中的 filterExpressionsParserResult 集合。

(10).enterFunctionName() 方法:填充 AnalysisResult 的 aggregationFunctionName 字段。

到此为止,该 AnalysisResult 填充的字段如下图所示:

(11).exitAggregationStatement() 方法:这里使用 DeepAnalysis 分析前 10 步从 OAL 语句获取到的信息,从而完整填充整个 AnalysisResult 对象。

  • 在 DeepAnalysis 中首先会根据 aggregationFunctionName 确定当前指标的类型并填充 metricsClassName 字段。示例中的 longAvg 会查找到 LongAvgMetrics 类,如下图所示:

  • 接下来会查找 LongAvgMetrics 类中 @Entrance 注解标注的入口方法,即 combine() 方法,创建相应的 EntryMethod 对象填充到 entryMethod 字段中。这里生成的 EntryMethod 对象不仅包含入口方法的名称,还会根据入口方法参数上的注解生成相应的参数表达式。

依然以 LongAvgMetrics 为例,combine() 方法的定义如下:

@Entrance
public void combine(@SourceFrom long summation, @ConstOne int count) {this.summation += summation;this.count += count;
}

之前我们只关心了方法内的具体逻辑,没有关注方法以及参数上的注解。@Entrance 注解标识了该方法为入口方法,@SourceFrom 标识了该参数来自 OAL 语句前面指定的 source.sourceAttribute,即 ServiceInstanceJVMGC.time,@ ConstOne 标识该参数固定为 1。

查找 @Entrance 标注的方法的逻辑比较简单,就是遍历 LongAvgMetrics 以及父类所有方法即可。这里来看处理 @SourceFrom 以及 @ConstOne 注解的相关代码如下:

EntryMethod entryMethod = new EntryMethod();
result.setEntryMethod(entryMethod);
// @Entrance注解标注的入口方法名
entryMethod.setMethodName(entranceMethod.getName());// 根据入口方法的参数设置参数代码
for (Parameter parameter : entranceMethod.getParameters()) {Annotation[] parameterAnnotations = parameter.getAnnotations();Annotation annotation = parameterAnnotations[0];if (annotation instanceof SourceFrom) {entryMethod.addArg("source." + ClassMethodUtil.toGetMethod(result.getSourceAttribute()) + "()");} else if (annotation instanceof ConstOne) {entryMethod.addArg("1");} // 还有针对其他注解的处理,例如 @Expression、@ExpressionArg0等,不再展开
}

最终创建的 EntryMethod 对象如下图所示:

  • 扫描 LongAvgMetrics 中的全部字段,将所有 @Column 注解标注的字段封装成 DataColumn 对象记录到 persistentFields 集合中。

  • 根据 sourceName 字段的值从 generator-scope-meta.yml 文件中查找该 source 默认新增的字段,如下图所示,InstanceJvmOldGcTimeMetrics 需要新增 entityId、serviceId 两个字段,这也与我们之前的分析相同。这些新增字段会记录到 fieldsFromSource 集合中。

到此为止,instance_jvm_old_gc_time 这条 OAL 语句对应的 AnalysisResult 对象填充完毕。在第 11 步 exitAggregationStatement() 方法的最后,会将该 AnalysisResult 对象记录到 OALScripts.metricsStmts 集合中,作为后续 FreeMarker 填充模板的数据。

MetricsImplementor 模板

在完成 official_analysis.oal 文件中全部 OAL 语句的处理之后,会将 OALScripts 对象传入到 FileGenerator 中完成 Java 代码生成。在 FileGenerator 的构造方法中会初始化 Configuration 对象,与前面介绍的 FreeMarker 示例相同。

在 FileGenerator.generate() 方法中会遍历全部 AnalysisResult 对象,为每个 AnalysisResult 对象生成相应的 Metrics 类以及 Dispatcher 类。创建 Metrics 类时使用的是 MetricsImplementor.ftl 模板文件,相关代码如下:

void generateMetricsImplementor(AnalysisResult result,Writer output) {configuration.getTemplate("MetricsImplementor.ftl").process(result, output);
}

在 MetricsImplementor.ftl 这个模板文件中,我们重点关注一下字段生成的逻辑以及 id() 方法的逻辑,具体如下所示:

<!-- 直接获取 AnalysisResult中相应的字段值,生成的@Stream注解-->
@Stream(name = "${tableName}", scopeId = ${sourceScopeId}, builder = ${metricsName}Metrics.Builder.class, processor = MetricsStreamProcessor.class)
<!-- 填充类名以及父类名称 -->
public class ${metricsName}Metrics extends ${metricsClassName} implements WithMetadata {
<!-- 遍历 AnalysisResult中的 fieldsFromSource集合,生成相应的字段 -->
<#list fieldsFromSource as sourceField><!-- 设置 @Column注解的名称 -->@Setter @Getter @Column(columnName = "${sourceField.columnName}") <!-- 根据配置是否添加 @IDColumn注解 --><#if sourceField.isID()>@IDColumn</#if>   private ${sourceField.typeName} ${sourceField.fieldName};
</#list>
<span class="hljs-meta">@Override</span> public <span class="hljs-built_in">String</span> id() {<span class="hljs-built_in">String</span> splitJointId = <span class="hljs-built_in">String</span>.valueOf(getTimeBucket());

<!-- 遍历 AnalysisResult中的 fieldsFromSource集合 -->
<#list fieldsFromSource as sourceField>
<#if sourceField.isID()> <!-- 根据ID配置决定是否参与构造Document Id–>
<#if sourceField.getTypeName() == “java.lang.String”>
splitJointId += Const.ID_SPLIT + KaTeX parse error: Expected 'EOF', got '&' at position 34: …Name}; &̲lt;#<span class…{sourceField.fieldName});
</#if>
</#if>
</#list>
return splitJointId;
}
<!-- 省略后续其他方法 -->
}

Metrics 类其他方法的生成方式与 id() 方法类似,只是使用的 AnalysisResult 字段不同。你可以将 MetricsImplementor.ftl 模板与 InstanceJvmOldGcTimeMetrics.java 进行比较,更便于理解。

DispatcherTemplate 模板

在前文介绍 Dispatcher 的时候提到,不同 Dispatcher 实现会对关联的 Source 进行分析并转换成 Metrics 传入到 MetricsStreamProcessor 进行后续的流处理。例如,ServiceInstanceJVMGCDispatcher 会将一个 ServiceInstanceJVMGC 对象转换成下图展示的四个 Metrics 对象:

相应的,FileGenerator 生成 Dispatcher 实现类的代码之前,会将由同一个 Source 衍生出来的 Metrics 封装到一个 DispatcherContext 对象, DispatcherContext 的核心字段如下:

private String source; // Source 名称
private String packageName; // Dispatcher所在包名
// 该 Source所有衍生 Metrics对应的 AnalysisResult对象集合
private List<AnalysisResult> metrics = new ArrayList<>();

生成 Dispatcher 实现类使用的是 DispatcherTemplate.ftl 模板文件,填充的数据来自 DispatcherContext,入口是 FileGenerator.generateDispatcher() 方法:

void generateDispatcher(AnalysisResult result, Writer output) {String scopeName = result.getSourceName(); // 根据 Source名称查找相应的 DispatcherContextDispatcherContext context =  allDispatcherContext.getAllContext().get(scopeName);// 生成 Dispatcher实现类的代码并写入到指定文件中configuration.getTemplate("DispatcherTemplate.ftl").process(context, output);
}

接下来看 DispatcherTemplate.ftl 的实现,它会遍历 DispatcherContext.metrics 集合为每个 Metrics 生成相应的 do*() 方法,核心实现如下:

<#list metrics as metrics> <!-- 遍历 DispatcherContext.metrics 集合 --><!-- 填充 do*()方法签名 --><!-- 示例中对应 doInstanceJvmOldGcTime(ServiceInstanceJVMGC)方法 -->private void do${metrics.metricsName}(${source} source) {<!-- 创建相应的Metrics实例 -->${metrics.metricsName}Metrics metrics = new ${metrics.metricsName}Metrics();<#if metrics.filterExpressions??><!--根据 OAL语句中 filter表达式生成对source过滤的代码(略) --></#if><!-- 下面开始填充 Metrics对象 -->metrics.setTimeBucket(source.getTimeBucket());<#list metrics.fieldsFromSource as field>metrics.${field.fieldSetter}(source.${field.fieldGetter}());</#list><!-- 根据 AnalysisResult.entryMethod 生成调用入口方法的代码 --><!-- doInstanceJvmOldGcTime() 方法中调用的是 combine() 方法 -->metrics.${metrics.entryMethod.methodName}(<!-- 生成入口方法的参数 --><#list metrics.entryMethod.argsExpressions as arg>${arg}<#if arg_has_next>, </#if></#list>);MetricsStreamProcessor.getInstance().in(metrics);}
</#list>

为了更好地理解 FreeMarker 填充数据的逻辑,你可以将 DispatcherTemplate.ftl 模板生成 do*() 方法的逻辑与生成后的 ServiceInstanceJVMGCDispatcher.doInstanceJvmOldGcTime() 方法进行比较。

内置 oal 引擎

从 6.3 版本的开始,SkyWalking 将 OAL 引擎内置到 OAP Server 中,在 OAP Server 启动时会动态生成 Metrics 类实现以及相应 Dispatcher 实现,我们可以在 CoreModuleProvider.prepare() 方法中看到下面这段代码(6.3 版本之后的代码):

oalEngine = OALEngineLoader.get();
oalEngine.setStreamListener(streamAnnotationListener);
oalEngine.setDispatcherListener(receiver.getDispatcherManager());
oalEngine.start(getClass().getClassLoader());

在 oalEngine.start() 方法中会解析 official_analysis.oal 文件得到 OALScripts 对象,然后使用 Javassist 和 FreeMarker 生成的 Metrics 和 Dispatcher 实现类,最后直接通过传入的 ClassLoader 加载到 JVM。
6.3 版本中生成代码的核心实现与 6.2 版本中生成代码的核心实现基本类似,只有下面的微小区别:

  • 6.3 版本之后的 OAL 语法略有改动,但改动很小,并不影响理解。

  • 6.3 版本之后在运行时生成代码,而 6.2 版本是在编译期生成。

  • 6.3 版本之后生成代码时使用了 Javassist 和 FreeMarker,6.2 版本只使用了 FreeMarker。

  • 6.3 版本之后生成的代码默认不会保存到磁盘中,我们可以在环境变量中设置 SW_OAL_ENGINE_DEBUG=Y 参数保存运行时生成的 Java 文件。如果你感兴趣可以对比 6.2 和 6.3 生成的 Java 代码,会发现两者区别不大。


第33讲:优化 Trace 上报性能,让你的 OAP 集群轻松抗住百万流量

背景

通过前面对 SkyWalking Agent 的介绍我们知道,Agent 中的 TraceSegmentServiceClient 上报 TraceSegment 数据的方式是 gRPC(客户端流式发送)。使用客户端流式 gRPC 可以向服务端发送批量的数据,服务端在接收这些数据的时候,可以不必等所有的消息全收齐之后再发送响应,而是在接收到第一条消息的时候就及时响应,这显然比 HTTP 1.1 的交互方式更快地提供了响应。

这种上报方式虽然及时,但是在微服务的架构中,依然会面临一些挑战。例如,某一段时间用户请求量突增,整个后端产生的 Trace 上报请求就会增多,若是 OAP 集群无法处理这个尖峰流量,就可能导致整个 OAP 被拖垮。再例如,某些服务进行了扩容,每个后端的服务实例上报 Trace 都是要创建连接的,可能将整个 OAP 集群的对外连接数耗尽。还有可能在通过 gRPC 上报 Trace 数据的过程中网络连接意外断开或是某台 OAP 服务突然宕机,该条 Trace 数据只接收了部分,只能展示出一条断掉的 Trace 链。

为了避免上述问题,这里对 Trace 数据的上报方式修改为使用 Kafka 方式进行上报,使用 Kafka 上报有如下好处。

  1. 削峰:Trace 数据会先写入到 Kafka 中,然后由 OAP 服务进行消费,如果出现了尖峰流量,也会先缓存到 Kafka 集群中,这样 OAP 服务不会被突增流量打垮。待尖峰流量过去之后,OAP 服务会将 Kafka 缓存的数据全部消费掉。

  2. 扩展性:当 Trace 数据或是其他 JVM 监控数据增大到 OAP 集群的处理上限之后,我们只需要增加新的 OAP 服务即可。

  3. 多副本:Kafka 中的消息会有多个副本,即使 Kafka 集群中的一台机器或是 OAP 集群的一个实例宕机,也不会导致数据丢失。

Kafka 基础入门

首先我们先来了解一下 Kafka 的整体架构以及核心概念,如下图所示。

  • 消息:Kafka 中最基本的数据单元。消息是一串主要由 key 和 value 构成的字符串,key 和 value 也都是 byte 数组。key 的主要作用是根据一定的策略,将此消息路由到指定的 Partition 中,这样就可以保证包含同一 key 的消息全部写入同一分区中。消息的真正有效负载是 value 部分的数据。为了提高网络和存储的利用率,Producer 会批量发送消息到 Kafka,并在发送之前对消息进行压缩。

  • Producer:负责将消息发送到 Kafka 集群,即将消息按照一定的规则推送到 Topic 的Partition 中。这里选择分区的“规则”可以有很多种,例如:根据消息的 key 的 Hash 值选择 Partition ,或按序轮训该 Topic 全部 Partition 的方式。

  • Broker:Kafka 集群中一个单独的 Kafka Server 就是一个 Broker。Broker 的主要工作就是接收 Producer 发过来的消息、为其分配 offset 并将消息保存到磁盘中;同时,接收 Consumer 以及其他 Broker 的请求,并根据请求类型进行相应处理并返回响应。

  • Topic:Topic 是用于存储消息的逻辑概念,可以看作是一个消息集合。发送到 Kafka 集群的每条消息都存储到一个 Topic 中。每个 Topic 可以有多个生产者向其中推送(push)消息,也可以有任意多个消费者消费其中的消息。

  • Partition:每个 Topic 可以划分成一个或多个 Partition,同一 Topic 下的不同分区包含着消息是不同的。每个消息在被添加到 Partition 时,都会被分配一个 offset,它是消息在此分区中的唯一编号,Kafka 通过 offset 保证消息在分区内的顺序,offset 的顺序性不跨分区,即 Kafka 只保证在同一个分区内的消息是有序的;同一 Topic 的多个分区内的消息,Kafka 并不保证其顺序性,如下图所示。

同一 Topic 的不同 Partition 会分配在不同的 Broker 上。 Partition 是 Kafka 水平扩展性的基础,我们可以通过增加服务器并在其上分配 Partition 的方式,增加 Kafka 的并行处理能力。

Partition 在逻辑上对应着一个 Log,当 Producer 将消息写入 Partition 时,实际上是写入到了 Partition 对应的 Log 中。Log 是一个逻辑概念,可以对应到磁盘上的一个文件夹。Log 由多个 Segment 组成,每个 Segment 对应一个日志文件和索引文件。在面对海量数据时,为避免出现超大文件,每个日志文件的大小是有限制的,当超出限制后则会创建新的 Segment,继续对外提供服务。这里要注意,因为 Kafka 采用顺序 IO,所以只向最新的 Segment 追加数据。为了权衡文件大小、索引速度、占用内存大小等多方面因素,索引文件采用稀疏索引的方式,文件大小并不会很大,在运行时会将其内容映射到内存,提高索引速度。

  • 保留策略(Retention Policy)& 日志压缩(Log Compaction)

无论消费者是否已经消费了消息,Kafka 都会一直保存这些消息,但并不会像数据库那样长期保存。为了避免磁盘被占满,Kafka 会配置相应的“保留策略”(Retention Policy),以实现周期性的删除陈旧的消息。

Kafka 中有两种“保留策略”:一种是根据消息保留的时间,当消息在 Kafka 中保存的时间超过了指定时间,就可以被删除;另一种是根据 Topic 存储的数据大小,当 Topic 所占的日志文件大小大于一个阈值,则可以开始删除最旧的消息。Kafka 会启动一个后台线程,定期检查是否存在可以删除的消息。“保留策略”的配置是非常灵活的,可以有全局的配置,也可以针对 Topic 进行配置覆盖全局配置。

除此之外,Kafka 还会进行“日志压缩”(Log Compaction)。在很多场景中,消息的 key 与 value 的值之间的对应关系是不断变化的,就像数据库中的数据会不断被修改一样,消费者只关心 key 对应的最新 value 值。此时,可以开启 Kafka 的日志压缩功能,Kafka 会在后台启动一个线程,定期将相同 key 的消息进行合并,只保留最新的 value 值。日志压缩的工作原理如下图所示,图展示了一次日志压缩过程的简化版本。

  • Replica:一般情况下,Kafka 对消息进行了冗余备份,每个 Partition 可以有多个 Replica(副本),每个 Replica 中包含的消息是一样的。每个 Partition 的 Replica 集合中,都会选举出一个 Replica 作为 Leader Replica,Kafka 在不同的场景下会采用不同的选举策略。所有的读写请求都由选举出的 Leader Replica 处理,其他都作为 Follower Replica,Follower Replica 仅仅是从 Leader Replica 处把数据拉取到本地之后,同步更新到自己的 Log 中。每个 Partition 至少有一个 Replica,当 Partition 中只有一个 Replica 时,就只有 Leader Replica,没有 Follower Replica。下图展示了一个拥有三个 Replica 的Partition。

一般情况下,同一 Partition 的多个 Replica 会被分配到不同的 Broker 上,这样,当 Leader 所在的 Broker 宕机之后,可以重新选举新的 Leader,继续对外提供服务。

  • ISR 集合:ISR(In-Sync Replica)集合表示的是目前“可用”(alive)且消息量与 Leader 相差不多的副本集合,这是整个副本集合的一个子集。“可用”和“相差不多”都是很模糊的描述,其实际含义是ISR集合中的副本必须满足下面两个条件:

  1. 副本所在节点必须维持着与ZooKeeper的连接。

  2. 副本最后一条消息的 offset 与 Leader 副本的最后一条消息的 offset 之间的差值不能超出指定的阈值。

每个分区中的 Leader Replica 都会维护此分区的 ISR 集合。写请求首先是由 Leader Replica 处理,之后 Follower Replica 会从 Leader Replica 上拉取写入的消息,这个过程会有一定的延迟,导致 Follower Replica 中保存的消息略少于 Leader Replica,只要未超出阈值都是可以容忍的。如果一个 Follower Replica 出现异常,比如:宕机、发生长时间 GC 而导致 Kafka 僵死或是网络断开连接导致长时间没有拉取消息进行同步,就会违反上面的两个条件,从而被 Leader Replica 踢出 ISR 集合。当 Follower Replica 从异常中恢复之后,会继续与 Leader Replica 进行同步,当 Follower Replica “追上” Leader Replica 的时候(即最后一条消息的 offset 的差值小于指定阈值),此 Follower Replica 会被 Leader Replica 重新加入 ISR 集合中。

  • HW&LEO:HW(HighWatermark)和 LEO 与上面的 ISR 集合紧密相关。HW 标记了一个特殊的 offset ,当消费者处理消息的时候,只能拉取到 HW 之前的消息,HW 之后的消息对消费者来说是不可见的。与 ISR 集合类似,HW 也是由 Leader  Replica 管理的。当 ISR 集合中全部的 Follower Replica 都拉取 HW 指定消息进行同步后,Leader Replica 会递增 HW 的值。Kafka 官方网站的将 HW 之前的消息的状态称为“commit”,其含义是这些消息在多个 Replica 中同时存在,即使此时 Leader Replica 损坏,也不会出现数据丢失。

LEO(Log End offset)是所有的 Replica 都会有的一个 offset 标记,它指向追加到当前 Replica 的最后一个消息的 offset 。当 Producer 向 Leader Replica 追加消息的时候, Leader Replica 的 LEO 标记会递增;当 Follower Replica 成功从 Leader Replica 拉取消息并更新到本地的时候,Follower Replica 的 LEO 就会增加。

为了让你更好地理解 HW 和 LEO 之间的关系,下面通过一个示例进行分析,下图中展示了针对 offset 为 11 的消息,ISR 集合、HW 与 LEO 是如何协调工作。

① Producer 向此 Partition 推送消息。

② Leader Replica 将消息追加到 Log 中,并递增其 LEO。

③ Follower Replica 从 Leader Replica 拉取消息进行同步。

④ Follower Replica 将拉取到的消息更新到本地 Log 中,并递增其 LEO 。

⑤ 当 ISR 集合中所有 Replica 都完成了对 offset =11 的消息的同步,Leader Replica 会递增 HW。

在 ①~⑤ 步完成之后,offset=11 的消息就对 Consumer 可见了。

了解了 Replica 复制原理之后,请你考虑一下,为什么 Kafka 要这么设计?在分布式存储中,冗余备份是常见的一种设计,常用的方案有同步复制和异步复制:

  • 同步复制要求所有能工作的 Follower Replica 都复制完,这条消息才会被认为提交成功。一旦有一个 Follower Replica 出现故障,就会导致 HW 无法完成递增,消息就无法提交,消费者获取不到消息。这种情况下,故障的 Follower Replica 会拖慢整个系统的性能,甚至导致整个系统不可用。

  • 异步复制中,Leader Replica 收到生产者推送的消息后,就认为此消息提交成功。 Follower Replica 则异步地从 Leader Replica 同步消息。这种设计虽然避免了同步复制的问题,但同样也存在一定的风险,现在假设所有 Follower Replica 的同步速度都比较慢,它们保存的消息量都远远落后于 Leader Replica,如下图所示。

此时 Leader Replica 所在的 Broker 突然宕机,则会重新选举新的 Leader Replica,而新的 Leader Replica 中没有原来 Leader Replica 的消息,这就出现了消息的丢失,而有些 Consumer 则可能消费了这些丢失的消息,后续服务状态变得不可控。

Kafka 权衡了同步复制和异步复制两种策略,通过引入了 ISR 集合,巧妙地解决了上面两种方案存在的缺陷:首先,当 Follower Replica 的延迟过高时,会将 Leader Replica 被踢出 ISR 集合,消息依然可以快速提交,Producer 也可以快速得到响应,避免高延时的 Follower Replica 影响整个 Kafka 集群的性能。当 Leader Replica 所在的 Broker 突然宕机的时候,会优先将 ISR 集合中 Follower Replica 选举为 Leader Replica,新 Leader  Replica 中包含了 HW 之前的全部消息,这就避免了消息的丢失。值得注意是,Follower  Replica 可以批量地从 Leader Replica 复制消息,这就加快了网络 I/O,Follower Replica 在更新消息时是批量写磁盘,加速了磁盘的 I/O,极大减少了 Follower 与 Leader 的差距。

  • Cluster&Controller:多个 Broker 可以做成一个 Cluster(集群)对外提供服务,每个 Cluster 当中会选举出一个 Broker 来担任 Controller,Controller 是 Kafka 集群的指挥中心,而其他 Broker 则听从 Controller 指挥实现相应的功能。Controller 负责管理分区的状态、管理每个分区的 Replica 状态、监听 Zookeeper 中数据的变化等工作。Controller 也是一主多从的实现,所有 Broker 都会监听 Controller Leader 的状态,当 Leader Controller 出现故障时则重新选举新的 Controller Leader。

  • Consumer:从 Topic 中拉取消息,并对消息进行消费。某个消费者消费到 Partition 的哪个位置(offset)的相关信息,是 Consumer 自己维护的。在下图中,三个消费者同时消费同一个 Partition,各自管理自己的消费位置。

这样设计非常巧妙,避免了 Kafka Server 端维护消费者消费位置的开销,尤其是在消费数量较多的情况下。另一方面,如果是由 Kafka Server 端管理每个 Consumer 消费状态,一旦 Kafka Server 端出现延或是消费状态丢失时,将会影响大量的 Consumer。同时,这一设计也提高了 Consumer 的灵活性,Consumer 可以按照自己需要的顺序和模式拉取消息进行消费。例如:Consumer 可以通过修改其消费的位置实现针对某些特殊 key 的消息进行反复消费,或是跳过某些消息的需求。

  • Consumer Group:在 Kafka 中,多个 Consumer 可以组成一个 Consumer Group,一个Consumer 只能属于一个 Consumer Group。Consumer Group 保证其订阅的 Topic 的每个Partition 只被分配给此 Consumer Group 中的一个消费者处理。如果不同 Consumer Group 订阅了同一 Topic,Consumer Group 彼此之间不会干扰。这样,如果要实现一个消息可以被多个 Consumer 同时消费(“广播”)的效果,则将每个 Consumer 放入单独的一个 Consumer Group;如果要实现一个消息只被一个 Consumer 消费(“独占”)的效果,则将所有的 Consumer 放入一个 Consumer Group 中。在 Kafka 官网的介绍中,将 Consumer Group 称为“逻辑上的订阅者”(logical subscriber),从这个角度看,是有一定道理的。

下图展示了一个 Consumer Group 中消费者与 Partition 之间的对应关系,其中,Consumer1 和 Consumer2 分别消费 Partition0 和 Partition1,而 Partition2 和 Partition3 分配给了 Consumer3 进行处。

Consumer Group 除了实现“独占”和“广播”模式的消息处理外,Kafka 还通过 Consumer Group 实现了消费者的水平扩展和故障转移。在上图中,当 Consumer3 的处理能力不足以处理两个 Partition 中的数据时,可以通过向 Consumer Group 中添加消费者的方式,触发Rebalance 操作重新分配 Partition 与 Consumer 的对应关系,从而实现水平扩展。如下图所示,添加 Consumer4 之后,Consumer3 只消费 Partition3 中的消息,Partition4 中的消息则由 Consumer4 来消费。

下面来看 Consumer 出现故障的场景,当 Consumer4 宕机时,Consumer Group 会自动重新分配 Partition,如下图所示,由 Consumer3 接管 Consumer4 对应的 Partition 继续处理。

注意,Consumer Group 中消费者的数量并不是越多越好,当消费者数量超过 Partition 的数量时,会导致有 Consumer 分配不到 Partition,从而造成 Consumer 的浪费。

介绍完 Kafka 的核心概念后,我们通过下图进行总结,并从更高的视角审视 Kafka 集群的完整架构。

在上图中,Producer 会根据业务逻辑产生消息,之后根据路由规则将消息发送到指定的分区的 Leader Replica 所在的 Broker 上。在 Kafka 服务端接收到消息后,会将消息追加到 Leader Replica 的 Log 中保存,之后 Follower Replica 会与 Leader Replica 进行同步,当 ISR 集合中所有 Replica 都完成了此消息的同步之后,则 Leader Replica 的 HW 会增加,并向 Producer 返回响应。

当 Consumer 加入 Consumer Group 时,会触发 Rebalance 操作将 Partition 分配给不同的 Consumer 进行消费。随后,Consumer 会确定其消费的位置,并向 Kafka 集群发送拉取消息的请求, Leader Replica 会验证请求的 offset 以及其他相关信息,然后批量返回消息。

Kafka 环境搭建

ZooKeeper

Kafka 集群有一些元数据和选举操作会依赖 ZooKeeper,这里需要先启动 ZooKeeper 集群,前文搭建 Demo 示例(demo-webapp 和 demo-provider)时,已经搭建好了 ZooKeeper 环境,这里直接启动就好了,不再重复。

Scala 环境

Kafka 是使用 Scala 语言编写的,Scala 是一种现代多范式编程语言,集成了面向对象和函数式编程的特性。Scala 语言需要运行在 Java 虚拟机之上,前面我们已经说明了 JDK8 的安装流程,不再赘述。这里使用 Scala 2.13 版本,首先从官网(https://www.scala-lang.org/download/)下载 Scala 安装包并执行如下命令解压:

tar -zxf scala-2.13.1.tgz

然后编辑 .bash_profile 文件添加 $SCALA_HONME ,如下所示:

export SCALA_HOME=/Users/xxx/scala-2.13.1
export PATH=$PATH:$JAVA_HOME:$SCALA_HOME/bin

编辑完成后,保存并关闭 .bash_profile 文件,执行 source 命令:

source .bash_profile

最后执行 scala -version 命令,看到如下输出即安装成功:

scala -version
Scala code runner version 2.13.1 -- Copyright 2002-2019, LAMP/EPFL and Lightbend, Inc.

安装 Kafka

首先从 kafka 官网(http://kafka.apache.org/downloads.html)下载 Kafka 的二进制安装包,目前最新版本是 2.4.0,我们选择在 Scala 2.13 上打包出的二进制包,如下图所示:

下载完毕之后,执行如下命令解压缩:

tar -zxf kafka_2.13-2.4.0.tgz

进入解压后的目录 /Users/xxx/kafka_2.13-2.4.0,创建一个空目录 logs 作为存储 Log 文件的目录。

然后打开 ./config/server.properties 文件,将其中的 log.dirs 这一项指向上面创建的 logs 目录,如下所示:

vim ./config/server.properties
# A comma separated list of directories under which to store log files
log.dirs=/Users/xxx/kafka_2.13-2.4.0/logs

最后执行如下命令即可启动 Kafka,启动过程中关注一下日志,不报错即可:

./bin/kafka-server-start.sh ./config/server.properties

验证

这里通过 Kafka 自带的命令行 Producer 和 Consumer 验证 Kafka 是否搭建成功。首先需要创建一个名为“test”的 Topic:

./bin/kafka-topics.sh --create --zookeeper localhost:2181 \--replication-factor 1 --partitions 1 --topic test
# 输出下面的一行,即为创建成功
Created topic test.

接下来启动命令行 Producer,并输入一条消息“This is a test Message”,以回车结束,如下所示:

./bin/kafka-console-producer.sh --broker-list localhost:9092 \--topic test
>This is a test Message

最后启动命令行 Consumer,可以接收到前面输入的消息,如下所示,即表示 Kafka 安装并启动成功:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \--topic test --from-beginning>This is a test Message

Agent 改造

SkyWalking Agent 在 TraceSegment 结束的时候,会通过 TraceSegmentServiceClient 将 TraceSegment 序列化并发送给后端 OAP。这里我们对其进行改造,将单一的 gRPC 上报方式修改成可配置的上报方式,可配置的方式有 gRPC 调用或是 Kafka 方式,修改后的结构如下图所示:

SegmentReportStrategy 接口中定义了发送 TraceSegment 数据的 report() 方法,如下所示:

public interface SegmentReportStrategy extends GRPCChannelListener{void report(List<TraceSegment> data);
}

在 AbstractSegmentReportStrategy 抽象类的 report() 方法中会根据当前发送请求打印日志信息(与 TraceSegmentServiceClient.printUplinkStatus() 方法类似),然后将请求委托给抽象方法 doReport() ,该方法由子类 KafkaSegmentReport 和 GrpcSegmentReporter 具体实现。

GrpcSegmentReportor 使用 gRPC 方式上报 TraceSegment 数据,具体逻辑与 TraceSegmentServiceClient 原有的 gRPC 上报方式相同,不再展开介绍。

再来看 KafkaSegmentReporter ,要使用 Kafka 方式上报,我们先要引入 Kafka Client 的依赖,如下所示:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version>
</dependency>

KafkaSegmentReporter 的大致逻辑是将序列化后的 UpstreamSegment 数据封装成一条消息,然后通过 Kafka Client 发送到指定的 Topic 中。在其构造函数中会出初始化 KafkaProducer 对象,具体实现如下:

public KafkaSegmentReporter(String topic) {if (!StringUtil.isEmpty(topic)) {this.topic = topic; // 默认 topic为 "sw_segment_topic"}Properties props = new Properties();// Kafka服务端的主机名和端口号,关于 Kafka集群的配置可以写到 agent.config// 配置文件中,然后通过 Config读取,这里为了演示简单,直接硬编码了props.put("bootstrap.servers", "localhost:9092");// UpstreamSegmentSerializer用来将UpstreamSegment对象序列化成字节数组props.put("value.serializer", "org.apache.skywalking.apm.agent.core.remote.UpstreamSegmentSerializer");producer = new KafkaProducer<>(props); // 生产者的核心类
}

KafkaProducer 是 Kafka Producer 的核心对象,它是线程安全的。在 doReport() 方法实现中会将 UpstreamSegment 封装成 ProducerRecord 消息发送出去,发送之前会使用上面指定的 UpstreamSegmentSerializer 将 UpstreamSegment 序列化成字节数组。 doReport() 方法的具体实现如下:

public void doReport(List<TraceSegment> data) {for (TraceSegment segment : data) {// 将 TraceSegment封装成 UpstreamSegment对象UpstreamSegment upstreamSegment = segment.transform();// 只添加了消息 value,并未指定消息的 keyProducerRecord<Object, UpstreamSegment> record = new ProducerRecord<>(topic, upstreamSegment);// 发送消息producer.send(record, (recordMetadata, e) -> {if (e != null) { // 该回调用来监听发送过程中出现的异常segmentUplinkedCounter += data.size();segmentAbandonedCounter += data.size();}});}
}

完成 SegmentReportStrategy 接口及其实现类之后,我们需要修改 TraceSegmentServiceClient,让其在 prepare() 方法中根据配置选择上报方式:

public void prepare() throws Throwable {ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);if (Config.Report.strategy == Strategy.GRPC) {segmentReportStrategy = new GrpcSegmentReporter();} else {segmentReportStrategy = new KafkaSegmentReporter(Config.Report.topic);}
}

在从 DataCarrier 中消费 TraceSegment 的时候,只需委托给当前 SegmentReportStrategy 对象即可,TraceSegmentServiceClient.consume() 方法的修改如下:

public void consume(List<TraceSegment> data) {segmentReportStrategy.report(data);
}

最后,我们在 demo-webapp、demo-provider 使用的 agent.config 配置文件的末尾添加如下配置,将它们切换为 Kafka 方式上报:

report.strategy = ${SW_LOGGING_LEVEL:KAFKA}

相应的在 Config 中需要添加相应的 Report 内部类来读取该配置:

public static class Report{public static Strategy strategy = Strategy.GRPC;
}

trace-receiver-plugin 改造

trace-receiver-plugin 插件本身使用 TraceSegmentReportServiceHandler 处理 gRPC 方式上报的 UpstreamSegment 数据,相关的逻辑无须做任何修改。

为了处理 Kafka 上报方式 ,我们先要引入 Kafka Client 的依赖,如下所示:

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.4.0</version>
</dependency>

之后我们添加一个 TraceSegmentReportServiceConsumer 类,在其构造函数中会初始化 Kafka Consumer 对象,如下所示(Kafka 集群的其他配置信息也可以配置化,这里为了方便直接硬编码了):

public TraceSegmentReportServiceConsumer(SegmentParseV2.Producer segmentProducer, String topic) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Broker的地址props.put("group.id", "sw_trace"); // 所属Consumer Group的Idprops.put("enable.auto.commit", "true"); // 自动提交offset// 自动提交offset的时间间隔props.put("auto.commit.interval.ms", "1000");props.put("session.timeout.ms", "30000");// value使用的反序列化器props.put("value.deserializer","org.apache.skywalking.oap.server    .receiver.trace.provider.handler.kafka.UpstreamSegmentDeserializer");this.consumer = new KafkaConsumer<>(props);this.segmentProducer = segmentProducer;this.topic = topic;// 负责消费的线程this.consumerExecutor = Executors.newSingleThreadScheduledExecutor();
}

在 TraceSegmentReportServiceConsumer.start() 方法中会启动任务,调用 cosume() 方法消费指定的 Kafka Topic(默认为 sw_segment_topic),具体实现如下:

private void consume() {consumer.subscribe(Arrays.asList(topic)); // 订阅Topicwhile (true) {// 从 Kafka集群拉取消息,每次poll()可以拉取多个消息ConsumerRecords<String, UpstreamSegment> records =consumer.poll(100);// 消费消息for (ConsumerRecord<String, UpstreamSegment> record:records){segmentProducer.send(record.value(), SegmentSource.Agent);}}
}

为了在 trace-receiver-plugin 插件启动时将 TraceSegmentReportServiceConsumer 一并启动,需要在 TraceModuleProvider.start() 方法中添加如下代码:

String reportStrategy = moduleConfig.getReportStrategy();
if(!StringUtil.isEmpty(reportStrategy) && "kafka".equals(reportStrategy.toLowerCase())){segmentReportServiceConsumer = new TraceSegmentReportServiceConsumer(segmentProducerV2,moduleConfig.getKafkaTopic());segmentReportServiceConsumer.start();
}

最后,要在 application.yml 配置文件以及 TraceServiceModuleConfig 中添加相应的配置项,如下所示:

public class TraceServiceModuleConfig extends ModuleConfig {... ... // 省略其他已有字段@Setter @Getter private String reportStrategy = "kafka";@Setter @Getter private String kafkaTopic = "sw_segment_topic";
}
receiver-trace:default:# 省略已有的配置信息reportStrategy: ${SW_REPORT_STRATEGY:kafka}kafkaTopic: ${SW_KAFKA_TOPIC:sw_segment_topic}

验证

为了验证上述的改造是否成功,我们将改造后的 Agent 切换成 Kafka 上报模式,打开 trace-receiver-plugin 插件接收 Kafka 上报 Trace 的功能,同时还可以开启一个命令行 Kafka Consumer。

还有就是要从 apm-sdk-plugin 模块中暂时删除 apm-kafka-v1-plugin-6.2.0 模块,该插件会拦截 Kafka Client 来生成 Trace,前文没有对该模块进行修改,会导致死循环生成 TraceSegment 的问题。这个问题属于如何让 SkyWalking 自己监控自己的问题,留给你自己思考一下如何解决。

完成上述操作之后,可以请求 http://localhost:8000/hello/xxx ,此时 demo-provider 和 demo-provider 都会分别生成两条 TraceSegment 并通过 Kafka 方式上报。在 Kafka 的命令行 Consumer 中可以看到如下输出:

在 SkyWalking Rocketbot UI 中可以查找到相应的完整 Trace 信息,如下图所示,即表示上述改造成果:

![在这里插入图片描述](https://img-blog.csdnimg.cn/339ccf962043433288eef4c74c97118f.png#pic_center)


第34讲:实现线程级别监控,轻松搞定 Thread Dump

本课时我们来学习 Thread Dump 功能。

背景

通过前面课时的介绍我们知道,SkyWalking 提供的 Agent 可以收集服务的 Metrics、Trace、Log 等维度的数据,然后发送到后端的 OAP 进行分析并进行持久化存储,我们可以使用 SkyWalking Rocketbot UI(或是直接使用 GraphQL)​ 从不同的维度查询上述数据,评估系统的各项性能和某些具体行为。

例如,我们可以通过 ServiceRespTimeMetrics、ServiceP99Metrics、ServiceCpmMetrics 等 Metrics 了解一个服务的整体吞吐量;可以通过 Trace 信息了解某个具体请求经过的核心组件和服务,以及在这些组件和服务上的耗时情况;可以通过 Trace 上携带的 Log 信息了解相应的异常信息;还可以根据 Trace 信息分析得到 Relation 信息,画出整个服务架构的拓扑图,了解各个服务之间的调用关系以及拓扑图每条调用边上的响应时间、SLA 等信息。这就可以帮助开发和运维人员更好地管理整个服务集群,更快地定位系统的热点和瓶颈,降低运维和问题定位的成本。

SkyWalking 已经满足了我们日常监控和运维的绝大多数需求,但是并没有覆盖到所有运维场景。假设我们发现请求在某个服务中的耗时特别长,远远超过了预期,例如开篇示例中的 demo-webapp ,如下图所示,在 HelloWorldController 在开始调用 Dubbo 服务的前后,会有耗时超过 1s 以上情况:

此时,SkyWalking 的 Trace 信息只能提示我们 HelloWorldController.hello() 方法中有一些耗时的逻辑,但是耗时的具体原因是什么无法准确地说明。实际的业务逻辑比较复杂,请求处理耗时高的原因也可能千奇百怪,例如(可能但不限于):

  • 多个线程并发竞争同一把锁;

  • 读写文件,线程等待 I/O 操作;

  • 代码逻辑本身的性能有问题,时间复杂度太高。

如果通过 Trace 以及 Metrics 不能明确定位高耗时的问题,我们使用 jstack 工具将线程的栈信息 dump 下来,然后分析线程在哪一个调用中耗时较长。在现实场景中,往往一次 dump 的信息是不足以确认问题的,为了反映线程状态的动态变化,需要连续多次做 Thread Dump,每次间隔根据具体的场景决定,建议至少产生三次以上的 Thread Dump 信息,如果每次 Thread Dump 都指向同一个问题,一般就能够确定具体的问题。

在实际的微服务场景中进行 Thread Dump 时,你可能会遇到几个问题:

  • 如果多个服务都有耗时高的情况,就需要我们去多个服务的机器上进行 Thread Dump,比较麻烦,而且也很难确定不同服务的 Thread Dump 信息是否存在关联。

  • 请求一般会经过多个服务端处理,每个服务又是单独的一个集群。如果是某些特殊参数的请求触发了高耗时,我们很难手动捕捉到该请求走到了服务的那个实例上,这台机器上去进行 Thread Dump 就比较困难。

  • 如果要求某些服务的响应时延非常低的情况下,虽然服务的延迟高了,但是相对人来说的时间是非常短的,而我们手动 Thread Dump 的速度和次数都是有限的,可能错过问题所在的逻辑,导致问题定位错误。

Thread Dump 需求

为了解决在上述场景下手动 Thread Dump 带来的问题,本课时将为 SkyWalking 添加 Thread Dump 功能。下面先说明一下 Thread Dump 的需求:一般场景中,用户会通过一个外网的入口请求我们的接入层(例如机房的 Nginx 集群),然后接入层会进行负载均衡,将请求发送到后端的 API 服务集群进行处理(例如 Tomcat 集群),API 服务会根据业务需求调用后端的 RPC 服务(例如 Dubbo、gRPC 等),在 RPC 服务中会调用 Service 层、DAO 层等完成存储的读写或是再次调用其他 RPC 服务。单个请求的路径如下图所示:

为了实现自动 Thread Dump 功能,我们会在入口处为 Http 请求追加一个 Http Header(Key 为 ENABLE_DUMP_FLAG,Value 为“true”),作为是否进行 Thread Dump 的标识。如果请求带有该标识,线程在处理该请求时每隔一段时间(例如 300ms)会被 dump 一次,这些 dump 下来的信息会记录到请求的 Trace 中,一并发送给 SkyWalking OAP 进行持久化存储。在后续通过 query-graphql-plugin 插件查询某条 Trace 信息的时候,可以将这些 dump 信息一起查询出来,在 SkyWalking Rocketbot UI 进行展示时,可以根据 Thread Dump 的时间将其显示在相应的 Span 处,当然,也可以在 OAP 接收到 Trace 数据时对其中的 Thread Dump 信息进出分析并完成与 Span 的关联。

首先要了解,在 Java 代码中使用 ThreadMXBean 即可完成全部线程的 Thread Dump ,下面是一段简单的示例代码:

ThreadMXBean bean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = bean.dumpAllThreads(true, true);
for (ThreadInfo threadInfo : threadInfos) {System.out.println(threadInfo);
}
// 部分输出如所示,我们可以看到每个线程的状态信息以及具体的调用栈:
"Reference Handler" Id=2 WAITING on java.lang.ref.Reference$Lock@1517365bat java.lang.Object.wait(Native Method)-  waiting on java.lang.ref.Reference$Lock@1517365bat java.lang.Object.wait(Object.java:502)at java.lang.ref.Reference.tryHandlePending(Reference.java:191)at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)

“main” Id=1 RUNNABLE
at sun.management.ThreadImpl.dumpThreads0(Native Method)
at sun.management.ThreadImpl.dumpAllThreads(ThreadImpl.java:454)
at com.xxx.sw.Main.main(Main.java:14)

Thread Dump 功能实现

从 demo-webapp 这个 API 服务处理“/hello/xxx”接口请求的 Trace 中可以看到,请求首先到达了 Spring Boot 内嵌的 Tomcat 容器,然后走到 Spring Container 中调用 HelloWorldController.hello() 方法 ,最后调用 demo-provider 这个 Dubbo 服务。下图展示了请求的全过程、涉及插件以及插件做的事情:

请求的第一站是 Tomcat ,我们可以在 tomcat-7.x-8.x-plugin 插件创建 TracingContext 之前将请求 Header 中携带的 ENABLE_DUMP_FLAG 标记提取出来,并记录到 Trace 的 RuntimeContext 上下文中,这样就可以让 ENABLE_DUMP_FLAG 标记随 Trace 在当前线程继续传播了,实现如下:

// ENABLE_DUMP_FLAG 标记在Http Header 和 RuntimeContext中使用相应的Key
// 可以将"ENABLE_DUMP_FLAG"字符串抽到 Constants作为常量,后续可以重复使用
ContextManager.getRuntimeContext().put("ENABLE_DUMP_FLAG",request.getHeader("ENABLE_DUMP_FLAG")
);
// 对 ContextCarrier 的处理后面会介绍

前面在介绍 SkyWalking Agent 的时候提到,在 TracingContext 关闭的时候会回调全部 TracingContextListener 监听器,其中就包括 TraceSegmentServiceClient,它会将该 Trace 发送到后端的 OAP 服务,这是典型的观察者模式的应用。我们可以参考这种实现,定义一个 TracingContextPostConstructListener 接口来处理 TracingContext 创建的事件,如下所示:

public interface TracingContextPostConstructListener {void postConstruct(TracingContext tracingContext);
}

在 TracingContext 中新增 postConstruct() 方法回调全部 TracingContextPostConstructListener 实现,并在 TracingContext 构造方法最后调用该方法,其具体剩下如下:

private void postConstruct() { TracingContext.ListenerManager.notifyPostConstruct(this);
}

在 TracingContext.ListenerManager 中会新增 POST_CONSTRUCT_LISTENERS 字段(List <TracingContextPostConstructListener> 类型)来记录当前全部的 TracingContextPostConstructListener 对象,并提供相应 add()、addFirst() 、remove() 等方法,这里的 notifyPostConstruct() 方法会回调全部的 TracingContextPostConstructListener 对象,通知它们该 TracingContext 对象构造完毕。这与 TracingContext.ListenerManager 处理 TracingContextListener 的方式一模一样,具体实现不再展开。

下面我们需要提供了一个 TracingContextPostConstructListener 接口的实现 —— ThreadDumpManager,它同时实现了 BootService、TracingContextListener、TracingContextPostConstructListener 三个接口,如下图所示,下面将详细分析该实现针对每个接口的实现逻辑:

首先在 onComplete() 方法(对 BootService 接口的实现)中会启动一个单独的线程执行一个定时任务,该定时任务主要做两件事:

  • 定时通过 ThreadMXBean 获取线程的 dump 信息。

  • 查找到处理 ENABLE_DUMP_FLAG 标记请求的线程,并将该线程的 dump 信息与 Trace 关联起来。

具体实现如下:

// 其中 Key处理标记请求的线程 ID,Value是线程的 dump 信息,该线程在处理标记请求
// 的过程中,可能会被 dump 多次,所以 Value 是 List<ThreadDump>集合
private Map<Long, List<ThreadDump>> dumpStore = Maps.newConcurrentMap();
private ScheduledExecutorService scheduledExecutorService;

@Override
public void onComplete() throws Throwable {
// 创建并启动后台线程
    scheduledExecutorService = Executors
.newSingleThreadScheduledExecutor();
    scheduledExecutorService.scheduleAtFixedRate(
            new RunnableWithExceptionProtection(this::doThreadDump,
                    t -> logger.error(“thread dump error.”, t)),
dumpPeriod, dumpPeriod, TimeUnit.MILLISECONDS);
// 将 ThreadDumpManager作为 TracingContextListener接口实现进行注册
    TracingContext.ListenerManager.addFirst(
(TracingContextListener) this);
// 将 ThreadDumpManager作为 TracingContextPostConstructListener
// 接口实现进行注册
    TracingContext.ListenerManager.addFirst(
(TracingContextPostConstructListener) this);
}

ThreadDumpManager.doThreadDump() 方法是后台线程的核心,它会请求 ThreadMXBean 获取线程的 dump 信息并记录到 dumpStore 集合中,具体实现如下:

public void doThreadDump() {long dumpTimestamp = System.currentTimeMillis();ThreadMXBean bean = ManagementFactory.getThreadMXBean();// 获取全部线程的 dump信息ThreadInfo[] threadInfos = bean.dumpAllThreads(true, true);for (ThreadInfo threadInfo : threadInfos) {long threadId = threadInfo.getThreadId();// 根据监控的线程ID,将相应的 dump信息记录到 dumpStore集合中List<ThreadDump> threadDumps = this.dumpStore.get(threadId);if (threadDumps != null) {// 创建 ThreadDump来记录线程 dump信息ThreadDump threadDump = ThreadDump.newBuilder().setDumpTime(dumpTimestamp).setThreadInfo(threadInfo.toString()).build();threadDumps.add(threadDump);}}
}

这里使用到了一个新类型 —— ThreadDump ,然后将其定义添加到 trace.proto 文件中,并在 SegmentObject 中添加一个 threadDumps 字段,如下所示:

message SegmentObject {UniqueId traceSegmentId = 1;repeated SpanObjectV2 spans = 2;... ...// 省略3~5的字段repeated ThreadDump threadDumps = 6;
}

message ThreadDump {
int64 dumpTime = 1;
string threadInfo = 2;
}

ThreadDump 主要用于记录一条 Thread Dump 数据以及 dump 操作的时间戳,后续会在 ThreadDumpManager 中将其添加到关联的 SegmentObject 对象中。

回到 ThreadDumpManager,作为 TracingContextPostConstructListener 接口的实现,在其 postConstruct() 方法中会从 TracingContext 关联的 RuntimeContext 中获取 ENABLE_DUMP_FLAG 标记,如果标记为 true,则将当前线程 ID 添加到 dumpStore 集合中存储,具体实现如下:

@Override
public void postConstruct(TracingContext tracingContext) {RuntimeContext runtimeContext= ContextManager.getRuntimeContext();Object enableDumpFlag = runtimeContext.get(Constants.ENABLE_DUMP_FLAG);if (Constants.ENABLE_DUMP_VAULES.equals(enableDumpFlag.toString().toLowerCase())) {// 将当前线程的ID添加到 dumpStore集合中dumpStore.put(Thread.currentThread().getId(), Lists.newLinkedList());// 在TracingContext中也添加了显影的tracingContext.setEnableDumpFlag(Constants.ENABLE_DUMP_VAULES);}
}

ThreadDumpManager 作为 TracingContextListener 的实现,其 afterFinished() 方法实现会在 TracingContext 关闭之后,立即关联相应的 Thread Dump 信息,具体实现如下:

public void afterFinished(TraceSegment traceSegment) {long threadId = Thread.currentThread().getId();List<ThreadDump> threadDumps = dumpStore.get(threadId);traceSegment.setThreadDumps(threadDumps);removeThread(threadId);
}

注意,ThreadDumpManager 作为 TracingContextListener 需要先于 TraceSegmentServiceClient 这个监听执行,否则是在 Trace 数据发送出去之后再进行关联,后端 OAP 感知不到 ThreadDump 信息。

跨进程/跨线程传播

在上一课时,我们重点处理了入口 Http 服务携带的 ENABLE_DUMP_FLAG 标记。在微服务架构中,如果入口的 Http 请求带了 ENABLE_DUMP_FLAG 标记,后续跨进程的 RPC 调用也是需要传递该标记的。这里将改造 ContextCarrier 以及 TracingContext 的相关方法,实现传播 ENABLE_DUMP_FLAG 标记的功能。

首先需要修改一下 ContextCarrier 序列化之后的字符串结构,SkyWalking 原始的 ContextCarrier 持久化后的字符串包括下面 9 个部分,且相互之间通过字符串“-”连接起来:

  1. 固定字符串“1”

  2. TraceId

  3. TraceSegmentId

  4. SpanId

  5. ParentServiceInstanceId

  6. EntryServiceInstanceId

  7. PeerHost

  8. EntryEndpointName

  9. ParentEndpointName

这里需要添加一个新的部分用于记录当前线程的 ENABLE_DUMP_FLAG 标记。在 serialize() 方法中针对 V2 版本 ContextCarrier 持久化逻辑修改如下:

String serialize(HeaderVersion version) {return StringUtil.join('-', "1",Base64.encode(this.getPrimaryDistributedTraceId().encode()),Base64.encode(this.getTraceSegmentId().encode()),this.getSpanId() + "",this.getParentServiceInstanceId() + "",this.getEntryServiceInstanceId() + "",Base64.encode(this.getPeerHost()),Base64.encode(this.getEntryEndpointName()),Base64.encode(this.getParentEndpointName()),this.enableDumpFlag); // 新增 enableDumpFlag 部分
}

enableDumpFlag 是 ContextCarrier 中新增的一个字段,用于记录当前线程的标记信息。

接下来看 deserialize() 方法,其中兼容了原始 ContextCarrier 字符串以及上述改造后的 ContextCarrier 字符串,如下所示:

ContextCarrier deserialize(String text, HeaderVersion version) {String[] parts = text.split("\\-", 10);if (parts.length == 9 || parts.length == 10) {// parts[0] is sample flag, always trace if header exists.this.primaryDistributedTraceId = new PropagatedTraceId(Base64.decode2UTFString(parts[1]));this.traceSegmentId = new ID(Base64.decode2UTFString(parts[2]));this.spanId = Integer.parseInt(parts[3]);this.parentServiceInstanceId = Integer.parseInt(parts[4]);this.entryServiceInstanceId = Integer.parseInt(parts[5]);this.peerHost = Base64.decode2UTFString(parts[6]);this.entryEndpointName = Base64.decode2UTFString(parts[7]);this.parentEndpointName = Base64.decode2UTFString(parts[8]);if (parts.length == 10) {this.enableDumpFlag = parts[9];}}return this;
}

最后,在 TracingContext.inject() 方法填充 ContextCarrier 对象的时候,需要同时填充 enableDumpFlag 字段,如下所示:

public void inject(ContextCarrier carrier) {... ...// 省略前面的原始代码Object enableDumpFlag = ContextManager.getRuntimeContext().get(Constants.ENABLE_DUMP_FLAG);carrier.setEnableDumpFlag(enableDumpFlag == null ? "" : enableDumpFlag.toString());
}

我们可以在 TracingContext.extract() 方法中检测 ContextCarrier 对象携带的 ENABLE_DUMP_FLAG 标记值,之后通过前文介绍的 ThreadDumpManager 记录线程 ID 并由后台线程进行 dump。这种实现方式会与前文添加的 TracingContextPostConstructListener 接口的目的冲突。

另一种实现方式是在各个 agent-plugin 的入口(创建 TracingContext 之前)处理 ContextCarrier 对象携带的 ENABLE_DUMP_FLAG 标记,并设置到 RuntimeContext 中。例如通过 Dubbo 实现的 RPC 调用,我们可以对 apm-dubbo-plugin 插件中的 DubboInterceptor 进行如下修改:

public void beforeMethod(...) {if (isConsumer) {... ... // 省略Consumer创建 ExitSpan以及ContextCarrier的逻辑} else {... ... // 从 RpcContext中获取 ContextCarrier字符串并反序列化(略)// 将ENABLE_DUMP_FLAG标记记录到 RuntimeContext中ContextManager.getRuntimeContext().put(Constants.ENABLE_DUMP_FLAG,contextCarrier.getEnableDumpFlag());// 创建 TracingContext,其中会触发// TracingContextPostConstructListener,从而记录需要dump的线程span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);}// 省略后续设置 Tag、Component以及SpanLayer的相关代码
}

最后,在跨线程调用的时候,TracingContext 信息会通过其 capture() 方法生成的 ContextSnapshot 对象传递,在接收调用的线程中会通过 TracingContext.continued() 方法从 ContextSnapshot 中还原数据。这里需要在 ContextSnapshot 中添加相应字段并改造 capture() 方法以及 continued() 方法,具体逻辑与跨进程调用类似,这里就不再重复,留给你动手实践。

OAP 改造

完成 apm-agent-core 以及 tomcat-7.x-8.x-plugin、apm-dubbo-plugin 插件的改造之后,带有 Thread Dump 的 Trace 可以重构传递到后端 OAP 服务。

通过前文对 trace-receiver-plugin 插件的介绍我们知道,其中的 SegmentParseV2 会解析收到的 UpstreamSegment 得到相应的 TraceSegment,然后交给所有 RecordStreamProcessor 处理,如果存储选择 ElasticSearch,则 TraceSegment 的全部数据最终会按照序列化的格式存储到 segment-yyyyMMdd 索引中的 data_binary 字段中,当然也包括前面新增的 Thread Dump 信息。因此,整个 trace-receiver-plugin 插件以及 OAP 中存储相关的逻辑是无须进行改动的。

需要改动的是 OAP 查询 Trace 的相关逻辑。首先是 query-graphql-plugin 插件,在 trace.graphqls 中我们新增一个 ThreadDump 类型用于展示线程 dump 信息,具体实现如下所示:

type ThreadDump{dumpTimestamp: Long!threadInfo: String!
}
type Trace {spans: [Span!]!threadDumps: [ThreadDump!]! // 在 Trace 中添加 threadDumps集合
}

相应的,需要修改 GraphQL​ 相应的 Java 对象。首先在 server-core 模块的org.apache.skywalking.oap.server.core.query.entity 包添加一个 ThreadDump 对象,如下所示:

@Getter
@Setter
public class ThreadDump {private long dumpTimestamp;private String threadInfo;
}

相应的 Trace 类中也要添加 threadDumps 字段(List <ThreadDump> 类型),如下所示:

@Getter
public class Trace {private final List<Span> spans;private final List<ThreadDump> threadDumps;
}

接下来就是填充该 ThreadDump 集合,TraceQuery.queryTrace() 方法是查询 Trace 详细信息的入口。在其中完成所有 SegmentRecord 的查询之后,我们可以将每个 Segment 携带的 ThreadDump 取出来填充上述 ThreadDump 集合。具体实现如下所示:

public Trace queryTrace(final String traceId) throws IOException {Trace trace = new Trace();// 根据traceId查询所有关联的 SegmentObjectList<SegmentRecord> segmentRecords =getTraceQueryDAO().queryByTraceId(traceId);for (SegmentRecord segment : segmentRecords) {// 反序列化 SegmentObjectSegmentObject segmentObject = SegmentObject.parseFrom(segment.getDataBinary());// 解析 SegmentObject中的 Span,填充到 Trace中trace.getSpans().addAll(buildSpanV2List(traceId, segment.getSegmentId(), segment.getServiceId(), segmentObject.getSpansList()));// 填充 ThreadDump集合trace.getThreadDumps().addAll(buildThreadDumpList(segmentObject.getThreadDumpsList()));}... ...// 省略整理Trace中Span的顺序等操作,// 这些逻辑在前文f分析query-graphql-plugin插件时已经详细分析过return trace;
}

最后,在 SkyWalking 源码目录下执行如下 maven 命令重新编译打包:

mvn clean
mvn package -Dcheckstyle.skip -DskipTests

执行完成后,首先启动 ElasticSearch 和 ZooKeeper 两个服务,然后依次启动 OAP、demo-provider、demo-webapp 以及 apm-webapp,请求 http://localhost:8000/hello/xxx 地址可以正常相应,且能够在 SkyWalking Rocketbot UI 中看到相应 Trace 信息表名修改未破坏对原始 Trace 的兼容。我们还可以使用 PostMan 在 Http 请求中携带 ENABLE_DUMP_FLAG:true 的 Header,然后通过 GraphQL Playground 查询,可以得到类似如下的结果:

{"data": {"trace": {"spans": [... ... // 省略该 Trace中的Span信息 ],"threadDumps": [ // 该Trace携带的ThreadDump信息{"dumpTimestamp": 1580029989057,"threadInfo": "\"DubboServerHandler-172.17.32.91:20880-thread-36\" Id=106 TIMED_WAITING\n\tat java.lang.Thread.sleep(Native Method)\n\tat com.xxx.service.DefaultHelloService.say$original$MUzxmS45(DefaultHelloService.java:15)\n\t ... ..."}// 省略其他 ThreadDump信息]}}
}

总结

本课时最后将通过一张图来总结 Thread Dump 功能的关键点:

  • Http 请求进入 demo-webapp 之后,tomcat-7.x-8.x-plugin 插件会从其 Header 中查找 ENABLE_DUMP_FLAG 标记并记录到 RuntimeContext 中。之后通过 ContextManager 创建此次请求对应的 TracingContext 对象以及 EntrySpan,在完成 TracingContext 的初始化之后会触发 TracingContextPostConstructListener,即 ThreadDumpManager,记录需要进行 dump 的线程 ID。后续请求执行过程中会调用 create*Span() 方法创建 Span,同时 ThreadDumpManager 中的后台线程也会定时 dump 线程信息,如图中(3)和(4)处所示。

  • 接下来,在 demo-webapp 通过 Dubbo 调用 demo-provider 服务的时候,会将生成的 ContextCarrier 对象(包含 ENABLE_DUMP_FLAG 标记)序列化成字符串添加到 RpcContext 中,随 Dubbo 请求发送到下游的 demo-provider。在 demo-provider 服务的 dubbo-plugin 插件中会处理 ContextCarrier,当然也会处理 ENABLE_DUMP_FLAG 标记。

  • 回到(5)处,demo-webapp 处理完请求后会关闭 TracingContext,同时会触发所有 TraceContextListener 监听器,其中 ThreadDumpManager 会根据记录的线程 ID 关联 ThreadDump 与 TraceSegment,TraceSegmentServiceClient 则负责通过 gRPC 将序列化后的 TraceSegment 数据发送到后端的 OAP 集群。

  • OAP 服务中的 trace-receiver-plugin 负责接收 Agent 发送的 TraceSegment 数据,解析之后会由 RecordStreamProcessor 存储到 ElasticSearch 中。

  • OAP 服务中的 query-graphql-plugin 插件负责处理查询 Trace 的请求,这里会从 SegmentObject 中获取全部 ThreadDump 填充到 Trace 中返回给用户。

好了,本专栏的全部内容就讲完了,最后的彩蛋我将带你回顾 SkyWalking 架构并展望未来。


微服务链路追踪SkyWalking第十一课 OAL详解实战相关推荐

  1. 微服务链路追踪SkyWalking第八课 OAP的receiver模块详解

    第22讲:深入剖析 regiter-receiver-plugin 插件(上) 在上一课时中,重点介绍了 SkyWalking 存储层的框架设计以及核心接口.从本节课开始,我们将深入 SkyWalki ...

  2. 微服务链路追踪-SkyWalking

    微服务链路追踪-SkyWalking SkyWalking官网地址:https://skywalking.apache.org/ SkyWalking官方文档:https://skywalking.a ...

  3. 微服务链路追踪SkyWalking

    微服务链路追踪SkyWalking 链路追踪介绍 skywalking是什么 SkyWalking环境搭建部署 SkyWalking跨多个微服务跟踪 SkyWalking UI介绍 SkyWalkin ...

  4. 微服务链路追踪SkyWalking第一课 SkyWalking简介

    开篇词:从剖析 SkyWalking 源码到吃透 APM 核心知识 你好,我是你的 SkyWalking 老师徐郡明,网名吴小胖,你也可以叫我胖哥.进入互联网行业工作多年,主要从事基础组件开发相关的工 ...

  5. SkyWalking 微服务链路追踪

    目录 8. SkyWalking 微服务链路追踪 8.1 介绍 SkyWalking 8.2 Skywalking---服务搭建 8.3 SkyWalking---接入服务 8.3.1 windows ...

  6. skywalking原理_微服务链路追踪原理

    作者:平也 来源:关爱程序员社区 背景介绍 在微服务横行的时代,服务化思维逐渐成为了程序员的基本思维模式,但是,由于绝大部分项目只是一味地增加服务,并没有对其妥善管理,当接口出现问题时,很难从错综复杂 ...

  7. 全网最全的微服务链路追踪实践-SkyWalking(看这一篇就够了)

    链路追踪介绍 对于一个大型的几十个.几百个微服务构成的微服务架构系统,通常会遇到下面一些问题,比如: 1. 如何串联整个调用链路,快速定位问题? 2. 如何缕清各个微服务之间的依赖关系? 3. 如何进 ...

  8. 阿里P7架构师详解微服务链路追踪原理

    背景介绍 在微服务横行的时代,服务化思维逐渐成为了程序员的基本思维模式,但是,由于绝大部分项目只是一味地增加服务,并没有对其妥善管理,当接口出现问题时,很难从错综复杂的服务调用网络中找到问题根源,从而 ...

  9. 微服务链路追踪之zipkin搭建

    前言 微服务治理方案中,链路追踪是必修课,SpringCloud的组件其实使用很简单,生产环境中真正令人头疼的往往是软件维护,接口在微服务间的调用究竟哪个环节出现了问题,哪个环节耗时较长,这都是项目上 ...

最新文章

  1. JavaWeb:JDBC之事务
  2. bzoj1045: [HAOI2008] 糖果传递
  3. 实战SSM_O2O商铺_42【前端展示】店铺列表页面View层的实现
  4. AJPFX学习笔记JavaAPI之String类
  5. 并发集合和普通集合以及安全集合的区别
  6. airflow sql_alchemy_conn mysql_搭建AirFlow—— 一段波折后的总结
  7. Apache配置同一IP使用多域名对应多个网站
  8. x学校计算机及网络维护方案,校园计算机网络常见故障的处理与维护
  9. Servlet容器中web.xml配置context-param与init-param
  10. 吉首大学2019年程序设计竞赛-F 天花乱坠
  11. Python(7)-程序执行的原理
  12. Oracle 20c 新特性:基础级内存数据库免费功能 In-Memory Base Level
  13. easymock_EasyMock TestNG示例
  14. vue-video-player的使用方法,vue-video-player在移动端点击画面不触发事件不能暂停播放的解决方法
  15. Matlab中绘制折线图(附matlab代码)
  16. matlab符号函数绘图法_MATLAB符号运算实验
  17. 28388D上电时从BOOT跳转到main过程分析
  18. 佛系前端面试题记录--第三周
  19. 需要计算机安装msxml,怎么在电脑上安装msxml6.0?教大家具体安装步骤
  20. jzoj 3457. 【NOIP2013模拟联考3】沙耶的玩偶(doll) (Standard IO)

热门文章

  1. Java 实现 YoloV7 目标检测
  2. 最大公约数、最小公倍数与算术基本定理
  3. 第一天、python之路
  4. python之路-基础篇
  5. 机器学习预测nba_通过机器学习预测2020年NBA季后赛支架
  6. 什么是IOS 和 GHO 文件?
  7. OPT3001光强传感器驱动实现(STM32F407)
  8. 【docker-compose】一键部署WordPress博客
  9. [网络安全学习篇65]:提权
  10. 蓝桥杯 嵌入式 STMG431RBT6 综合测试