一、示例

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>com.simplemall.micro.serv.base</groupId><artifactId>base-serv</artifactId><version>0.0.1-SNAPSHOT</version></parent><groupId>com.simplemall.micro.serv.base.zuul</groupId><artifactId>zuul-server</artifactId><name>zuulServer</name><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>io.projectreactor.netty</groupId><artifactId>reactor-netty</artifactId><version>0.9.14.RELEASE</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.7</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId><version>2.2.6.RELEASE</version></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId><version>2.2.6.RELEASE</version></dependency><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-gateway</artifactId><exclusions><exclusion><artifactId>reactor-netty</artifactId><groupId>io.projectreactor.netty</groupId></exclusion></exclusions><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-reactor-spring-boot-starter</artifactId><version>1.28.0</version></dependency><!-- Sa-Token 整合 Redis (使用jackson序列化方式) --><dependency><groupId>cn.dev33</groupId><artifactId>sa-token-dao-redis-jackson</artifactId><version>1.28.0</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build>
</project>

application.yml

server:port: 9005#eureka:
#  client:
#    service-url:
#      defaultZone: http://127.0.0.1:9003/eureka/sa-token:token-name: authorizationis-concurrent: trueis-share: truetimeout: 3600is-log: true#spring.redis.database=0
#spring.redis.host=127.0.0.1
#spring.redis.port=6379
#spring.redis.timeout=10sspring:application:name: gateway-server
#  redis:
#    database: 0
#    host: 127.0.0.1
#    port: 6379
#    timeout: 10s
#    lettuce:
#      pool:
#        max-active: 10
#        max-wait: 200
#        min-idle: 0
#        max-idle: 8#  sleuth:
#    sampler:
#      probability: 1.0zipkin:base-url: http://127.0.0.1:9411cloud:nacos:config:server-addr: 127.0.0.1:8848discovery:server-addr: 127.0.0.1:8848gateway:enabled: truehttpclient:connect-timeout: 1000response-timeout: 3sdiscovery:locator:enabled: trueroutes:- id: front-apiuri: lb://front-app1predicates:- Path=/app/**filters:- StripPrefix=1- AddResponseHeader=X-Response-Default-Foo, Default-Bar- id: nacosdayuri: lb://nacosdaypredicates:- Path=/nacosday/**filters:- StripPrefix=1- AddResponseHeader=X-Response-Default-Foo, Default-Bar

saTokenFilter

package com.simplemall.micro.serv.zuul.filter;import cn.dev33.satoken.exception.BackResultException;
import cn.dev33.satoken.exception.SaTokenException;
import cn.dev33.satoken.exception.StopMatchException;
import cn.dev33.satoken.filter.SaFilterAuthStrategy;
import cn.dev33.satoken.filter.SaFilterErrorStrategy;
import cn.dev33.satoken.reactor.context.SaReactorHolder;
import cn.dev33.satoken.reactor.context.SaReactorSyncHolder;
import cn.dev33.satoken.router.SaRouter;
import cn.dev33.satoken.util.SaTokenConsts;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;//import javax.servlet.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;@Order(SaTokenConsts.ASSEMBLY_ORDER)
public class SaReactorFilter implements GlobalFilter {// ------------------------ 设置此过滤器 拦截 & 放行 的路由 /*** 拦截路由 */private List<String> includeList = new ArrayList<>();/*** 放行路由 */private List<String> excludeList = new ArrayList<>();/*** 添加 [拦截路由] * @param paths 路由* @return 对象自身*/public SaReactorFilter addInclude(String... paths) {includeList.addAll(Arrays.asList(paths));return this;}/*** 添加 [放行路由]* @param paths 路由* @return 对象自身*/public SaReactorFilter addExclude(String... paths) {excludeList.addAll(Arrays.asList(paths));return this;}/*** 写入 [拦截路由] 集合* @param pathList 路由集合 * @return 对象自身*/public SaReactorFilter setIncludeList(List<String> pathList) {includeList = pathList;return this;}/*** 写入 [放行路由] 集合* @param pathList 路由集合 * @return 对象自身*/public SaReactorFilter setExcludeList(List<String> pathList) {excludeList = pathList;return this;}/*** 获取 [拦截路由] 集合* @return see note */public List<String> getIncludeList() {return includeList;}/*** 获取 [放行路由] 集合* @return see note */public List<String> getExcludeList() {return excludeList;}// ------------------------ 钩子函数/*** 认证函数:每次请求执行 */public SaFilterAuthStrategy auth = r -> {};/*** 异常处理函数:每次[认证函数]发生异常时执行此函数*/public SaFilterErrorStrategy error = e -> {throw new SaTokenException(e);};/*** 前置函数:在每次[认证函数]之前执行 */public SaFilterAuthStrategy beforeAuth = r -> {};/*** 写入[认证函数]: 每次请求执行 * @param auth see note * @return 对象自身*/public SaReactorFilter setAuth(SaFilterAuthStrategy auth) {this.auth = auth;return this;}/*** 写入[异常处理函数]:每次[认证函数]发生异常时执行此函数 * @param error see note * @return 对象自身*/public SaReactorFilter setError(SaFilterErrorStrategy error) {this.error = error;return this;}/*** 写入[前置函数]:在每次[认证函数]之前执行* @param beforeAuth see note * @return 对象自身*/public SaReactorFilter setBeforeAuth(SaFilterAuthStrategy beforeAuth) {this.beforeAuth = beforeAuth;return this;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//        try {
//            // 执行全局过滤器
//            SaRouter.match(includeList).notMatch(excludeList).check(r -> {
//                beforeAuth.run(null);
//                auth.run(null);
//            });
//
//        } catch (StopMatchException e) {
//
//        } catch (Throwable e) {
//            e.printStackTrace();
//            // 1. 获取异常处理策略结果
//            String result = (e instanceof BackResultException) ? e.getMessage() : String.valueOf(error.run(e));
//            return ErrorConvertor.errorResp(exchange.getResponse(), 500,result);
//        }
//
//        return chain.filter(exchange);// 写入WebFilterChain对象exchange.getAttributes().put(SaReactorHolder.CHAIN_KEY, chain);// ---------- 全局认证处理try {// 写入全局上下文 (同步)SaReactorSyncHolder.setContext(exchange);// 执行全局过滤器SaRouter.match(includeList).notMatch(excludeList).check(r -> {beforeAuth.run(null);auth.run(null);});} catch (StopMatchException e) {} catch (Throwable e) {e.printStackTrace();// 1. 获取异常处理策略结果String result = (e instanceof BackResultException) ? e.getMessage() : String.valueOf(error.run(e));// 2. 写入输出流if(exchange.getResponse().getHeaders().getFirst("Content-Type") == null) {exchange.getResponse().getHeaders().set("Content-Type", "text/plain; charset=utf-8");}return exchange.getResponse().writeWith(Mono.just(exchange.getResponse().bufferFactory().wrap(result.getBytes())));} finally {// 清除上下文SaReactorSyncHolder.clearContext();}// ---------- 执行// 写入全局上下文 (同步)SaReactorSyncHolder.setContext(exchange);// 执行return chain.filter(exchange).subscriberContext(ctx -> {// 写入全局上下文 (异步)ctx = ctx.put(SaReactorHolder.CONTEXT_KEY, exchange);return ctx;}).doFinally(r -> {// 清除上下文SaReactorSyncHolder.clearContext();});}}
SaTokenFilterConfig
package com.simplemall.micro.serv.zuul.config;import cn.dev33.satoken.context.SaHolder;
import cn.dev33.satoken.router.SaRouter;
import cn.dev33.satoken.stp.StpUtil;
import com.simplemall.micro.serv.zuul.filter.SaReactorFilter;
import com.simplemall.micro.serv.zuul.utils.AjaxJson;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class SaTokenFilterConfig {@Beanpublic SaReactorFilter createServletFilter(){return new SaReactorFilter().addInclude("/**").addExclude("/favicon.ico").setAuth(r -> {System.out.println("---------- 进入Sa-Token全局认证 -----------");SaRouter.match("/**","/tb-acc/get", r2 -> StpUtil.checkLogin());SaRouter.match("/tb-acc/getLoginInfo",r2 -> StpUtil.checkPermission("user-add"));}).setError(e -> {System.out.println("---------- 进入Sa-Token异常处理 -----------");return AjaxJson.getError(e.getMessage());}).setBeforeAuth(r -> {SaHolder.getResponse()// 服务器名称.setServer("sa-server")// 是否可以在iframe显示视图: DENY=不可以 | SAMEORIGIN=同域下可以 | ALLOW-FROM uri=指定域名下可以.setHeader("X-Frame-Options", "SAMEORIGIN")// 是否启用浏览器默认XSS防护: 0=禁用 | 1=启用 | 1; mode=block 启用, 并在检查到XSS攻击时,停止渲染页面.setHeader("X-XSS-Protection", "1; mode=block")// 禁用浏览器内容嗅探.setHeader("X-Content-Type-Options", "nosniff");});}
}

二、测试效果

1.在网关下启动nacosday这个微服务。

2.通过网关访问nacosday微服务。

http://localhost:9005/nacosday/tb-acc/get?id=4c5a2c38-2fc0-4977-a577-1625066ad8d3&device=pcm 网关日志

nacosday日志

3. 看到已经通过网关访问到服务层。

三、访问流程分析

1.总体流程

二、我们调试接口,跟踪代码。

1.看到总体流程为  netty.transport->reactor.netty->spring.web

HttpServerHandle
public void onStateChange(Connection connection, State newState) {if (newState == HttpServerState.REQUEST_RECEIVED) {try {if (log.isDebugEnabled()) {log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);}HttpServerOperations ops = (HttpServerOperations) connection;Mono.fromDirect(handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());}catch (Throwable t) {log.error(format(connection.channel(), ""), t);//"FutureReturnValueIgnored" this is deliberateconnection.channel().close();}}}

2.这里面重点为Mono.fromDirect(handler.apply(ops, ops)).subscribe(ops.disposeSubscriber());

这行代码包含了整个的请示路由,处理,是采用的webflux的响应式编程,由handler.apply(ops, ops)创建MONO的流程编排对象,然后subscribe来订阅此流程,才会真正触发流程的执行。

3.这里面进行响应式httpHandler适配器的应用,生成一个Mono流程对象。

ReactorHttpHandlerAdapter@Overridepublic Mono<Void> apply(HttpServerRequest reactorRequest, HttpServerResponse reactorResponse) {NettyDataBufferFactory bufferFactory = new NettyDataBufferFactory(reactorResponse.alloc());try {ReactorServerHttpRequest request = new ReactorServerHttpRequest(reactorRequest, bufferFactory);ServerHttpResponse response = new ReactorServerHttpResponse(reactorResponse, bufferFactory);if (request.getMethod() == HttpMethod.HEAD) {response = new HttpHeadResponseDecorator(response);}return this.httpHandler.handle(request, response).doOnError(ex -> logger.trace(request.getLogPrefix() + "Failed to complete: " + ex.getMessage())).doOnSuccess(aVoid -> logger.trace(request.getLogPrefix() + "Handling completed"));}catch (URISyntaxException ex) {if (logger.isDebugEnabled()) {logger.debug("Failed to get request URI: " + ex.getMessage());}reactorResponse.status(HttpResponseStatus.BAD_REQUEST);return Mono.empty();}}

4. 这里的httpHandler为WebServerManager.DelayedInitializationHttpHandler

5.接着这个代理delegate对象指向HttpWebHandlerAdapter

6.可以看到就是构造一个exchange对象,然后再继承又是代理处理。

这里面成功附加打印一个日志,失败时,进行出错显示统一处理,最后调用response设置完成。

 HttpWebHandlerAdapter
@Overridepublic Mono<Void> handle(ServerHttpRequest request, ServerHttpResponse response) {if (this.forwardedHeaderTransformer != null) {request = this.forwardedHeaderTransformer.apply(request);}ServerWebExchange exchange = createExchange(request, response);return getDelegate().handle(exchange).doOnSuccess(aVoid -> logResponse(exchange)).onErrorResume(ex -> handleUnresolvedError(exchange, ex)).then(Mono.defer(response::setComplete));}

7. 接下来的代理对象为ExceptionHandlingWebHandler,这个就是调用基类的处理函数,然后为这个Mono流程对象加上各种出错的异常处理器。

ExceptionHandlingWebHandler
@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {Mono<Void> completion;try {completion = super.handle(exchange);}catch (Throwable ex) {completion = Mono.error(ex);}for (WebExceptionHandler handler : this.exceptionHandlers) {completion = completion.onErrorResume(ex -> handler.handle(exchange, ex));}return completion;}

8. WebHandlerDecorator的代理对象为FilteringWebHandler

9.filterWebHandler就是调用chain的责任链模式调用。

FilteringWebHandler@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {return this.chain.filter(exchange);}

10.DefaultWebFilterChain.filter为生成一个懒加载的MONO对象。

DefaultWebFilterChain
@Overridepublic Mono<Void> filter(ServerWebExchange exchange) {return Mono.defer(() ->this.currentFilter != null && this.chain != null ?invokeFilter(this.currentFilter, this.chain, exchange) :this.handler.handle(exchange));}

11.接着又回到第7步,我们看到前面的第10步并没有执行,必须要等待流程订阅时才执行。

12.然后又回到第一步,真正开始订阅流程,执行流程。

13.这里的ops就是HttpServerOperations,这个类实现了mono的自定义订阅类。

Mono的订阅方法为public abstract void subscribe(CoreSubscriber<? super T> actual);

从上面可以看到ChannelOperations实现了CoreSubscriber接口,这里在 onSubscribe请求拉模式,拉取了 全量消息数据。

ChannelOperations@Overridepublic final void onNext(Void aVoid) {}@Overridepublic final void onSubscribe(Subscription s) {if (Operators.setOnce(OUTBOUND_CLOSE, this, s)) {s.request(Long.MAX_VALUE);}}

三、发送请求流程

1.现在我们开始真正触发流程。首先执行第二部分第10步,DefaultWebFilterChain.filter为生成一个懒加载的MONO对象。

2.首先经过WeightCalculatorWebFilter

WeightCalculatorWebFilter@Overridepublic Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {Map<String, String> weights = getWeights(exchange);for (String group : groupWeights.keySet()) {GroupWeightConfig config = groupWeights.get(group);if (config == null) {if (log.isDebugEnabled()) {log.debug("No GroupWeightConfig found for group: " + group);}continue; // nothing we can do, but this is odd}double r = this.random.nextDouble();List<Double> ranges = config.ranges;if (log.isTraceEnabled()) {log.trace("Weight for group: " + group + ", ranges: " + ranges + ", r: "+ r);}for (int i = 0; i < ranges.size() - 1; i++) {if (r >= ranges.get(i) && r < ranges.get(i + 1)) {String routeId = config.rangeIndexes.get(i);weights.put(group, routeId);break;}}}return chain.filter(exchange);}

3.执行完上面的权重filter之后,又回DefaultWebFilterChain.filter的循环调用。

因为filter执行完了,执行handler.handle方法。

4.上面的handler为DispatcherHandler,这个类的handle方法,

4.1  这里面首先要对系统中所有内置的mappingHandle进行映射匹配,找到所有合适的handler

4.2 .next方法为取第一个handler

4.3  如果为空,则返回一个Mono.error

4.4 循环调用handler,返回结果

4.5 循环处理结果。

DispatcherHandler@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {if (this.handlerMappings == null) {return createNotFoundError();}return Flux.fromIterable(this.handlerMappings).concatMap(mapping -> mapping.getHandler(exchange)).next().switchIfEmpty(createNotFoundError()).flatMap(handler -> invokeHandler(exchange, handler)).flatMap(result -> handleResult(exchange, result));}

HandlerMapping接口主要为根据exchange找到一个可用的Mono的handler对象。
Mono<Object> getHandler(ServerWebExchange exchange);

5.RoutePredicateHandlerMapping负责路由查找,并更具路由断言判断路由是否可用

这里首先查找路由表,找到匹配的路由,然后将此路由设置到exchange的属性中,接着返回

FilteringWebHandler的MONO包装。
RoutePredicateHandlerMapping
@Overrideprotected Mono<?> getHandlerInternal(ServerWebExchange exchange) {// don't handle requests on management port if set and different than server portif (this.managementPortType == DIFFERENT && this.managementPort != null&& exchange.getRequest().getURI().getPort() == this.managementPort) {return Mono.empty();}exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());return lookupRoute(exchange)// .log("route-predicate-handler-mapping", Level.FINER) //name this.flatMap((Function<Route, Mono<?>>) r -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isDebugEnabled()) {logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);}exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);return Mono.just(webHandler);}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isTraceEnabled()) {logger.trace("No RouteDefinition found for ["+ getExchangeDesc(exchange) + "]");}})));}

6.首先查找路由,我们来看下路由表信息

RoutePredicateHandlerMappingprotected Mono<Route> lookupRoute(ServerWebExchange exchange) {return this.routeLocator.getRoutes()// individually filter routes so that filterWhen error delaying is not a// problem.concatMap(route -> Mono.just(route).filterWhen(r -> {// add the current route we are testingexchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());return r.getPredicate().apply(exchange);})// instead of immediately stopping main flux due to error, log and// swallow it.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(),e)).onErrorResume(e -> Mono.empty()))// .defaultIfEmpty() put a static Route not found// or .switchIfEmpty()// .switchIfEmpty(Mono.<Route>empty().log("noroute")).next()// TODO: error handling.map(route -> {if (logger.isDebugEnabled()) {logger.debug("Route matched: " + route.getId());}validateRoute(route, exchange);return route;});/** TODO: trace logging if (logger.isTraceEnabled()) {* logger.trace("RouteDefinition did not match: " + routeDefinition.getId()); }*/}

然后通过filterWhen逐个路由进行匹配。匹配成功后返回了此路由

7.最终的handler返回的是一个 FilteringWebHandler

8.接着又回到第4.4步,开始执行handler。

看到SimpleHandlerAdapter能够支持FilteringWebHandler,接着调用其 webHandler

public class SimpleHandlerAdapter implements HandlerAdapter {@Overridepublic Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {WebHandler webHandler = (WebHandler) handler;Mono<Void> mono = webHandler.handle(exchange);return mono.then(Mono.empty());}

9.接着调用FilteringWebHandler.handler方法。 这里会构造13个filter,进行网关FILTER的责任链调用。

FilteringWebHandler
@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);List<GatewayFilter> gatewayFilters = route.getFilters();List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);combined.addAll(gatewayFilters);// TODO: needed or cached?AnnotationAwareOrderComparator.sort(combined);if (logger.isDebugEnabled()) {logger.debug("Sorted gatewayFilterFactories: " + combined);}return new DefaultGatewayFilterChain(combined).filter(exchange);}

10. 默认网关的FILTER责任链调用,就是调用当前索引的filter.filter->在filter里面又再次调用chain.filter(这时filter索引会递增)->接着会调用到下一个filter,当所有filter调用完时,则完成。

private static class DefaultGatewayFilterChain implements GatewayFilterChain {private final int index;private final List<GatewayFilter> filters;@Overridepublic Mono<Void> filter(ServerWebExchange exchange) {return Mono.defer(() -> {if (this.index < filters.size()) {GatewayFilter filter = filters.get(this.index);DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,this.index + 1);return filter.filter(exchange, chain);}else {return Mono.empty(); // complete}});}

11.这里我们重点关注几个拼接URL和发起HTTP请求的FILTER

RewritePathGatewayFilterFactory:去掉/nacosday这种服务前缀。
 @Overridepublic GatewayFilter apply(Config config) {String replacement = config.replacement.replace("$\\", "$");return new GatewayFilter() {@Overridepublic Mono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain) {ServerHttpRequest req = exchange.getRequest();addOriginalRequestUrl(exchange, req.getURI());String path = req.getURI().getRawPath();String newPath = path.replaceAll(config.regexp, replacement);ServerHttpRequest request = req.mutate().path(newPath).build();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, request.getURI());return chain.filter(exchange.mutate().request(request).build());}@Overridepublic String toString() {return filterToStringCreator(RewritePathGatewayFilterFactory.this).append(config.getRegexp(), replacement).toString();}};}

通过正则去除之后,我们看下效果。

看到/nacosday的服务前缀已经从URL去掉。

 12.RouteToRequestUrlFilter将URL协议进行改成,转换生成gatewayRequestUrl

,改lb://nacosday协议。

RouteToRequestUrlFilter
@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);if (route == null) {return chain.filter(exchange);}log.trace("RouteToRequestUrlFilter start");URI uri = exchange.getRequest().getURI();boolean encoded = containsEncodedParts(uri);URI routeUri = route.getUri();URI mergedUrl = UriComponentsBuilder.fromUri(uri)// .uri(routeUri).scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);return chain.filter(exchange);}

13.LoadBalancerClientFilter为根据lb://nacosday的协议,通过负载均衡策略和服务注册中心,找到合适的后端服务实例,进行URL替换。

public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);if (url == null|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {return chain.filter(exchange);}// preserve the original urladdOriginalRequestUrl(exchange, url);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url before: " + url);}final ServiceInstance instance = choose(exchange);if (instance == null) {throw NotFoundException.create(properties.isUse404(),"Unable to find instance for " + url.getHost());}URI uri = exchange.getRequest().getURI();// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,// if the loadbalancer doesn't provide one.String overrideScheme = instance.isSecure() ? "https" : "http";if (schemePrefix != null) {overrideScheme = url.getScheme();}URI requestUrl = loadBalancer.reconstructURI(new DelegatingServiceInstance(instance, overrideScheme), uri);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);}exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);return chain.filter(exchange);}

这里引用了RibbonLoadBalancerClient.choose来真正通过服务注册中心获取实例。

 public ServiceInstance choose(String serviceId, Object hint) {Server server = getServer(getLoadBalancer(serviceId), hint);if (server == null) {return null;}return new RibbonServer(serviceId, server, isSecure(server, serviceId),serverIntrospector(serviceId).getMetadata(server));}

看最后改写URL过程,可以看到已 经改为真实的URL.

14.NettyRoutingFilter为真正封装httpClient发起HTTP请求的filter.

这里为设置http header.创建httpClient,发起http请求。

@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (isAlreadyRouted(exchange) || (!"http".equalsIgnoreCase(scheme)&& !"https".equalsIgnoreCase(scheme))) {return chain.filter(exchange);}setAlreadyRouted(exchange);ServerHttpRequest request = exchange.getRequest();final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());final String url = requestUrl.toASCIIString();HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();filtered.forEach(httpHeaders::set);Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {headers.add(httpHeaders);// Will either be set below, or later by Nettyheaders.remove(HttpHeaders.HOST);if (preserveHost) {String host = request.getHeaders().getFirst(HttpHeaders.HOST);headers.add(HttpHeaders.HOST, host);}}).request(method).uri(url).send((req, nettyOutbound) -> {if (log.isTraceEnabled()) {nettyOutbound.withConnection(connection -> log.trace("outbound route: "+ connection.channel().id().asShortText()+ ", inbound: " + exchange.getLogPrefix()));}return nettyOutbound.send(request.getBody().map(this::getByteBuf));}).responseConnection((res, connection) -> {exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response = exchange.getResponse();// put headers and status so filters can modify the responseHttpHeaders headers = new HttpHeaders();res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,contentTypeValue);}setResponseStatus(res, response);// make sure headers filters run after setting status so it is// available in responseHttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE);exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,filteredResponseHeaders.keySet());response.getHeaders().putAll(filteredResponseHeaders);return Mono.just(res);});Duration responseTimeout = getResponseTimeout(route);if (responseTimeout != null) {responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class,th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,th.getMessage(), th));}return responseFlux.then(chain.filter(exchange));}

真正的通过netty发起请求

四、接收后端服务请求流程

1.

HttpClientConnect.HttpIOHandlerObserver.onStateChange(Connection connection, State newState)
@Overridepublic void onStateChange(Connection connection, State newState) {if (newState == HttpClientState.RESPONSE_RECEIVED) {sink.success(connection);return;}if (newState == ConnectionObserver.State.CONFIGURED&& HttpClientOperations.class == connection.getClass()) {if (log.isDebugEnabled()) {log.debug(format(connection.channel(), "Handler is being applied: {}"), handler);}Mono.defer(() -> Mono.fromDirect(handler.requestWithBody((HttpClientOperations) connection))).subscribe(connection.disposeSubscriber());}}

2.然后会回调到NettyRoutingFilter.filter中的匿名内部 receiver方法,从 这里可以看出,已经收到http的回应了。

3.将后端服务的结果,回写给客户端。

注意,在前面一步将connection写入CLIENT_RESPONSE_CONN_ATTR

exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);

在这一步读出来

Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
NettyWriteResponseFilter@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added// until the NettyRoutingFilter is run// @formatter:offreturn chain.filter(exchange).doOnError(throwable -> cleanup(exchange)).then(Mono.defer(() -> {Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);if (connection == null) {return Mono.empty();}if (log.isTraceEnabled()) {log.trace("NettyWriteResponseFilter start inbound: "+ connection.channel().id().asShortText() + ", outbound: "+ exchange.getLogPrefix());}ServerHttpResponse response = exchange.getResponse();// TODO: needed?final Flux<DataBuffer> body = connection.inbound().receive().retain().map(byteBuf -> wrap(byteBuf, response));MediaType contentType = null;try {contentType = response.getHeaders().getContentType();}catch (Exception e) {if (log.isTraceEnabled()) {log.trace("invalid media type", e);}}return (isStreamingMediaType(contentType)? response.writeAndFlushWith(body.map(Flux::just)): response.writeWith(body));})).doOnCancel(() -> cleanup(exchange));// @formatter:on}

最后不断读取与后端的connection数据,然后回写给客户端。

五、注意事项

1.如果配置了spring.cloud.gateway.discovery.locator.enabled为TRUE,则系统会自动扫描NACOS中所有的注册服务,然后自动为每个服务生成路由,这样会导致自定义的同名路由配置不会生效,RoutePredicateHandlerMapping在寻找路由时会先找到NACOS中的服务路由然后直接返回。

我们可以看到ReactiveCompositeDiscoveryClient_nacosday(第一个)的路由匹配策略和我们自定义(第3个)的是一样的 ,导致在寻找路由时,找到第一个就返回了,我们配置的相同匹配规则路由策略不会生效。

2.如果配置了spring.cloud.gateway.discovery.locator.enabled为FALSE,则不会将NACOS中注册的服务生成路由,不会冲突。

我们可以看到找到的路由是我们自定义配置的路由了。

3.还有一个注意事项,系统内置的过滤器前缀为左边,右边为实现的FILTER工厂类。

从下面可以看出filter的配置名称就是类名取前缀,去掉GatewayFilterFactory,AddRequestHeaderGatewayFilterFactory->AddRequestHeader

springcloud gateway 请求执行流程分析相关推荐

  1. OkHttp3 HTTP请求执行流程分析

    OkHttp3的基本用法 使用OkHttp3发送Http请求并获得响应的过程大体为: 创建OkHttpClient对象.OkHttpClient为网络请求执行的一个中心,它会管理连接池,缓存,Sock ...

  2. SpringCloud Gateway——请求转发源码分析

    SpringCloud Gateway--请求转发源码分析 1. 分享目的 SpringCloud Gateway功能很多,其中使用了非阻塞的WebFlux框架让人印象深刻,想学习这种WebFlux的 ...

  3. springMVC从发送hello请求到响应的执行流程分析

    启动tomcat服务器后,借助springMVC框架,我们可以很方便高效控制客户端发出的各种请求.分析请求执行流程前,我们需要了解一下服务器启动时,都做了什么. tomcat服务器启动加载项 加载we ...

  4. CVE-2022-22947 SpringCloud GateWay SPEL RCE 漏洞分析

    漏洞概要 Spring Cloud Gateway 是Spring Cloud 生态中的API网关,包含限流.过滤等API治理功能. Spring官方在2022年3月1日发布新版本修复了Spring ...

  5. Java多线程- 线程池的基本使用和执行流程分析 - ThreadPoolExecutor

    线程池的实现原理 池化技术 一说到线程池自然就会想到池化技术. 其实所谓池化技术,就是把一些能够复用的东西放到池中,避免重复创建.销毁的开销,从而极大提高性能. 常见池化技术的例如: 线程池 内存池 ...

  6. Apache DolphinScheduler v2.0.1 Master 和 Worker 执行流程分析系列(三)

    点亮 ⭐️ Star · 照亮开源之路 https://github.com/apache/dolphinscheduler 这是一系列关于 DolphinScheduler v2.0.1的源码分析文 ...

  7. Java-Mybatis(二): Mybatis配置解析、resultMap结果集映射、日志、分页、注解开发、Mybatis执行流程分析

    Java-Mybatis-02 学习视频:B站 狂神说Java – https://www.bilibili.com/video/BV1NE411Q7Nx 学习资料:mybatis 参考文档 – ht ...

  8. DRF基本使用及执行流程分析 | APIView源码分析

    DRF基本使用及执行流程分析 介绍: # 使用的都是CBV的方式 ,继承的类为drf提供的类(提供的类很多) # 这里目前继承使用APIView类 # 因为APIView是所有类的基类,其他类可能拓展 ...

  9. 动态执行流程分析和性能瓶颈分析的利器——gperftools的Cpu Profiler

    在<动态执行流程分析和性能瓶颈分析的利器--valgrind的callgrind>中,我们领略了valgrind对流程和性能瓶颈分析的强大能力.本文将介绍拥有相似能力的gperftools ...

最新文章

  1. Reactive Extensions(Rx) 学习
  2. 这种奇奇怪怪的符号,只能用latex打出来,如果实在不行,>---|-->>也行
  3. keil obj 文件 结构_keil下的STM32程序开发部署(一)
  4. [error]Cannot create __weak reference in file using manual refer XCode7.3
  5. 【图像超分辨率】遥感数据的高斯金字塔尺度上推方法研究
  6. 数据库:MySQL常见的设计规范误区
  7. SignalR的使用
  8. 鼠标右键 移动选定的文件夹到指定位置_怎么把电脑桌面上的文件移动到更加安全的地方...
  9. LeetCode 2019 力扣杯全国秋季编程大赛
  10. Sublime Text 3无法安装Package Control插件的解决
  11. 【ES】ReceiveTimeoutTransportException request_id
  12. docker-compose搭建ghost博客系统
  13. [RMAN]表空间的恢复
  14. swift网络编程入门应用:天气预报
  15. matlab2016b安装
  16. 布置工作五步法,让工作布置跟高效
  17. latex 花体之英文字母
  18. 银联支付服务之公众号支付业务(二)
  19. GeoHash在空间道路密度计算中的应用-以mobike骑行轨迹为例
  20. MyIM服务端聊天记录(网络编程作业)

热门文章

  1. JSP入门教程(1)
  2. #5月23日湖北省赛总结 + 个人计划变更
  3. 分割数据集label转换为目标检测boundingbox
  4. boolean android.graphics.Bitmap.compress(android.graphics.Bitmap$CompressFormat, int, java.io.Output
  5. 2018互联网月饼哪家强?阿里有情怀、腾讯最实在、咪咕最暖萌、联想最简单粗暴......
  6. 科研笔记4:从图片中测量面积、提取数据
  7. Oracle-Oracle数据库安全管理
  8. Linux的基本指令(一):常用基础指令
  9. 顺丰快递代码表java_JAVA接入顺丰快递
  10. 机房环境监控的系统概述