流量回放repeater的原理分析二:repeater源码分析
前言
在上文中我们分析了sandbox-jvm(以下简称sandbox)的核心源码,了解了sandbox实现类增强的原理。并且了解了sandbox的模块化加载能力,repeater作为一个独立的模块,其基于sandbox的能力实现了流量的录制和回放。下面我们从repeater的软件结构、录制过程、回放过程三个方面来分析其源码。
软件结构
代码角度
jvm-sandbox-repeater
hessian-lite: 序列化服务
repeater-aide: 辅助工具
repeater-client: spring ApplicationContext获取工具
repeater-console: 控制台能力
repeater-module: 可以被sandbox加载的模块,用于流量录制与回放
repeater-plugin-api:repeater可以实现和扩展的接口
repeater-plugin-core:repeater插件的核心能力和管理能力
repeater-plugins:repeater默认实现的插件
运行角度
repeater作为sandbox一个子模块加载,进行流量录制发送console,console存储流量并且,调用repeater将流量在目标jvm上回放
流量录制
插件的加载
代码结构中已经介绍到repeater-module负责流量的录制工作,我们从repeater-module模块中,RepeaterModule.class来进行分析。
RepeaterModule实现了ModuleLifecycle接口,在sandbox进行模块加载时,对于实现该接口的实例会一次执行其onLoad方法、onActive方法、onCompleted方法,一起来看下onCompleted方法:
- 在线程池中执行,配置拉取,pullConfig实际上就是调用console提供的接口能力
- 根据配置进行初始化
- 启动心跳检测
public void loadCompleted() {ExecutorInner.execute(new Runnable() {@Overridepublic void run() {configManager = StandaloneSwitch.instance().getConfigManager();broadcaster = StandaloneSwitch.instance().getBroadcaster();invocationListener = new DefaultInvocationListener(broadcaster);RepeaterResult<RepeaterConfig> pr = configManager.pullConfig();if (pr.isSuccess()) {log.info("pull repeater config success,config={}", pr.getData());ClassloaderBridge.init(loadedClassDataSource);initialize(pr.getData());}}});heartbeatHandler = new HeartbeatHandler(configInfo, moduleManager);heartbeatHandler.start();}
来看下初始化过程:
- 根据配置装载插件,注意invokePlugin.watch的实现,这里最终就是调用了BuildingForBehavior.onWatch,如果看过上一篇文章就知道,sandbox就是在这个阶段实现类增强
String pluginsPath;
if (StringUtils.isEmpty(config.getPluginsPath())) {pluginsPath = PathUtils.getPluginPath();
} else {pluginsPath = config.getPluginsPath();
}
lifecycleManager = new JarFileLifeCycleManager(pluginsPath, routingArray);
// 装载插件
invokePlugins = lifecycleManager.loadInvokePlugins();
for (InvokePlugin invokePlugin : invokePlugins) {try {if (invokePlugin.enable(config)) {log.info("enable plugin {} success", invokePlugin.identity());invokePlugin.watch(eventWatcher, invocationListener);invokePlugin.onConfigChange(config);}} catch (PluginLifeCycleException e) {log.info("watch plugin occurred error", e);}
}
- 装载回放器,通过spi的方式加载插件中定义的回放器,并且设置结果发送服务器(可以是本地存储,也可以发送的console)
List<Repeater> repeaters = lifecycleManager.loadRepeaters();
for (Repeater repeater : repeaters) {if (repeater.enable(config)) {repeater.setBroadcast(broadcaster);}
}
- 消息订阅器注册
List<SubscribeSupporter> subscribes = lifecycleManager.loadSubscribes();
for (SubscribeSupporter subscribe : subscribes) {subscribe.register();}
- 线程池增强
TtlConcurrentAdvice.watcher(eventWatcher).watch(config);
插件的实现
所有的插件其都是按照sandbox规范实现类类增强,插件中主要由2类,入口插件和子调用插件。入口插件主要用于记录入口调用信息,包括request+response+入口信息,子调用插件主要用于记录子调用信息,包括子调用参数和响应。
一起来看下2种插件如何实现:
入口调用插件
repeater实现了3种入口调用插件:dubbo/http/java,以dubbo插件来看,它是如何实现dubbo入口调用的信息记录和录制。
先看下InvokePlugin的相关实现类,dubbo有2个实现:
- DubboProviderPlugin
- DubboConsumerPlugin
如何确定谁是入口调用插件实现呢?
看下isEntrance的实现,可以发现DubboProviderPlugin是返回的true。
public boolean isEntrance() {return true;}
入口调用定义在getEnhanceModels方法中
protected List<EnhanceModel> getEnhanceModels() {EnhanceModel onResponse = EnhanceModel.builder().classPattern("org.apache.dubbo.rpc.filter.ContextFilter$ContextListener").methodPatterns(EnhanceModel.MethodPattern.transform("onResponse")).watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS).build();EnhanceModel invoke = EnhanceModel.builder().classPattern("org.apache.dubbo.rpc.filter.ContextFilter").methodPatterns(EnhanceModel.MethodPattern.transform("invoke")).watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS).build();return Lists.newArrayList(invoke, onResponse);}
增强的能力在getEventListener,其能力就是在执行到入口代码前记录调用的参数,执行到入口返回代码时将response序列化并且发送(可以是本地存储也可以发送控制台)
protected EventListener getEventListener(InvocationListener listener) {return new DubboEventListener(getType(), isEntrance(), listener, getInvocationProcessor());}
事件监听器中还用到处理器,在getInvocationProcessor方法中。
到这里我们发现如果自定义一种入口调用插件,需要实现以下几个方法:
- isEntrance: 是否是入口调用插件
- getEnhanceModels:增强的切面
- getEventListener:增强的事件监听器
- getInvocationProcessor:响应事件的处理器
子调用插件
repeater提供了一些列插件:
- DubboConsumerPlugin:dubbo子调用
- HibernatePlugin:hibernate子调用
- IBatisPlugin:ibatis子调用
- MybatisPlugin:mybatis子调用
- JavaSubInvokePlugin:java方法子调用
- RedisPlugin: redis子调用
- SocketIOPlugin:socketio子调用
- SpringDataJpaPlugin:jpa子调用
以DubboConsumerPlugin为例来看实现一个子调用插件需要做哪些事情:
首先标记自身的调用类型
public boolean isEntrance() {return false;}
定义增强的切面
protected List<EnhanceModel> getEnhanceModels() {EnhanceModel onResponse = EnhanceModel.builder().classPattern("org.apache.dubbo.rpc.filter.ConsumerContextFilter$ConsumerContextListener").methodPatterns(EnhanceModel.MethodPattern.transform("onResponse")).watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS).build();EnhanceModel invoke = EnhanceModel.builder().classPattern("org.apache.dubbo.rpc.filter.ConsumerContextFilter").methodPatterns(EnhanceModel.MethodPattern.transform("invoke")).watchTypes(Event.Type.BEFORE, Event.Type.RETURN, Event.Type.THROWS).build();return Lists.newArrayList(onResponse, invoke);}
定义事件监听器
protected EventListener getEventListener(InvocationListener listener) {return new DubboEventListener(getType(), isEntrance(), listener, getInvocationProcessor());}
定义事件处理器
protected EventListener getEventListener(InvocationListener listener) {return new DubboEventListener(getType(), isEntrance(), listener, getInvocationProcessor());}
监听器和处理器主要将子调用的结果进行缓存,入口调用上报时会将缓存子调用一起上报。
到这里我们已经明白了repeater通过加载不同的插件来实现入口流量的记录和子调用的记录,统一上报到本地或者远端控制台。接下来让我们来看看回放是如何实现的。
回放
回放器类型
repeater提供了3种回放器(回放器就是能够用于发起相对应协议调用的)。
- http回放器:以http发起调用
- dubbo回放器:通过泛化调用请求dubbo服务
- java回放器:实际上是基于spring的能力,通过从spring容器中获得对应bean来发起调用
整体过程
先看下RepeaterModule类中定义的相关接口,了解sandbox能力后我们就知道,用Command注解的方法对外提供了http协议调用,RepeaterModule中定义了一些几个接口(也就是对外提供几个接口的http调用)
- repeat:回放调用
- reload:重新加载repeater
- repeatWithJson:回放调用,请求参数json格式
- pushConfig: 配置动态生效
看repeat的实现:
public void repeat(final Map<String, String> req, final PrintWriter writer) {try {String data = req.get(Constants.DATA_TRANSPORT_IDENTIFY);if (StringUtils.isEmpty(data)) {writer.write("invalid request, cause parameter {" + Constants.DATA_TRANSPORT_IDENTIFY + "} is required");return;}RepeatEvent event = new RepeatEvent();Map<String, String> requestParams = new HashMap<String, String>(16);for (Map.Entry<String, String> entry : req.entrySet()) {requestParams.put(entry.getKey(), entry.getValue());}event.setRequestParams(requestParams);EventBusInner.post(event);writer.write("submit success");} catch (Throwable e) {writer.write(e.getMessage());}}
根据请求中的参数,构造event对象,并且通过事件中心进行发布,这里借助了guava的事件中心能力
看下事件订阅者RepeatSubscribeSupporter的处理onSubscribe方法
DefaultFlowDispatcher.instance().dispatch(meta, pr.getData());
在这里将流量进行分发,分发的逻辑在DefaultFlowDispatcher::dispatch中
Repeater repeater = RepeaterBridge.instance().select(recordModel.getEntranceInvocation().getType());if (repeater == null) {throw new RepeatException("no valid repeat found for invoke type:" + recordModel.getEntranceInvocation().getType());}RepeatContext context = new RepeatContext(meta, recordModel, TraceGenerator.generate());// 放置到回放缓存中RepeatCache.putRepeatContext(context);repeater.repeat(context);
根据入口调用类型选择对应的回放器,进行回放,回放结果跟录制类似,可以保存在本地和发送控制台。
总结
到这里我们看了repeater的代码结构、录制原理、回放原理。但是repeater中很多底层的技术其实还是没有讲清楚,后续空闲的时间会进行其中字节码技术、事件机制等等做分析。
流量回放repeater的原理分析二:repeater源码分析相关推荐
- 字节跳动Android三面视频解析:framework+MVP架构+HashMap原理+性能优化+Flutter+源码分析等
前言 对于字节跳动的二面三面而言,Framework+MVP架构+HashMap原理+性能优化+Flutter+源码分析等问题都成高频问点!然而很多的朋友在面试时却答不上或者答不全!今天在这分享下这些 ...
- AFL二三事——源码分析
AFL二三事--源码分析 前言 AFL,全称"American Fuzzy Lop",是由安全研究员Michal Zalewski开发的一款基于覆盖引导(Coverage-guid ...
- Dubbo系列(二)源码分析之SPI机制
Dubbo系列(二)源码分析之SPI机制 在阅读Dubbo源码时,常常看到 ExtensionLoader.getExtensionLoader(*.class).getAdaptiveExtensi ...
- Spring源码分析之AOP源码分析
文章目录 前言 一.AOP回顾 二.源码分析 EnableAspectJAutoProxy注解 AnnotationAwareAspectJAutoProxyCreator 前言 Spring框架的两 ...
- 【Android 性能优化】应用启动优化 ( 阶段总结 | Trace 文件分析及解决方案 | 源码分析梳理 | 设置主题的方案总结 ) ★
文章目录 一. 常用的耗时方法优化方案 ( 重要 ) 二. 源码分析梳理 1. 应用启动时间计算相关源码分析 2. Launcher 应用中启动 Android 应用流程 三. 启动白屏解决方案 An ...
- spring源码分析第一天------源码分析知识储备
spring源码分析第一天------源码分析知识储备 Spring源码分析怎么学? 1.环境准备: 2.思路 看:是什么? 能干啥 想:为什么? 实践:怎么做? ...
- k8s client-go源码分析 informer源码分析(3)-Reflector源码分析
k8s client-go源码分析 informer源码分析(3)-Reflector源码分析 1.Reflector概述 Reflector从kube-apiserver中list&watc ...
- JDK动态代理实现原理详解(源码分析)
无论是静态代理,还是Cglib动态代理,都比较容易理解,本文就通过进入源码的方式来看看JDK动态代理的实现原理进行分析 要了解动态代理的可以参考另一篇文章,有详细介绍,这里仅仅对JDK动态代理做源码分 ...
- SNMP功能开发简介 二 net-snmp源码分析报文处理流程图
最近在开发snmp功能,核心实现是基于net-snmp,将net-snmp的代理基本功能移植到自己的程序中去,因为需要修改一些定制化的内容,所以需要对net-snmp的流程有所了解,网上这方面的资料比 ...
- dnsspoof工作原理、编译、源码分析
dnsspoof 是一个DNS欺骗工具,只要给出将要重定向的域名和域名重定向到的IP,就可以实现DNS欺骗. 下载地址:http://monkey.org/~dugsong/dsniff/ dnssp ...
最新文章
- WebStorm中不小心勾选了不再显示更新项目的提示弹窗,如何重新显示版本控制(VCS)的更新项目Update Project(同步项目)提示弹窗?
- python 编程该看那些书籍_我用python5年后,我发现学python编程必看的三本书!
- linux i2c adapter 增加设备_LINUX设备驱动模型分析之四 设备模块相关(DEVICE)接口分析...
- Reporting Services 的伸缩性和性能表现规划(转载)
- 怎么增加服务器容量,新睿云服务器硬盘容量怎么增加?
- matlab 柱状图_MATLAB作图实例:24:条形图
- python 正则表达式 分组_正则表达式之分组的用法
- 在CentOS 8上安装与配置Apache虚拟主机
- hdu1247 Hat’s Words
- Flask组件之Flask-SQLAlchemy
- 稳压二极管真的可以稳压吗?
- 自写程序的打包成软件
- windows环境下curl安装
- php注册后面有对号错号,jQuery Ajax显示对号和错号用于验证输入验证码是否正确...
- 网易评论盖楼的数据结构
- Firefox检测到潜在的安全威胁,并因blog.csdn.net要求安全连接而没有继续
- 希尔伯特空间巴拿赫空间空间上的翻译
- c语言拍皮球100,童话故事——拍皮球
- 医学人工智能读书会与黄智生教授简历(公号回复“医学AI读书会”下载PDF资料,欢迎转发、赞赏、支持科普)
- 我的markdown