Spring Cloud Gateway 源码解析(4)-- filter
文章目录
- 绑定Filter
- HandlerMapping
- Filter
- GatewayFilterChain
- FilteringWebHandler
- GlobalFilter实例化
- GatewayFilter
- GlobalFilter
- RemoveCachedBodyFilter
- AdaptCachedBodyGlobalFilter
- NettyWriteResponseFilter
- GatewayMetricsFilter
- ForwardPathFilter
- RouteToRequestUrlFilter
- LoadBalancerClientFilter
- ReactiveLoadBalancerClientFilter
- WebsocketRoutingFilter
- NettyRoutingFilter
- ForwardRoutingFilter
- WebClientHttpRoutingFilter
- WebClientWriteResponseFilter
- 总结
- 通过配置生成Filter
- Configurable
- ShortcutConfigurable
- ConfigurationService
- RouteDefinitionRouteLocator
- GatewayFilterFactory
- AddRequestHeaderGatewayFilterFactory
- RoutePredicateFactory
- PathRoutePredicateFactory
绑定Filter
WebFlux
通过HandlerMapping
来根据请求获取handler。Spring Cloud Gateway 实现了类RoutePredicateHandlerMapping
。
public class RoutePredicateHandlerMapping extends AbstractHandlerMapping {//请求处理private final FilteringWebHandler webHandler;//路由定位器。RouteLocator 是多种 RouteLocator 的合集。private final RouteLocator routeLocator;private final Integer managementPort;private final ManagementPortType managementPortType;//父类:private CorsConfigurationSource corsConfigurationSource;
}
@Overrideprotected Mono<?> getHandlerInternal(ServerWebExchange exchange) {// don't handle requests on management port if set and different than server portif (this.managementPortType == DIFFERENT && this.managementPort != null&& exchange.getRequest().getURI().getPort() == this.managementPort) {return Mono.empty();}//设置mapping。exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());return lookupRoute(exchange)// .log("route-predicate-handler-mapping", Level.FINER) //name this.flatMap((Function<Route, Mono<?>>) r -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isDebugEnabled()) {logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);}//设置路由属性:当前路由exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);return Mono.just(webHandler);}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);if (logger.isTraceEnabled()) {logger.trace("No RouteDefinition found for ["+ getExchangeDesc(exchange) + "]");}})));}//查找路由protected Mono<Route> lookupRoute(ServerWebExchange exchange) {return this.routeLocator.getRoutes() //获取所有定义路由// individually filter routes so that filterWhen error delaying is not a// problem.concatMap(route -> Mono.just(route).filterWhen(r -> {// add the current route we are testingexchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());//返回匹配的路由return r.getPredicate().apply(exchange);})// instead of immediately stopping main flux due to error, log and// swallow it.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(),e)).onErrorResume(e -> Mono.empty()))// .defaultIfEmpty() put a static Route not found// or .switchIfEmpty()// .switchIfEmpty(Mono.<Route>empty().log("noroute")).next()// TODO: error handling.map(route -> {if (logger.isDebugEnabled()) {logger.debug("Route matched: " + route.getId());}validateRoute(route, exchange);return route;});}
HandlerMapping
通过Request查找Handler,由DispatcherHandler
负责处理。
public Mono<Void> handle(ServerWebExchange exchange) {if (this.handlerMappings == null) {return createNotFoundError();}return Flux.fromIterable(this.handlerMappings).concatMap(mapping -> mapping.getHandler(exchange)).next().switchIfEmpty(createNotFoundError()).flatMap(handler -> invokeHandler(exchange, handler)).flatMap(result -> handleResult(exchange, result));}
默认HandlerMapping
:
org.springframework.boot.actuate.endpoint.web.reactive.WebFluxEndpointHandlerMapping//通过controller查找handlerorg.springframework.boot.actuate.endpoint.web.reactive.ControllerEndpointHandlerMappingorg.springframework.web.reactive.function.server.support.RouterFunctionMapping org.springframework.web.reactive.result.method.annotation.RequestMappingHandlerMapping//通过配置的路由查找handlerorg.springframework.cloud.gateway.handler.RoutePredicateHandlerMapping//资源目录,classpath,resources等查找org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
顺序调用,返回找到的第一个Handler
。不同的handler有不同的Filter列表。常用的是RoutePredicateHandlerMapping
得到的Handler。里面包含GatewayFilter等。
Filter
GatewayFilterChain
Filter 链
public interface GatewayFilterChain {Mono<Void> filter(ServerWebExchange exchange);
}
FilteringWebHandler 类定义了GatewayFilterChain
的默认实现DefaultGatewayFilterChain
。
private static class DefaultGatewayFilterChain implements GatewayFilterChain {//当前chain对应的 filter的下标。private final int index;private final List<GatewayFilter> filters;DefaultGatewayFilterChain(List<GatewayFilter> filters) {this.filters = filters;this.index = 0;}private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {this.filters = parent.getFilters();this.index = index;}public List<GatewayFilter> getFilters() {return filters;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange) {return Mono.defer(() -> {if (this.index < filters.size()) {//获取当前filter,GatewayFilter filter = filters.get(this.index);//构造chain。DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,this.index + 1);return filter.filter(exchange, chain);}else {return Mono.empty(); // complete}});}}//把GlobalFilter 包装成 GatewayFilterprivate static class GatewayFilterAdapter implements GatewayFilter {private final GlobalFilter delegate;GatewayFilterAdapter(GlobalFilter delegate) {this.delegate = delegate;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return this.delegate.filter(exchange, chain);}@Overridepublic String toString() {final StringBuilder sb = new StringBuilder("GatewayFilterAdapter{");sb.append("delegate=").append(delegate);sb.append('}');return sb.toString();}}
FilteringWebHandler
FilteringWebHandler
用于组装GatewayFilter。
public class FilteringWebHandler implements WebHandler {protected static final Log logger = LogFactory.getLog(FilteringWebHandler.class);private final List<GatewayFilter> globalFilters;public FilteringWebHandler(List<GlobalFilter> globalFilters) {this.globalFilters = loadFilters(globalFilters);}//把GlobalFilter包装成GatewayFilterprivate static List<GatewayFilter> loadFilters(List<GlobalFilter> filters) {return filters.stream().map(filter -> {//构造GatewayFilterAdapterGatewayFilterAdapter gatewayFilter = new GatewayFilterAdapter(filter);//排序的if (filter instanceof Ordered) {int order = ((Ordered) filter).getOrder();return new OrderedGatewayFilter(gatewayFilter, order);}return gatewayFilter;}).collect(Collectors.toList());}/** TODO: relocate @EventListener(RefreshRoutesEvent.class) void handleRefresh() {* this.combinedFiltersForRoute.clear();*/@Overridepublic Mono<Void> handle(ServerWebExchange exchange) {//获取Route。Route route = exchange.getRequiredAttribute(GATEWAY_ROUTE_ATTR);//获取Route的 Filter。List<GatewayFilter> gatewayFilters = route.getFilters();//合并GlobalFilter 和 GatewayFilter。List<GatewayFilter> combined = new ArrayList<>(this.globalFilters);combined.addAll(gatewayFilters);// TODO: needed or cached?//排序。AnnotationAwareOrderComparator.sort(combined);if (logger.isDebugEnabled()) {logger.debug("Sorted gatewayFilterFactories: " + combined);}//构造 GatewayFilterChain。return new DefaultGatewayFilterChain(combined).filter(exchange);}}
GlobalFilter实例化
在GatewayAutoConfiguration
中定义Bean。
GatewayFilter
GatewayFilter 网关过滤器用于拦截并链式处理web请求,可以实现横切的与应用无关的需求,比如:安全、访问超时的设置等。
public interface GatewayFilter extends ShortcutConfigurable {/*** Name key.*/String NAME_KEY = "name";/*** Value key.*/String VALUE_KEY = "value";//chain:下一个要执行的chain(对应一个filter)Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);}/** Ordered , delegate.
*/
public class OrderedGatewayFilter implements GatewayFilter, Ordered {private final GatewayFilter delegate;private final int order;public GatewayFilter getDelegate() {return delegate;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return this.delegate.filter(exchange, chain);}@Overridepublic int getOrder() {return this.order;}@Overridepublic String toString() {return new StringBuilder("[").append(delegate).append(", order = ").append(order).append("]").toString();}}
GlobalFilter
GlobalGilter 全局过滤器接口与 GatewayFilter 网关过滤器接口具有相同的方法定义。全局过滤器是一系列特殊的过滤器,会根据条件应用到所有路由中。网关过滤器是更细粒度的过滤器,作用于指定的路由中。
public interface GlobalFilter {Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}
RemoveCachedBodyFilter
删除网关上下文中的缓存。RemoveCachedBodyFilter没有 Pre Filter。
//public static final String CACHED_REQUEST_BODY_ATTR = "cachedRequestBody";
public class RemoveCachedBodyFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(RemoveCachedBodyFilter.class);@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return chain.filter(exchange).doFinally(s -> {//删除 缓存的requestbody 属性。PooledDataBuffer dataBuffer = (PooledDataBuffer) exchange.getAttributes().remove(CACHED_REQUEST_BODY_ATTR);if (dataBuffer != null && dataBuffer.isAllocated()) {if (log.isTraceEnabled()) {log.trace("releasing cached body in exchange attribute");}dataBuffer.release();}});}@Overridepublic int getOrder() {return HIGHEST_PRECEDENCE;}}
AdaptCachedBodyGlobalFilter
从请求中获取body缓存到网关上线文,属性是CACHED_REQUEST_BODY_ATTR(cachedRequestBody),这样就可以直接从网关上下文中拿到请求参数,而不会出现从request中拿到之后还要回填到请求体的问题;
public class AdaptCachedBodyGlobalFilterimplements GlobalFilter, Ordered, ApplicationListener<EnableBodyCachingEvent> {private ConcurrentMap<String, Boolean> routesToCache = new ConcurrentHashMap<>();/*** Cached request body key.*/@Deprecatedpublic static final String CACHED_REQUEST_BODY_KEY = CACHED_REQUEST_BODY_ATTR;@Overridepublic void onApplicationEvent(EnableBodyCachingEvent event) {this.routesToCache.putIfAbsent(event.getRouteId(), true);}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//获取属性:CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR = "cachedServerHttpRequestDecorator";ServerHttpRequest cachedRequest = exchange.getAttributeOrDefault(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR, null);if (cachedRequest != null) {//移除属性,exchange.getAttributes().remove(CACHED_SERVER_HTTP_REQUEST_DECORATOR_ATTR);return chain.filter(exchange.mutate().request(cachedRequest).build());}//CACHED_REQUEST_BODY_ATTR = "cachedRequestBody";DataBuffer body = exchange.getAttributeOrDefault(CACHED_REQUEST_BODY_ATTR, null);//GATEWAY_ROUTE_ATTR = qualify("gatewayRoute");Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);if (body != null || !this.routesToCache.containsKey(route.getId())) {return chain.filter(exchange);}return ServerWebExchangeUtils.cacheRequestBody(exchange, (serverHttpRequest) -> {// don't mutate and build if same request objectif (serverHttpRequest == exchange.getRequest()) {return chain.filter(exchange);}return chain.filter(exchange.mutate().request(serverHttpRequest).build());});}@Overridepublic int getOrder() {return Ordered.HIGHEST_PRECEDENCE + 1000;}}
NettyWriteResponseFilter
NettyWriteResponseFilter
没有 Pre Filter 代码,因此是个Post Filter。
public class NettyWriteResponseFilter implements GlobalFilter, Ordered {/*** Order for write response filter.*/public static final int WRITE_RESPONSE_FILTER_ORDER = -1;private static final Log log = LogFactory.getLog(NettyWriteResponseFilter.class);private final List<MediaType> streamingMediaTypes;public NettyWriteResponseFilter(List<MediaType> streamingMediaTypes) {this.streamingMediaTypes = streamingMediaTypes;}@Overridepublic int getOrder() {return WRITE_RESPONSE_FILTER_ORDER;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {// NOTICE: nothing in "pre" filter stage as CLIENT_RESPONSE_CONN_ATTR is not added// until the NettyRoutingFilter is run// @formatter:offreturn chain.filter(exchange)//异常处理:关闭连接。.doOnError(throwable -> cleanup(exchange)).then(Mono.defer(() -> {//reactor.netty.ConnectionConnection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);if (connection == null) {return Mono.empty();}if (log.isTraceEnabled()) {log.trace("NettyWriteResponseFilter start inbound: "+ connection.channel().id().asShortText() + ", outbound: "+ exchange.getLogPrefix());}ServerHttpResponse response = exchange.getResponse();//NETTY 写数据final Flux<DataBuffer> body = connection.inbound().receive().retain().map(byteBuf -> wrap(byteBuf, response));MediaType contentType = null;try {contentType = response.getHeaders().getContentType();}catch (Exception e) {if (log.isTraceEnabled()) {log.trace("invalid media type", e);}}return (isStreamingMediaType(contentType)? response.writeAndFlushWith(body.map(Flux::just)): response.writeWith(body));})).doOnCancel(() -> cleanup(exchange));// @formatter:on}protected DataBuffer wrap(ByteBuf byteBuf, ServerHttpResponse response) {if (response.bufferFactory() instanceof NettyDataBufferFactory) {NettyDataBufferFactory factory = (NettyDataBufferFactory) response.bufferFactory();return factory.wrap(byteBuf);}// MockServerHttpResponse creates theseelse if (response.bufferFactory() instanceof DefaultDataBufferFactory) {DefaultDataBufferFactory factory = (DefaultDataBufferFactory) response.bufferFactory();return factory.wrap(byteBuf.nioBuffer());}throw new IllegalArgumentException("Unkown DataBufferFactory type " + response.bufferFactory().getClass());}}
GatewayMetricsFilter
public class GatewayMetricsFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(GatewayMetricsFilter.class);private final MeterRegistry meterRegistry;private GatewayTagsProvider compositeTagsProvider;public GatewayMetricsFilter(MeterRegistry meterRegistry,List<GatewayTagsProvider> tagsProviders) {this.meterRegistry = meterRegistry;this.compositeTagsProvider = tagsProviders.stream().reduce(exchange -> Tags.empty(), GatewayTagsProvider::and);}@Deprecatedpublic GatewayMetricsFilter(MeterRegistry meterRegistry) {this(meterRegistry, Arrays.asList(new GatewayHttpTagsProvider(),new GatewayRouteTagsProvider()));}@Overridepublic int getOrder() {// start the timer as soon as possible and report the metric event before we write// response to clientreturn NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER + 1;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//记录开始执行时间Sample sample = Timer.start(meterRegistry);return chain.filter(exchange).doOnSuccessOrError((aVoid, ex) -> {endTimerRespectingCommit(exchange, sample);});}private void endTimerRespectingCommit(ServerWebExchange exchange, Sample sample) {ServerHttpResponse response = exchange.getResponse();if (response.isCommitted()) {endTimerInner(exchange, sample);}else {response.beforeCommit(() -> {endTimerInner(exchange, sample);return Mono.empty();});}}private void endTimerInner(ServerWebExchange exchange, Sample sample) {Tags tags = compositeTagsProvider.apply(exchange);if (log.isTraceEnabled()) {log.trace("gateway.requests tags: " + tags);}sample.stop(meterRegistry.timer("gateway.requests", tags));}}
ForwardPathFilter
ForwardPathFilter:路由处理。
public class ForwardPathFilter implements GlobalFilter, Ordered {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//GATEWAY_ROUTE_ATTR = qualify("gatewayRoute");Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);//获取路由Uri。URI routeUri = route.getUri();String scheme = routeUri.getScheme();//如果已路由或者是forward,则直接转发。if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {return chain.filter(exchange);}//重新构造新的url请求。exchange = exchange.mutate().request(exchange.getRequest().mutate().path(routeUri.getPath()).build()).build();return chain.filter(exchange);}@Overridepublic int getOrder() {return 0;}}
RouteToRequestUrlFilter
public class RouteToRequestUrlFilter implements GlobalFilter, Ordered {/*** Order of Route to URL.*/public static final int ROUTE_TO_URL_FILTER_ORDER = 10000;private static final Log log = LogFactory.getLog(RouteToRequestUrlFilter.class);private static final String SCHEME_REGEX = "[a-zA-Z]([a-zA-Z]|\\d|\\+|\\.|-)*:.*";static final Pattern schemePattern = Pattern.compile(SCHEME_REGEX);/* for testing */static boolean hasAnotherScheme(URI uri) {return schemePattern.matcher(uri.getSchemeSpecificPart()).matches()&& uri.getHost() == null && uri.getRawPath() == null;}@Overridepublic int getOrder() {return ROUTE_TO_URL_FILTER_ORDER;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {// //GATEWAY_ROUTE_ATTR = qualify("gatewayRoute");Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);if (route == null) {return chain.filter(exchange);}log.trace("RouteToRequestUrlFilter start");//获取路由URIURI uri = exchange.getRequest().getURI();boolean encoded = containsEncodedParts(uri);URI routeUri = route.getUri();if (hasAnotherScheme(routeUri)) {// this is a special url, save scheme to special attribute// replace routeUri with schemeSpecificPartexchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,routeUri.getScheme());routeUri = URI.create(routeUri.getSchemeSpecificPart());}if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {// Load balanced URIs should always have a host. If the host is null it is// most// likely because the host name was invalid (for example included an// underscore)throw new IllegalStateException("Invalid host: " + routeUri.toString());}//合并成完整的Url。URI mergedUrl = UriComponentsBuilder.fromUri(uri)// .uri(routeUri).scheme(routeUri.getScheme()).host(routeUri.getHost()).port(routeUri.getPort()).build(encoded).toUri();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);return chain.filter(exchange);}}
LoadBalancerClientFilter
废弃的,使用ReactiveLoadBalancerClientFilter
代替。
ReactiveLoadBalancerClientFilter
public class ReactiveLoadBalancerClientFilter implements GlobalFilter, Ordered {private final LoadBalancerClientFactory clientFactory;private LoadBalancerProperties properties;@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//GATEWAY_REQUEST_URL_ATTR = qualify("gatewayRequestUrl");URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);//GATEWAY_SCHEME_PREFIX_ATTR = qualify( "gatewaySchemePrefix");String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);if (url == null|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {return chain.filter(exchange);}// 保存原始URI//GATEWAY_ORIGINAL_REQUEST_URL_ATTR = qualify( "gatewayOriginalRequestUrl");addOriginalRequestUrl(exchange, url);if (log.isTraceEnabled()) {log.trace(ReactiveLoadBalancerClientFilter.class.getSimpleName()+ " url before: " + url);}return choose(exchange).doOnNext(response -> {if (!response.hasServer()) {throw NotFoundException.create(properties.isUse404(),"Unable to find instance for " + url.getHost());}URI uri = exchange.getRequest().getURI();// if the `lb:<scheme>` mechanism was used, use `<scheme>` as the default,// if the loadbalancer doesn't provide one.String overrideScheme = null;if (schemePrefix != null) {overrideScheme = url.getScheme();}//取得服务实例DelegatingServiceInstance serviceInstance = new DelegatingServiceInstance(response.getServer(), overrideScheme);//重构requestURI requestUrl = reconstructURI(serviceInstance, uri);if (log.isTraceEnabled()) {log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);}exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);}).then(chain.filter(exchange));}//选择服务实例。private Mono<Response<ServiceInstance>> choose(ServerWebExchange exchange) {URI uri = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);ReactorLoadBalancer<ServiceInstance> loadBalancer = this.clientFactory.getInstance(uri.getHost(), ReactorLoadBalancer.class,ServiceInstance.class);if (loadBalancer == null) {throw new NotFoundException("No loadbalancer available for " + uri.getHost());}return loadBalancer.choose(createRequest());}private Request createRequest() {return ReactiveLoadBalancer.REQUEST;}}
WebsocketRoutingFilter
public class WebsocketRoutingFilter implements GlobalFilter, Ordered {/*** Sec-Websocket protocol.*/public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";private static final Log log = LogFactory.getLog(WebsocketRoutingFilter.class);private final WebSocketClient webSocketClient;private final WebSocketService webSocketService;private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;// do not use this headersFilters directly, use getHeadersFilters() instead.private volatile List<HttpHeadersFilter> headersFilters;/* for testing */static String convertHttpToWs(String scheme) {scheme = scheme.toLowerCase();return "http".equals(scheme) ? "ws" : "https".equals(scheme) ? "wss" : scheme;}@Overridepublic int getOrder() {// Before NettyRoutingFilter since this routes certain http requestsreturn Ordered.LOWEST_PRECEDENCE - 1;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {changeSchemeIfIsWebSocketUpgrade(exchange);URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (isAlreadyRouted(exchange)|| (!"ws".equals(scheme) && !"wss".equals(scheme))) {return chain.filter(exchange);}setAlreadyRouted(exchange);HttpHeaders headers = exchange.getRequest().getHeaders();HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);if (protocols != null) {protocols = headers.get(SEC_WEBSOCKET_PROTOCOL).stream().flatMap(header -> Arrays.stream(commaDelimitedListToStringArray(header))).map(String::trim).collect(Collectors.toList());}return this.webSocketService.handleRequest(exchange, new ProxyWebSocketHandler(requestUrl, this.webSocketClient, filtered, protocols));}private List<HttpHeadersFilter> getHeadersFilters() {if (this.headersFilters == null) {this.headersFilters = this.headersFiltersProvider.getIfAvailable(ArrayList::new);headersFilters.add((headers, exchange) -> {HttpHeaders filtered = new HttpHeaders();headers.entrySet().stream().filter(entry -> !entry.getKey().toLowerCase().startsWith("sec-websocket")).forEach(header -> filtered.addAll(header.getKey(),header.getValue()));return filtered;});}return this.headersFilters;}private void changeSchemeIfIsWebSocketUpgrade(ServerWebExchange exchange) {// Check the UpgradeURI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme().toLowerCase();String upgrade = exchange.getRequest().getHeaders().getUpgrade();// change the scheme if the socket client send a "http" or "https"if ("WebSocket".equalsIgnoreCase(upgrade)&& ("http".equals(scheme) || "https".equals(scheme))) {String wsScheme = convertHttpToWs(scheme);URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri();exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);if (log.isTraceEnabled()) {log.trace("changeSchemeTo:[" + wsRequestUrl + "]");}}}private static class ProxyWebSocketHandler implements WebSocketHandler {private final WebSocketClient client;private final URI url;private final HttpHeaders headers;private final List<String> subProtocols;@Overridepublic Mono<Void> handle(WebSocketSession session) {// pass headers along so custom headers can be sent throughreturn client.execute(url, this.headers, new WebSocketHandler() {@Overridepublic Mono<Void> handle(WebSocketSession proxySession) {// Use retain() for Reactor NettyMono<Void> proxySessionSend = proxySession.send(session.receive().doOnNext(WebSocketMessage::retain));// .log("proxySessionSend", Level.FINE);Mono<Void> serverSessionSend = session.send(proxySession.receive().doOnNext(WebSocketMessage::retain));// .log("sessionSend", Level.FINE);return Mono.zip(proxySessionSend, serverSessionSend).then();}/*** Copy subProtocols so they are available downstream.* @return*/@Overridepublic List<String> getSubProtocols() {return ProxyWebSocketHandler.this.subProtocols;}});}}}
NettyRoutingFilter
public class NettyRoutingFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);private final HttpClient httpClient;private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;private final HttpClientProperties properties;// do not use this headersFilters directly, use getHeadersFilters() instead.private volatile List<HttpHeadersFilter> headersFilters;public List<HttpHeadersFilter> getHeadersFilters() {if (headersFilters == null) {headersFilters = headersFiltersProvider.getIfAvailable();}return headersFilters;}@Override@SuppressWarnings("Duplicates")public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {//URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (isAlreadyRouted(exchange)|| (!"http".equals(scheme) && !"https".equals(scheme))) {return chain.filter(exchange);}//处理 http,https。setAlreadyRouted(exchange);ServerHttpRequest request = exchange.getRequest();final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());final String url = requestUrl.toASCIIString();HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();filtered.forEach(httpHeaders::set);boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);Flux<HttpClientResponse> responseFlux = getHttpClient(route, exchange).headers(headers -> {headers.add(httpHeaders);// Will either be set below, or later by Nettyheaders.remove(HttpHeaders.HOST);if (preserveHost) {String host = request.getHeaders().getFirst(HttpHeaders.HOST);headers.add(HttpHeaders.HOST, host);}}).request(method).uri(url).send((req, nettyOutbound) -> {//发送请求if (log.isTraceEnabled()) {nettyOutbound.withConnection(connection -> log.trace("outbound route: "+ connection.channel().id().asShortText()+ ", inbound: " + exchange.getLogPrefix()));}return nettyOutbound.send(request.getBody().map(this::getByteBuf));}).responseConnection((res, connection) -> {//设置响应内容// Defer committing the response until all route filters have run// Put client response as ServerWebExchange attribute and write// response later NettyWriteResponseFilterexchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response = exchange.getResponse();// put headers and status so filters can modify the responseHttpHeaders headers = new HttpHeaders();res.responseHeaders().forEach(entry -> headers.add(entry.getKey(), entry.getValue()));String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,contentTypeValue);}setResponseStatus(res, response);// make sure headers filters run after setting status so it is// available in responseHttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(getHeadersFilters(), headers, exchange, Type.RESPONSE);if (!filteredResponseHeaders.containsKey(HttpHeaders.TRANSFER_ENCODING)&& filteredResponseHeaders.containsKey(HttpHeaders.CONTENT_LENGTH)) {// It is not valid to have both the transfer-encoding header and// the content-length header.// Remove the transfer-encoding header in the response if the// content-length header is present.response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);}exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,filteredResponseHeaders.keySet());response.getHeaders().putAll(filteredResponseHeaders);return Mono.just(res);});//设置超时。Duration responseTimeout = getResponseTimeout(route);if (responseTimeout != null) {responseFlux = responseFlux.timeout(responseTimeout, Mono.error(new TimeoutException("Response took longer than timeout: " + responseTimeout))).onErrorMap(TimeoutException.class,th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,th.getMessage(), th));}//转发。return responseFlux.then(chain.filter(exchange));}protected ByteBuf getByteBuf(DataBuffer dataBuffer) {if (dataBuffer instanceof NettyDataBuffer) {NettyDataBuffer buffer = (NettyDataBuffer) dataBuffer;return buffer.getNativeBuffer();}// MockServerHttpResponse creates theseelse if (dataBuffer instanceof DefaultDataBuffer) {DefaultDataBuffer buffer = (DefaultDataBuffer) dataBuffer;return Unpooled.wrappedBuffer(buffer.getNativeBuffer());}throw new IllegalArgumentException("Unable to handle DataBuffer of type " + dataBuffer.getClass());}private void setResponseStatus(HttpClientResponse clientResponse,ServerHttpResponse response) {HttpStatus status = HttpStatus.resolve(clientResponse.status().code());if (status != null) {response.setStatusCode(status);}else {while (response instanceof ServerHttpResponseDecorator) {response = ((ServerHttpResponseDecorator) response).getDelegate();}if (response instanceof AbstractServerHttpResponse) {((AbstractServerHttpResponse) response).setStatusCodeValue(clientResponse.status().code());}else {// TODO: log warning here, not throw error?throw new IllegalStateException("Unable to set status code "+ clientResponse.status().code() + " on response of type "+ response.getClass().getName());}}}/*** Creates a new HttpClient with per route timeout configuration. Sub-classes that* override, should call super.getHttpClient() if they want to honor the per route* timeout configuration.* @param route the current route.* @param exchange the current ServerWebExchange.* @param chain the current GatewayFilterChain.* @return*/protected HttpClient getHttpClient(Route route, ServerWebExchange exchange) {Object connectTimeoutAttr = route.getMetadata().get(CONNECT_TIMEOUT_ATTR);if (connectTimeoutAttr != null) {Integer connectTimeout = getInteger(connectTimeoutAttr);return this.httpClient.tcpConfiguration((tcpClient) -> tcpClient.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout));}return httpClient;}static Integer getInteger(Object connectTimeoutAttr) {Integer connectTimeout;if (connectTimeoutAttr instanceof Integer) {connectTimeout = (Integer) connectTimeoutAttr;}else {connectTimeout = Integer.parseInt(connectTimeoutAttr.toString());}return connectTimeout;}private Duration getResponseTimeout(Route route) {Object responseTimeoutAttr = route.getMetadata().get(RESPONSE_TIMEOUT_ATTR);Long responseTimeout = null;if (responseTimeoutAttr != null) {if (responseTimeoutAttr instanceof Number) {responseTimeout = ((Number) responseTimeoutAttr).longValue();}else {responseTimeout = Long.valueOf(responseTimeoutAttr.toString());}}return responseTimeout != null ? Duration.ofMillis(responseTimeout): properties.getResponseTimeout();}}
ForwardRoutingFilter
public class ForwardRoutingFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(ForwardRoutingFilter.class);private final ObjectProvider<DispatcherHandler> dispatcherHandlerProvider;// do not use this dispatcherHandler directly, use getDispatcherHandler() instead.private volatile DispatcherHandler dispatcherHandler;@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (isAlreadyRouted(exchange) || !"forward".equals(scheme)) {return chain.filter(exchange);}// TODO: translate url?if (log.isTraceEnabled()) {log.trace("Forwarding to URI: " + requestUrl);}return this.getDispatcherHandler().handle(exchange);}}
WebClientHttpRoutingFilter
默认未使用
WebClientWriteResponseFilter
默认未使用
总结
GlobalFilter执行顺序
Filter | Type | Order | 功能 |
---|---|---|---|
AdaptCachedBodyGlobalFilter | PRE | HIGHEST_PRECEDENCE + 1000 | 缓存body |
GatewayMetricsFilter | PRE+ POST | 0 | 指标统计 |
ForwardPathFilter | PRE | 0 | 路由转发 |
RouteToRequestUrlFilter | PRE | 10000 | 路由转成成完成的Url。 |
LoadBalancerClientFilter | * | ||
ReactiveLoadBalancerClientFilter | PRE | 10150 |
负载均衡,解析lb schema。
|
WebsocketRoutingFilter | ROUTE | LOWEST_PRECEDENCE - 1 |
处理websocket。ws,wss。 如果是ws,wss则处理完成,开始执行POST |
NettyRoutingFilter | ROUTE | LOWEST_PRECEDENCE | 处理 http,https。不再转发,开始执行POST |
ForwardRoutingFilter | ROUTE | LOWEST_PRECEDENCE | 用于本地forward,也就是将请求在Gateway服务内进行转发,而不是转发到下游服务。 |
GatewayMetricsFilter | PRE+ POST | 0 | 指标统计。 |
NettyWriteResponseFilter | POST | -1 | Netty写响应数据 |
RemoveCachedBodyFilter | POST | HIGHEST_PRECEDENCE | 清除body缓存 |
Order只影响构造Chain 的Filter的顺序,不是请求的执行顺序。如果Filter 在 return 之前没有处理逻辑,则仅是个POST FILTER。
Filter 类型: 1、Pre:前置处理,2:ROUTE,表示处理请求,不再往后转发。3:POST,表示 处理响应。
通过配置生成Filter
生成Filter,RoutePredicate等都是通过Factory生成。然后调用 apply()
方法生成对应实例。
GatewayFilterFactory,RoutePredicateFactory 方法类似。
Configurable
public interface Configurable<C> {Class<C> getConfigClass();C newConfig();}public abstract class AbstractConfigurable<C> implements Configurable<C> {private Class<C> configClass;}
ShortcutConfigurable
ShortcutConfigurable 接口提供的默认方法,主要用于对过滤器和断言参数进行标准化处理,将表达式和生成的键进行转换。
public interface ShortcutConfigurable {static String normalizeKey(String key, int entryIdx, ShortcutConfigurable argHints,Map<String, String> args) {// RoutePredicateFactory has name hints and this has a fake key name// replace with the matching key hint//GENERATED_NAME_PREFIX = "_genkey_";if (key.startsWith(NameUtils.GENERATED_NAME_PREFIX)&& !argHints.shortcutFieldOrder().isEmpty() && entryIdx < args.size()&& entryIdx < argHints.shortcutFieldOrder().size()) {key = argHints.shortcutFieldOrder().get(entryIdx);}return key;}/**获取值,如果是spel表达式,则计算出表达式的值。*/static Object getValue(SpelExpressionParser parser, BeanFactory beanFactory,String entryValue) {Object value;String rawValue = entryValue;if (rawValue != null) {rawValue = rawValue.trim();}if (rawValue != null && rawValue.startsWith("#{") && entryValue.endsWith("}")) {// assume it's spelStandardEvaluationContext context = new StandardEvaluationContext();context.setBeanResolver(new BeanFactoryResolver(beanFactory));Expression expression = parser.parseExpression(entryValue,new TemplateParserContext());value = expression.getValue(context);}else {value = entryValue;}return value;}default ShortcutType shortcutType() {return ShortcutType.DEFAULT;}/*** Returns hints about the number of args and the order for shortcut parsing.* @return the list of hints*/default List<String> shortcutFieldOrder() {return Collections.emptyList();}default String shortcutFieldPrefix() {return "";}enum ShortcutType {DEFAULT {@Overridepublic Map<String, Object> normalize(Map<String, String> args,ShortcutConfigurable shortcutConf, SpelExpressionParser parser,BeanFactory beanFactory) {Map<String, Object> map = new HashMap<>();int entryIdx = 0;//处理每个map。for (Map.Entry<String, String> entry : args.entrySet()) {String key = normalizeKey(entry.getKey(), entryIdx, shortcutConf,args);Object value = getValue(parser, beanFactory, entry.getValue());map.put(key, value);entryIdx++;}return map;}},//一个key,多个值。GATHER_LIST {@Overridepublic Map<String, Object> normalize(Map<String, String> args,ShortcutConfigurable shortcutConf, SpelExpressionParser parser,BeanFactory beanFactory) {Map<String, Object> map = new HashMap<>();// field order should be of size 1List<String> fieldOrder = shortcutConf.shortcutFieldOrder();Assert.isTrue(fieldOrder != null && fieldOrder.size() == 1,"Shortcut Configuration Type GATHER_LIST must have shortcutFieldOrder of size 1");String fieldName = fieldOrder.get(0);map.put(fieldName,args.values().stream().map(value -> getValue(parser, beanFactory, value)).collect(Collectors.toList()));return map;}},// list is all elements except last which is a boolean flagGATHER_LIST_TAIL_FLAG {@Overridepublic Map<String, Object> normalize(Map<String, String> args,ShortcutConfigurable shortcutConf, SpelExpressionParser parser,BeanFactory beanFactory) {Map<String, Object> map = new HashMap<>();// field order should be of size 1List<String> fieldOrder = shortcutConf.shortcutFieldOrder();Assert.isTrue(fieldOrder != null && fieldOrder.size() == 2,"Shortcut Configuration Type GATHER_LIST_HEAD must have shortcutFieldOrder of size 2");List<String> values = new ArrayList<>(args.values());if (!values.isEmpty()) {// strip boolean flag if last entry is true or falseint lastIdx = values.size() - 1;String lastValue = values.get(lastIdx);if (lastValue.equalsIgnoreCase("true")|| lastValue.equalsIgnoreCase("false")) {values = values.subList(0, lastIdx);map.put(fieldOrder.get(1),getValue(parser, beanFactory, lastValue));}}String fieldName = fieldOrder.get(0);map.put(fieldName,values.stream().map(value -> getValue(parser, beanFactory, value)).collect(Collectors.toList()));return map;}};/**抽象方法。args:配置文件中的配置信息。shortcutConf:*/public abstract Map<String, Object> normalize(Map<String, String> args,ShortcutConfigurable shortcutConf, SpelExpressionParser parser,BeanFactory beanFactory);}}
ShortcutType
默认实现了3种情况的解析。
DEFAULT
:按照shortcutFieldOrder顺序依次赋值,实现类:AfterRoutePredicateFactory
。GATHER_LIST
:shortcutFiledOrder只能有一个值,如果参数有多个拼成一个集合。shortcutFieldOrder接口实现返回的list,必须是size=1。实现类:HostRoutePredicateFactory
GATHER_LIST_TAIL_FLAG
:shortcutFiledOrder只能有两个值,其中最后一个值为true或者false,其余的值变成一个集合赋值给第一个值。例如:PathRoutePredicateFactory
。
ConfigurationService
public class ConfigurationService implements ApplicationEventPublisherAware {private ApplicationEventPublisher publisher;private BeanFactory beanFactory;private Supplier<ConversionService> conversionService;private SpelExpressionParser parser = new SpelExpressionParser();private Supplier<Validator> validator;
}
public abstract class AbstractBuilder<T, B extends AbstractBuilder<T, B>> {protected final ConfigurationService service;protected BiFunction<T, Map<String, Object>, ApplicationEvent> eventFunction;protected String name;protected Map<String, Object> normalizedProperties;protected Map<String, String> properties;public AbstractBuilder(ConfigurationService service) {this.service = service;}protected abstract B getThis();protected abstract void validate();//标准化属性protected Map<String, Object> normalizeProperties() {Map<String, Object> normalizedProperties = new HashMap<>();this.properties.forEach(normalizedProperties::put);return normalizedProperties;}protected abstract T doBind();public T bind() {validate();Assert.hasText(this.name, "name may not be empty");Assert.isTrue(this.properties != null || this.normalizedProperties != null,"properties and normalizedProperties both may not be null");if (this.normalizedProperties == null) {this.normalizedProperties = normalizeProperties();}//绑定T bound = doBind();if (this.eventFunction != null && this.service.publisher != null) {ApplicationEvent applicationEvent = this.eventFunction.apply(bound,this.normalizedProperties);this.service.publisher.publishEvent(applicationEvent);}return bound;}}/**构造:Configurable<T> & ShortcutConfigurable
*/public class ConfigurableBuilder<T, C extends Configurable<T> & ShortcutConfigurable>extends AbstractBuilder<T, ConfigurableBuilder<T, C>> {private final C configurable;@Overrideprotected void validate() {Assert.notNull(this.configurable, "configurable may not be null");}@Overrideprotected Map<String, Object> normalizeProperties() {if (this.service.beanFactory != null) {//标准化属性return this.configurable.shortcutType().normalize(this.properties,this.configurable, this.service.parser, this.service.beanFactory);}return super.normalizeProperties();}@Overrideprotected T doBind() {Bindable<T> bindable = Bindable.of(this.configurable.getConfigClass());T bound = bindOrCreate(bindable, this.normalizedProperties,this.configurable.shortcutFieldPrefix(),/* this.name, */this.service.validator.get(),this.service.conversionService.get());return bound;}}public class InstanceBuilder<T> extends AbstractBuilder<T, InstanceBuilder<T>> {private final T instance;@Overrideprotected T doBind() {T toBind = getTargetObject(this.instance);Bindable<T> bindable = Bindable.ofInstance(toBind);return bindOrCreate(bindable, this.normalizedProperties, this.name,this.service.validator.get(), this.service.conversionService.get());}}
构造Builder
public <T, C extends Configurable<T> & ShortcutConfigurable> ConfigurableBuilder<T, C> with(C configurable) {return new ConfigurableBuilder<T, C>(this, configurable);}public <T> InstanceBuilder<T> with(T instance) {return new InstanceBuilder<T>(this, instance);}
RouteDefinitionRouteLocator
RouteDefinitionRouteLocator
是以gateway route yaml配置为基础来构造Route。
public class RouteDefinitionRouteLocatorimplements RouteLocator, BeanFactoryAware, ApplicationEventPublisherAware {//获取routedefine private final RouteDefinitionLocator routeDefinitionLocator;private final ConfigurationService configurationService;private final Map<String, RoutePredicateFactory> predicates = new LinkedHashMap<>();private final Map<String, GatewayFilterFactory> gatewayFilterFactories = new HashMap<>();//yaml gateway配置private final GatewayProperties gatewayProperties;@Deprecatedpublic RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,List<RoutePredicateFactory> predicates,List<GatewayFilterFactory> gatewayFilterFactories,GatewayProperties gatewayProperties, ConversionService conversionService) {this.routeDefinitionLocator = routeDefinitionLocator;this.configurationService = new ConfigurationService();this.configurationService.setConversionService(conversionService);//构造 predicatesinitFactories(predicates);gatewayFilterFactories.forEach(factory -> this.gatewayFilterFactories.put(factory.name(), factory));this.gatewayProperties = gatewayProperties;}public RouteDefinitionRouteLocator(RouteDefinitionLocator routeDefinitionLocator,List<RoutePredicateFactory> predicates,List<GatewayFilterFactory> gatewayFilterFactories,GatewayProperties gatewayProperties,ConfigurationService configurationService) {this.routeDefinitionLocator = routeDefinitionLocator;this.configurationService = configurationService;initFactories(predicates);gatewayFilterFactories.forEach(factory -> this.gatewayFilterFactories.put(factory.name(), factory));this.gatewayProperties = gatewayProperties;}}
private void initFactories(List<RoutePredicateFactory> predicates) {predicates.forEach(factory -> {String key = factory.name();if (this.predicates.containsKey(key)) {this.logger.warn("A RoutePredicateFactory named " + key+ " already exists, class: " + this.predicates.get(key)+ ". It will be overwritten.");}this.predicates.put(key, factory);if (logger.isInfoEnabled()) {logger.info("Loaded RoutePredicateFactory [" + key + "]");}});}@Overridepublic Flux<Route> getRoutes() {//把RouteDefinition 转换为 RouteFlux<Route> routes = this.routeDefinitionLocator.getRouteDefinitions().map(this::convertToRoute);//失败处理策略。if (!gatewayProperties.isFailOnRouteDefinitionError()) {// instead of letting error bubble up, continueroutes = routes.onErrorContinue((error, obj) -> {if (logger.isWarnEnabled()) {logger.warn("RouteDefinition id " + ((RouteDefinition) obj).getId()+ " will be ignored. Definition has invalid configs, "+ error.getMessage());}});}return routes.map(route -> {if (logger.isDebugEnabled()) {logger.debug("RouteDefinition matched: " + route.getId());}return route;});}/**把 RouteDefinition 转换为Route*/private Route convertToRoute(RouteDefinition routeDefinition) {//把List 拼成 and。AsyncPredicate<ServerWebExchange> predicate = combinePredicates(routeDefinition);//获取定义的Filter。List<GatewayFilter> gatewayFilters = getFilters(routeDefinition);return Route.async(routeDefinition).asyncPredicate(predicate).replaceFilters(gatewayFilters).build();}private AsyncPredicate<ServerWebExchange> combinePredicates(RouteDefinition routeDefinition) {//获取定义的PredicatesList<PredicateDefinition> predicates = routeDefinition.getPredicates();//获取第一个AsyncPredicate<ServerWebExchange> predicate = lookup(routeDefinition,predicates.get(0));//拼成 and for (PredicateDefinition andPredicate : predicates.subList(1,predicates.size())) {AsyncPredicate<ServerWebExchange> found = lookup(routeDefinition,andPredicate);predicate = predicate.and(found);}return predicate;}private AsyncPredicate<ServerWebExchange> lookup(RouteDefinition route,PredicateDefinition predicate) {//获取factory。RoutePredicateFactory<Object> factory = this.predicates.get(predicate.getName());if (factory == null) {throw new IllegalArgumentException("Unable to find RoutePredicateFactory with name "+ predicate.getName());}if (logger.isDebugEnabled()) {logger.debug("RouteDefinition " + route.getId() + " applying "+ predicate.getArgs() + " to " + predicate.getName());}// @formatter:offObject config = this.configurationService.with(factory) //构造一个builder。.name(predicate.getName()).properties(predicate.getArgs()).eventFunction((bound, properties) -> new PredicateArgsEvent(RouteDefinitionRouteLocator.this, route.getId(), properties)).bind(); //构造一个 configurable。// @formatter:onreturn factory.applyAsync(config);}
GatewayFilterFactory
public interface GatewayFilterFactory<C> extends ShortcutConfigurable, Configurable<C> {/*** Name key.*/String NAME_KEY = "name";/*** Value key.*/String VALUE_KEY = "value";// useful for javadsldefault GatewayFilter apply(String routeId, Consumer<C> consumer) {C config = newConfig();consumer.accept(config);return apply(routeId, config);}default GatewayFilter apply(Consumer<C> consumer) {C config = newConfig();consumer.accept(config);return apply(config);}default Class<C> getConfigClass() {throw new UnsupportedOperationException("getConfigClass() not implemented");}@Overridedefault C newConfig() {throw new UnsupportedOperationException("newConfig() not implemented");}//应用一个配置。GatewayFilter apply(C config);default GatewayFilter apply(String routeId, C config) {if (config instanceof HasRouteId) {HasRouteId hasRouteId = (HasRouteId) config;hasRouteId.setRouteId(routeId);}return apply(config);}default String name() {// TODO: deal with proxysreturn NameUtils.normalizeFilterFactoryName(getClass());}@Deprecateddefault ServerHttpRequest.Builder mutate(ServerHttpRequest request) {return request.mutate();}}
AddRequestHeaderGatewayFilterFactory
public abstract class AbstractNameValueGatewayFilterFactory extendsAbstractGatewayFilterFactory <AbstractNameValueGatewayFilterFactory.NameValueConfig> {public AbstractNameValueGatewayFilterFactory() {super(NameValueConfig.class);}public List<String> shortcutFieldOrder() {return Arrays.asList(GatewayFilter.NAME_KEY, GatewayFilter.VALUE_KEY);}@Validatedpublic static class NameValueConfig {@NotEmptyprotected String name;@NotEmptyprotected String value;}}public class AddRequestHeaderGatewayFilterFactoryextends AbstractNameValueGatewayFilterFactory {@Overridepublic GatewayFilter apply(NameValueConfig config) {return new GatewayFilter() {@Overridepublic Mono<Void> filter(ServerWebExchange exchange,GatewayFilterChain chain) {String value = ServerWebExchangeUtils.expand(exchange, config.getValue());ServerHttpRequest request = exchange.getRequest().mutate()//添加header。.header(config.getName(), value).build();return chain.filter(exchange.mutate().request(request).build());}@Overridepublic String toString() {return filterToStringCreator(AddRequestHeaderGatewayFilterFactory.this).append(config.getName(), config.getValue()).toString();}};}}
RoutePredicateFactory
public interface RoutePredicateFactory<C> extends ShortcutConfigurable, Configurable<C> {/*** Pattern key.*/String PATTERN_KEY = "pattern";// useful for javadsldefault Predicate<ServerWebExchange> apply(Consumer<C> consumer) {C config = newConfig();consumer.accept(config);beforeApply(config);return apply(config);}... ...default void beforeApply(C config) {}Predicate<ServerWebExchange> apply(C config);}public abstract class AbstractRoutePredicateFactory<C> extends AbstractConfigurable<C>implements RoutePredicateFactory<C> {public AbstractRoutePredicateFactory(Class<C> configClass) {super(configClass);}}
PathRoutePredicateFactory
public class PathRoutePredicateFactoryextends AbstractRoutePredicateFactory<PathRoutePredicateFactory.Config> {private static final Log log = LogFactory.getLog(RoutePredicateFactory.class);private static final String MATCH_OPTIONAL_TRAILING_SEPARATOR_KEY = "matchOptionalTrailingSeparator";private PathPatternParser pathPatternParser = new PathPatternParser();public PathRoutePredicateFactory() {super(Config.class);}private static void traceMatch(String prefix, Object desired, Object actual,boolean match) {if (log.isTraceEnabled()) {String message = String.format("%s \"%s\" %s against value \"%s\"", prefix,desired, match ? "matches" : "does not match", actual);log.trace(message);}}public void setPathPatternParser(PathPatternParser pathPatternParser) {this.pathPatternParser = pathPatternParser;}@Overridepublic List<String> shortcutFieldOrder() {return Arrays.asList("patterns", MATCH_OPTIONAL_TRAILING_SEPARATOR_KEY);}@Overridepublic ShortcutType shortcutType() {return ShortcutType.GATHER_LIST_TAIL_FLAG;}@Overridepublic Predicate<ServerWebExchange> apply(Config config) {final ArrayList<PathPattern> pathPatterns = new ArrayList<>();synchronized (this.pathPatternParser) {pathPatternParser.setMatchOptionalTrailingSeparator(config.isMatchOptionalTrailingSeparator());config.getPatterns().forEach(pattern -> {PathPattern pathPattern = this.pathPatternParser.parse(pattern);pathPatterns.add(pathPattern);});}return new GatewayPredicate() {@Overridepublic boolean test(ServerWebExchange exchange) {//获取原始UrlPathContainer path = parsePath(exchange.getRequest().getURI().getRawPath());Optional<PathPattern> optionalPathPattern = pathPatterns.stream().filter(pattern -> pattern.matches(path)).findFirst();if (optionalPathPattern.isPresent()) {PathPattern pathPattern = optionalPathPattern.get();traceMatch("Pattern", pathPattern.getPatternString(), path, true);PathMatchInfo pathMatchInfo = pathPattern.matchAndExtract(path);putUriTemplateVariables(exchange, pathMatchInfo.getUriVariables());return true;}else {traceMatch("Pattern", config.getPatterns(), path, false);return false;}}@Overridepublic String toString() {return String.format("Paths: %s, match trailing slash: %b",config.getPatterns(), config.isMatchOptionalTrailingSeparator());}};}/**配置*/@Validatedpublic static class Config {private List<String> patterns = new ArrayList<>();//是否匹配结尾分隔符private boolean matchOptionalTrailingSeparator = true;}}
public class AfterRoutePredicateFactoryextends AbstractRoutePredicateFactory<AfterRoutePredicateFactory.Config> {/*** DateTime key.*/public static final String DATETIME_KEY = "datetime";public AfterRoutePredicateFactory() {super(Config.class);}@Overridepublic List<String> shortcutFieldOrder() {return Collections.singletonList(DATETIME_KEY);}@Overridepublic Predicate<ServerWebExchange> apply(Config config) {return new GatewayPredicate() {@Overridepublic boolean test(ServerWebExchange serverWebExchange) {final ZonedDateTime now = ZonedDateTime.now();return now.isAfter(config.getDatetime());}@Overridepublic String toString() {return String.format("After: %s", config.getDatetime());}};}public static class Config {@NotNullprivate ZonedDateTime datetime;
}
Spring Cloud Gateway 源码解析(4)-- filter相关推荐
- api网关揭秘--spring cloud gateway源码解析
要想了解spring cloud gateway的源码,要熟悉spring webflux,我的上篇文章介绍了spring webflux. 1.gateway 和zuul对比 I am the au ...
- Spring Cloud Gateway 源码解析(3) —— Predicate
目录 RoutePredicateFactory GatewayPredicate AfterRoutePredicateFactory RoutePredicateHandlerMapping Fi ...
- Spring Cloud Gateway 源码解析(1) —— 基础
目录 Gateway初始化 启用Gateway GatewayClassPathWarningAutoConfiguration GatewayLoadBalancerClientAutoConfig ...
- Spring Cloud Gateway 源码解析(2) —— 路由
目录 基本组件 路由定位器(RouteDefinitionLocator ) 路由定义(RouteDefinition) PredicateDefinition FilterDefinition Co ...
- Spring cloud Gateway 源码(二) 路由流程
目录 1.DispatcherHandler 1.1 handle方法 1.1.1 getHandler 获取请求处理器 1.1.2 invokeHandler 执行 2.路由选择 2.1 选择目标 ...
- Spring Cloud Gateway源码系列之路由配置加载过程
当前章节主要是讲解配置文件中定义的路由配置被gateway加载,同时转为可以直接操作的路由对象 引入pom坐标 <dependency><groupId>org.springf ...
- spring cloud ribbon源码解析(一)
我们知道spring cloud中restTemplate可以通过服务名调接口,加入@loadBalanced标签就实现了负载均衡的功能,那么spring cloud内部是如何实现的呢? 通过@loa ...
- Spring 之 @Cacheable 源码解析(上)
一.@EnableCaching 源码解析 当要使用 @Cacheable 注解时需要引入 @EnableCaching 注解开启缓存功能.为什么呢?现在就来看看为什么要加入 @EnableCachi ...
- Spring 之 @Cacheable 源码解析(下)
CacheInterceptor 缓存切面处理逻辑 接着上篇 Spring 之 @Cacheable 源码解析(上) 说起,代理对象已经创建成功,接着分析调用流程.那么应该从哪里入手呢?当然是去看 A ...
最新文章
- ng-cordova和cordova区别
- 太赞了!副业月入3W的技术大佬的公众号,学起来!
- Android时间选择器对话框的使用
- DAL层修改sql表数据
- 6.网络层(4)---IP多播,NAT
- 大教堂与集市 The Cathedral The Bazaar -- 这是当代软件技术领域最重要的著作
- DNF服务器搭建服务端架设教程
- 【Unity3D】制作进度条——让Image同时具有Filled和Sliced的功能
- 亿级流量网站架构核心技术 跟开涛学搭建高可用高并发系统
- 1、misa统计SRR结果
- 联想微型计算机供电电源线,拆修一只联想电源适配器,告诉你一个不为人知的秘密...
- 单页面SPA(如react,vue)网站的服务器渲染SSR之SEO大杀器rendertron(超详细配置+避坑)
- 内存(DRAM)芯片国产进程
- 霍夫圆检测(HoughCircles)
- 怎么获取机智股票自动交易软件
- C# aspnetcore 完整修改项目名称
- 小程序 视图不随数据动态改变
- CorelDraw Graphics Suite 2020 22.1.0.517 中文版
- 消费金融业务模式结构图
- Pandas超全总结
热门文章
- io-同步 异步 阻塞 非阻塞
- rfid3-micro2440,linux2.6.32.2,写成misc驱动
- 【Spring学习】spring开发包介绍
- Burrow 服务的安装部署
- 【Python实战】机型自动化标注(搜狗爬虫实现)
- 【Android】手机端的投射
- [九度][何海涛] 栈的压入压出
- Windows下 Apache+PHP5+MYSQL5+phpmyadmin 规范安装
- 日志管理(一):slf4j原理简单介绍
- html基于web2.0标准,晕倒:“用web2.0来制作符合标准的页面”