Soul 网关源码阅读(六)Sofa请求处理概览
Soul 网关源码阅读(六)Sofa请求处理概览
简介
今天来探索一下Sofa请求处理流程,看看和前面的HTTP、Dubbo有什么异同
Sofa示例运行
PS:如果请求加上参数运行不成功,请更新最新版本,此问题在新版本中已经修复:https://blog.csdn.net/baidu_27627251/article/details/112726694
Add sofa param resolve service
首先运行下官方的Sofa示例,首先启动下mysql和zookeeper,这里使用docker启动:
docker run -dit --name zk -p 2181:2181 zookeepe
docker run --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=123456 -d mysql:latest
然后运行Soul-admin,Soul-Bootst,在管理图界面起用sofa插件
运行官方示例:soul-examples --> soul-examples-sofa
这里有个坑,需要注意,启动后,bootstrap打印日志中没有sofa插件,请求一直失败
o.d.s.w.configuration.SoulConfiguration : load plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[sign] [org.dromara.soul.plugin.sign.SignPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[resilience4j] [org.dromara.soul.plugin.resilience4j.Resilience4JPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[alibaba-dubbo-body-param] [org.dromara.soul.plugin.alibaba.dubbo.param.BodyParamPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[dubbo] [org.dromara.soul.plugin.alibaba.dubbo.AlibabaDubboPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.alibaba.dubbo.response.DubboResponsePlugin]
```xml    查看初始的plugins也是没有sofa```javapublic SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);final List<SoulPlugin> soulPlugins = pluginList.stream().sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));return new SoulWebHandler(soulPlugins);}
经过探索和老哥的讨论,发现是每天起用sofa的相关依赖
我们在Bootstrap的pom.xml文件中添加下面的依赖,然后重启
<!-- sofa plugin start--><dependency><groupId>com.alipay.sofa</groupId><artifactId>sofa-rpc-all</artifactId><version>5.7.6</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-client</artifactId><version>4.0.1</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-framework</artifactId><version>4.0.1</version></dependency><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>4.0.1</version></dependency><dependency><groupId>org.dromara</groupId><artifactId>soul-spring-boot-starter-plugin-sofa</artifactId><version>${project.version}</version></dependency><!-- sofa plugin end-->
然后查看日志打印,出现了sofa相关的插件
o.d.s.w.configuration.SoulConfiguration : load plugin:[global] [org.dromara.soul.plugin.global.GlobalPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[sign] [org.dromara.soul.plugin.sign.SignPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[waf] [org.dromara.soul.plugin.waf.WafPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[rate_limiter] [org.dromara.soul.plugin.ratelimiter.RateLimiterPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[hystrix] [org.dromara.soul.plugin.hystrix.HystrixPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[resilience4j] [org.dromara.soul.plugin.resilience4j.Resilience4JPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[webClient] [org.dromara.soul.plugin.httpclient.WebClientPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[divide] [org.dromara.soul.plugin.divide.websocket.WebSocketPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[sofa-body-param] [org.dromara.soul.plugin.sofa.param.BodyParamPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[dubbo] [org.dromara.soul.plugin.alibaba.dubbo.AlibabaDubboPlugin]
// 新出现的sofa相关的
o.d.s.w.configuration.SoulConfiguration : load plugin:[sofa] [org.dromara.soul.plugin.sofa.SofaPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[monitor] [org.dromara.soul.plugin.monitor.MonitorPlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.alibaba.dubbo.response.DubboResponsePlugin]
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.httpclient.response.WebClientResponsePlugin]
// 新出现的sofa相关的
o.d.s.w.configuration.SoulConfiguration : load plugin:[response] [org.dromara.soul.plugin.sofa.response.SofaResponsePlugin]
日志中还打印了成功加载sofa相关的metadata
o.d.s.p.s.cache.ApplicationConfigCache : init sofa reference success there meteData is :MetaData
o.d.s.p.s.cache.ApplicationConfigCache : init sofa reference success there meteData is :MetaData
o.d.s.p.s.cache.ApplicationConfigCache : init sofa reference success there meteData is :MetaData
访问链接: http://localhost:9195/sofa/findAll ,成功返回如下请求
{"code": 200,"message": "Access to success!","data": {"name": "hello world Soul Sofa , findAll","id": "998932133"}
}
源码解析
PS:Debug时间过程,会导致超时,这是正常的
首先找到我们非常熟悉的切入点函数: SoulWebHandler ,在下面的方法中打上断点,然后逐步进入每个plugin观察其行为
public Mono<Void> execute(final ServerWebExchange exchange) {return Mono.defer(() -> {if (this.index < plugins.size()) {SoulPlugin plugin = plugins.get(this.index++);Boolean skip = plugin.skip(exchange);if (skip) {return this.execute(exchange);}return plugin.execute(exchange, this);}return Mono.empty();});}
GlobalPlugin
进入其中,执行处理逻辑,通过上篇的分析,我们知道大致作用是将请求类型放入exchange中
SignPlugin/WafPlugin/RateLimiterPlugin/HystrixPlugin/Resilience4JPlugin
进入其中,但plugin没有起用,不执行逻辑
DividePlugin/WebClientPlugin/WebSocketPlugin
通过类型判断,跳过,不执行
BodyParamPlugin
这个plugin在dubbo的时候也是要执行,我们来看看它干了写啥事。从下面逻辑中大概能看出先判断是否符合执行条件,然后将请求地址替换成真实的后端地址
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {final ServerHttpRequest request = exchange.getRequest();final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);// 判断类型是不是sofaif (Objects.nonNull(soulContext) && RpcTypeEnum.SOFA.getName().equals(soulContext.getRpcType())) {MediaType mediaType = request.getHeaders().getContentType();ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);if (MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {return body(exchange, serverRequest, chain);}if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)) {return formData(exchange, serverRequest, chain);}// 进行路径替换,换成后端服务器的return query(exchange, serverRequest, chain);}return chain.execute(exchange);}private Mono<Void> query(final ServerWebExchange exchange, final ServerRequest serverRequest, final SoulPluginChain chain) {exchange.getAttributes().put(Constants.SOFA_PARAMS,HttpParamConverter.ofString(() -> serverRequest.uri().getQuery()));return chain.execute(exchange);}
这里有个非常有趣的现象,我们第四篇分析中,dubbo也走了一模一样的类,在上面函数逻辑中,我们看出它并不能兼容dubbo,那dubbo是如何走这个类的呢?
通过调试我们发现,当同时启动dubbo和sofa的时候,会生成两个BodyParamPlugin,名称是一模一样的,但里面的判断类型换了,很神奇,猜测这个类是动态生成之类的手段,这里先不探索了,可以后面研究研究
AlibabaDubboPlugin
通过类型判断,跳过
SofaPlugin
这个从名字就看出来是核心类,我们看看它具体干了啥。通过下面注释的地方,可以看出和dubbo请求的非常的相像。进行路由匹配,成功后rpc调用,获得结果后放入exchange中
# AbstractSoulPluginpublic Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {String pluginName = named();final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);if (pluginData != null && pluginData.getEnabled()) {final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);if (CollectionUtils.isEmpty(selectors)) {return handleSelectorIsNull(pluginName, exchange, chain);}final SelectorData selectorData = matchSelector(exchange, selectors);if (Objects.isNull(selectorData)) {return handleSelectorIsNull(pluginName, exchange, chain);}selectorLog(selectorData, pluginName);final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());if (CollectionUtils.isEmpty(rules)) {return handleRuleIsNull(pluginName, exchange, chain);}// 判断是否有路由规则能匹配上RuleData rule;if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {//get lastrule = rules.get(rules.size() - 1);} else {rule = matchRule(exchange, rules);}if (Objects.isNull(rule)) {return handleRuleIsNull(pluginName, exchange, chain);}ruleLog(rule, pluginName);// 匹配上后执行处理逻辑return doExecute(exchange, chain, selectorData, rule);}return chain.execute(exchange);}# SofaPluginprotected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {String body = exchange.getAttribute(Constants.SOFA_PARAMS);SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext != null;MetaData metaData = exchange.getAttribute(Constants.META_DATA);if (!checkMetaData(metaData)) {assert metaData != null;log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);Object error = SoulResultWrap.error(SoulResultEnum.SOFA_HAVE_BODY_PARAM.getCode(), SoulResultEnum.SOFA_HAVE_BODY_PARAM.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}// 这里得到结果,跟下去final Mono<Object> result = sofaProxyService.genericInvoker(body, metaData, exchange);return result.then(chain.execute(exchange));}# SofaProxyServicepublic Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {// 根据请求路径,获得rpc中的consumer ConsumerConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterfaceId())) {ApplicationConfigCache.getInstance().invalidate(metaData.getServiceName());reference = ApplicationConfigCache.getInstance().initRef(metaData);}GenericService genericService = reference.refer();Pair<String[], Object[]> pair;if (null == body || "".equals(body) || "{}".equals(body) || "null".equals(body)) {pair = new ImmutablePair<>(new String[]{}, new Object[]{});} else {pair = sofaParamResolveService.buildParameter(body, metaData.getParameterTypes());}CompletableFuture<Object> future = new CompletableFuture<>();RpcInvokeContext.getContext().setResponseCallback(new SofaResponseCallback<Object>() {@Overridepublic void onAppResponse(final Object o, final String s, final RequestBase requestBase) {future.complete(o);}@Overridepublic void onAppException(final Throwable throwable, final String s, final RequestBase requestBase) {future.completeExceptionally(throwable);}@Overridepublic void onSofaException(final SofaRpcException e, final String s, final RequestBase requestBase) {future.completeExceptionally(e);}});// 通过函数名,能猜到是rpc调用,然后得到结果,并将结果放入exchange中genericService.$invoke(metaData.getMethodName(), pair.getLeft(), pair.getRight());return Mono.fromFuture(future.thenApply(ret -> {if (Objects.isNull(ret)) {ret = Constants.SOFA_RPC_RESULT_EMPTY;}exchange.getAttributes().put(Constants.SOFA_RPC_RESULT, ret);exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());return ret;})).onErrorMap(SoulException::new);}
MonitorPlugin
不跳过,但插件没有开启
DubboResponsePlugin/WebClientResponsePlugin
通过类型判断,跳过执行
SofaResponsePlugin
通过上几篇分析和名字能猜出来是将响应返回给客户端的,通过下面代码的逻辑也可以看出
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {return chain.execute(exchange).then(Mono.defer(() -> {// 从exchange中获取结果final Object result = exchange.getAttribute(Constants.SOFA_RPC_RESULT);if (Objects.isNull(result)) {Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result));// 熟悉的返回响应的函数return WebFluxResultUtils.result(exchange, success);}));}
总结
上面的plugin流程大致如下:
- GlobalPlugin : 将请求类型置入
- SignPlugin : 跳过不执行逻辑
- WafPlugin : 跳过不执行逻辑
- RateLimiterPlugin : 跳过不执行逻辑
- HystrixPlugin : 跳过不执行逻辑
- Resilience4JPlugin : 跳过不执行逻辑
- DividePlugin : 跳过不执行逻辑
- WebClientPlugin : 跳过不执行逻辑
- WebSocketPlugin : 跳过不执行逻辑
- BodyParamPlugin : 执行RPC的请求路径替换,替换成真实的服务器后端路径,作用类似于dividePlugin;不同rpc有相关的这个插件名,也就是会有多个BodyParamPlugin
- AlibabaDubboPlugin : 跳过不执行逻辑
- SofaPlugin : 发送请求到后台服务器,拿到结果,写入exchange
- MonitorPlugin : 跳过不执行逻辑
- DubboResponsePlugin : 跳过不执行逻辑
- WebClientResponsePlugin : 跳过不执行逻辑
- SofaResponsePlugin : 从exchange中拿到响应,发送给客户端
经过这几篇的分析,我们进一步优化我们对Soul网关的请求流程,大致如下:
更新了我们对处理流程中一些类的认知:
- 通过上篇分析,得到GlobalPlugin的具体作用,是置入请求类型
- BodyParamPlugin 作用类似于 dividePlugin,能进行路由匹配,匹配后将路径修改真实的后端服务器路径;并且能动态的生成同名的但针对不同rpc实现的plugin
Soul网关源码分析文章列表
Github
- Soul 源码阅读(一) 概览
- Soul 源码阅读(二)代码初步运行
- Soul 源码阅读(三)HTTP请求处理概览
- Soul 网关源码阅读(四)Dubbo请求概览
- Soul网关源码阅读(五)请求类型探索
- Soul 网关源码阅读(六)Sofa请求处理概览
掘金
- Soul 网关源码阅读(一) 概览 #掘金文章# https://juejin.cn/post/6917864624423436296
- Soul 网关源码阅读(二)代码初步运行 #掘金文章# https://juejin.cn/post/6917865804121767944
- Soul 网关源码阅读(三)请求处理概览 #掘金文章# https://juejin.cn/post/6917866538712334343
- Soul 网关源码阅读(四)Dubbo请求概览 #掘金文章# https://juejin.cn/post/6917867369909977102
- Soul网关源码阅读(五)请求类型探索 #掘金文章# https://juejin.cn/post/6918575905962983438
Soul 网关源码阅读(六)Sofa请求处理概览相关推荐
- Soul 网关源码阅读(一) 概览
Soul 源码阅读(一) 概览 简介 阅读soul的官方文档,大致了解soul的功能和相关概念 心得 需要对网关的功能有个大致的了解,把soul官方文档读两遍(第一遍通读,能看懂多少是 ...
- Soul网关源码阅读(十)自定义简单插件编写
Soul网关源码阅读(十)自定义简单插件编写 简介 综合前面所分析的插件处理流程相关知识,此次我们来编写自定义的插件:统计请求在插件链中的经历时长 编写准备 首先我们先探究一下,一个P ...
- Soul网关源码阅读(九)插件配置加载初探
Soul网关源码阅读(九)插件配置加载初探 简介 今日来探索一下插件的初始化,及相关的配置的加载 源码Debug 插件初始化 首先来到我们非常熟悉的插件链调用的类: SoulWebHa ...
- Soul网关源码阅读(八)路由匹配初探
Soul网关源码阅读(八)路由匹配初探 简介 今日看看路由的匹配相关代码,查看HTTP的DividePlugin匹配 示例运行 使用HTTP的示例,运行Soul-Admin,Sou ...
- Soul网关源码阅读(七)限流插件初探
Soul网关源码阅读(七)限流插件初探 简介 前面的文章中对处理流程探索的差不多了,今天来探索下限流插件:resilience4j 示例运行 环境配置 启动下MySQL和redis d ...
- Soul网关源码阅读番外篇(一) HTTP参数请求错误
Soul网关源码阅读番外篇(一) HTTP参数请求错误 共同作者:石立 萧 * 简介 在Soul网关2.2.1版本源码阅读中,遇到了HTTP请求加上参数返回404的错误,此篇文章基于此进行探索 ...
- Soul网关源码阅读(六)请求类型探索
Soul网关源码阅读(六)请求类型探索 简介 在上几篇文章中分析了请求的处理流程,HTTP和RPC请求处理是互斥的,通过请求类型来判断,这篇文章来探索下请求类型的前世今生 源码分析 通 ...
- Soul 网关源码阅读(四)Dubbo请求概览
Soul 网关源码阅读(四)Dubbo请求概览 简介 本次启动一个dubbo服务示例,初步探索Soul网关源码的Dubbo请求处理流程 示例运行 环境配置 在Soul源码clone下来 ...
- Soul 网关源码阅读(二)代码初步运行
Soul 源码阅读(二)代码初步运行 简介 基于上篇:Soul 源码阅读(一) 概览,这部分跑一下Soul网关的示例 过程记录 现在我们可以根据地图,稍微探索一下周边,摸一摸 ...
最新文章
- Python 之 Matplotlib (一)基本用法
- 多进程/多线程同时向一个文件中写入日志如何避免冲突?
- linux centos 编译luabind-0.9.1 动态库 静态库
- java编程思想 学习笔记(2)
- mysql有dataguard吗_DataGuard部署
- .NET Core 给使用.NET的公司所带来的机遇
- 基于CNN的增量学习论文的读后感
- JAVA进阶开发之(数组练习题)
- 电信设置的nat 虚拟服务器192.168.1.3 是什么,VMware WorkStation的三种网络连接方式详解...
- 给窗口添加背景图的案例
- 在现有的Visual Studio 2017中设置和测试Python环境
- (转载)Linux启动过程详解
- JavaScript实现监听移动端上下左右滑动事件
- 执行计划:SET AUTOTRACE TRACEONLY
- a星算法的优缺点_轻松理解机器学习算法:C4.5算法
- 好东西再安利一遍!!
- asp.net门诊收费管理系统案例
- 目标检测之fasterRCNN:关于学习使用fasterRCNN做目标检测
- matlab费曼编码输入,多点格林函数数值积分(费曼参数积分)的程序分析及应用
- 【韩顺平utility工具类】