Skywalking系列学习之Trace Profiling源码分析
前言
在《在线代码级性能剖析,补全分布式追踪的最后一块“短板”》中有提到再复杂的业务逻辑,都是基于线程去进行执行,那skywalking怎样利用方法栈快照进行代码级性能剖析的,出于好奇心,一起来debug看看其中的奥妙
demo演示
- 打开skywalking UI,点击新建Trace Profiling任务
- 配置Trace Profiling任务
- 查看堆栈信息
源码分析
UI创建任务
接收页面请求,通过ProfileTaskMutationService#createTask将任务存入ES中,索引名为:profile_task-*(profile_task-20220807)
public ProfileTaskCreationResult createTask(final String serviceId,final String endpointName,final long monitorStartTime,final int monitorDuration,final int minDurationThreshold,final int dumpPeriod,final int maxSamplingCount) throws IOException {// check datafinal String errorMessage = checkDataSuccess(serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod,maxSamplingCount);if (errorMessage != null) {return ProfileTaskCreationResult.builder().errorReason(errorMessage).build();}// create taskfinal long createTime = System.currentTimeMillis();final ProfileTaskRecord task = new ProfileTaskRecord();task.setServiceId(serviceId);task.setEndpointName(endpointName.trim());task.setStartTime(taskStartTime);task.setDuration(monitorDuration);task.setMinDurationThreshold(minDurationThreshold);task.setDumpPeriod(dumpPeriod);task.setCreateTime(createTime);task.setMaxSamplingCount(maxSamplingCount);task.setTimeBucket(TimeBucket.getMinuteTimeBucket(taskStartTime));NoneStreamProcessor.getInstance().in(task);return ProfileTaskCreationResult.builder().id(task.id()).build();}
CacheUpdateTimer#updateProfileTask更新profileTask缓存:ProfileTaskCache$profileTaskDownstreamCache
agent发起ProfileTaskCommandQuery请求
- agent通过ProfileTaskChannelService发起ProfileTaskCommandQuery请求
public void run() {if (status == GRPCChannelStatus.CONNECTED) {try {ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();// sniffer infobuilder.setService(Config.Agent.SERVICE_NAME).setServiceInstance(Config.Agent.INSTANCE_NAME);// last command create timebuilder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime());// 发起ProfileTaskCommandQuery请求Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).getProfileTaskCommands(builder.build());// 处理响应ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);} catch (Throwable t) {}} }
- 服务端通过ProfileTaskServiceHandler#getProfileTaskCommands接收ProfileTaskCommandQuery请求
public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver<Commands> responseObserver) {// query profile task list by service idfinal String serviceId = IDManager.ServiceID.buildId(request.getService(), true);final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, request.getServiceInstance());// 从缓存中取出对应服务的任务final List<ProfileTask> profileTaskList = profileTaskCache.getProfileTaskList(serviceId);if (CollectionUtils.isEmpty(profileTaskList)) {responseObserver.onNext(Commands.newBuilder().build());responseObserver.onCompleted();return;}// build command listfinal Commands.Builder commandsBuilder = Commands.newBuilder();final long lastCommandTime = request.getLastCommandTime();for (ProfileTask profileTask : profileTaskList) {// if command create time less than last command time, means sniffer already have taskif (profileTask.getCreateTime() <= lastCommandTime) {continue;}// record profile task log -->索引名为:sw_profile_task_log-20220808recordProfileTaskLog(profileTask, serviceInstanceId, ProfileTaskLogOperationType.NOTIFIED);// add command -->将ProfileTask转换为ProfileTaskCommand返回commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());}responseObserver.onNext(commandsBuilder.build());responseObserver.onCompleted(); }
- agent通过CommandService#receiveCommand处理ProfileTaskCommand返回,放入阻塞队列commands中
public void receiveCommand(Commands commands) {for (Command command : commands.getCommandsList()) {try {BaseCommand baseCommand = CommandDeserializer.deserialize(command);boolean success = this.commands.offer(baseCommand);} catch (UnsupportedCommandException e) {}} }
agent异步处理ProfileTaskCommand
- CommandService线程循环检测commands队列的任务,交给不同command执行器去执行对应的任务
public void run() {final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);while (isRunning) {try {// 取出commands队列的任务BaseCommand command = commands.take();if (isCommandExecuted(command)) {continue;}commandExecutorService.execute(command);serialNumberCache.add(command.getSerialNumber());} catch (CommandExecutionException e) {LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand());} catch (Throwable e) {LOGGER.error(e, "There is unexpected exception");}}}
- ProfileTaskCommandExecutor#execute将ProfileTaskCommand转换为ProfileTask
- ProfileTaskExecutionService#addProfileTask处启动定时任务处理ProfileTask
public void addProfileTask(ProfileTask task) {// update last command create timeif (task.getCreateTime() > lastCommandCreateTime) {lastCommandCreateTime = task.getCreateTime();}// check profile task limitfinal CheckResult dataError = checkProfileTaskSuccess(task);if (!dataError.isSuccess()) {LOGGER.warn("check command error, cannot process this profile task. reason: {}", dataError.getErrorReason());return;}// add task to listprofileTaskList.add(task);// schedule to start tasklong timeToProcessMills = task.getStartTime() - System.currentTimeMillis();PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS); }
- ProfileTaskExecutionService#processProfileTask新建ProfileThread线程丢入线程池中,得到其返回profilingFuture(方便后面关闭)
ProfileThread开始profiling
- ProfileThread线程循环处理ProfileTaskExecutionContext的profilingSegmentSlots(profilingSegmentSlots什么时候插入呢?–>下文有答案)
- 通过Thread#getStackTrace获取线程栈,将其转换为线程快照TracingThreadSnapshot
- 将线程快照TracingThreadSnapshot放入快照队列snapshotQueue中
private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException {// 监控间隔 ->10msint maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();// run loop when current thread still runninglong currentLoopStartTime = -1;// 循环while (!Thread.currentThread().isInterrupted()) {currentLoopStartTime = System.currentTimeMillis();// each all slot 采集插槽AtomicReferenceArray<ThreadProfiler> profilers = executionContext.threadProfilerSlots();int profilerCount = profilers.length();for (int slot = 0; slot < profilerCount; slot++) {ThreadProfiler currentProfiler = profilers.get(slot);if (currentProfiler == null) {continue;}switch (currentProfiler.profilingStatus().get()) {case PENDING:// check tracing context running timecurrentProfiler.startProfilingIfNeed();break;case PROFILING:// dump stackTracingThreadSnapshot snapshot = currentProfiler.buildSnapshot();if (snapshot != null) {profileTaskChannelService.addProfilingSnapshot(snapshot);} else {// tell execution context current tracing thread dump failed, stop itexecutionContext.stopTracingProfile(currentProfiler.tracingContext());}break;}}// sleep to next period// if out of period, sleep one periodlong needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis();needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;Thread.sleep(needToSleep);}}
profilingSegmentSlots什么时候插入呢?
- 在agent拦截入口方法前(譬如tomcat),初始化TracingContext会插入slot到profilingSegmentSlots(通过Thread.currentThread()获取线程栈信息)
public ProfileStatusReference attemptProfiling(TracingContext tracingContext,String traceSegmentId,String firstSpanOPName) {........final ThreadProfiler threadProfiler = new ThreadProfiler(tracingContext, traceSegmentId, Thread.currentThread(), this);int slotLength = profilingSegmentSlots.length();for (int slot = 0; slot < slotLength; slot++) {if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) {return threadProfiler.profilingStatus();}} }
- 在agent拦截入口方法后(譬如tomcat),将之前插入slot重置为null
agent将线程快照异步发送给Server端
- ProfileTaskChannelService在boot时会启动500ms的定时任务,从快照队列snapshotQueue取出快照放入缓存中,批量发送给server端
public void boot() {.......sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileSendSnapshotService")).scheduleWithFixedDelay(new RunnableWithExceptionProtection(() -> {List<TracingThreadSnapshot> buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);//从快照队列snapshotQueue取出快照snapshotQueue.drainTo(buffer);if (!buffer.isEmpty()) {sender.send(buffer);}},t -> LOGGER.error("Profile segment snapshot upload failure.", t)), 0, 500, TimeUnit.MILLISECONDS);........ }
- ProfileSnapshotSender#send将TracingThreadSnapshot转换为ThreadSnapshot发送给server
束语
通过本篇文章可以知道UI创建任务 --> agent获取任务 --> agent上报线程快照的整个流程,了解skywalking在其中使用大量的异步变成技巧,后续继续挖掘学习。
Skywalking系列学习之Trace Profiling源码分析相关推荐
- Skywalking光会用可不行,必须的源码分析分析 - Skywalking Agent 插件解析
3 Skywalking源码导入 接上文,已经学习了Skywalking的应用,接下来我们将剖析Skywalking源码,深度学习Skywalking Agent. 3.1 源码环境搭建 当前最新版本 ...
- 链路追踪 SkyWalking 源码分析 —— Agent 插件体系
点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 中文详细注释的开源项目 消息中间件 RocketMQ 源码解析 数据库中间件 ...
- Skywalking源码分析【agent探针篇】
Skywalking agent源码分析 字节码技术 入口方法 1.核心配置加载方式: 2.插件初始化: 3.插件(中间件or框架)的增强 增强点的寻找: 4.服务启动 5.插件体系 5.1.拦截实例 ...
- SkyWalking 源码分析 —— Collector Storage 存储组件
1. 概述 本文主要分享 SkyWalking Collector Storage 存储组件.顾名思义,负责将调用链路.应用.应用实例等等信息存储到存储器,例如,ES .H2 . 友情提示:建议先阅读 ...
- 【Android 性能优化】应用启动优化 ( 阶段总结 | Trace 文件分析及解决方案 | 源码分析梳理 | 设置主题的方案总结 ) ★
文章目录 一. 常用的耗时方法优化方案 ( 重要 ) 二. 源码分析梳理 1. 应用启动时间计算相关源码分析 2. Launcher 应用中启动 Android 应用流程 三. 启动白屏解决方案 An ...
- skywalking源码分析第十六篇一agent端JVMService之度量上报
文章目录 原理图 原理图一基于MXBean进行Metrics数据收集 源码分析一JVMService 总结 原理图 通过prepare构建Metrics存储缓冲队列 初始化grpc客户端 通过boot ...
- 深入理解GO语言:GC原理及源码分析
Go 中的runtime 类似 Java的虚拟机,它负责管理包括内存分配.垃圾回收.栈处理.goroutine.channel.切片(slice).map 和反射(reflection)等.Go 的可 ...
- Spring事务源码分析责任链事务链事务不生效
文章目录 前言 带着问题分析源码 事务源码分析 寻找Spring事务源码类 TransactionInterceptor调用栈 分析Spring AOP责任链 分析TransactionInterce ...
- 【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P)
本人的源码是基于go 1.9.7 版本的哦! 紧接着之前写的 [我的区块链之路]- golang源码分析之select的底层实现 和 [我的区块链之路]- golang源码分析之channel的底层实 ...
最新文章
- bash 脚本_Bash技巧:可以左右下移动和旋转俄罗斯方块的Shell脚本
- Vue中使用form表单提交刷新问题
- Spring Cloud 2021.0.1 发布
- matlab绘制频散曲线,Matlab绘制频散曲线程序代码.docx
- java进入编程界面_java – 编程到界面是什么意思?
- android R编译Super镜像时报错问题分析和定位
- php登陆+链接+验证,php+ajax验证登录跳转登录的实现方法
- python中下划线开头的命名_Python中 5 种不同的下划线含义你都知道吗?
- 腾讯会议共享屏幕,ppt如何使用演讲者模式
- matlab负反馈传函,已知负反馈系统开环传函求阶跃传函
- 机器学习sklearn(13)层次聚类
- 智慧屏如何连接电视盒子
- 介绍dbt,ETL和ELT Disrupter
- android n换行格式,Android 写文件生成器的时候换行请用\r\n
- 德国质量链接中国速度,奥迪一汽新能源汽车有限公司在电动化赛道上全速奔跑
- 有什么值得入手的蓝牙耳机品牌?2022年蓝牙耳机品牌排行榜
- 批处理批量替换文本内容,用bat代码全篇替换txt文本文件中指定字符信息
- 《做有质感的民族》方文山
- 新的一年,那些晴耕小筑要填的坑
- 《ASP.NET AJAX程序设计 第I卷 服务器端ASP.NET AJAX Extensions与ASP.NET AJAX Control Toolkit》目录(最终定稿)...