不能从远程创建com+对象_链路追踪SkyWalking源码分析——Collector Remote远程通信服务...
摘要: 原创出处 http://www.iocoder.cn/SkyWalking/collector-remote-module/ 「芋道源码」欢迎转载,保留摘要,谢谢!
本文主要基于 SkyWalking 3.2.6 正式版
- 1. 概述
- 2. collector-remote-define
- 2.1 RemoteModule
- 2.2 RemoteSenderService
- 2.3 RemoteClientService
- 2.4 RemoteClient
- 2.5 CommonRemoteDataRegisterService
- 2.6 RemoteSerializeService
- 2.7 RemoteSerializeService
- 3. collector-remote-grpc-provider
- 3.1 RemoteModuleGRPCProvider
- 3.2 GRPCRemoteSenderService
- 3.3 GRPCRemoteClientService
- 3.4 GRPCRemoteClient
- 3.5 RemoteCommonServiceHandler
- 3.6 GRPCRemoteSerializeService
- 3.7 GRPCRemoteDeserializeService
- 4. collector-remote-grpc-provider
1. 概述
本文主要分享 SkyWalking Collector Remote 远程通信服务。该服务用于 Collector 集群内部通信。
目前集群内部通信的目的,跨节点的流式处理。Remote Module 应用在 SkyWalking 架构图如下位置( 红框 ) :
FROM https://github.com/apache/incubating-skywalking
下面我们来看看整体的项目结构,如下图所示 :
- collector-remote-define :定义远程通信接口。
- collector-remote-kafka-provider :基于 Kafka 的远程通信实现。目前暂未完成。
- collector-remote-grpc-provider :基于 Google gRPC 的远程通信实现。生产环境目前使用
下面,我们从接口到实现的顺序进行分享。
2. collector-remote-define
collector-remote-define :定义远程通信接口。项目结构如下 :
整体流程如下图:
我们按照整个流程的处理顺序,逐个解析涉及到的类与接口。
2.1 RemoteModule
org.skywalking.apm.collector.remote.RemoteModule ,实现 Module 抽象类,远程通信 Module 。
#name() 实现方法,返回模块名为 "remote" 。
#services() 实现方法,返回 Service 类名:RemoteSenderService 、RemoteDataRegisterService 。
2.2 RemoteSenderService
org.skywalking.apm.collector.remote.service.RemoteSenderService ,继承 Service 接口,远程发送服务接口,定义了 #send(graphId, nodeId, data, selector) 接口方法,调用 RemoteClient ,发送数据。
- graphId 方法参数,Graph 编号。通过 graphId ,可以查找到对应的 Graph 对象。
- Graph 在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「2. apm-collector-core/graph」 有详细解析。
- nodeId 方法参数,Worker 编号。通过 workerId ,可以查找在 Graph 对象中的 Worker 对象,从而 Graph 中的流式处理。
- Worker 在 《SkyWalking 源码分析 —— Collector Streaming Computing 流式处理(一)》「3. apm-collector-stream」 有详细解析。
- data 方法参数,Data 数据对象。例如,流式处理的具体数据对象。
- Data 在 《SkyWalking 源码分析 —— Collector Storage 存储组件》「2. apm-collector-core」 有详细解析。
- selector 方法参数,org.skywalking.apm.collector.remote.service.Selector 选择器对象。根据 Selector 对象,使用对应的负载均衡策略,选择集群内的 Collector 节点,发送数据。
- RemoteSenderService.Mode 返回值,发送模式分成 Remote 和 Local 两种方式。前者,发送数据到远程的 Collector 节点;后者,发送数据到本地,即本地处理,参见 RemoteWorkerRef#in(message) 方法。
2.3 RemoteClientService
org.skywalking.apm.collector.remote.service.RemoteClientService ,继承 Service 接口,远程客户端服务接口,定义了 #create(host, port, channelSize, bufferSize) 接口方法,创建 RemoteClient 对象。
2.4 RemoteClient
org.skywalking.apm.collector.remote.service.RemoteClient ,继承 java.lang.Comparable 接口,远程客户端接口。定义了如下接口方法:
- #push(graphId, nodeId, data, selector) 接口方法,发送数据。
- #getAddress() 接口方法,返回客户端连接的远程 Collector 地址。
- #equals(address) 接口方法,判断 RemoteClient 是否连接了指定的地址。
2.5 CommonRemoteDataRegisterService
在说 CommonRemoteDataRegisterService 之前,首先来说下 CommonRemoteDataRegisterService 的意图。
在上文中,我们可以看到发送给 Collector 是 Data 对象,而 Data 是数据的抽象类,在具体反序列化 Data 对象之前,程序是无法得知它是 Data 的哪个实现对象。这个时候,我们可以给 Data 对象的每个实现类,生成一个对应的数据协议编号。
- 在发送数据之前,序列化 Data 对象时,增加该 Data 对应的协议编号,一起发送。
- 在接收数据之后,反序列化数据时,根据协议编号,创建 Data 对应的实现类对象。
org.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService ,通用远程数据注册服务。
- id 属性,数据协议自增编号。
- dataClassMapping 属性,数据类型( Class extends Data> )与数据协议编号的映射。
- dataInstanceCreatorMapping 属性,数据协议编号与数据对象创建器( RemoteDataInstanceCreator )的映射。
2.5.1 RemoteDataRegisterService
org.skywalking.apm.collector.remote.service.RemoteDataRegisterService ,继承 Service 接口,远程客户端服务接口,定义了 #register(Class extends Data>, RemoteDataInstanceCreator) 接口方法,注册数据类型对应的远程数据创建器( RemoteDataRegisterService.RemoteDataInstanceCreator )对象。
CommonRemoteDataRegisterService 实现了 RemoteDataRegisterService 接口,#register(Class extends Data>, RemoteDataInstanceCreator) 实现方法。
另外,AgentStreamRemoteDataRegister 会调用 RemoteDataRegisterService#register(Class extends Data>, RemoteDataInstanceCreator) 方法,注册每个数据类型的 RemoteDataInstanceCreator 对象。注意,例如 Application::new 是 RemoteDataInstanceCreator 的匿名实现类。
2.5.2 RemoteDataIDGetter
org.skywalking.apm.collector.remote.service.RemoteDataIDGetter ,继承 Service 接口,远程数据协议编号获取器接口,定义了 #getRemoteDataId(Class extends Data>)接口方法,根据数据类型获取数据协议编号。
CommonRemoteDataRegisterService 实现了 RemoteDataIDGetter 接口,#getRemoteDataId(Class extends Data>) 实现方法。
2.5.3 RemoteDataInstanceCreatorGetter
org.skywalking.apm.collector.remote.service.RemoteDataInstanceCreatorGetter ,继承 Service 接口,远程数据创建器的获取器接口,定义了 #getInstanceCreator(remoteDataId 接口方法,根据数据协议编号获得远程数据创建器( RemoteDataInstanceCreator )。
CommonRemoteDataRegisterService 实现了 RemoteDataInstanceCreatorGetter 接口,#getInstanceCreator(remoteDataId) 实现方法。
2.6 RemoteSerializeService
org.skywalking.apm.collector.remote.service.RemoteSerializeService ,远程通信序列化服务接口,定义了 #serialize(Data) 接口方法,序列化数据,生成 Builder 对象。
2.7 RemoteSerializeService
org.skywalking.apm.collector.remote.service.RemoteDeserializeService ,远程通信序反列化服务接口,定义了 #deserialize(RemoteData, Data) 接口方法,反序列化传输数据。
3. collector-remote-grpc-provider
collector-remote-grpc-provider ,基于 Google gRPC 的远程通信实现。
项目结构如下 :
默认配置,在 application-default.yml 已经配置如下:
remote: gRPC: host: localhost port: 11800
3.1 RemoteModuleGRPCProvider
org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCProvider ,实现 ModuleProvider 抽象类,基于 gRPC 的组件服务提供者实现类。
#name() 实现方法,返回组件服务提供者名为 "gRPC" 。
module() 实现方法,返回组件类为 RemoteModule 。
#requiredModules() 实现方法,返回依赖组件为 cluster 、gRPC_manager 。
#prepare(Properties) 实现方法,执行准备阶段逻辑。
- 第 53 至 56 行 :创建 CommonRemoteDataRegisterService 、GRPCRemoteSenderService 对象,并调用 #registerServiceImplementation()父类方法,注册到 services 。
#start() 实现方法,执行启动阶段逻辑。
- Server 相关
- 第 65 行:创建 gRPC Server 对象。
- 第 67 行:注册 RemoteCommonServiceHandler 对象到 gRPC Server 上,用于接收 gRPC 请求后的处理。
- 《SkyWalking 源码分析 —— Collector Server Component 服务器组件》「3. gRPC 实现」
- 《SkyWalking 源码分析 —— Collector gRPC Server Manager》
- 注册发现相关
- 第 70 至 71 行:创建 org.skywalking.apm.collector.remote.grpc.RemoteModuleGRPCRegistration 对象,将自己注册到集群管理。这样,自己可以被 Collector 集群节点发现,从而被调用。
- 第 73 至 74 行:注册 GRPCRemoteSenderService 对象到集群管理。这样,自己可以监听到 Collector 集群节点的加入或离开,从而调用。
- 《SkyWalking 源码分析 —— Collector Cluster 集群管理》
#notifyAfterCompleted() 实现方法,方法为空。
3.2 GRPCRemoteSenderService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService ,继承 ClusterModuleListener 抽象类,实现 RemoteSenderService 接口,基于 gPRC 的远程发送服务实现类。
3.2.1 注册发现
通过继承 ClusterModuleListener 抽象类,实现了监听 Collector 集群节点的加入或离开。
- remoteClients 属性,连接 Collector 集群节点的客户端数组。每个 Collector 集群节点,对应一个客户端。
- #path() 实现方法,返回监听的目录 "/" + RemoteModule.NAME + "/" + RemoteModuleGRPCProvider.NAME 。Collector 集群中,每个节点的 Remote Server 都会注册到该目录下。
- #serverJoinNotify(serverAddress) 实现方法,当新的节点加入,创建新的客户端连接。
- #serverQuitNotify(serverAddress) 实现方法,当老的节点离开,移除对应的客户端连接。
3.2.2 负载均衡
RemoteModuleGRPCProvider 基于不同的选择器 ( Selector ) ,提供不同的客户端选择( org.skywalking.apm.collector.remote.grpc.service.selector.RemoteClientSelector )实现 :
- hashCodeSelector 属性,HashCodeSelector ,基于数据的哈希码。
- foreverFirstSelector 属性,ForeverFirstSelector ,基于客户端数组的顺序,选择第一个。
- rollingSelector 属性,RollingSelector ,基于客户端数组的顺序,顺序向下选择。
- #send(graphId, nodeId, data, selector) 方法,代码如下:
- 第 76 至 77 行:当选择的客户端连接的是本地时,不发送数据,交给本地处理,参见 RemoteWorkerRef#in(message) 方法。
- 第 78 至 81 行:当选择的客户端连接的是远程时,调用 RemoteClient#push(graphId, nodeId, data) 方法,发送数据。
- 第 63 、66 、69 行:根据选择器,调用 RemoteClientSelector#select(clients, data) 方法,选择客户端。
- 第 64 、67 、70 行:调用 #sendToRemoteWhenNotSelf(remoteClient, graphId, nodeId, data) 方法,发送请求数据。
3.3 GRPCRemoteClientService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClientService ,实现 RemoteClientService 接口,基于 gRPC 的远程客户端服务实现类。
#create(host, port, channelSize, bufferSize) 实现方法,创建 GRPCRemoteClient 对象。
3.4 GRPCRemoteClient
友情提示:本小节会涉及较多 gRPC 相关的知识,建议不熟悉的胖友自己 Google ,补充下姿势。
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteClient ,实现 RemoteClient 接口,基于 gRPC 的远程客户端实现类。
- client 属性,GRPCClient 对象。相比来说,GRPCRemoteClient 偏业务的封装,内部调用 GRPCClient 对象。
- carrier 属性,DataCarrier 对象,本地消息队列。GRPCRemoteClient 在被调用发送数据时,先提交到本地队列,异步消费进行发送到远程 Collector 节点。DataCarrier 在 《SkyWalking 源码分析 —— DataCarrier 异步处理库》 详细解析。
- 第 63 行:调用 DataCarrier#consume(IConsumer, num) 方法,设置消费者为 RemoteMessageConsumer 对象。
#push(graphId, nodeId, data) 实现方法,异步发送消息到远程 Collector 。
- 第 73 行:调用 RemoteDataIDGetter#getRemoteDataId(Class extends Data>) 方法,获得数据协议编号。
- 第 76 至 80 行:创建传输数据( RemoteMessage.Builder ) 对象。RemoteMessage 通过 Protobuf 创建定义,如下图所示:
- 第 83 行:调用 DataCarrier#produce(data) 方法,发送数据到本地队列。
RemoteMessageConsumer ,批量消费本地队列的数据,逐条发送数据到远程 Collector 节点。
- #consume(List) 实现方法,代码如下:
- 第 100 行:创建 StreamObserver 对象。StreamObserver 主要是 gPRC 相关的 API 的调用。
- 第 101 至 103 行:调用 io.grpc.stub.StreamObserver#onNext(RemoteMessage)方法,逐条发送数据。
- 第 106 行:调用 io.grpc.stub.StreamObserver#onCompleted() 方法,全部请求数据发送完成。
3.5 RemoteCommonServiceHandler
org.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler ,实现 org.skywalking.apm.collector.server.grpc.GRPCHandler 接口,继承 RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 抽象类,远程通信通用逻辑处理器。
其中,RemoteCommonServiceGrpc.RemoteCommonServiceImplBase 在 RemoteCommonService.proto 文件的定义如下图:
#call(StreamObserver) 实现方法,代码如下:
- #onNext(RemoteMessage) 方法,处理每一条消息,代码如下:
- 第 65 行:调用 RemoteDataInstanceCreatorGetter#getInstanceCreator(remoteDataId) 方法,获得数据协议编号对应的 RemoteDataInstanceCreator 对象。然后,调用 RemoteDataInstanceCreator#createInstance(id) 方法,创建数据协议编号对应的 Data 实现类对应的对象。
- 第 70 行:调用 GraphManager#findGraph(graphId) 方法,获得 graphId 对应的 Graph 对象。然后,调动 GraphNodeFinder#findNext(nodeId) 方法,获得 Next 对象。
- 第 71 行:调用 Next#execute(Data) 方法,继续流式处理。
3.6 GRPCRemoteSerializeService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSerializeService ,实现 RemoteSerializeService 接口,基于 gRPC 的远程通信序列化服务实现类。
3.7 GRPCRemoteDeserializeService
org.skywalking.apm.collector.remote.grpc.service.GRPCRemoteDeserializeService ,实现 GRPCRemoteDeserializeService 接口,基于 gRPC 的远程通信反序列化服务实现类。
4. collector-remote-grpc-provider
collector-remote-kafka-provider :基于 Kafka 的远程通信实现。
目前暂未完成。
TODO 【4005】collector-remote-grpc-provider
:-D 搜索微信号(ID:芋道源码),可以获得各种 Java 源码解析、原理讲解、面试题、学习指南。
:-D 并且,回复【书籍】后,可以领取笔者推荐的各种 Java 从入门到架构的 100 本书籍。
:-D 并且,回复【技术群】后,可以加入专门讨论 Java、后端、架构的技术群。
来吧,骚年~
不能从远程创建com+对象_链路追踪SkyWalking源码分析——Collector Remote远程通信服务...相关推荐
- 链路追踪 SkyWalking 源码分析 —— Agent 插件体系
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 中文详细注释的开源项目 消息中间件 RocketMQ 源码解析 数据库中间件 ...
- threadpoolexecutor创建线程池_线程池ThreadPoolExecutor源码分析
什么是线程池 创建线程要花费昂贵的资源和时间,如果任务来了才创建那么响应时间会变长,而且一个进程能创建的线程数量有限.为了避免这些问题,在程序启动的时候就创建若干线程来响应出来,它们被称为线程池,里面 ...
- 创建线程的三种方法_Netty源码分析系列之NioEventLoop的创建与启动
前言 前三篇文章分别分析了 Netty 服务端 channel 的初始化.注册以及绑定过程的源码,理论上这篇文章应该开始分析新连接接入过程的源码了,但是在看源码的过程中,发现有一个非常重要的组件:Ni ...
- python list存储对象_《python解释器源码剖析》第4章--python中的list对象
4.0 序 python中的list对象,底层对应的则是PyListObject.如果你熟悉C++,那么会很容易和C++中的list联系起来.但实际上,这个C++中的list大相径庭,反而和STL中的 ...
- seata 如何开启tcc事物_分布式事务Seata-TCC源码分析
为了更好理解分布式事务,首先提出一个问题: 假设数据库中有两个表ta,tb,我们要分别更改ta表中的ra记录和tb表中的rb记录,但要求ra和rb记录都修改成功,才认为此次操作时成功,或者需要失败回滚 ...
- 不能从远程创建com+对象_红蓝对抗攻防实战:寻找COM对象
概述 渗透测试人员.红蓝对抗的蓝军(攻击方).恶意行动者经常会选择COM对象来实现横向移动.此前,一些安全研究人员陆续针对COM对象开展研究,包括Matt Nelson(enigma0x3)在2017 ...
- @transaction 提交事务_分布式事务 TCC-Transaction 源码分析——TCC 实现
1. 概述 本文分享 TCC 实现.主要涉及如下三个 Maven 项目: tcc-transaction-core :tcc-transaction 底层实现. tcc-transaction-api ...
- java selector 源码_基于selector的源码分析和理解、思想和应用实践
来自京东架构师(JAVA)欢迎关注我的微信公众号java2arch,更多技术文章可看. 一个连接请求connetion request过来,产生一个通道channel(包含并封装了connection ...
- cityscapes数据集_全景分割 UPSNet 源码分析 (1) - 数据格式
本系列文章针对的数据集是Cityscapes,后续会在Mapillary vista街道数据集上尝试训练出一版模型. 1. 数据集和标注文件 Cityscapes文件夹下 ├── annotation ...
- Spring AOP 源码分析 - 创建代理对象
1.简介 在上一篇文章中,我分析了 Spring 是如何为目标 bean 筛选合适的通知器的.现在通知器选好了,接下来就要通过代理的方式将通知器(Advisor)所持有的通知(Advice)织入到 b ...
最新文章
- 计算机考研不压分的学校,考研院校里,这些学校不压分、不歧视专科生,值得关注...
- python找出图中所有闭合环_求图中的所有闭合环
- python任务调度平台 界面_任务调度平台Cuckoo-Schedule
- python读取文件某一行-使用python读取.text文件特定行的数据方法
- 【Linux】8_存储管理逻辑卷LVM
- E. Company(Codeforces Round #520 (Div. 2))
- leetcode 721. 账户合并(并查集)
- 面试题36:数组中的逆序对
- Android斗地主源码实现
- Ubuntu 20.04 国内源
- 中文分词:采用二元词图以及viterbi算法(一)
- 【同态加密算法的学习日记】
- Keil5窗口的背景保护色设置
- PMP课程笔记:第10章 项目沟通管理
- TV直播app TV版 超级直播 空壳 可玩性强 带EPG 带回看 带自定义 定制可带自定义协议等
- jointJS系列之一:jointJS的的初步使用
- display与visibility可见性、内补白与外补白
- uniapp 引入阿里矢量图标库的详细步骤及踩坑经历
- 量化交易入门阶段:好事要成双——双均线策略
- Tomcat配置问题(踩坑记录)