文章目录

  • 绑定Filter
    • HandlerMapping
  • Filter
    • GatewayFilterChain
      • FilteringWebHandler
      • GlobalFilter实例化
    • GatewayFilter
    • GlobalFilter
      • RemoveCachedBodyFilter
      • AdaptCachedBodyGlobalFilter
      • NettyWriteResponseFilter
      • GatewayMetricsFilter
      • ForwardPathFilter
      • RouteToRequestUrlFilter
      • LoadBalancerClientFilter
      • ReactiveLoadBalancerClientFilter
      • WebsocketRoutingFilter
      • NettyRoutingFilter
      • ForwardRoutingFilter
      • WebClientHttpRoutingFilter
      • WebClientWriteResponseFilter
      • 总结
  • 通过配置生成Filter
    • Configurable
    • ShortcutConfigurable
    • ConfigurationService
    • RouteDefinitionRouteLocator
    • GatewayFilterFactory
      • AddRequestHeaderGatewayFilterFactory
    • RoutePredicateFactory
      • PathRoutePredicateFactory

绑定Filter

WebFlux通过HandlerMapping来根据请求获取handler。Spring Cloud Gateway 实现了类RoutePredicateHandlerMapping

public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {//请求处理private final FilteringWebHandler webHandler;//路由定位器。RouteLocator 是多种 RouteLocator 的合集。private final RouteLocator routeLocator;private final Integer managementPort;private final ManagementPortType managementPortType;//父类:private CorsConfigurationSource corsConfigurationSource;
}
@Overrideprotected Mono<?> getHandlerInternal(ServerWebExchange exchange) {// don't handle requests on management port if set and different than server portif (this.managementPortType == DIFFERENT && this.managementPort != null&& exchange.getRequest().getURI().getPort() == this.managementPort) {return Mono.empty();}//设置mapping。exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());return lookupRoute(exchange)// .log("route-predicate-handler-mapping", Level.FINER) //name this.flatMap((Function<Route, Mono<?>>) r -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isDebugEnabled()) {logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);}//设置路由属性:当前路由exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);return Mono.just(webHandler);}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isTraceEnabled()) {logger.trace("No RouteDefinition found for ["+ getExchangeDesc(exchange) + "]");}})));}//查找路由protected Mono<Route> lookupRoute(ServerWebExchange exchange) {return this.routeLocator.getRoutes()    //获取所有定义路由// individually filter routes so that filterWhen error delaying is not a// problem.concatMap(route -> Mono.just(route).filterWhen(r -> {// add the current route we are testingexchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());//返回匹配的路由return r.getPredicate().apply(exchange);})// instead of immediately stopping main flux due to error, log and// swallow it.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(),e)).onErrorResume(e -> Mono.empty()))// .defaultIfEmpty() put a static Route not found// or .switchIfEmpty()// .switchIfEmpty(Mono.<Route>empty().log("noroute")).next()// TODO: error handling.map(route -> {if (logger.isDebugEnabled()) {logger.debug("Route matched: " + route.getId());}validateRoute(route, exchange);return route;});}

HandlerMapping

通过Request查找Handler,由DispatcherHandler 负责处理。

 public Mono<Void> handle(ServerWebExchange exchange) {if (this.handlerMappings == null) {return createNotFoundError();}return Flux.fromIterable(this.handlerMappings).concatMap(mapping -> mapping.getHandler(exchange)).next().switchIfEmpty(createNotFoundError()).flatMap(handler -> invokeHandler(exchange, handler)).flatMap(result -> handleResult(exchange, result));}

默认HandlerMapping

         org.springframework.boot.actuate.endpoint.web.reactive.WebFluxEndpointHandlerMapping//通过controller查找handlerorg.springframework.boot.actuate.endpoint.web.reactive.ControllerEndpointHandlerMappingorg.springframework.web.reactive.function.server.support.RouterFunctionMapping  org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping//通过配置的路由查找handlerorg.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping//资源目录,classpath,resources等查找org.springframework.web.reactive.handler.SimpleUrlHandlerMapping

顺序调用,返回找到的第一个Handler。不同的handler有不同的Filter列表。常用的是RoutePredicateHandlerMapping 得到的Handler。里面包含GatewayFilter等。

Filter

GatewayFilterChain

Filter 链

public interface GatewayFilterChain {Mono<Void> filter(ServerWebExchange exchange);
}

FilteringWebHandler 类定义了GatewayFilterChain的默认实现DefaultGatewayFilterChain

 private static class DefaultGatewayFilterChain implements GatewayFilterChain {//当前chain对应的 filter的下标。private final int index;private final List<GatewayFilter> filters;DefaultGatewayFilterChain(List<GatewayFilter> filters) {this.filters = filters;this.index = 0;}private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {this.filters = parent.getFilters();this.index = index;}public List<GatewayFilter> getFilters() {return filters;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange) {return Mono.defer(() -> {if (this.index < filters.size()) {//获取当前filter,GatewayFilter filter = filters.get(this.index);//构造chain。DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,this.index + 1);return filter.filter(exchange, chain);}else {return Mono.empty(); // complete}});}}//把GlobalFilter 包装成 GatewayFilterprivate static class GatewayFilterAdapter implements GatewayFilter {private final GlobalFilter delegate;GatewayFilterAdapter(GlobalFilter delegate) {this.delegate = delegate;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return this.delegate.filter(exchange, chain);}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");sb.append("delegate=").append(delegate);sb.append('}');return sb.toString();}}

FilteringWebHandler

FilteringWebHandler用于组装GatewayFilter。

public class FilteringWebHandler implements WebHandler {protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class);private final List<GatewayFilter> globalFilters;public FilteringWebHandler(List<GlobalFilter> globalFilters) {this.globalFilters = loadFilters(globalFilters);}//把GlobalFilter包装成GatewayFilterprivate static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {return filters.stream().map(filter -> {//构造GatewayFilterAdapterGatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);//排序的if (filter instanceof Ordered) {int order = ((Ordered) filter).getOrder();return new OrderedGatewayFilter(gatewayFilter, order);}return gatewayFilter;}).collect(Collectors.toList());}/** TODO: relocate @EventListener(RefreshRoutesEvent.class) void handleRefresh() {* this.combinedFiltersForRoute.clear();*/@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {//获取Route。Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);//获取Route的  Filter。List<GatewayFilter> gatewayFilters = route.getFilters();//合并GlobalFilter 和 GatewayFilter。List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);combined.addAll(gatewayFilters);// TODO: needed or cached?//排序。AnnotationAwareOrderComparator.sort(combined);if (logger.isDebugEnabled()) {logger.debug("Sorted gatewayFilterFactories: " + combined);}//构造 GatewayFilterChain。return new DefaultGatewayFilterChain(combined).filter(exchange);}}

GlobalFilter实例化

GatewayAutoConfiguration中定义Bean。

GatewayFilter

GatewayFilter 网关过滤器用于拦截并链式处理web请求,可以实现横切的与应用无关的需求,比如:安全、访问超时的设置等。

public interface GatewayFilter extends ShortcutConfigurable {/*** Name key.*/String NAME_KEY = "name";/*** Value key.*/String VALUE_KEY = "value";//chain:下一个要执行的chain(对应一个filter)Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);}/** Ordered , delegate.
*/
public class OrderedGatewayFilter implements GatewayFilter, Ordered {private final GatewayFilter delegate;private final int order;public GatewayFilter getDelegate() {return delegate;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return this.delegate.filter(exchange, chain);}@Overridepublic int getOrder() {return this.order;}@Overridepublic String toString() {return new StringBuilder("[").append(delegate).append(", order = ").append(order).append("]").toString();}}

GlobalFilter

GlobalGilter 全局过滤器接口与 GatewayFilter 网关过滤器接口具有相同的方法定义。全局过滤器是一系列特殊的过滤器,会根据条件应用到所有路由中。网关过滤器是更细粒度的过滤器,作用于指定的路由中。

public interface GlobalFilter {Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}

RemoveCachedBodyFilter

删除网关上下文中的缓存。RemoveCachedBodyFilter没有 Pre Filter。

//public static final String CACHED_REQUEST_BODY_ATTR = "cachedRequestBody";
public class RemoveCachedBodyFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(RemoveCachedBodyFilter.class);@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return chain.filter(exchange).doFinally(s -> {//删除 缓存的requestbody 属性。PooledDataBuffer dataBuffer = (PooledDataBuffer) exchange.getAttributes().remove(CACHED_REQUEST_BODY_ATTR);if (dataBuffer != null && dataBuffer.isAllocated()) {if (log.isTraceEnabled()) {log.trace("releasing cached body in exchange attribute");}dataBuffer.release();}});}@Overridepublic int getOrder() {return HIGHEST_PRECEDENCE;}}

AdaptCachedBodyGlobalFilter

从请求中获取body缓存到网关上线文,属性是CACHED_REQUEST_BODY_ATTR(cachedRequestBody),这样就可以直接从网关上下文中拿到请求参数,而不会出现从request中拿到之后还要回填到请求体的问题;


public class AdaptCachedBodyGlobalFilterimplements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();/*** Cached request body key.*/@Deprecatedpublic static final String CACHED_REQUEST_BODY_KEY = CACHED_REQUEST_BODY_ATTR;@Overridepublic void onApplicationEvent(EnableBodyCachingEvent event) {this.routesToCache.putIfAbsent(event.getRouteId(), true);}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//获取属性:CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR = "cachedServerHttpRequestDecorator";ServerHttpRequest cachedRequest = exchange.getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);if (cachedRequest != null) {//移除属性,exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);return chain.filter(exchange.mutate().request(cachedRequest).build());}//CACHED_REQUEST_BODY_ATTR = "cachedRequestBody";DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);//GATEWAY_ROUTE_ATTR = qualify("gatewayRoute");Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);if (body != null || !this.routesToCache.containsKey(route.getId())) {return chain.filter(exchange);}return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {// don't mutate and build if same request objectif (serverHttpRequest == exchange.getRequest()) {return chain.filter(exchange);}return chain.filter(exchange.mutate().request(serverHttpRequest).build());});}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE + 1000;}}

NettyWriteResponseFilter

NettyWriteResponseFilter没有 Pre Filter 代码,因此是个Post Filter。

public class NettyWriteResponseFilter implements GlobalFilter, Ordered {/*** Order for write response filter.*/public static final int WRITE_RESPONSE_FILTER_ORDER = -1;private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class);private final List<MediaType> streamingMediaTypes;public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) {this.streamingMediaTypes = streamingMediaTypes;}@Overridepublic int getOrder() {return WRITE_RESPONSE_FILTER_ORDER;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added// until the NettyRoutingFilter is run// @formatter:offreturn chain.filter(exchange)//异常处理:关闭连接。.doOnError(throwable -> cleanup(exchange)).then(Mono.defer(() -> {//reactor.netty.ConnectionConnection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);if (connection == null) {return Mono.empty();}if (log.isTraceEnabled()) {log.trace("NettyWriteResponseFilter start inbound: "+ connection.channel().id().asShortText() + ", outbound: "+ exchange.getLogPrefix());}ServerHttpResponse response = exchange.getResponse();//NETTY 写数据final Flux<DataBuffer> body = connection.inbound().receive().retain().map(byteBuf -> wrap(byteBuf, response));MediaType contentType = null;try {contentType = response.getHeaders().getContentType();}catch (Exception e) {if (log.isTraceEnabled()) {log.trace("invalid media type", e);}}return (isStreamingMediaType(contentType)? response.writeAndFlushWith(body.map(Flux::just)): response.writeWith(body));})).doOnCancel(() -> cleanup(exchange));// @formatter:on}protected DataBuffer wrap(ByteBuf byteBuf, ServerHttpResponse response) {if (response.bufferFactory() instanceof NettyDataBufferFactory) {NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();return factory.wrap(byteBuf);}// MockServerHttpResponse creates theseelse if (response.bufferFactory() instanceof DefaultDataBufferFactory) {DefaultDataBufferFactory factory = (DefaultDataBufferFactory) response.bufferFactory();return factory.wrap(byteBuf.nioBuffer());}throw new IllegalArgumentException("Unkown DataBufferFactory type " + response.bufferFactory().getClass());}}

GatewayMetricsFilter

public class GatewayMetricsFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(GatewayMetricsFilter.class);private final MeterRegistry meterRegistry;private GatewayTagsProvider compositeTagsProvider;public GatewayMetricsFilter(MeterRegistry meterRegistry,List<GatewayTagsProvider> tagsProviders) {this.meterRegistry = meterRegistry;this.compositeTagsProvider = tagsProviders.stream().reduce(exchange -> Tags.empty(), GatewayTagsProvider::and);}@Deprecatedpublic GatewayMetricsFilter(MeterRegistry meterRegistry) {this(meterRegistry, Arrays.asList(new GatewayHttpTagsProvider(),new GatewayRouteTagsProvider()));}@Overridepublic int getOrder() {// start the timer as soon as possible and report the metric event before we write// response to clientreturn NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//记录开始执行时间Sample sample = Timer.start(meterRegistry);return chain.filter(exchange).doOnSuccessOrError((aVoid, ex) -> {endTimerRespectingCommit(exchange, sample);});}private void endTimerRespectingCommit(ServerWebExchange exchange, Sample sample) {ServerHttpResponse response = exchange.getResponse();if (response.isCommitted()) {endTimerInner(exchange, sample);}else {response.beforeCommit(() -> {endTimerInner(exchange, sample);return Mono.empty();});}}private void endTimerInner(ServerWebExchange exchange, Sample sample) {Tags tags = compositeTagsProvider.apply(exchange);if (log.isTraceEnabled()) {log.trace("gateway.requests tags: " + tags);}sample.stop(meterRegistry.timer("gateway.requests", tags));}}

ForwardPathFilter

ForwardPathFilter:路由处理。

public class ForwardPathFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//GATEWAY_ROUTE_ATTR = qualify("gatewayRoute");Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);//获取路由Uri。URI routeUri = route.getUri();String scheme = routeUri.getScheme();//如果已路由或者是forward,则直接转发。if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {return chain.filter(exchange);}//重新构造新的url请求。exchange = exchange.mutate().request(exchange.getRequest().mutate().path(routeUri.getPath()).build()).build();return chain.filter(exchange);}@Overridepublic int getOrder() {return 0;}}

RouteToRequestUrlFilter

public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {/*** Order of Route to URL.*/public static final int ROUTE_TO_URL_FILTER_ORDER = 10000;private static final Log log = LogFactory.getLog(RouteToRequestUrlFilter.class);private static final String SCHEME_REGEX = "[a-zA-Z]([a-zA-Z]|\\d|\\+|\\.|-)*:.*";static final Pattern schemePattern = Pattern.compile(SCHEME_REGEX);/* for testing */static boolean hasAnotherScheme(URI uri) {return schemePattern.matcher(uri.getSchemeSpecificPart()).matches()&& uri.getHost() == null && uri.getRawPath() == null;}@Overridepublic int getOrder() {return ROUTE_TO_URL_FILTER_ORDER;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {// //GATEWAY_ROUTE_ATTR = qualify("gatewayRoute");Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);if (route == null) {return chain.filter(exchange);}log.trace("RouteToRequestUrlFilter start");//获取路由URIURI uri = exchange.getRequest().getURI();boolean encoded = containsEncodedParts(uri);URI routeUri = route.getUri();if (hasAnotherScheme(routeUri)) {// this is a special url, save scheme to special attribute// replace routeUri with schemeSpecificPartexchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,routeUri.getScheme());routeUri = URI.create(routeUri.getSchemeSpecificPart());}if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {// Load balanced URIs should always have a host. If the host is null it is// most// likely because the host name was invalid (for example included an// underscore)throw new IllegalStateException("Invalid host: " + routeUri.toString());}//合并成完整的Url。URI mergedUrl = UriComponentsBuilder.fromUri(uri)// .uri(routeUri).scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);return chain.filter(exchange);}}

LoadBalancerClientFilter

废弃的,使用ReactiveLoadBalancerClientFilter 代替。

ReactiveLoadBalancerClientFilter

public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {private final LoadBalancerClientFactory clientFactory;private LoadBalancerProperties properties;@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//GATEWAY_REQUEST_URL_ATTR = qualify("gatewayRequestUrl");URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);//GATEWAY_SCHEME_PREFIX_ATTR = qualify( "gatewaySchemePrefix");String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);if (url == null|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {return chain.filter(exchange);}// 保存原始URI//GATEWAY_ORIGINAL_REQUEST_URL_ATTR = qualify( "gatewayOriginalRequestUrl");addOriginalRequestUrl(exchange, url);if (log.isTraceEnabled()) {log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName()+ " url before: " + url);}return choose(exchange).doOnNext(response -> {if (!response.hasServer()) {throw NotFoundException.create(properties.isUse404(),"Unable to find instance for " + url.getHost());}URI uri = exchange.getRequest().getURI();// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,// if the loadbalancer doesn't provide one.String overrideScheme = null;if (schemePrefix != null) {overrideScheme = url.getScheme();}//取得服务实例DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(response.getServer(), overrideScheme);//重构requestURI requestUrl = reconstructURI(serviceInstance, uri);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);}exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);}).then(chain.filter(exchange));}//选择服务实例。private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(uri.getHost(), ReactorLoadBalancer.class,ServiceInstance.class);if (loadBalancer == null) {throw new NotFoundException("No loadbalancer available for " + uri.getHost());}return loadBalancer.choose(createRequest());}private Request createRequest() {return ReactiveLoadBalancer.REQUEST;}}

WebsocketRoutingFilter

public class WebsocketRoutingFilter implements GlobalFilter, Ordered {/*** Sec-Websocket protocol.*/public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";private static final Log log = LogFactory.getLog(WebsocketRoutingFilter.class);private final WebSocketClient webSocketClient;private final WebSocketService webSocketService;private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;// do not use this headersFilters directly, use getHeadersFilters() instead.private volatile List<HttpHeadersFilter> headersFilters;/* for testing */static String convertHttpToWs(String scheme) {scheme = scheme.toLowerCase();return "http".equals(scheme) ? "ws" : "https".equals(scheme) ? "wss" : scheme;}@Overridepublic int getOrder() {// Before NettyRoutingFilter since this routes certain http requestsreturn Ordered.LOWEST_PRECEDENCE - 1;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {changeSchemeIfIsWebSocketUpgrade(exchange);URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (isAlreadyRouted(exchange)|| (!"ws".equals(scheme) && !"wss".equals(scheme))) {return chain.filter(exchange);}setAlreadyRouted(exchange);HttpHeaders headers = exchange.getRequest().getHeaders();HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);if (protocols != null) {protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream().flatMap(header -> Arrays.stream(commaDelimitedListToStringArray(header))).map(String::trim).collect(Collectors.toList());}return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols));}private List<HttpHeadersFilter> getHeadersFilters() {if (this.headersFilters == null) {this.headersFilters = this.headersFiltersProvider.getIfAvailable(ArrayList::new);headersFilters.add((headers, exchange) -> {HttpHeaders filtered = new HttpHeaders();headers.entrySet().stream().filter(entry -> !entry.getKey().toLowerCase().startsWith("sec-websocket")).forEach(header -> filtered.addAll(header.getKey(),header.getValue()));return filtered;});}return this.headersFilters;}private void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {// Check the UpgradeURI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme().toLowerCase();String upgrade = exchange.getRequest().getHeaders().getUpgrade();// change the scheme if the socket client send a "http" or "https"if ("WebSocket".equalsIgnoreCase(upgrade)&& ("http".equals(scheme) || "https".equals(scheme))) {String wsScheme = convertHttpToWs(scheme);URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);if (log.isTraceEnabled()) {log.trace("changeSchemeTo:[" + wsRequestUrl + "]");}}}private static class ProxyWebSocketHandler implements WebSocketHandler {private final WebSocketClient client;private final URI url;private final HttpHeaders headers;private final List<String> subProtocols;@Overridepublic Mono<Void> handle(WebSocketSession session) {// pass headers along so custom headers can be sent throughreturn client.execute(url, this.headers, new WebSocketHandler() {@Overridepublic Mono<Void> handle(WebSocketSession proxySession) {// Use retain() for Reactor NettyMono<Void> proxySessionSend = proxySession.send(session.receive().doOnNext(WebSocketMessage::retain));// .log("proxySessionSend", Level.FINE);Mono<Void> serverSessionSend = session.send(proxySession.receive().doOnNext(WebSocketMessage::retain));// .log("sessionSend", Level.FINE);return Mono.zip(proxySessionSend, serverSessionSend).then();}/*** Copy subProtocols so they are available downstream.* @return*/@Overridepublic List<String> getSubProtocols() {return ProxyWebSocketHandler.this.subProtocols;}});}}}

NettyRoutingFilter

public class NettyRoutingFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);private final HttpClient httpClient;private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;private final HttpClientProperties properties;// do not use this headersFilters directly, use getHeadersFilters() instead.private volatile List<HttpHeadersFilter> headersFilters;public List<HttpHeadersFilter> getHeadersFilters() {if (headersFilters == null) {headersFilters = headersFiltersProvider.getIfAvailable();}return headersFilters;}@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (isAlreadyRouted(exchange)|| (!"http".equals(scheme) && !"https".equals(scheme))) {return chain.filter(exchange);}//处理 http,https。setAlreadyRouted(exchange);ServerHttpRequest request = exchange.getRequest();final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());final String url = requestUrl.toASCIIString();HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();filtered.forEach(httpHeaders::set);boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {headers.add(httpHeaders);// Will either be set below, or later by Nettyheaders.remove(HttpHeaders.HOST);if (preserveHost) {String host = request.getHeaders().getFirst(HttpHeaders.HOST);headers.add(HttpHeaders.HOST, host);}}).request(method).uri(url).send((req, nettyOutbound) -> {//发送请求if (log.isTraceEnabled()) {nettyOutbound.withConnection(connection -> log.trace("outbound route: "+ connection.channel().id().asShortText()+ ", inbound: " + exchange.getLogPrefix()));}return nettyOutbound.send(request.getBody().map(this::getByteBuf));}).responseConnection((res, connection) -> {//设置响应内容// Defer committing the response until all route filters have run// Put client response as ServerWebExchange attribute and write// response later NettyWriteResponseFilterexchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response = exchange.getResponse();// put headers and status so filters can modify the responseHttpHeaders headers = new HttpHeaders();res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,contentTypeValue);}setResponseStatus(res, response);// make sure headers filters run after setting status so it is// available in responseHttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE);if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {// It is not valid to have both the transfer-encoding header and// the content-length header.// Remove the transfer-encoding header in the response if the// content-length header is present.response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);}exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,filteredResponseHeaders.keySet());response.getHeaders().putAll(filteredResponseHeaders);return Mono.just(res);});//设置超时。Duration responseTimeout = getResponseTimeout(route);if (responseTimeout != null) {responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class,th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,th.getMessage(), th));}//转发。return responseFlux.then(chain.filter(exchange));}protected ByteBuf getByteBuf(DataBuffer dataBuffer) {if (dataBuffer instanceof NettyDataBuffer) {NettyDataBuffer buffer = (NettyDataBuffer) dataBuffer;return buffer.getNativeBuffer();}// MockServerHttpResponse creates theseelse if (dataBuffer instanceof DefaultDataBuffer) {DefaultDataBuffer buffer = (DefaultDataBuffer) dataBuffer;return Unpooled.wrappedBuffer(buffer.getNativeBuffer());}throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass());}private void setResponseStatus(HttpClientResponse clientResponse,ServerHttpResponse response) {HttpStatus status = HttpStatus.resolve(clientResponse.status().code());if (status != null) {response.setStatusCode(status);}else {while (response instanceof ServerHttpResponseDecorator) {response = ((ServerHttpResponseDecorator) response).getDelegate();}if (response instanceof AbstractServerHttpResponse) {((AbstractServerHttpResponse) response).setStatusCodeValue(clientResponse.status().code());}else {// TODO: log warning here, not throw error?throw new IllegalStateException("Unable to set status code "+ clientResponse.status().code() + " on response of type "+ response.getClass().getName());}}}/*** Creates a new HttpClient with per route timeout configuration. Sub-classes that* override, should call super.getHttpClient() if they want to honor the per route* timeout configuration.* @param route the current route.* @param exchange the current ServerWebExchange.* @param chain the current GatewayFilterChain.* @return*/protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR);if (connectTimeoutAttr != null) {Integer connectTimeout = getInteger(connectTimeoutAttr);return this.httpClient.tcpConfiguration((tcpClient) -> tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout));}return httpClient;}static Integer getInteger(Object connectTimeoutAttr) {Integer connectTimeout;if (connectTimeoutAttr instanceof Integer) {connectTimeout = (Integer) connectTimeoutAttr;}else {connectTimeout = Integer.parseInt(connectTimeoutAttr.toString());}return connectTimeout;}private Duration getResponseTimeout(Route route) {Object responseTimeoutAttr = route.getMetadata().get(RESPONSE_TIMEOUT_ATTR);Long responseTimeout = null;if (responseTimeoutAttr != null) {if (responseTimeoutAttr instanceof Number) {responseTimeout = ((Number) responseTimeoutAttr).longValue();}else {responseTimeout = Long.valueOf(responseTimeoutAttr.toString());}}return responseTimeout != null ? Duration.ofMillis(responseTimeout): properties.getResponseTimeout();}}

ForwardRoutingFilter

public class ForwardRoutingFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(ForwardRoutingFilter.class);private final ObjectProvider<DispatcherHandler> dispatcherHandlerProvider;// do not use this dispatcherHandler directly, use getDispatcherHandler() instead.private volatile DispatcherHandler dispatcherHandler;@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {return chain.filter(exchange);}// TODO: translate url?if (log.isTraceEnabled()) {log.trace("Forwarding to URI: " + requestUrl);}return this.getDispatcherHandler().handle(exchange);}}

WebClientHttpRoutingFilter

默认未使用

WebClientWriteResponseFilter

默认未使用

总结

GlobalFilter执行顺序

Filter Type Order 功能
AdaptCachedBodyGlobalFilter PRE HIGHEST_PRECEDENCE + 1000 缓存body
GatewayMetricsFilter PRE+ POST 0 指标统计
ForwardPathFilter PRE 0 路由转发
RouteToRequestUrlFilter PRE 10000 路由转成成完成的Url。
LoadBalancerClientFilter *
ReactiveLoadBalancerClientFilter PRE 10150 负载均衡,解析lb schema。
WebsocketRoutingFilter ROUTE LOWEST_PRECEDENCE - 1 处理websocket。ws,wss。
如果是ws,wss则处理完成,开始执行POST
NettyRoutingFilter ROUTE LOWEST_PRECEDENCE 处理 http,https。不再转发,开始执行POST
ForwardRoutingFilter ROUTE LOWEST_PRECEDENCE 用于本地forward,也就是将请求在Gateway服务内进行转发,而不是转发到下游服务。
GatewayMetricsFilter PRE+ POST 0 指标统计。
NettyWriteResponseFilter POST -1 Netty写响应数据
RemoveCachedBodyFilter POST HIGHEST_PRECEDENCE 清除body缓存

Order只影响构造Chain 的Filter的顺序,不是请求的执行顺序。如果Filter 在 return 之前没有处理逻辑,则仅是个POST FILTER。

Filter 类型: 1、Pre:前置处理,2:ROUTE,表示处理请求,不再往后转发。3:POST,表示 处理响应。

通过配置生成Filter

生成Filter,RoutePredicate等都是通过Factory生成。然后调用 apply()方法生成对应实例。

GatewayFilterFactory,RoutePredicateFactory 方法类似。

Configurable

public interface Configurable<C> {Class<C> getConfigClass();C newConfig();}public abstract class AbstractConfigurable<C> implements Configurable<C> {private Class<C> configClass;}

ShortcutConfigurable

ShortcutConfigurable 接口提供的默认方法,主要用于对过滤器和断言参数进行标准化处理,将表达式和生成的键进行转换。

public interface ShortcutConfigurable {static String normalizeKey(String key, int entryIdx, ShortcutConfigurable argHints,Map<String, String> args) {// RoutePredicateFactory has name hints and this has a fake key name// replace with the matching key hint//GENERATED_NAME_PREFIX = "_genkey_";if (key.startsWith(NameUtils.GENERATED_NAME_PREFIX)&& !argHints.shortcutFieldOrder().isEmpty() && entryIdx < args.size()&& entryIdx < argHints.shortcutFieldOrder().size()) {key = argHints.shortcutFieldOrder().get(entryIdx);}return key;}/**获取值,如果是spel表达式,则计算出表达式的值。*/static Object getValue(SpelExpressionParser parser, BeanFactory beanFactory,String entryValue) {Object value;String rawValue = entryValue;if (rawValue != null) {rawValue = rawValue.trim();}if (rawValue != null && rawValue.startsWith("#{") && entryValue.endsWith("}")) {// assume it's spelStandardEvaluationContext context = new StandardEvaluationContext();context.setBeanResolver(new BeanFactoryResolver(beanFactory));Expression expression = parser.parseExpression(entryValue,new TemplateParserContext());value = expression.getValue(context);}else {value = entryValue;}return value;}default ShortcutType shortcutType() {return ShortcutType.DEFAULT;}/*** Returns hints about the number of args and the order for shortcut parsing.* @return the list of hints*/default List<String> shortcutFieldOrder() {return Collections.emptyList();}default String shortcutFieldPrefix() {return "";}enum ShortcutType {DEFAULT {@Overridepublic Map<String, Object> normalize(Map<String, String> args,ShortcutConfigurable shortcutConf, SpelExpressionParser parser,BeanFactory beanFactory) {Map<String, Object> map = new HashMap<>();int entryIdx = 0;//处理每个map。for (Map.Entry<String, String> entry : args.entrySet()) {String key = normalizeKey(entry.getKey(), entryIdx, shortcutConf,args);Object value = getValue(parser, beanFactory, entry.getValue());map.put(key, value);entryIdx++;}return map;}},//一个key,多个值。GATHER_LIST {@Overridepublic Map<String, Object> normalize(Map<String, String> args,ShortcutConfigurable shortcutConf, SpelExpressionParser parser,BeanFactory beanFactory) {Map<String, Object> map = new HashMap<>();// field order should be of size 1List<String> fieldOrder = shortcutConf.shortcutFieldOrder();Assert.isTrue(fieldOrder != null && fieldOrder.size() == 1,"Shortcut Configuration Type GATHER_LIST must have shortcutFieldOrder of size 1");String fieldName = fieldOrder.get(0);map.put(fieldName,args.values().stream().map(value -> getValue(parser, beanFactory, value)).collect(Collectors.toList()));return map;}},// list is all elements except last which is a boolean flagGATHER_LIST_TAIL_FLAG {@Overridepublic Map<String, Object> normalize(Map<String, String> args,ShortcutConfigurable shortcutConf, SpelExpressionParser parser,BeanFactory beanFactory) {Map<String, Object> map = new HashMap<>();// field order should be of size 1List<String> fieldOrder = shortcutConf.shortcutFieldOrder();Assert.isTrue(fieldOrder != null && fieldOrder.size() == 2,"Shortcut Configuration Type GATHER_LIST_HEAD must have shortcutFieldOrder of size 2");List<String> values = new ArrayList<>(args.values());if (!values.isEmpty()) {// strip boolean flag if last entry is true or falseint lastIdx = values.size() - 1;String lastValue = values.get(lastIdx);if (lastValue.equalsIgnoreCase("true")|| lastValue.equalsIgnoreCase("false")) {values = values.subList(0, lastIdx);map.put(fieldOrder.get(1),getValue(parser, beanFactory, lastValue));}}String fieldName = fieldOrder.get(0);map.put(fieldName,values.stream().map(value -> getValue(parser, beanFactory, value)).collect(Collectors.toList()));return map;}};/**抽象方法。args:配置文件中的配置信息。shortcutConf:*/public abstract Map<String, Object> normalize(Map<String, String> args,ShortcutConfigurable shortcutConf, SpelExpressionParser parser,BeanFactory beanFactory);}}

ShortcutType默认实现了3种情况的解析。

  • DEFAULT :按照shortcutFieldOrder顺序依次赋值,实现类:AfterRoutePredicateFactory
  • GATHER_LIST:shortcutFiledOrder只能有一个值,如果参数有多个拼成一个集合。shortcutFieldOrder接口实现返回的list,必须是size=1。实现类:HostRoutePredicateFactory
  • GATHER_LIST_TAIL_FLAG:shortcutFiledOrder只能有两个值,其中最后一个值为true或者false,其余的值变成一个集合赋值给第一个值。例如:PathRoutePredicateFactory

ConfigurationService

public class ConfigurationService implements ApplicationEventPublisherAware {private ApplicationEventPublisher publisher;private BeanFactory beanFactory;private Supplier<ConversionService> conversionService;private SpelExpressionParser parser = new SpelExpressionParser();private Supplier<Validator> validator;
}
public abstract class AbstractBuilder<T, B extends AbstractBuilder<T, B>> {protected final ConfigurationService service;protected BiFunction<T, Map<String, Object>, ApplicationEvent> eventFunction;protected String name;protected Map<String, Object> normalizedProperties;protected Map<String, String> properties;public AbstractBuilder(ConfigurationService service) {this.service = service;}protected abstract B getThis();protected abstract void validate();//标准化属性protected Map<String, Object> normalizeProperties() {Map<String, Object> normalizedProperties = new HashMap<>();this.properties.forEach(normalizedProperties::put);return normalizedProperties;}protected abstract T doBind();public T bind() {validate();Assert.hasText(this.name, "name may not be empty");Assert.isTrue(this.properties != null || this.normalizedProperties != null,"properties and normalizedProperties both may not be null");if (this.normalizedProperties == null) {this.normalizedProperties = normalizeProperties();}//绑定T bound = doBind();if (this.eventFunction != null && this.service.publisher != null) {ApplicationEvent applicationEvent = this.eventFunction.apply(bound,this.normalizedProperties);this.service.publisher.publishEvent(applicationEvent);}return bound;}}/**构造:Configurable<T> & ShortcutConfigurable
*/public class ConfigurableBuilder<T, C extends Configurable<T> & ShortcutConfigurable>extends AbstractBuilder<T, ConfigurableBuilder<T, C>> {private final C configurable;@Overrideprotected void validate() {Assert.notNull(this.configurable, "configurable may not be null");}@Overrideprotected Map<String, Object> normalizeProperties() {if (this.service.beanFactory != null) {//标准化属性return this.configurable.shortcutType().normalize(this.properties,this.configurable, this.service.parser, this.service.beanFactory);}return super.normalizeProperties();}@Overrideprotected T doBind() {Bindable<T> bindable = Bindable.of(this.configurable.getConfigClass());T bound = bindOrCreate(bindable, this.normalizedProperties,this.configurable.shortcutFieldPrefix(),/* this.name, */this.service.validator.get(),this.service.conversionService.get());return bound;}}public class InstanceBuilder<T> extends AbstractBuilder<T, InstanceBuilder<T>> {private final T instance;@Overrideprotected T doBind() {T toBind = getTargetObject(this.instance);Bindable<T> bindable = Bindable.ofInstance(toBind);return bindOrCreate(bindable, this.normalizedProperties, this.name,this.service.validator.get(), this.service.conversionService.get());}}

构造Builder

 public <T, C extends Configurable<T> & ShortcutConfigurable> ConfigurableBuilder<T, C> with(C configurable) {return new ConfigurableBuilder<T, C>(this, configurable);}public <T> InstanceBuilder<T> with(T instance) {return new InstanceBuilder<T>(this, instance);}

RouteDefinitionRouteLocator

RouteDefinitionRouteLocator是以gateway route yaml配置为基础来构造Route。

public class RouteDefinitionRouteLocatorimplements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {//获取routedefine private final RouteDefinitionLocator routeDefinitionLocator;private final ConfigurationService configurationService;private final Map<String, RoutePredicateFactory> predicates = new LinkedHashMap<>();private final Map<String, GatewayFilterFactory> gatewayFilterFactories = new HashMap<>();//yaml gateway配置private final GatewayProperties gatewayProperties;@Deprecatedpublic RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,List<RoutePredicateFactory> predicates,List<GatewayFilterFactory> gatewayFilterFactories,GatewayProperties gatewayProperties, ConversionService conversionService) {this.routeDefinitionLocator = routeDefinitionLocator;this.configurationService = new ConfigurationService();this.configurationService.setConversionService(conversionService);//构造 predicatesinitFactories(predicates);gatewayFilterFactories.forEach(factory -> this.gatewayFilterFactories.put(factory.name(), factory));this.gatewayProperties = gatewayProperties;}public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,List<RoutePredicateFactory> predicates,List<GatewayFilterFactory> gatewayFilterFactories,GatewayProperties gatewayProperties,ConfigurationService configurationService) {this.routeDefinitionLocator = routeDefinitionLocator;this.configurationService = configurationService;initFactories(predicates);gatewayFilterFactories.forEach(factory -> this.gatewayFilterFactories.put(factory.name(), factory));this.gatewayProperties = gatewayProperties;}}
 private void initFactories(List<RoutePredicateFactory> predicates) {predicates.forEach(factory -> {String key = factory.name();if (this.predicates.containsKey(key)) {this.logger.warn("A RoutePredicateFactory named " + key+ " already exists, class: " + this.predicates.get(key)+ ". It will be overwritten.");}this.predicates.put(key, factory);if (logger.isInfoEnabled()) {logger.info("Loaded RoutePredicateFactory [" + key + "]");}});}@Overridepublic Flux<Route> getRoutes() {//把RouteDefinition 转换为 RouteFlux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);//失败处理策略。if (!gatewayProperties.isFailOnRouteDefinitionError()) {// instead of letting error bubble up, continueroutes = routes.onErrorContinue((error, obj) -> {if (logger.isWarnEnabled()) {logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()+ " will be ignored. Definition has invalid configs, "+ error.getMessage());}});}return routes.map(route -> {if (logger.isDebugEnabled()) {logger.debug("RouteDefinition matched: " + route.getId());}return route;});}/**把 RouteDefinition 转换为Route*/private Route convertToRoute(RouteDefinition routeDefinition) {//把List 拼成 and。AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);//获取定义的Filter。List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);return Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters).build();}private AsyncPredicate<ServerWebExchange> combinePredicates(RouteDefinition routeDefinition) {//获取定义的PredicatesList<PredicateDefinition> predicates = routeDefinition.getPredicates();//获取第一个AsyncPredicate<ServerWebExchange> predicate = lookup(routeDefinition,predicates.get(0));//拼成 and for (PredicateDefinition andPredicate : predicates.subList(1,predicates.size())) {AsyncPredicate<ServerWebExchange> found = lookup(routeDefinition,andPredicate);predicate = predicate.and(found);}return predicate;}private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route,PredicateDefinition predicate) {//获取factory。RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());if (factory == null) {throw new IllegalArgumentException("Unable to find RoutePredicateFactory with name "+ predicate.getName());}if (logger.isDebugEnabled()) {logger.debug("RouteDefinition " + route.getId() + " applying "+ predicate.getArgs() + " to " + predicate.getName());}// @formatter:offObject config = this.configurationService.with(factory)  //构造一个builder。.name(predicate.getName()).properties(predicate.getArgs()).eventFunction((bound, properties) -> new PredicateArgsEvent(RouteDefinitionRouteLocator.this, route.getId(), properties)).bind();  //构造一个 configurable。// @formatter:onreturn factory.applyAsync(config);}

GatewayFilterFactory

public interface GatewayFilterFactory<C> extends ShortcutConfigurable, Configurable<C> {/*** Name key.*/String NAME_KEY = "name";/*** Value key.*/String VALUE_KEY = "value";// useful for javadsldefault GatewayFilter apply(String routeId, Consumer<C> consumer) {C config = newConfig();consumer.accept(config);return apply(routeId, config);}default GatewayFilter apply(Consumer<C> consumer) {C config = newConfig();consumer.accept(config);return apply(config);}default Class<C> getConfigClass() {throw new UnsupportedOperationException("getConfigClass() not implemented");}@Overridedefault C newConfig() {throw new UnsupportedOperationException("newConfig() not implemented");}//应用一个配置。GatewayFilter apply(C config);default GatewayFilter apply(String routeId, C config) {if (config instanceof HasRouteId) {HasRouteId hasRouteId = (HasRouteId) config;hasRouteId.setRouteId(routeId);}return apply(config);}default String name() {// TODO: deal with proxysreturn NameUtils.normalizeFilterFactoryName(getClass());}@Deprecateddefault ServerHttpRequest.Builder mutate(ServerHttpRequest request) {return request.mutate();}}

AddRequestHeaderGatewayFilterFactory

public abstract class AbstractNameValueGatewayFilterFactory extendsAbstractGatewayFilterFactory <AbstractNameValueGatewayFilterFactory.NameValueConfig> {public AbstractNameValueGatewayFilterFactory() {super(NameValueConfig.class);}public List<String> shortcutFieldOrder() {return Arrays.asList(GatewayFilter.NAME_KEY, GatewayFilter.VALUE_KEY);}@Validatedpublic static class NameValueConfig {@NotEmptyprotected String name;@NotEmptyprotected String value;}}public class AddRequestHeaderGatewayFilterFactoryextends AbstractNameValueGatewayFilterFactory {@Overridepublic GatewayFilter apply(NameValueConfig config) {return new GatewayFilter() {@Overridepublic Mono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain) {String value = ServerWebExchangeUtils.expand(exchange, config.getValue());ServerHttpRequest request = exchange.getRequest().mutate()//添加header。.header(config.getName(), value).build();return chain.filter(exchange.mutate().request(request).build());}@Overridepublic String toString() {return filterToStringCreator(AddRequestHeaderGatewayFilterFactory.this).append(config.getName(), config.getValue()).toString();}};}}

RoutePredicateFactory

public interface RoutePredicateFactory<C> extends ShortcutConfigurable, Configurable<C> {/*** Pattern key.*/String PATTERN_KEY = "pattern";// useful for javadsldefault Predicate<ServerWebExchange> apply(Consumer<C> consumer) {C config = newConfig();consumer.accept(config);beforeApply(config);return apply(config);}... ...default void beforeApply(C config) {}Predicate<ServerWebExchange> apply(C config);}public abstract class AbstractRoutePredicateFactory<C> extends AbstractConfigurable<C>implements RoutePredicateFactory<C> {public AbstractRoutePredicateFactory(Class<C> configClass) {super(configClass);}}

PathRoutePredicateFactory

public class PathRoutePredicateFactoryextends AbstractRoutePredicateFactory<PathRoutePredicateFactory.Config> {private static final Log log = LogFactory.getLog(RoutePredicateFactory.class);private static final String MATCH_OPTIONAL_TRAILING_SEPARATOR_KEY = "matchOptionalTrailingSeparator";private PathPatternParser pathPatternParser = new PathPatternParser();public PathRoutePredicateFactory() {super(Config.class);}private static void traceMatch(String prefix, Object desired, Object actual,boolean match) {if (log.isTraceEnabled()) {String message = String.format("%s \"%s\" %s against value \"%s\"", prefix,desired, match ? "matches" : "does not match", actual);log.trace(message);}}public void setPathPatternParser(PathPatternParser pathPatternParser) {this.pathPatternParser = pathPatternParser;}@Overridepublic List<String> shortcutFieldOrder() {return Arrays.asList("patterns", MATCH_OPTIONAL_TRAILING_SEPARATOR_KEY);}@Overridepublic ShortcutType shortcutType() {return ShortcutType.GATHER_LIST_TAIL_FLAG;}@Overridepublic Predicate<ServerWebExchange> apply(Config config) {final ArrayList<PathPattern> pathPatterns = new ArrayList<>();synchronized (this.pathPatternParser) {pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchOptionalTrailingSeparator());config.getPatterns().forEach(pattern -> {PathPattern pathPattern = this.pathPatternParser.parse(pattern);pathPatterns.add(pathPattern);});}return new GatewayPredicate() {@Overridepublic boolean test(ServerWebExchange exchange) {//获取原始UrlPathContainer path = parsePath(exchange.getRequest().getURI().getRawPath());Optional<PathPattern> optionalPathPattern = pathPatterns.stream().filter(pattern -> pattern.matches(path)).findFirst();if (optionalPathPattern.isPresent()) {PathPattern pathPattern = optionalPathPattern.get();traceMatch("Pattern", pathPattern.getPatternString(), path, true);PathMatchInfo pathMatchInfo = pathPattern.matchAndExtract(path);putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());return true;}else {traceMatch("Pattern", config.getPatterns(), path, false);return false;}}@Overridepublic String toString() {return String.format("Paths: %s, match trailing slash: %b",config.getPatterns(), config.isMatchOptionalTrailingSeparator());}};}/**配置*/@Validatedpublic static class Config {private List<String> patterns = new ArrayList<>();//是否匹配结尾分隔符private boolean matchOptionalTrailingSeparator = true;}}
public class AfterRoutePredicateFactoryextends AbstractRoutePredicateFactory<AfterRoutePredicateFactory.Config> {/*** DateTime key.*/public static final String DATETIME_KEY = "datetime";public AfterRoutePredicateFactory() {super(Config.class);}@Overridepublic List<String> shortcutFieldOrder() {return Collections.singletonList(DATETIME_KEY);}@Overridepublic Predicate<ServerWebExchange> apply(Config config) {return new GatewayPredicate() {@Overridepublic boolean test(ServerWebExchange serverWebExchange) {final ZonedDateTime now = ZonedDateTime.now();return now.isAfter(config.getDatetime());}@Overridepublic String toString() {return String.format("After: %s", config.getDatetime());}};}public static class Config {@NotNullprivate ZonedDateTime datetime;
}

Spring Cloud Gateway 源码解析(4)-- filter相关推荐

  1. api网关揭秘--spring cloud gateway源码解析

    要想了解spring cloud gateway的源码,要熟悉spring webflux,我的上篇文章介绍了spring webflux. 1.gateway 和zuul对比 I am the au ...

  2. Spring Cloud Gateway 源码解析(3) —— Predicate

    目录 RoutePredicateFactory GatewayPredicate AfterRoutePredicateFactory RoutePredicateHandlerMapping Fi ...

  3. Spring Cloud Gateway 源码解析(1) —— 基础

    目录 Gateway初始化 启用Gateway GatewayClassPathWarningAutoConfiguration GatewayLoadBalancerClientAutoConfig ...

  4. Spring Cloud Gateway 源码解析(2) —— 路由

    目录 基本组件 路由定位器(RouteDefinitionLocator ) 路由定义(RouteDefinition) PredicateDefinition FilterDefinition Co ...

  5. Spring cloud Gateway 源码(二) 路由流程

    目录 1.DispatcherHandler 1.1 handle方法 1.1.1 getHandler 获取请求处理器 1.1.2 invokeHandler 执行 2.路由选择 2.1  选择目标 ...

  6. Spring Cloud Gateway源码系列之路由配置加载过程

    当前章节主要是讲解配置文件中定义的路由配置被gateway加载,同时转为可以直接操作的路由对象 引入pom坐标 <dependency><groupId>org.springf ...

  7. spring cloud ribbon源码解析(一)

    我们知道spring cloud中restTemplate可以通过服务名调接口,加入@loadBalanced标签就实现了负载均衡的功能,那么spring cloud内部是如何实现的呢? 通过@loa ...

  8. Spring 之 @Cacheable 源码解析(上)

    一.@EnableCaching 源码解析 当要使用 @Cacheable 注解时需要引入 @EnableCaching 注解开启缓存功能.为什么呢?现在就来看看为什么要加入 @EnableCachi ...

  9. Spring 之 @Cacheable 源码解析(下)

    CacheInterceptor 缓存切面处理逻辑 接着上篇 Spring 之 @Cacheable 源码解析(上) 说起,代理对象已经创建成功,接着分析调用流程.那么应该从哪里入手呢?当然是去看 A ...

最新文章

  1. ng-cordova和cordova区别
  2. 太赞了!副业月入3W的技术大佬的公众号,学起来!
  3. Android时间选择器对话框的使用
  4. DAL层修改sql表数据
  5. 6.网络层(4)---IP多播,NAT
  6. 大教堂与集市 The Cathedral The Bazaar -- 这是当代软件技术领域最重要的著作
  7. DNF服务器搭建服务端架设教程
  8. 【Unity3D】制作进度条——让Image同时具有Filled和Sliced的功能
  9. 亿级流量网站架构核心技术 跟开涛学搭建高可用高并发系统
  10. 1、misa统计SRR结果
  11. 联想微型计算机供电电源线,拆修一只联想电源适配器,告诉你一个不为人知的秘密...
  12. 单页面SPA(如react,vue)网站的服务器渲染SSR之SEO大杀器rendertron(超详细配置+避坑)
  13. 内存(DRAM)芯片国产进程
  14. 霍夫圆检测(HoughCircles)
  15. 怎么获取机智股票自动交易软件
  16. C# aspnetcore 完整修改项目名称
  17. 小程序 视图不随数据动态改变
  18. CorelDraw Graphics Suite 2020 22.1.0.517 中文版
  19. 消费金融业务模式结构图
  20. Pandas超全总结

热门文章

  1. io-同步 异步 阻塞 非阻塞
  2. rfid3-micro2440,linux2.6.32.2,写成misc驱动
  3. 【Spring学习】spring开发包介绍
  4. Burrow 服务的安装部署
  5. 【Python实战】机型自动化标注(搜狗爬虫实现)
  6. 【Android】手机端的投射
  7. [九度][何海涛] 栈的压入压出
  8. Windows下 Apache+PHP5+MYSQL5+phpmyadmin 规范安装
  9. 日志管理(一):slf4j原理简单介绍
  10. html基于web2.0标准,晕倒:“用web2.0来制作符合标准的页面”