skywalking的架构图如下:

Skywalking的agent负责采集数据,发送到collector,collector聚合,并且存储这些数据,且提供了一个简洁使用的UI端,可共我们查看监控的指标。

下面我们来开始分析skywalking的源码。

下载源码并构建

因为skywalking为了实现高性能通信,采用的是grpc的方式来实现服务器与客户端的数据传输的,所以导入之后我们需要稍微做一些事情,我们可以参考docs/en/guides/How-to-build.md这篇文档来构建。

打包构建

我们可以在github上面将skywalking源码fork一份,然后下载到自己的本地。

// 直接

git clone --recurse-submodules https://github.com/apache/skywalking.git

// 或者

git clone https://github.com/apache/skywalking.git

cd skywalking/

git submodule init

git submodule update

执行命令:

./mvnw clean package -DskipTests

最终打好的包在dist目录下面

在IDEA里面构建源码

用IDEA打开skywalking项目(作为maven项目导入)

然后在skywalking所在目录命令行运行脚本去编译构建(因为skywalking用到了grpc):

./mvnw compile -Dmaven.test.skip=false

然后查看设置生成的源代码(主要是看potobuf文件编译生成的源代码)

apm-protocol/apm-network/target/generated-sources/protobuf选中这个目录下面的grpc-java和java,然后右键选择Mark Directory As-->Generated Sources Root如下图所示

oap-server/server-core/target/generated-sources/protobuf目录的grpc-java和java文件夹Mark Directory As-->Generated Sources Root`

oap-server/server-receiver-plugin/receiver-proto/target/generated-sources/protobuf目录的grpc-java和java文件夹Mark Directory As-->Generated Sources Root`

oap-server/exporter/target/generated-sources/protobuf目录的grpc-java和java文件夹Mark Directory As-->Generated Sources Root`

oap-server/server-configuration/grpc-configuration-sync/target/generated-sources/protobuf目录的grpc-java和java文件夹Mark Directory As-->Generated Sources Root`

oap-server/oal-grammar/target/generated-sources目录的grpc-java和java文件夹Mark Directory As-->Generated Sources Root`

在Eclipse里面构建源码

1、按照maven项目导入到eclipse中

2、添加一下内容到skywalking/pom.xml中

org.codehaus.mojo

build-helper-maven-plugin

1.8

add-source

generate-sources

add-source

src/java/main

apm-protocol/apm-network/target/generated-sources/protobuf

apm-collector/apm-collector-remote/collector-remote-grpc-provider/target/generated-sources/protobuf

3、添加如下内容,使得eclipse的M2e插件能够支持扩展配置

org.eclipse.m2e

lifecycle-mapping

1.0.0

org.codehaus.mojo

build-helper-maven-plugin

[1.8,)

add-source

4、apm-collector-remote/collector-remote-grpc-provider/pom.xml文件中添加如下依赖

com.google.guava

guava

24.0-jre

5、执行命令

./mvnw compile -Dmaven.test.skip=true

6、执行命令

先执行maven clean,然后maven update

7、执行命令:

./mvnw compile

8、刷新项目

源码分析

skywalking的分布式链路追踪流程大致如下:

Agent采集数据

Agent发送数据到Collector

Collector接收数据

Collector将接收的数据存储到持久层

我们这里主要探探Agent采集Java类系统的数据比如spring等,以spring cloud的一个简易的分布式系统来讲:

这是一个建议的订单系统,有Eureka,Order,Product,Stock,当下订单的时候,order会调用product,product又会调用stock。

我们从apm-sniffer工程开始出发(sniffer即嗅探器、探针的意思)

apm-agent工程

我们看到这个工程只有一个类

org.apache.skywalking.apm.agent.SkyWalkingAgent

这个类有一个方法:

/**

* 主入口,使用byte-buddy来实现来增强插件里面定义的所有的类。

*/

public static void premain(String agentArgs, Instrumentation instrumentation) throws PluginException, IOException {

}

Agent采集数据

我们这里主要介绍JVM的数据和spring相关的数据

JVM的数据

我们看到在apm-agent-core里面有类:org.apache.skywalking.apm.agent.core.jvm.JVMService

这个类实现了BootService和java.lang.Runnable接口,而这个类是怎么执行里面的一些方法的呢?在apm-agent-core这个工程的/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService文件里面有很多类的全限定名信息:

org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient

org.apache.skywalking.apm.agent.core.context.ContextManager

org.apache.skywalking.apm.agent.core.sampling.SamplingService

org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager

org.apache.skywalking.apm.agent.core.jvm.JVMService

org.apache.skywalking.apm.agent.core.remote.ServiceAndEndpointRegisterClient

org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService

org.apache.skywalking.apm.agent.core.commands.CommandService

org.apache.skywalking.apm.agent.core.commands.CommandExecutorService

org.apache.skywalking.apm.agent.core.context.OperationNameFormatService

而这每个类都实现了BootService这个借口,BootService是所有当插件机制开始起作用时需要启动的远程交换需要实现的接口。BootService启动的时候将调用boot方法。

org.apache.skywalking.apm.agent.core.boot.ServiceManager这个类里面会将所有实现BootService的类的实例都执行一遍。

JVMService类实例化后执行的boot方法内容如下

@Override

public void boot() throws Throwable {

// 创建一个持续收集(生产)指标的单一线程的线程池,这个线程池会定期(每秒)执行,而且执行的是JVMService的run方法

collectMetricFuture = Executors

.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-produce"))

.scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {

@Override public void handle(Throwable t) {

logger.error("JVMService produces metrics failure.", t);

}

}), 0, 1, TimeUnit.SECONDS);

// 创建一个持续发送(消费)数据的单一线程的线程池,这个线程池会定期(每秒)执行,而且执行的是JVMService的内部类Sender的run方法

sendMetricFuture = Executors

.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("JVMService-consume"))

.scheduleAtFixedRate(new RunnableWithExceptionProtection(sender, new RunnableWithExceptionProtection.CallbackWhenException() {

@Override public void handle(Throwable t) {

logger.error("JVMService consumes and upload failure.", t);

}

}

), 0, 1, TimeUnit.SECONDS);

}

JVMService类的run方法:

public void run() {

if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()

&& RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()

) {

long currentTimeMillis = System.currentTimeMillis();

try {

JVMMetric.Builder jvmBuilder = JVMMetric.newBuilder();

jvmBuilder.setTime(currentTimeMillis);

jvmBuilder.setCpu(CPUProvider.INSTANCE.getCpuMetric());

jvmBuilder.addAllMemory(MemoryProvider.INSTANCE.getMemoryMetricList());

jvmBuilder.addAllMemoryPool(MemoryPoolProvider.INSTANCE.getMemoryPoolMetricsList());

jvmBuilder.addAllGc(GCProvider.INSTANCE.getGCList());

// JVM指标数据

JVMMetric jvmMetric = jvmBuilder.build();

// 收集数据后,放到消息队列LinkedBlockingQueue queue中

if (!queue.offer(jvmMetric)) {

queue.poll();

queue.offer(jvmMetric);

}

} catch (Exception e) {

logger.error(e, "Collect JVM info fail.");

}

}

}

内部Sender类的run方法:

@Override

public void run() {

if (RemoteDownstreamConfig.Agent.SERVICE_ID != DictionaryUtil.nullValue()

&& RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID != DictionaryUtil.nullValue()

) {

if (status == GRPCChannelStatus.CONNECTED) {

try {

JVMMetricCollection.Builder builder = JVMMetricCollection.newBuilder();

LinkedList buffer = new LinkedList();

queue.drainTo(buffer);

if (buffer.size() > 0) {

builder.addAllMetrics(buffer);

builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);

// 发送数据并接收返回的结果

Commands commands = stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).collect(builder.build());

ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);

}

} catch (Throwable t) {

logger.error(t, "send JVM metrics to Collector fail.");

}

}

}

}

而具体数据是怎么发送的呢?我们来看采集的指标类JVMMetric.java

public  final class JVMMetric extends

com.google.protobuf.GeneratedMessageV3 implements

// @@protoc_insertion_point(message_implements:JVMMetric)

JVMMetricOrBuilder {

...

}

其实这个类是JVMMetric.proto编译后生成的,而JVMMetric.proto内容如下:

syntax = "proto3";

option java_multiple_files = true;

option java_package = "org.apache.skywalking.apm.network.language.agent.v2";

option csharp_namespace = "SkyWalking.NetworkProtocol";

import "common/common.proto";

import "common/JVM.proto";

service JVMMetricReportService {

// grpc定义的方法,参数类型JVMMetricCollection,返回类型为:Commands

rpc collect (JVMMetricCollection) returns (Commands) {

}

}

message JVMMetricCollection {

repeated JVMMetric metrics = 1;

int32 serviceInstanceId = 2;

}

common.proto内容如下:

syntax = "proto3";

option java_multiple_files = true;

option java_package = "org.apache.skywalking.apm.network.common";

option csharp_namespace = "SkyWalking.NetworkProtocol";

message KeyStringValuePair {

string key = 1;

string value = 2;

}

message KeyIntValuePair {

string key = 1;

int32 value = 2;

}

message CPU {

double usagePercent = 2;

}

// In most cases, detect point should be `server` or `client`.

// Even in service mesh, this means `server`/`client` side sidecar

// `proxy` is reserved only.

enum DetectPoint {

client = 0;

server = 1;

proxy = 2;

}

message Commands {

repeated Command commands = 1;

}

message Command {

string command = 1;

repeated KeyStringValuePair args = 2;

}

enum ServiceType {

// An agent works inside the normal business application.

normal = 0;

// An agent works inside the database.

database = 1;

// An agent works inside the MQ.

mq = 2;

// An agent works inside the cache server.

cache = 3;

// An agent works inside the browser.

browser = 4;

}

jvm.proto内容如下:

syntax = "proto3";

option java_multiple_files = true;

option java_package = "org.apache.skywalking.apm.network.language.agent";

option csharp_namespace = "SkyWalking.NetworkProtocol";

import "common/common.proto";

message JVMMetric {

int64 time = 1;

CPU cpu = 2;

repeated Memory memory = 3;

repeated MemoryPool memoryPool = 4;

repeated GC gc = 5;

}

message Memory {

bool isHeap = 1;

int64 init = 2;

int64 max = 3;

int64 used = 4;

int64 committed = 5;

}

message MemoryPool {

PoolType type = 1;

int64 init = 2;

int64 max = 3;

int64 used = 4;

int64 commited = 5;

}

enum PoolType {

CODE_CACHE_USAGE = 0;

NEWGEN_USAGE = 1;

OLDGEN_USAGE = 2;

SURVIVOR_USAGE = 3;

PERMGEN_USAGE = 4;

METASPACE_USAGE = 5;

}

message GC {

GCPhrase phrase = 1;

int64 count = 2;

int64 time = 3;

}

enum GCPhrase {

NEW = 0;

OLD = 1;

}

而服务接收端,即collector是怎么接收的呢?

接收端有一个类JVMMetricsServiceHandler专门用来处理JVM的监控数据,这个类的collect方法如下:

@Override public void collect(JVMMetrics request, StreamObserver responseObserver) {

int serviceInstanceId = request.getApplicationInstanceId();

if (logger.isDebugEnabled()) {

logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);

}

// 处理数据,jvmSourceDispatcher发送到下一环节处理

request.getMetricsList().forEach(metrics -> {

long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());

jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);

});

responseObserver.onNext(Downstream.newBuilder().build());

responseObserver.onCompleted();

}

然后我们看一下JVMSourceDispatcher的sendMetric方法

void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {

ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId);

int serviceId;

if (Objects.nonNull(serviceInstanceInventory)) {

serviceId = serviceInstanceInventory.getServiceId();

} else {

logger.warn("Can"t find service by service instance id from cache, service instance id is: {}", serviceInstanceId);

return;

}

this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());

this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());

this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());

this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());

}

然后我们看sendTopCpuMetricProcess方法

private void sendToCpuMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, CPU cpu) {

ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();

serviceInstanceJVMCPU.setId(serviceInstanceId);

serviceInstanceJVMCPU.setName(Const.EMPTY_STRING);

serviceInstanceJVMCPU.setServiceId(serviceId);

serviceInstanceJVMCPU.setServiceName(Const.EMPTY_STRING);

serviceInstanceJVMCPU.setUsePercent(cpu.getUsagePercent());

serviceInstanceJVMCPU.setTimeBucket(timeBucket);

sourceReceiver.receive(serviceInstanceJVMCPU);

}

SourceReceiver的receive来接收数据

然而SourceReceiver是一个接口

public interface SourceReceiver extends Service {

void receive(Source source);

}

这个接口只有一个实现类

public class SourceReceiverImpl implements SourceReceiver {

}

receive的实现:

@Override public void receive(Source source) {

dispatcherManager.forward(source);

}

我们看到又调用了DispatcherManager的forward方法

public void forward(Source source) {

if (source == null) {

return;

}

List dispatchers = dispatcherMap.get(source.scope());

/**

* Dispatcher is only generated by oal script analysis result.

* So these will/could be possible, the given source doesn"t have the dispatcher,

* when the receiver is open, and oal script doesn"t ask for analysis.

*/

if (dispatchers != null) {

for (SourceDispatcher dispatcher : dispatchers) {

dispatcher.dispatch(source);

}

}

}

然后会调用SourceDispatcher的dispatch方法

而我们看到有这么多类实现了SourceDispatcher接口,具体是那个方法实现了呢?我们可以打日志也可以简单分析一下,首先可以排除的书EndpointCallRElationDispatcher、HttpAccessLogDispatcher、

JaegerSpanRecordDispatcher、ServiceCallRelationDispatcher、ServiceInstanceCallRelationDispatcher、ZipkinSpanRecordDispatcher这几个类也就是说我们可以重点关注

DatabaseStatementDispatcher和SegmentDispatcher这两个类,而这个DatabaseStatementDispatcher并没有被使用,所以我们可以重点分析SegmentDispatcher这个类

public class SegmentDispatcher implements SourceDispatcher {

@Override public void dispatch(Segment source) {

SegmentRecord segment = new SegmentRecord();

segment.setSegmentId(source.getSegmentId());

segment.setTraceId(source.getTraceId());

segment.setServiceId(source.getServiceId());

segment.setServiceInstanceId(source.getServiceInstanceId());

segment.setEndpointName(source.getEndpointName());

segment.setEndpointId(source.getEndpointId());

segment.setStartTime(source.getStartTime());

segment.setEndTime(source.getEndTime());

segment.setLatency(source.getLatency());

segment.setIsError(source.getIsError());

segment.setDataBinary(source.getDataBinary());

segment.setTimeBucket(source.getTimeBucket());

segment.setVersion(source.getVersion());

// 构造SegmentRecord对象,然后RecordStreamProcessor的in方法去处理(消费)segment信息

RecordStreamProcessor.getInstance().in(segment);

}

}

然后我们看一下RecordStreamProcessor的in方法

public void in(Record record) {

RecordPersistentWorker worker = workers.get(record.getClass());

if (worker != null) {

worker.in(record);

}

}

然后是RecordPersistentWorker的in方法

@Override public void in(Record record) {

try {

InsertRequest insertRequest = recordDAO.prepareBatchInsert(model, record);

batchDAO.asynchronous(insertRequest);

} catch (IOException e) {

logger.error(e.getMessage(), e);

}

}

到此我们能够看到持久化到数据库的操作(调用es或者h2的相关接口实现)

整个过程我们看到JVM的数据是在agent发送到collector后立马就持久化存储了(h2或者es)。

插件源码分析

关于插件开发,我们可以参考Java-Plugin-Development-Guide.md这篇文档,或者我翻译过来后的中文文档,接下来我们看看spring框架的数据是如何采集的,在apm-sniffer/apm-sdk-plugin目录下,有个字项目spring-plugins里面放的都是spring相关的插件用来实现spring框架的数据采集

我们以mvc-annotation-4.x-plugin项目为例来看,skywalking的插件是如何开发的。

我们可以看到resources目录的文件src/main/resources/skywalking-plugin.def这个skywalking-plugin.def就是用来定义插件的。

spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.ControllerInstrumentation

spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.RestControllerInstrumentation

spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.HandlerMethodInstrumentation

spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.InvocableHandlerInstrumentation

spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.ControllerForLowVersionInstrumentation

spring-mvc-annotation-4.x=org.apache.skywalking.apm.plugin.spring.mvc.v4.define.RestControllerForLowVersionInstrumentation

这个里面定义了一下几个插件

ControllerInstrumentation

RestControllerInstrumentation

HandlerMethodInstrumentation

InvocableHandlerInstrumentation

ControllerForLowVersionInstrumentation

RestControllerForLowVersionInstrumentation

我们根据plugin的开发流程来分析代码,应该是有一个类定义拦截机制,另外一个类是增强机制。我们先看一下类的结构图:

我们看到AbstractClassEnhancePluginDefine、ClassEnhancePluginDefine、ClassInstanceMethodsEnhancePluginDefine都是skywalking提供的基础类,而这个插件里面的类增强都是继承这些父类的。

我们先来看org.apache.skywalking.apm.plugin.spring.mvc.v4.define.ControllerInstrumentation,我们先看抽象类AbstractSpring4Instrumentation的内容:

public abstract class AbstractSpring4Instrumentation extends ClassInstanceMethodsEnhancePluginDefine {

// 这块个人感觉应该是witness_class写错了,

public static final String WITHNESS_CLASSES = "org.springframework.cache.interceptor.SimpleKey";

@Override

protected String[] witnessClasses() {

return new String[] {WITHNESS_CLASSES, "org.springframework.cache.interceptor.DefaultKeyGenerator"};

}

}

然后它的子类:

/**

* ControllerInstrumentation 增强所有有RequestMapping注解和Controller注解的类的构造函数和方法

* ControllerConstructorInterceptor 在执行构造函数之前将controller的base path(路径)放到动态

* 字段里面

* RequestMappingMethodInterceptor先从动态字段里面获取request path,如果没找到

* RequestMappingMethodInterceptor会结合路径和当前方法上面的注解和base path将新的路径放到动态

* 字段里面

* @author zhangxin

*/

public abstract class AbstractControllerInstrumentation extends AbstractSpring4Instrumentation {

// 构造函数拦截点

@Override

public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {

return new ConstructorInterceptPoint[] {

new ConstructorInterceptPoint() {

// 匹配方式,这里是返回了一个any()即总是匹配

@Override

public ElementMatcher getConstructorMatcher() {

return any();

}

// 拦截器类

@Override

public String getConstructorInterceptor() {

return "org.apache.skywalking.apm.plugin.spring.mvc.v4.ControllerConstructorInterceptor";

}

}

};

}

// 实例方法拦截点,返回了一个数组,一个是针对@RequestMapping这种类型的注解,一个是针对

// @GetMapping、@PostMapping、@PutMapping、@DeleteMapping、@PatchMapping这些类型的注解

@Override

public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {

return new InstanceMethodsInterceptPoint[] {

new DeclaredInstanceMethodsInterceptPoint() {

// 所有有RequestMapping这个注解的

@Override

public ElementMatcher getMethodsMatcher() {

return isAnnotatedWith(named("org.springframework.web.bind.annotation.RequestMapping"));

}

// RequestMappingMethodInterceptor

@Override

public String getMethodsInterceptor() {

return Constants.REQUEST_MAPPING_METHOD_INTERCEPTOR;

}

@Override

public boolean isOverrideArgs() {

return false;

}

},

new DeclaredInstanceMethodsInterceptPoint() {

@Override

public ElementMatcher getMethodsMatcher() {

return isAnnotatedWith(named("org.springframework.web.bind.annotation.GetMapping"))

.or(isAnnotatedWith(named("org.springframework.web.bind.annotation.PostMapping")))

.or(isAnnotatedWith(named("org.springframework.web.bind.annotation.PutMapping")))

.or(isAnnotatedWith(named("org.springframework.web.bind.annotation.DeleteMapping")))

.or(isAnnotatedWith(named("org.springframework.web.bind.annotation.PatchMapping")));

}

// RestMappingMethodInterceptor

@Override

public String getMethodsInterceptor() {

return Constants.REST_MAPPING_METHOD_INTERCEPTOR;

}

@Override

public boolean isOverrideArgs() {

return false;

}

}

};

}

// 需要增强的类的匹配方式

@Override

protected ClassMatch enhanceClass() {

// 抽象类不定义具体匹配方式,而是交给子类,让子类去实现getEnhanceAnnotations方法。

return ClassAnnotationMatch.byClassAnnotationMatch(getEnhanceAnnotations());

}

protected abstract String[] getEnhanceAnnotations();

}

AbstractControllerInstrumentation这个类并没有定义确定的类的匹配 然后是ControllerInstrumentation

public class ControllerInstrumentation extends AbstractControllerInstrumentation {

public static final String ENHANCE_ANNOTATION = "org.springframework.stereotype.Controller";

// 匹配所有有@Controller注解的类

@Override protected String[] getEnhanceAnnotations() {

return new String[] {ENHANCE_ANNOTATION};

}

}

接下来我们来看构造函数的拦截器类ControllerConstructorInterceptor

/**

* The ControllerConstructorInterceptor intercepts the Controller"s constructor, in order to acquire the

* mapping annotation, if exist.

*

* But, you can see we only use the first mapping value, Why?

*

* Right now, we intercept the controller by annotation as you known, so we CAN"T know which uri patten is actually

* matched. Even we know, that costs a lot.

*

* If we want to resolve that, we must intercept the Spring MVC core codes, that is not a good choice for now.

*

* Comment by @wu-sheng

*/

public class ControllerConstructorInterceptor implements InstanceConstructorInterceptor {

@Override

public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {

String basePath = "";

// 获取@RequestMapping的信息,其实主要是想要获取到路径信息

RequestMapping basePathRequestMapping = objInst.getClass().getAnnotation(RequestMapping.class);

if (basePathRequestMapping != null) {

if (basePathRequestMapping.value().length > 0) {

basePath = basePathRequestMapping.value()[0];

} else if (basePathRequestMapping.path().length > 0) {

basePath = basePathRequestMapping.path()[0];

}

}

EnhanceRequireObjectCache enhanceRequireObjectCache = new EnhanceRequireObjectCache();

enhanceRequireObjectCache.setPathMappingCache(new PathMappingCache(basePath));

objInst.setSkyWalkingDynamicField(enhanceRequireObjectCache);

}

}

然后我们看到这个插件里面只是定义了需要增强的类的匹配形式,并没有具体的创建EntrySpan,ExitSpan的处理逻辑。其实这块处理逻辑是在AbstractControllerInstrumentation方法拦截定义设置好具体由哪个类来处理的主要是两个类:RequestMappingMethodInterceptor,RestMappingMethodInterceptor。

一个是针对@RequestMapping这种注解的,一个是针对@GetMapping这类注解的。其实@GetMapping也是又@RequestMapping而来的。GetMapping本身就用了@RequestMapping,相当于是指定method的@RequestMapping。

@Target(ElementType.METHOD)

@Retention(RetentionPolicy.RUNTIME)

@Documented

@RequestMapping(method = RequestMethod.GET)

public @interface GetMapping {

}

RequestMappingMethodInterceptor,RestMappingMethodInterceptor继承了同一个父类:

AbstractMethodInterceptor。这两个类本身只重写了父类的两个方法:

public abstract String getRequestURL(Method method);

public abstract String getAcceptedMethodTypes(Method method);

所以我们重点关注父类(AbstractMethodInterceptor)的两个方法:

beforeMethod (方法调用前的逻辑)

afterMethod(方法调用后的逻辑)

@Override

public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,

MethodInterceptResult result) throws Throwable {

// forwardRequestFlag

Boolean forwardRequestFlag = (Boolean)ContextManager.getRuntimeContext().get(FORWARD_REQUEST_FLAG);

// 如果是forwardRequest就直接返回

/**

* Spring MVC plugin do nothing if current request is forward request.

* Ref: https://github.com/apache/skywalking/pull/1325

*/

if (forwardRequestFlag != null && forwardRequestFlag) {

return;

}

String operationName;

if (Config.Plugin.SpringMVC.USE_QUALIFIED_NAME_AS_ENDPOINT_NAME) {

operationName = MethodUtil.generateOperationName(method);

} else {

EnhanceRequireObjectCache pathMappingCache = (EnhanceRequireObjectCache)objInst.getSkyWalkingDynamicField();

String requestURL = pathMappingCache.findPathMapping(method);

if (requestURL == null) {

requestURL = getRequestURL(method);

pathMappingCache.addPathMapping(method, requestURL);

requestURL = getAcceptedMethodTypes(method) + pathMappingCache.findPathMapping(method);

}

operationName = requestURL;

}

// 设置operationName为requestURL

// 获取HttpServletRequest

HttpServletRequest request = (HttpServletRequest)ContextManager.getRuntimeContext().get(REQUEST_KEY_IN_RUNTIME_CONTEXT);

if (request != null) {

// 拿到StackDepth

StackDepth stackDepth = (StackDepth)ContextManager.getRuntimeContext().get(CONTROLLER_METHOD_STACK_DEPTH);

if (stackDepth == null) {

// new一个ContextCarrier

ContextCarrier contextCarrier = new ContextCarrier();

CarrierItem next = contextCarrier.items();

while (next.hasNext()) {

next = next.next();

next.setHeadValue(request.getHeader(next.getHeadKey()));

}

// 创建EntrySpan

AbstractSpan span = ContextManager.createEntrySpan(operationName, contextCarrier);

Tags.URL.set(span, request.getRequestURL().toString());

Tags.HTTP.METHOD.set(span, request.getMethod());

span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);

SpanLayer.asHttp(span);

if (Config.Plugin.SpringMVC.COLLECT_HTTP_PARAMS) {

final Map parameterMap = request.getParameterMap();

if (parameterMap != null && !parameterMap.isEmpty()) {

String tagValue = CollectionUtil.toString(parameterMap);

tagValue = Config.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD > 0

? StringUtil.cut(tagValue, Config.Plugin.Http.HTTP_PARAMS_LENGTH_THRESHOLD)

: tagValue;

Tags.HTTP.PARAMS.set(span, tagValue);

}

}

stackDepth = new StackDepth();

ContextManager.getRuntimeContext().put(CONTROLLER_METHOD_STACK_DEPTH, stackDepth);

} else {

AbstractSpan span =

ContextManager.createLocalSpan(buildOperationName(objInst, method));

span.setComponent(ComponentsDefine.SPRING_MVC_ANNOTATION);

}

stackDepth.increment();

}

}

private String buildOperationName(Object invoker, Method method) {

StringBuilder operationName = new StringBuilder(invoker.getClass().getName())

.append(".").append(method.getName()).append("(");

for (Class> type : method.getParameterTypes()) {

operationName.append(type.getName()).append(",");

}

if (method.getParameterTypes().length > 0) {

operationName = operationName.deleteCharAt(operationName.length() - 1);

}

return operationName.append(")").toString();

}

afterMethod

@Override

public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,

Object ret) throws Throwable {

Boolean forwardRequestFlag = (Boolean)ContextManager.getRuntimeContext().get(FORWARD_REQUEST_FLAG);

/**

* Spring MVC plugin do nothing if current request is forward request.

* Ref: https://github.com/apache/skywalking/pull/1325

*/

if (forwardRequestFlag != null && forwardRequestFlag) {

return ret;

}

HttpServletRequest request = (HttpServletRequest)ContextManager.getRuntimeContext().get(REQUEST_KEY_IN_RUNTIME_CONTEXT);

if (request != null) {

StackDepth stackDepth = (StackDepth)ContextManager.getRuntimeContext().get(CONTROLLER_METHOD_STACK_DEPTH);

if (stackDepth == null) {

throw new IllegalMethodStackDepthException();

} else {

stackDepth.decrement();

}

// 获取当前的span

AbstractSpan span = ContextManager.activeSpan();

if (stackDepth.depth() == 0) {

HttpServletResponse response = (HttpServletResponse)ContextManager.getRuntimeContext().get(RESPONSE_KEY_IN_RUNTIME_CONTEXT);

if (response == null) {

throw new ServletResponseNotFoundException();

}

if (IS_SERVLET_GET_STATUS_METHOD_EXIST && response.getStatus() >= 400) {

span.errorOccurred();

Tags.STATUS_CODE.set(span, Integer.toString(response.getStatus()));

}

// 清楚一些上下文信息

ContextManager.getRuntimeContext().remove(REQUEST_KEY_IN_RUNTIME_CONTEXT);

ContextManager.getRuntimeContext().remove(RESPONSE_KEY_IN_RUNTIME_CONTEXT);

ContextManager.getRuntimeContext().remove(CONTROLLER_METHOD_STACK_DEPTH);

}

// 停止span

ContextManager.stopSpan();

}

return ret;

}

服务器采集数据源码,Skywalking数据采集与收集源码分析相关推荐

  1. 最新全自动更新采集影视带10套模板PHP源码+功能多

    正文: 全自动更新采集影视带10套模板PHP源码,一个全自动采集影视的网站程序,里面包含了十种影视模板. 支持放到二级目录.里面有教程,怎么修改接口啥的,具体自己下载研究吧. 程序: wwhegu.l ...

  2. 医学影像管理系统源码(PACS) PACS源码 PACS系统源码

    主要功能: 一.信息管理 1.支持对患者.检查项目.申请医生.申请单据.设备等信息进行管理: 2.支持检查病人排队管理功能: 3.支持大屏幕队列显示和语音呼叫: 4.提供预约调整.插队管理和掉队处理等 ...

  3. 11没有源码注释_我们为什么要看源码、应该如何看源码?

    看源码的意义 看源码只是一种方法.手段,而不是目的.我也曾经给自己制定过"阅读xxx源码"的目标,现在看起来真的很蠢,一点不smart(specific.measurable.at ...

  4. java毕业生设计疫苗药品批量扫码识别追溯系统计算机源码+系统+mysql+调试部署+lw

    java毕业生设计疫苗药品批量扫码识别追溯系统计算机源码+系统+mysql+调试部署+lw java毕业生设计疫苗药品批量扫码识别追溯系统计算机源码+系统+mysql+调试部署+lw 本源码技术栈: ...

  5. 【2022修复版】社群扫码进群活码引流完整运营源码/带视频搭建教程

    搭建环境 1.环境Nginx MySQL 5.6 php7.2 php7.2安装扩展fileinfo redis Swoole sg11 服务器需要安装linux的系统, 安装上宝塔,在配置这些环境就 ...

  6. 帝国CMS仿玩游戏网源码大型游戏资讯网站源码

    帝国CMS仿玩游戏网源码大型游戏资讯网站源码 仿<玩游戏>wanyx 大型游戏资讯门户模板,模板采用帝国 CMS7.5 内核制作完成,自带手机版. 模板带有部分内容数据,但没有图片附件,有 ...

  7. JAVA要不要看源码_为什么要看源码、如何看源码,高手进阶必看

    作者:xybaby www.cnblogs.com/xybaby/p/10794700.html 由于项目的需求,最近花了较多的时间来看开源项目的代码,在本文中,简单总结一下对为什么要看源码.如何看源 ...

  8. Live555源码阅读笔记(一):源码介绍文档 及 源码目录结构

    目录 一.Live555介绍 1.Live555项目介绍 2.官网及帮助文档介绍 二.源码目录结构 1.UsageEnvironment 2.BasicUsageEnvironment 3.group ...

  9. php淘金农场源码,2018Thinkphp仿淘金农场开源源码统H5农场复利源码带商城仓库商店...

    演示地址:如有演示站请以演示为准,无演示站以截图为准,源码太多服务器有限,无法搭建所有源码演示站,请谅解! 新手购买指导:1.在本站注册账号 丨 2.登录已注册账号充值源码所需金币 丨 3.登录账号下 ...

  10. oa办公系统源码OA企业人事管理系统源码php协同自动化办公签到

    介绍: 功能模块包含了人事管理.公告.培训.通知.论坛.考勤.会议.网络磁盘,短消息等功能,看了下功能上算是比较全的了. 安装说明: 1.将完整程序上传至你的服务器,输入http://你的域名/ins ...

最新文章

  1. iOS处理高并发量的数据请求和数据集合的对应关系
  2. 实现计算机界“大满贯”,芮勇博士再获技术成就大奖
  3. 切换卡TabHost控件的使用
  4. Maven插件tomcat7-maver-plugin
  5. ROS通信架构(上)
  6. [机器学习] --- Getting Started With MachineLearning
  7. c 多线程map_Rust:一个不再有 C/C++ 的,实现安全实时软件的未来
  8. android 定时器 坑,Android 记录一个使用AlarmManager的坑。
  9. linux下进程的创建代码,伪代码说明Linux进程创建过程
  10. 直播盒子的源代码怎么找
  11. 我的软件开发生涯 (10年开发经验总结和爆栈人生)
  12. 小米平板5 pro 解锁 Bootloader
  13. 2017第34届广州国际特许连锁加盟展览会会刊(参展商名录)
  14. 贴片钽电容封装、尺寸
  15. 转载]“无法打开Internet站点 已终止操作”的解决方法
  16. 美股分时交易数据 API 接口
  17. Android存储空间及路径总结
  18. 虚幻引擎与现代C++:转移语义和右值引用
  19. 南京师范大学计算机技术研究生就业,重磅!2017年南京师范大学毕业研究生就业质量报告新鲜出炉...
  20. uboot.lds 链接文件分析

热门文章

  1. 一步真实解决RuntimeError: result type Float can‘t be cast to the desired output type long int
  2. 使用yum下载rpm源码包
  3. 天才作文-不知道有没有人发过 很有才
  4. python np float_Python astype(np.float)函数使用方法解析
  5. 链家网爬取深圳租房信息并存入MySQL
  6. elasticsearch-01
  7. ​单页应用程序是如何打破网页设计的?
  8. 中奖人js滚动效果_H5实现中奖记录逐行滚动切换效果
  9. C语言学习-- 计算机原理及二进制
  10. wireshark使用