前言

在《在线代码级性能剖析,补全分布式追踪的最后一块“短板”》中有提到再复杂的业务逻辑,都是基于线程去进行执行,那skywalking怎样利用方法栈快照进行代码级性能剖析的,出于好奇心,一起来debug看看其中的奥妙

demo演示

  1. 打开skywalking UI,点击新建Trace Profiling任务
  2. 配置Trace Profiling任务
  3. 查看堆栈信息

源码分析

UI创建任务

  1. 接收页面请求,通过ProfileTaskMutationService#createTask将任务存入ES中,索引名为:profile_task-*(profile_task-20220807)

     public ProfileTaskCreationResult createTask(final String serviceId,final String endpointName,final long monitorStartTime,final int monitorDuration,final int minDurationThreshold,final int dumpPeriod,final int maxSamplingCount) throws IOException {// check datafinal String errorMessage = checkDataSuccess(serviceId, endpointName, taskStartTime, taskEndTime, monitorDuration, minDurationThreshold, dumpPeriod,maxSamplingCount);if (errorMessage != null) {return ProfileTaskCreationResult.builder().errorReason(errorMessage).build();}// create taskfinal long createTime = System.currentTimeMillis();final ProfileTaskRecord task = new ProfileTaskRecord();task.setServiceId(serviceId);task.setEndpointName(endpointName.trim());task.setStartTime(taskStartTime);task.setDuration(monitorDuration);task.setMinDurationThreshold(minDurationThreshold);task.setDumpPeriod(dumpPeriod);task.setCreateTime(createTime);task.setMaxSamplingCount(maxSamplingCount);task.setTimeBucket(TimeBucket.getMinuteTimeBucket(taskStartTime));NoneStreamProcessor.getInstance().in(task);return ProfileTaskCreationResult.builder().id(task.id()).build();}
    
  2. CacheUpdateTimer#updateProfileTask更新profileTask缓存:ProfileTaskCache$profileTaskDownstreamCache

agent发起ProfileTaskCommandQuery请求

  1. agent通过ProfileTaskChannelService发起ProfileTaskCommandQuery请求

    public void run() {if (status == GRPCChannelStatus.CONNECTED) {try {ProfileTaskCommandQuery.Builder builder = ProfileTaskCommandQuery.newBuilder();// sniffer infobuilder.setService(Config.Agent.SERVICE_NAME).setServiceInstance(Config.Agent.INSTANCE_NAME);// last command create timebuilder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime());// 发起ProfileTaskCommandQuery请求Commands commands = profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS).getProfileTaskCommands(builder.build());// 处理响应ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);} catch (Throwable t) {}}
    }
    
  2. 服务端通过ProfileTaskServiceHandler#getProfileTaskCommands接收ProfileTaskCommandQuery请求
     public void getProfileTaskCommands(ProfileTaskCommandQuery request, StreamObserver<Commands> responseObserver) {// query profile task list by service idfinal String serviceId = IDManager.ServiceID.buildId(request.getService(), true);final String serviceInstanceId = IDManager.ServiceInstanceID.buildId(serviceId, request.getServiceInstance());// 从缓存中取出对应服务的任务final List<ProfileTask> profileTaskList = profileTaskCache.getProfileTaskList(serviceId);if (CollectionUtils.isEmpty(profileTaskList)) {responseObserver.onNext(Commands.newBuilder().build());responseObserver.onCompleted();return;}// build command listfinal Commands.Builder commandsBuilder = Commands.newBuilder();final long lastCommandTime = request.getLastCommandTime();for (ProfileTask profileTask : profileTaskList) {// if command create time less than last command time, means sniffer already have taskif (profileTask.getCreateTime() <= lastCommandTime) {continue;}// record profile task log -->索引名为:sw_profile_task_log-20220808recordProfileTaskLog(profileTask, serviceInstanceId, ProfileTaskLogOperationType.NOTIFIED);// add command -->将ProfileTask转换为ProfileTaskCommand返回commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());}responseObserver.onNext(commandsBuilder.build());responseObserver.onCompleted();
    }
    
  3. agent通过CommandService#receiveCommand处理ProfileTaskCommand返回,放入阻塞队列commands中
    public void receiveCommand(Commands commands) {for (Command command : commands.getCommandsList()) {try {BaseCommand baseCommand = CommandDeserializer.deserialize(command);boolean success = this.commands.offer(baseCommand);} catch (UnsupportedCommandException e) {}}
    }
    

agent异步处理ProfileTaskCommand

  1. CommandService线程循环检测commands队列的任务,交给不同command执行器去执行对应的任务

    public void run() {final CommandExecutorService commandExecutorService = ServiceManager.INSTANCE.findService(CommandExecutorService.class);while (isRunning) {try {// 取出commands队列的任务BaseCommand command = commands.take();if (isCommandExecuted(command)) {continue;}commandExecutorService.execute(command);serialNumberCache.add(command.getSerialNumber());} catch (CommandExecutionException e) {LOGGER.error(e, "Failed to execute command[{}].", e.command().getCommand());} catch (Throwable e) {LOGGER.error(e, "There is unexpected exception");}}}
    
  2. ProfileTaskCommandExecutor#execute将ProfileTaskCommand转换为ProfileTask
  3. ProfileTaskExecutionService#addProfileTask处启动定时任务处理ProfileTask
    public void addProfileTask(ProfileTask task) {// update last command create timeif (task.getCreateTime() > lastCommandCreateTime) {lastCommandCreateTime = task.getCreateTime();}// check profile task limitfinal CheckResult dataError = checkProfileTaskSuccess(task);if (!dataError.isSuccess()) {LOGGER.warn("check command error, cannot process this profile task. reason: {}", dataError.getErrorReason());return;}// add task to listprofileTaskList.add(task);// schedule to start tasklong timeToProcessMills = task.getStartTime() - System.currentTimeMillis();PROFILE_TASK_SCHEDULE.schedule(() -> processProfileTask(task), timeToProcessMills, TimeUnit.MILLISECONDS);
    }
    
  4. ProfileTaskExecutionService#processProfileTask新建ProfileThread线程丢入线程池中,得到其返回profilingFuture(方便后面关闭)

ProfileThread开始profiling

  1. ProfileThread线程循环处理ProfileTaskExecutionContext的profilingSegmentSlots(profilingSegmentSlots什么时候插入呢?–>下文有答案)
  2. 通过Thread#getStackTrace获取线程栈,将其转换为线程快照TracingThreadSnapshot
  3. 将线程快照TracingThreadSnapshot放入快照队列snapshotQueue中
    private void profiling(ProfileTaskExecutionContext executionContext) throws InterruptedException {// 监控间隔 ->10msint maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();// run loop when current thread still runninglong currentLoopStartTime = -1;// 循环while (!Thread.currentThread().isInterrupted()) {currentLoopStartTime = System.currentTimeMillis();// each all slot 采集插槽AtomicReferenceArray<ThreadProfiler> profilers = executionContext.threadProfilerSlots();int profilerCount = profilers.length();for (int slot = 0; slot < profilerCount; slot++) {ThreadProfiler currentProfiler = profilers.get(slot);if (currentProfiler == null) {continue;}switch (currentProfiler.profilingStatus().get()) {case PENDING:// check tracing context running timecurrentProfiler.startProfilingIfNeed();break;case PROFILING:// dump stackTracingThreadSnapshot snapshot = currentProfiler.buildSnapshot();if (snapshot != null) {profileTaskChannelService.addProfilingSnapshot(snapshot);} else {// tell execution context current tracing thread dump failed, stop itexecutionContext.stopTracingProfile(currentProfiler.tracingContext());}break;}}// sleep to next period// if out of period, sleep one periodlong needToSleep = (currentLoopStartTime + maxSleepPeriod) - System.currentTimeMillis();needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;Thread.sleep(needToSleep);}}

profilingSegmentSlots什么时候插入呢?

  1. 在agent拦截入口方法前(譬如tomcat),初始化TracingContext会插入slot到profilingSegmentSlots(通过Thread.currentThread()获取线程栈信息)

     public ProfileStatusReference attemptProfiling(TracingContext tracingContext,String traceSegmentId,String firstSpanOPName) {........final ThreadProfiler threadProfiler = new ThreadProfiler(tracingContext, traceSegmentId, Thread.currentThread(), this);int slotLength = profilingSegmentSlots.length();for (int slot = 0; slot < slotLength; slot++) {if (profilingSegmentSlots.compareAndSet(slot, null, threadProfiler)) {return threadProfiler.profilingStatus();}}
    }
    
  2. 在agent拦截入口方法后(譬如tomcat),将之前插入slot重置为null

agent将线程快照异步发送给Server端

  1. ProfileTaskChannelService在boot时会启动500ms的定时任务,从快照队列snapshotQueue取出快照放入缓存中,批量发送给server端

    public void boot() {.......sendSnapshotFuture = Executors.newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("ProfileSendSnapshotService")).scheduleWithFixedDelay(new RunnableWithExceptionProtection(() -> {List<TracingThreadSnapshot> buffer = new ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);//从快照队列snapshotQueue取出快照snapshotQueue.drainTo(buffer);if (!buffer.isEmpty()) {sender.send(buffer);}},t -> LOGGER.error("Profile segment snapshot upload failure.", t)), 0, 500, TimeUnit.MILLISECONDS);........
    }
    
  2. ProfileSnapshotSender#send将TracingThreadSnapshot转换为ThreadSnapshot发送给server

束语

通过本篇文章可以知道UI创建任务 --> agent获取任务 --> agent上报线程快照的整个流程,了解skywalking在其中使用大量的异步变成技巧,后续继续挖掘学习。

Skywalking系列学习之Trace Profiling源码分析相关推荐

  1. Skywalking光会用可不行,必须的源码分析分析 - Skywalking Agent 插件解析

    3 Skywalking源码导入 接上文,已经学习了Skywalking的应用,接下来我们将剖析Skywalking源码,深度学习Skywalking Agent. 3.1 源码环境搭建 当前最新版本 ...

  2. 链路追踪 SkyWalking 源码分析 —— Agent 插件体系

    点击上方"芋道源码",选择"设为星标" 做积极的人,而不是积极废人! 源码精品专栏 中文详细注释的开源项目 消息中间件 RocketMQ 源码解析 数据库中间件 ...

  3. Skywalking源码分析【agent探针篇】

    Skywalking agent源码分析 字节码技术 入口方法 1.核心配置加载方式: 2.插件初始化: 3.插件(中间件or框架)的增强 增强点的寻找: 4.服务启动 5.插件体系 5.1.拦截实例 ...

  4. SkyWalking 源码分析 —— Collector Storage 存储组件

    1. 概述 本文主要分享 SkyWalking Collector Storage 存储组件.顾名思义,负责将调用链路.应用.应用实例等等信息存储到存储器,例如,ES .H2 . 友情提示:建议先阅读 ...

  5. 【Android 性能优化】应用启动优化 ( 阶段总结 | Trace 文件分析及解决方案 | 源码分析梳理 | 设置主题的方案总结 ) ★

    文章目录 一. 常用的耗时方法优化方案 ( 重要 ) 二. 源码分析梳理 1. 应用启动时间计算相关源码分析 2. Launcher 应用中启动 Android 应用流程 三. 启动白屏解决方案 An ...

  6. skywalking源码分析第十六篇一agent端JVMService之度量上报

    文章目录 原理图 原理图一基于MXBean进行Metrics数据收集 源码分析一JVMService 总结 原理图 通过prepare构建Metrics存储缓冲队列 初始化grpc客户端 通过boot ...

  7. 深入理解GO语言:GC原理及源码分析

    Go 中的runtime 类似 Java的虚拟机,它负责管理包括内存分配.垃圾回收.栈处理.goroutine.channel.切片(slice).map 和反射(reflection)等.Go 的可 ...

  8. Spring事务源码分析责任链事务链事务不生效

    文章目录 前言 带着问题分析源码 事务源码分析 寻找Spring事务源码类 TransactionInterceptor调用栈 分析Spring AOP责任链 分析TransactionInterce ...

  9. 【我的架构师之路】- golang源码分析之协程调度器底层实现( G、M、P)

    本人的源码是基于go 1.9.7 版本的哦! 紧接着之前写的 [我的区块链之路]- golang源码分析之select的底层实现 和 [我的区块链之路]- golang源码分析之channel的底层实 ...

最新文章

  1. bash 脚本_Bash技巧:可以左右下移动和旋转俄罗斯方块的Shell脚本
  2. Vue中使用form表单提交刷新问题
  3. Spring Cloud 2021.0.1 发布
  4. matlab绘制频散曲线,Matlab绘制频散曲线程序代码.docx
  5. java进入编程界面_java – 编程到界面是什么意思?
  6. android R编译Super镜像时报错问题分析和定位
  7. php登陆+链接+验证,php+ajax验证登录跳转登录的实现方法
  8. python中下划线开头的命名_Python中 5 种不同的下划线含义你都知道吗?
  9. 腾讯会议共享屏幕,ppt如何使用演讲者模式
  10. matlab负反馈传函,已知负反馈系统开环传函求阶跃传函
  11. 机器学习sklearn(13)层次聚类
  12. 智慧屏如何连接电视盒子
  13. 介绍dbt,ETL和ELT Disrupter
  14. android n换行格式,Android 写文件生成器的时候换行请用\r\n
  15. 德国质量链接中国速度,奥迪一汽新能源汽车有限公司在电动化赛道上全速奔跑
  16. 有什么值得入手的蓝牙耳机品牌?2022年蓝牙耳机品牌排行榜
  17. 批处理批量替换文本内容,用bat代码全篇替换txt文本文件中指定字符信息
  18. 《做有质感的民族》方文山
  19. 新的一年,那些晴耕小筑要填的坑
  20. 《ASP.NET AJAX程序设计 第I卷 服务器端ASP.NET AJAX Extensions与ASP.NET AJAX Control Toolkit》目录(最终定稿)...

热门文章

  1. 汇编系列02-借助操作系统输出Hello World
  2. web端数据实时更新是如何实现的?
  3. 解决navicat出现“rsa public key not find”的问题
  4. 数据中台建设方案-基于大数据平台
  5. 【Layui】表单验证
  6. STM32——汉字显示
  7. echarts 地图tooltip提示框超出浏览器窗口怎么隐藏?
  8. Grunt插件之liveReload--前端刷新神器,解放你的F5
  9. 常用金属材料 钢管材料
  10. reportlab教程2--中文的显示