Spring Cloud Gateway-ServerWebExchange核心方法与请求或者响应内容的修改
Spring Cloud Gateway-ServerWebExchange核心方法与请求或者响应内容的修改
前提
- 本文编写的时候使用的
Spring Cloud Gateway
版本为当时最新的版本Greenwich.SR1
。
我们在使用Spring Cloud Gateway
的时候,注意到过滤器(包括GatewayFilter
、GlobalFilter
和过滤器链GatewayFilterChain
),都依赖到ServerWebExchange
:
public interface GlobalFilter {Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}public interface GatewayFilter extends ShortcutConfigurable {Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain);
}public interface GatewayFilterChain {Mono<Void> filter(ServerWebExchange exchange);
}
这里的设计和Servlet
中的Filter
是相似的,当前过滤器可以决定是否执行下一个过滤器的逻辑,由GatewayFilterChain#filter()
是否被调用来决定。而ServerWebExchange
就相当于当前请求和响应的上下文。ServerWebExchange
实例不但存储了Request
和Response
对象,还提供了一些扩展方法,如果想实现改造请求参数或者响应参数,就必须深入了解ServerWebExchange
。
理解ServerWebExchange
先看ServerWebExchange
的注释:
Contract for an HTTP request-response interaction. Provides access to the HTTP request and response and also exposes additional server-side processing related properties and features such as request attributes.
翻译一下大概是:
ServerWebExchange是一个HTTP请求-响应交互的契约。提供对HTTP请求和响应的访问,并公开额外的服务器端处理相关属性和特性,如请求属性。
其实,ServerWebExchange
命名为服务网络交换器,存放着重要的请求-响应属性、请求实例和响应实例等等,有点像Context
的角色。
ServerWebExchange接口
ServerWebExchange
接口的所有方法:
public interface ServerWebExchange {// 日志前缀属性的KEY,值为org.springframework.web.server.ServerWebExchange.LOG_ID// 可以理解为 attributes.set("org.springframework.web.server.ServerWebExchange.LOG_ID","日志前缀的具体值");// 作用是打印日志的时候会拼接这个KEY对饮的前缀值,默认值为""String LOG_ID_ATTRIBUTE = ServerWebExchange.class.getName() + ".LOG_ID";String getLogPrefix();// 获取ServerHttpRequest对象ServerHttpRequest getRequest();// 获取ServerHttpResponse对象ServerHttpResponse getResponse();// 返回当前exchange的请求属性,返回结果是一个可变的MapMap<String, Object> getAttributes();// 根据KEY获取请求属性@Nullabledefault <T> T getAttribute(String name) {return (T) getAttributes().get(name);}// 根据KEY获取请求属性,做了非空判断@SuppressWarnings("unchecked")default <T> T getRequiredAttribute(String name) {T value = getAttribute(name);Assert.notNull(value, () -> "Required attribute '" + name + "' is missing");return value;}// 根据KEY获取请求属性,需要提供默认值@SuppressWarnings("unchecked")default <T> T getAttributeOrDefault(String name, T defaultValue) {return (T) getAttributes().getOrDefault(name, defaultValue);} // 返回当前请求的网络会话Mono<WebSession> getSession();// 返回当前请求的认证用户,如果存在的话<T extends Principal> Mono<T> getPrincipal(); // 返回请求的表单数据或者一个空的Map,只有Content-Type为application/x-www-form-urlencoded的时候这个方法才会返回一个非空的Map -- 这个一般是表单数据提交用到Mono<MultiValueMap<String, String>> getFormData(); // 返回multipart请求的part数据或者一个空的Map,只有Content-Type为multipart/form-data的时候这个方法才会返回一个非空的Map -- 这个一般是文件上传用到Mono<MultiValueMap<String, Part>> getMultipartData();// 返回Spring的上下文@NullableApplicationContext getApplicationContext(); // 这几个方法和lastModified属性相关boolean isNotModified();boolean checkNotModified(Instant lastModified);boolean checkNotModified(String etag);boolean checkNotModified(@Nullable String etag, Instant lastModified);// URL转换String transformUrl(String url); // URL转换映射void addUrlTransformer(Function<String, String> transformer); // 注意这个方法,方法名是:改变,这个是修改ServerWebExchange属性的方法,返回的是一个Builder实例,Builder是ServerWebExchange的内部类default Builder mutate() {return new DefaultServerWebExchangeBuilder(this);}interface Builder { // 覆盖ServerHttpRequestBuilder request(Consumer<ServerHttpRequest.Builder> requestBuilderConsumer);Builder request(ServerHttpRequest request);// 覆盖ServerHttpResponseBuilder response(ServerHttpResponse response);// 覆盖当前请求的认证用户Builder principal(Mono<Principal> principalMono);// 构建新的ServerWebExchange实例ServerWebExchange build();}
}
注意到ServerWebExchange#mutate()
方法,ServerWebExchange
实例可以理解为不可变实例,如果我们想要修改它,需要通过mutate()
方法生成一个新的实例,例如这样:
public class CustomGlobalFilter implements GlobalFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();// 这里可以修改ServerHttpRequest实例ServerHttpRequest newRequest = ...ServerHttpResponse response = exchange.getResponse();// 这里可以修改ServerHttpResponse实例ServerHttpResponse newResponse = ...// 构建新的ServerWebExchange实例ServerWebExchange newExchange = exchange.mutate().request(newRequest).response(newResponse).build();return chain.filter(newExchange);}
}
ServerHttpRequest接口
ServerHttpRequest
实例是用于承载请求相关的属性和请求体,Spring Cloud Gateway
中底层使用Netty
处理网络请求,通过追溯源码,可以从ReactorHttpHandlerAdapter
中得知ServerWebExchange
实例中持有的ServerHttpRequest
实例的具体实现是ReactorServerHttpRequest
。之所以列出这些实例之间的关系,是因为这样比较容易理清一些隐含的问题,例如:
ReactorServerHttpRequest
的父类AbstractServerHttpRequest
中初始化内部属性headers的时候把请求的HTTP头部封装为只读的实例:
public AbstractServerHttpRequest(URI uri, @Nullable String contextPath, HttpHeaders headers) {this.uri = uri;this.path = RequestPath.parse(uri, contextPath);this.headers = HttpHeaders.readOnlyHttpHeaders(headers);
}// HttpHeaders类中的readOnlyHttpHeaders方法,其中ReadOnlyHttpHeaders屏蔽了所有修改请求头的方法,直接抛出UnsupportedOperationException
public static HttpHeaders readOnlyHttpHeaders(HttpHeaders headers) {Assert.notNull(headers, "HttpHeaders must not be null");if (headers instanceof ReadOnlyHttpHeaders) {return headers;}else {return new ReadOnlyHttpHeaders(headers);}
}
所以不能直接从ServerHttpRequest
实例中直接获取请求头HttpHeaders
实例并且进行修改。
ServerHttpRequest
接口如下:
public interface HttpMessage {// 获取请求头,目前的实现中返回的是ReadOnlyHttpHeaders实例,只读HttpHeaders getHeaders();
} public interface ReactiveHttpInputMessage extends HttpMessage {// 返回请求体的Flux封装Flux<DataBuffer> getBody();
}public interface HttpRequest extends HttpMessage {// 返回HTTP请求方法,解析为HttpMethod实例@Nullabledefault HttpMethod getMethod() {return HttpMethod.resolve(getMethodValue());}// 返回HTTP请求方法,字符串String getMethodValue(); // 请求的URIURI getURI();
} public interface ServerHttpRequest extends HttpRequest, ReactiveHttpInputMessage {// 连接的唯一标识或者用于日志处理标识String getId(); // 获取请求路径,封装为RequestPath对象RequestPath getPath();// 返回查询参数,是只读的MultiValueMap实例MultiValueMap<String, String> getQueryParams();// 返回Cookie集合,是只读的MultiValueMap实例MultiValueMap<String, HttpCookie> getCookies(); // 远程服务器地址信息@Nullabledefault InetSocketAddress getRemoteAddress() {return null;}// SSL会话实现的相关信息@Nullabledefault SslInfo getSslInfo() {return null;} // 修改请求的方法,返回一个建造器实例Builder,Builder是内部类default ServerHttpRequest.Builder mutate() {return new DefaultServerHttpRequestBuilder(this);} interface Builder {// 覆盖请求方法Builder method(HttpMethod httpMethod);// 覆盖请求的URI、请求路径或者上下文,这三者相互有制约关系,具体可以参考API注释Builder uri(URI uri);Builder path(String path);Builder contextPath(String contextPath);// 覆盖请求头Builder header(String key, String value);Builder headers(Consumer<HttpHeaders> headersConsumer);// 覆盖SslInfoBuilder sslInfo(SslInfo sslInfo);// 构建一个新的ServerHttpRequest实例ServerHttpRequest build();}
}
如果要修改ServerHttpRequest
实例,那么需要这样做:
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequest newRequest = request.mutate().headers("key","value").path("/myPath").build();
这里最值得注意的是:ServerHttpRequest
或者说HttpMessage
接口提供的获取请求头方法HttpHeaders getHeaders();
返回结果是一个只读的实例,具体是ReadOnlyHttpHeaders
类型,这里提多一次,笔者写这篇博文时候使用的Spring Cloud Gateway
版本为Greenwich.SR1
。
ServerHttpResponse接口
ServerHttpResponse
实例是用于承载响应相关的属性和响应体,Spring Cloud Gateway
中底层使用Netty
处理网络请求,通过追溯源码,可以从ReactorHttpHandlerAdapter
中得知ServerWebExchange
实例中持有的ServerHttpResponse
实例的具体实现是ReactorServerHttpResponse
。之所以列出这些实例之间的关系,是因为这样比较容易理清一些隐含的问题,例如:
// ReactorServerHttpResponse的父类
public AbstractServerHttpResponse(DataBufferFactory dataBufferFactory, HttpHeaders headers) {Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null");Assert.notNull(headers, "HttpHeaders must not be null");this.dataBufferFactory = dataBufferFactory;this.headers = headers;this.cookies = new LinkedMultiValueMap<>();
}public ReactorServerHttpResponse(HttpServerResponse response, DataBufferFactory bufferFactory) {super(bufferFactory, new HttpHeaders(new NettyHeadersAdapter(response.responseHeaders())));Assert.notNull(response, "HttpServerResponse must not be null");this.response = response;
}
可知ReactorServerHttpResponse
构造函数初始化实例的时候,存放响应Header的是HttpHeaders
实例,也就是响应Header是可以直接修改的。
ServerHttpResponse
接口如下:
public interface HttpMessage {// 获取响应Header,目前的实现中返回的是HttpHeaders实例,可以直接修改HttpHeaders getHeaders();
} public interface ReactiveHttpOutputMessage extends HttpMessage {// 获取DataBufferFactory实例,用于包装或者生成数据缓冲区DataBuffer实例(创建响应体)DataBufferFactory bufferFactory();// 注册一个动作,在HttpOutputMessage提交之前此动作会进行回调void beforeCommit(Supplier<? extends Mono<Void>> action);// 判断HttpOutputMessage是否已经提交boolean isCommitted();// 写入消息体到HTTP协议层Mono<Void> writeWith(Publisher<? extends DataBuffer> body);// 写入消息体到HTTP协议层并且刷新缓冲区Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body);// 指明消息处理已经结束,一般在消息处理结束自动调用此方法,多次调用不会产生副作用Mono<Void> setComplete();
}public interface ServerHttpResponse extends ReactiveHttpOutputMessage {// 设置响应状态码boolean setStatusCode(@Nullable HttpStatus status);// 获取响应状态码@NullableHttpStatus getStatusCode();// 获取响应Cookie,封装为MultiValueMap实例,可以修改MultiValueMap<String, ResponseCookie> getCookies(); // 添加响应Cookievoid addCookie(ResponseCookie cookie);
}
这里可以看到除了响应体比较难修改之外,其他的属性都是可变的。
ServerWebExchangeUtils和上下文属性
ServerWebExchangeUtils
里面存放了很多静态公有的字符串KEY值(这些字符串KEY的实际值是org.springframework.cloud.gateway.support.ServerWebExchangeUtils.
+ 下面任意的静态公有KEY),这些字符串KEY值一般是用于ServerWebExchange
的属性(Attribute
,见上文的ServerWebExchange#getAttributes()
方法)的KEY,这些属性值都是有特殊的含义,在使用过滤器的时候如果时机适当可以直接取出来使用,下面逐个分析。
PRESERVE_HOST_HEADER_ATTRIBUTE
:是否保存Host属性,值是布尔值类型,写入位置是PreserveHostHeaderGatewayFilterFactory
,使用的位置是NettyRoutingFilter
,作用是如果设置为true,HTTP请求头中的Host属性会写到底层Reactor-Netty的请求Header属性中。CLIENT_RESPONSE_ATTR
:保存底层Reactor-Netty的响应对象,类型是reactor.netty.http.client.HttpClientResponse
。CLIENT_RESPONSE_CONN_ATTR
:保存底层Reactor-Netty的连接对象,类型是reactor.netty.Connection
。URI_TEMPLATE_VARIABLES_ATTRIBUTE
:PathRoutePredicateFactory
解析路径参数完成之后,把解析完成后的占位符KEY-路径Path映射存放在ServerWebExchange
的属性中,KEY就是URI_TEMPLATE_VARIABLES_ATTRIBUTE
。CLIENT_RESPONSE_HEADER_NAMES
:保存底层Reactor-Netty的响应Header的名称集合。GATEWAY_ROUTE_ATTR
:用于存放RoutePredicateHandlerMapping
中匹配出来的具体的路由(org.springframework.cloud.gateway.route.Route
)实例,通过这个路由实例可以得知当前请求会路由到下游哪个服务。GATEWAY_REQUEST_URL_ATTR
:java.net.URI
类型的实例,这个实例代表直接请求或者负载均衡处理之后需要请求到下游服务的真实URI。GATEWAY_ORIGINAL_REQUEST_URL_ATTR
:java.net.URI
类型的实例,需要重写请求URI的时候,保存原始的请求URI。GATEWAY_HANDLER_MAPPER_ATTR
:保存当前使用的HandlerMapping
具体实例的类型简称(一般是字符串"RoutePredicateHandlerMapping")。GATEWAY_SCHEME_PREFIX_ATTR
:确定目标路由URI中如果存在schemeSpecificPart属性,则保存该URI的scheme在此属性中,路由URI会被重新构造,见RouteToRequestUrlFilter
。GATEWAY_PREDICATE_ROUTE_ATTR
:用于存放RoutePredicateHandlerMapping
中匹配出来的具体的路由(org.springframework.cloud.gateway.route.Route
)实例的ID。WEIGHT_ATTR
:实验性功能(此版本还不建议在正式版本使用)存放分组权重相关属性,见WeightCalculatorWebFilter
。ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR
:存放响应Header中的ContentType的值。HYSTRIX_EXECUTION_EXCEPTION_ATTR
:Throwable
的实例,存放的是Hystrix执行异常时候的异常实例,见HystrixGatewayFilterFactory
。GATEWAY_ALREADY_ROUTED_ATTR
:布尔值,用于判断是否已经进行了路由,见NettyRoutingFilter
。GATEWAY_ALREADY_PREFIXED_ATTR
:布尔值,用于判断请求路径是否被添加了前置部分,见PrefixPathGatewayFilterFactory
。
ServerWebExchangeUtils
提供的上下文属性用于Spring Cloud Gateway
的ServerWebExchange
组件处理请求和响应的时候,内部一些重要实例或者标识属性的安全传输和使用,使用它们可能存在一定的风险,因为没有人可以确定在版本升级之后,原有的属性KEY或者VALUE是否会发生改变,如果评估过风险或者规避了风险之后,可以安心使用。例如我们在做请求和响应日志(类似Nginx的Access Log)的时候,可以依赖到GATEWAY_ROUTE_ATTR
,因为我们要打印路由的目标信息。举个简单例子:
@Slf4j
@Component
public class AccessLogFilter implements GlobalFilter {@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();String path = request.getPath().pathWithinApplication().value();HttpMethod method = request.getMethod();// 获取路由的目标URIURI targetUri = exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);InetSocketAddress remoteAddress = request.getRemoteAddress();return chain.filter(exchange.mutate().build()).then(Mono.fromRunnable(() -> {ServerHttpResponse response = exchange.getResponse();HttpStatus statusCode = response.getStatusCode();log.info("请求路径:{},客户端远程IP地址:{},请求方法:{},目标URI:{},响应码:{}",path, remoteAddress, method, targetUri, statusCode);}));}
}
修改请求体
修改请求体是一个比较常见的需求。例如我们使用Spring Cloud Gateway
实现网关的时候,要实现一个功能:把存放在请求头中的JWT解析后,提取里面的用户ID,然后写入到请求体中。我们简化这个场景,假设我们把userId明文存放在请求头中的accessToken中,请求体是一个JSON结构:
{"serialNumber": "请求流水号","payload" : {// ... 这里是有效载荷,存放具体的数据}
}
我们需要提取accessToken,也就是userId插入到请求体JSON中如下:
{"userId": "用户ID","serialNumber": "请求流水号","payload" : {// ... 这里是有效载荷,存放具体的数据}
}
这里为了简化设计,用全局过滤器GlobalFilter
实现,实际需要结合具体场景考虑:
@Slf4j
@Component
public class ModifyRequestBodyGlobalFilter implements GlobalFilter {private final DataBufferFactory dataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);@Autowiredprivate ObjectMapper objectMapper;@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();String accessToken = request.getHeaders().getFirst("accessToken");if (!StringUtils.hasLength(accessToken)) {throw new IllegalArgumentException("accessToken");}// 新建一个ServerHttpRequest装饰器,覆盖需要装饰的方法ServerHttpRequestDecorator decorator = new ServerHttpRequestDecorator(request) {@Overridepublic Flux<DataBuffer> getBody() {Flux<DataBuffer> body = super.getBody();InputStreamHolder holder = new InputStreamHolder();body.subscribe(buffer -> holder.inputStream = buffer.asInputStream());if (null != holder.inputStream) {try {// 解析JSON的节点JsonNode jsonNode = objectMapper.readTree(holder.inputStream);Assert.isTrue(jsonNode instanceof ObjectNode, "JSON格式异常");ObjectNode objectNode = (ObjectNode) jsonNode;// JSON节点最外层写入新的属性objectNode.put("userId", accessToken);DataBuffer dataBuffer = dataBufferFactory.allocateBuffer();String json = objectNode.toString();log.info("最终的JSON数据为:{}", json);dataBuffer.write(json.getBytes(StandardCharsets.UTF_8));return Flux.just(dataBuffer);} catch (Exception e) {throw new IllegalStateException(e);}} else {return super.getBody();}}};// 使用修改后的ServerHttpRequestDecorator重新生成一个新的ServerWebExchangereturn chain.filter(exchange.mutate().request(decorator).build());}private class InputStreamHolder {InputStream inputStream;}
}
测试一下:
// HTTP
POST /order/json HTTP/1.1
Host: localhost:9090
Content-Type: application/json
accessToken: 10086
Accept: */*
Cache-Control: no-cache
Host: localhost:9090
accept-encoding: gzip, deflate
content-length: 94
Connection: keep-alive
cache-control: no-cache{"serialNumber": "请求流水号","payload": {"name": "doge"}
}// 日志输出
最终的JSON数据为:{"serialNumber":"请求流水号","payload":{"name":"doge"},"userId":"10086"}
最重要的是用到了ServerHttpRequest
装饰器ServerHttpRequestDecorator
,主要覆盖对应获取请求体数据缓冲区的方法即可,至于怎么处理其他逻辑需要自行考虑,这里只是做一个简单的示范。一般的代码逻辑如下:
ServerHttpRequest request = exchange.getRequest();
ServerHttpRequestDecorator requestDecorator = new ServerHttpRequestDecorator(request) {@Overridepublic Flux<DataBuffer> getBody() {// 拿到承载原始请求体的FluxFlux<DataBuffer> body = super.getBody();// 这里通过自定义方式生成新的承载请求体的FluxFlux<DataBuffer> newBody = ...return newBody;}
}
return chain.filter(exchange.mutate().request(requestDecorator).build());
修改响应体
修改响应体的需求也是比较常见的,具体的做法和修改请求体差不多。例如我们想要实现下面的功能:第三方服务请求经过网关,原始报文是密文,我们需要在网关实现密文解密,然后把解密后的明文路由到下游服务,下游服务处理成功响应明文,需要在网关把明文加密成密文再返回到第三方服务。现在简化整个流程,用AES加密算法,统一密码为字符串"throwable",假设请求报文和响应报文明文如下:
// 请求密文
{"serialNumber": "请求流水号","payload" : "加密后的请求消息载荷"
}// 请求明文(仅仅作为提示)
{"serialNumber": "请求流水号","payload" : "{\"name:\":\"doge\"}"
}// 响应密文
{"code": 200,"message":"ok","payload" : "加密后的响应消息载荷"
}// 响应明文(仅仅作为提示)
{"code": 200,"message":"ok","payload" : "{\"name:\":\"doge\",\"age\":26}"
}
为了方便一些加解密或者编码解码的实现,需要引入Apache
的commons-codec
类库:
<dependency><groupId>commons-codec</groupId><artifactId>commons-codec</artifactId><version>1.12</version>
</dependency>
这里定义一个全局过滤器专门处理加解密,实际上最好结合真实的场景决定是否适合全局过滤器,这里只是一个示例:
// AES加解密工具类
public enum AesUtils {// 单例X;private static final String PASSWORD = "throwable";private static final String KEY_ALGORITHM = "AES";private static final String SECURE_RANDOM_ALGORITHM = "SHA1PRNG";private static final String DEFAULT_CIPHER_ALGORITHM = "AES/ECB/PKCS5Padding";public String encrypt(String content) {try {Cipher cipher = Cipher.getInstance(DEFAULT_CIPHER_ALGORITHM);cipher.init(Cipher.ENCRYPT_MODE, provideSecretKey());return Hex.encodeHexString(cipher.doFinal(content.getBytes(StandardCharsets.UTF_8)));} catch (Exception e) {throw new IllegalArgumentException(e);}}public byte[] decrypt(String content) {try {Cipher cipher = Cipher.getInstance(DEFAULT_CIPHER_ALGORITHM);cipher.init(Cipher.DECRYPT_MODE, provideSecretKey());return cipher.doFinal(Hex.decodeHex(content));} catch (Exception e) {throw new IllegalArgumentException(e);}}private SecretKey provideSecretKey() {try {KeyGenerator keyGen = KeyGenerator.getInstance(KEY_ALGORITHM);SecureRandom secureRandom = SecureRandom.getInstance(SECURE_RANDOM_ALGORITHM);secureRandom.setSeed(PASSWORD.getBytes(StandardCharsets.UTF_8));keyGen.init(128, secureRandom);return new SecretKeySpec(keyGen.generateKey().getEncoded(), KEY_ALGORITHM);} catch (Exception e) {throw new IllegalArgumentException(e);}}
}// EncryptionGlobalFilter
@Slf4j
@Component
public class EncryptionGlobalFilter implements GlobalFilter, Ordered {@Autowiredprivate ObjectMapper objectMapper;@Overridepublic int getOrder() {return -2;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpRequest request = exchange.getRequest();ServerHttpResponse response = exchange.getResponse();DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();ServerHttpRequestDecorator requestDecorator = processRequest(request, bufferFactory);ServerHttpResponseDecorator responseDecorator = processResponse(response, bufferFactory);return chain.filter(exchange.mutate().request(requestDecorator).response(responseDecorator).build());}private ServerHttpRequestDecorator processRequest(ServerHttpRequest request, DataBufferFactory bufferFactory) {Flux<DataBuffer> body = request.getBody();DataBufferHolder holder = new DataBufferHolder();body.subscribe(dataBuffer -> {int len = dataBuffer.readableByteCount();holder.length = len;byte[] bytes = new byte[len];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);String text = new String(bytes, StandardCharsets.UTF_8);JsonNode jsonNode = readNode(text);JsonNode payload = jsonNode.get("payload");String payloadText = payload.asText();byte[] content = AesUtils.X.decrypt(payloadText);String requestBody = new String(content, StandardCharsets.UTF_8);log.info("修改请求体payload,修改前:{},修改后:{}", payloadText, requestBody);rewritePayloadNode(requestBody, jsonNode);DataBuffer data = bufferFactory.allocateBuffer();data.write(jsonNode.toString().getBytes(StandardCharsets.UTF_8));holder.dataBuffer = data;});HttpHeaders headers = new HttpHeaders();headers.putAll(request.getHeaders());headers.remove(HttpHeaders.CONTENT_LENGTH);return new ServerHttpRequestDecorator(request) {@Overridepublic HttpHeaders getHeaders() {int contentLength = holder.length;if (contentLength > 0) {headers.setContentLength(contentLength);} else {headers.set(HttpHeaders.TRANSFER_ENCODING, "chunked");}return headers;}@Overridepublic Flux<DataBuffer> getBody() {return Flux.just(holder.dataBuffer);}};}private ServerHttpResponseDecorator processResponse(ServerHttpResponse response, DataBufferFactory bufferFactory) {return new ServerHttpResponseDecorator(response) {@SuppressWarnings("unchecked")@Overridepublic Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {if (body instanceof Flux) {Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;return super.writeWith(flux.map(buffer -> {CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());DataBufferUtils.release(buffer);JsonNode jsonNode = readNode(charBuffer.toString());JsonNode payload = jsonNode.get("payload");String text = payload.toString();String content = AesUtils.X.encrypt(text);log.info("修改响应体payload,修改前:{},修改后:{}", text, content);setPayloadTextNode(content, jsonNode);return bufferFactory.wrap(jsonNode.toString().getBytes(StandardCharsets.UTF_8));}));}return super.writeWith(body);}};}private void rewritePayloadNode(String text, JsonNode root) {try {JsonNode node = objectMapper.readTree(text);ObjectNode objectNode = (ObjectNode) root;objectNode.set("payload", node);} catch (Exception e) {throw new IllegalStateException(e);}}private void setPayloadTextNode(String text, JsonNode root) {try {ObjectNode objectNode = (ObjectNode) root;objectNode.set("payload", new TextNode(text));} catch (Exception e) {throw new IllegalStateException(e);}}private JsonNode readNode(String in) {try {return objectMapper.readTree(in);} catch (Exception e) {throw new IllegalStateException(e);}}private class DataBufferHolder {DataBuffer dataBuffer;int length;}
}
先准备一份密文:
Map<String, Object> json = new HashMap<>(8);
json.put("serialNumber", "请求流水号");
String content = "{\"name\": \"doge\"}";
json.put("payload", AesUtils.X.encrypt(content));
System.out.println(new ObjectMapper().writeValueAsString(json));// 输出
{"serialNumber":"请求流水号","payload":"144e3dc734743f5709f1adf857bca473da683246fd612f86ac70edeb5f2d2729"}
模拟请求:
POST /order/json HTTP/1.1
Host: localhost:9090
accessToken: 10086
Content-Type: application/json
User-Agent: PostmanRuntime/7.13.0
Accept: */*
Cache-Control: no-cache
Postman-Token: bda07fc3-ea1a-478c-b4d7-754fe6f37200,634734d9-feed-4fc9-ba20-7618bd986e1c
Host: localhost:9090
cookie: customCookieName=customCookieValue
accept-encoding: gzip, deflate
content-length: 104
Connection: keep-alive
cache-control: no-cache{"serialNumber": "请求流水号","payload": "FE49xzR0P1cJ8a34V7ykc9poMkb9YS+GrHDt618tJyk="
}// 响应结果
{"serialNumber": "请求流水号","payload": "oo/K1igg2t/S8EExkBVGWOfI1gAh5pBpZ0wyjNPW6e8=" # <--- 解密后:{"name":"doge","age":26}
}
遇到的问题:
- 必须实现
Ordered
接口,返回一个小于-1的order值,这是因为NettyWriteResponseFilter
的order值为-1,我们需要覆盖返回响应体的逻辑,自定义的GlobalFilter
必须比NettyWriteResponseFilter
优先执行。 - 网关每次重启之后,第一个请求总是无法从原始的
ServerHttpRequest
读取到有效的Body,准确来说出现的现象是NettyRoutingFilter
调用ServerHttpRequest#getBody()
的时候获取到一个空的对象,导致空指针;奇怪的是从第二个请求开始就能正常调用。笔者把Spring Cloud Gateway
的版本降低到Finchley.SR3
,Spring Boot
的版本降低到2.0.8.RELEASE
,问题不再出现,初步确定是Spring Cloud Gateway
版本升级导致的兼容性问题或者是BUG。
最重要的是用到了ServerHttpResponse
装饰器ServerHttpResponseDecorator
,主要覆盖写入响应体数据缓冲区的部分,至于怎么处理其他逻辑需要自行考虑,这里只是做一个简单的示范。一般的代码逻辑如下:
ServerHttpResponse response = exchange.getResponse();
ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(response) {@Overridepublic Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {if (body instanceof Flux) {Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;return super.writeWith(flux.map(buffer -> {// buffer就是原始的响应数据的缓冲区// 下面处理完毕之后返回新的响应数据的缓冲区即可return bufferFactory.wrap(...);}));}return super.writeWith(body);}};
return chain.filter(exchange.mutate().response(responseDecorator).build());
请求体或者响应体报文过大的问题
有热心的同学告诉笔者,如果请求报文过大或者响应报文过大的时候,前面两节的修改请求和响应报文的方法会出现问题,这里尝试重现一下遇到的具体问题。先把请求报文尝试加长:
Map<String, Object> json = new HashMap<>(8);
json.put("serialNumber", "请求流水号");
StringBuilder builder = new StringBuilder();
for (int i = 0; i < 1000; i++) {builder.append("doge");
}
String content = String.format("{\"name\": \"%s\"}", builder.toString());
json.put("payload", AesUtils.X.encrypt(content));
System.out.println(new ObjectMapper().writeValueAsString(json));// 请求的JSON报文如下:
{"serialNumber": "请求流水号","payload": "......"
}
复制代码
用上面的请求报文发起请求,确实存在问题:
主要问题是:
- 请求体包数据装成的
Flux<DataBuffer>
实例被订阅之后,读取到的字节数组的长度被截断了,提供的原始请求报文里面字符串长度要大于1000,转换成byte数组绝对要大于1000,但是上面的示例中只读取到长度为673的byte数组。 - 读取到的字节数组被截断后,则使用Jackson进行反序列化的时候提示没有读取到字符串的EOF标识,导致反序列化失败。
既然遇到了问题,就想办法解决。首先第一步定位一下是什么原因,直觉告诉笔者:要开启一下DEBUG日志进行观察,如果还没有头绪可能要跟踪一下源码。
开启DEBUG日志级别之后做一次请求,发现了一些可疑的日志信息:
2019-05-19 11:16:15.660 [reactor-http-nio-2] DEBUG reactor.ipc.netty.http.server.HttpServer - [id: 0xa9b527e5, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:58012] READ COMPLETE
2019-05-19 11:16:15.660 [reactor-http-nio-2] DEBUG reactor.ipc.netty.http.server.HttpServer - [id: 0xa9b527e5, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:58012] INACTIVE
2019-05-19 11:16:15.660 [reactor-http-nio-3] DEBUG reactor.ipc.netty.http.server.HttpServer - [id: 0x5554e091, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:58013] READ: 1024B+-------------------------------------------------+| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 50 4f 53 54 20 2f 6f 72 64 65 72 2f 6a 73 6f 6e |POST /order/json|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 61 63 63 65 73 | HTTP/1.1..acces|
|00000020| 73 54 6f 6b 65 6e 3a 20 31 30 30 38 36 0d 0a 43 |sToken: 10086..C|
... ...
|000003f0| 49 41 72 6b 64 37 58 57 35 4c 6c 32 2f 71 61 42 |IArkd7XW5Ll2/qaB|
+--------+-------------------------------------------------+----------------+
2019-05-19 11:16:15.662 [reactor-http-nio-2] DEBUG reactor.ipc.netty.http.server.HttpServer - [id: 0xa9b527e5, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:58012] UNREGISTERED
2019-05-19 11:16:15.665 [reactor-http-nio-3] DEBUG reactor.ipc.netty.http.server.HttpServerOperations - [id: 0x5554e091, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:58013] Increasing pending responses, now 1
2019-05-19 11:16:15.671 [reactor-http-nio-3] DEBUG reactor.ipc.netty.http.server.HttpServer - [id: 0x5554e091, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:58013] READ COMPLETE
注意一下关键字READ: 1024B
,这里应该是底层的Reactor-Netty
读取的最大数据报的长度限制,打印出来的数据报刚好也是1024B的大小,这个应该就是导致请求体被截断的根本原因;这个问题不单单会出现在请求体的获取,也会出现在响应体的写入。既然这个是共性的问题,那么项目Github上肯定有对应的Issue,找到一个互动比较长的gateway request size limit 1024B because netty default limit 1024,how to solve it? #581,从回答来看,官方建议使用ModifyRequestBodyGatewayFilterFactory
和ModifyResponseBodyGatewayFilterFactory
完成对应的功能。这里可以尝试借鉴一下ModifyRequestBodyGatewayFilterFactory
的实现方式修改之前的代码,因为代码的逻辑比较长和复杂,解密请求体的过滤器拆分到新的类RequestEncryptionGlobalFilter
,加密响应体的过滤器拆分到ResponseDecryptionGlobalFilter
:
RequestEncryptionGlobalFilter
的代码如下:
@Slf4j
@Component
public class RequestEncryptionGlobalFilter implements GlobalFilter, Ordered {@Autowiredprivate ObjectMapper objectMapper;private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();@Overridepublic int getOrder() {return -2;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return processRequest(exchange, chain);}private Mono<Void> processRequest(ServerWebExchange exchange, GatewayFilterChain chain) {ServerRequest serverRequest = new DefaultServerRequest(exchange, messageReaders);DataBufferFactory bufferFactory = exchange.getResponse().bufferFactory();Mono<String> rawBody = serverRequest.bodyToMono(String.class).map(s -> s);BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(rawBody, String.class);HttpHeaders tempHeaders = new HttpHeaders();tempHeaders.putAll(exchange.getRequest().getHeaders());tempHeaders.remove(HttpHeaders.CONTENT_LENGTH);CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, tempHeaders);return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {Flux<DataBuffer> body = outputMessage.getBody();DataBufferHolder holder = new DataBufferHolder();body.subscribe(dataBuffer -> {int len = dataBuffer.readableByteCount();holder.length = len;byte[] bytes = new byte[len];dataBuffer.read(bytes);DataBufferUtils.release(dataBuffer);String text = new String(bytes, StandardCharsets.UTF_8);JsonNode jsonNode = readNode(text);JsonNode payload = jsonNode.get("payload");String payloadText = payload.asText();byte[] content = AesUtils.X.decrypt(payloadText);String requestBody = new String(content, StandardCharsets.UTF_8);log.info("修改请求体payload,修改前:{},修改后:{}", payloadText, requestBody);rewritePayloadNode(requestBody, jsonNode);DataBuffer data = bufferFactory.allocateBuffer();data.write(jsonNode.toString().getBytes(StandardCharsets.UTF_8));holder.dataBuffer = data;});ServerHttpRequestDecorator requestDecorator = new ServerHttpRequestDecorator(exchange.getRequest()) {@Overridepublic HttpHeaders getHeaders() {long contentLength = tempHeaders.getContentLength();HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.putAll(super.getHeaders());if (contentLength > 0) {httpHeaders.setContentLength(contentLength);} else {httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");}return httpHeaders;}@Overridepublic Flux<DataBuffer> getBody() {return Flux.just(holder.dataBuffer);}};return chain.filter(exchange.mutate().request(requestDecorator).build());}));}private void rewritePayloadNode(String text, JsonNode root) {try {JsonNode node = objectMapper.readTree(text);ObjectNode objectNode = (ObjectNode) root;objectNode.set("payload", node);} catch (Exception e) {throw new IllegalStateException(e);}}private void setPayloadTextNode(String text, JsonNode root) {try {ObjectNode objectNode = (ObjectNode) root;objectNode.set("payload", new TextNode(text));} catch (Exception e) {throw new IllegalStateException(e);}}private JsonNode readNode(String in) {try {return objectMapper.readTree(in);} catch (Exception e) {throw new IllegalStateException(e);}}private class DataBufferHolder {DataBuffer dataBuffer;int length;}
}
ResponseDecryptionGlobalFilter
的代码如下:
@Slf4j
@Component
public class ResponseDecryptionGlobalFilter implements GlobalFilter, Ordered {@Autowiredprivate ObjectMapper objectMapper;@Overridepublic int getOrder() {return NettyWriteResponseFilter.WRITE_RESPONSE_FILTER_ORDER - 1;}@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {return processResponse(exchange, chain);}private Mono<Void> processResponse(ServerWebExchange exchange, GatewayFilterChain chain) {ServerHttpResponseDecorator responseDecorator = new ServerHttpResponseDecorator(exchange.getResponse()) {@Overridepublic Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {String originalResponseContentType = exchange.getAttribute(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);HttpHeaders httpHeaders = new HttpHeaders();httpHeaders.add(HttpHeaders.CONTENT_TYPE, originalResponseContentType);ResponseAdapter responseAdapter = new ResponseAdapter(body, httpHeaders);DefaultClientResponse clientResponse = new DefaultClientResponse(responseAdapter, ExchangeStrategies.withDefaults());Mono<String> rawBody = clientResponse.bodyToMono(String.class).map(s -> s);BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(rawBody, String.class);CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, exchange.getResponse().getHeaders());return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {Flux<DataBuffer> messageBody = outputMessage.getBody();Flux<DataBuffer> flux = messageBody.map(buffer -> {CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer.asByteBuffer());DataBufferUtils.release(buffer);JsonNode jsonNode = readNode(charBuffer.toString());JsonNode payload = jsonNode.get("payload");String text = payload.toString();String content = AesUtils.X.encrypt(text);log.info("修改响应体payload,修改前:{},修改后:{}", text, content);setPayloadTextNode(content, jsonNode);return getDelegate().bufferFactory().wrap(jsonNode.toString().getBytes(StandardCharsets.UTF_8));});HttpHeaders headers = getDelegate().getHeaders();if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)) {flux = flux.doOnNext(data -> headers.setContentLength(data.readableByteCount()));}return getDelegate().writeWith(flux);}));}};return chain.filter(exchange.mutate().response(responseDecorator).build());}private void setPayloadTextNode(String text, JsonNode root) {try {ObjectNode objectNode = (ObjectNode) root;objectNode.set("payload", new TextNode(text));} catch (Exception e) {throw new IllegalStateException(e);}}private JsonNode readNode(String in) {try {return objectMapper.readTree(in);} catch (Exception e) {throw new IllegalStateException(e);}}private class ResponseAdapter implements ClientHttpResponse {private final Flux<DataBuffer> flux;private final HttpHeaders headers;@SuppressWarnings("unchecked")private ResponseAdapter(Publisher<? extends DataBuffer> body, HttpHeaders headers) {this.headers = headers;if (body instanceof Flux) {flux = (Flux) body;} else {flux = ((Mono) body).flux();}}@Overridepublic Flux<DataBuffer> getBody() {return flux;}@Overridepublic HttpHeaders getHeaders() {return headers;}@Overridepublic HttpStatus getStatusCode() {return null;}@Overridepublic int getRawStatusCode() {return 0;}@Overridepublic MultiValueMap<String, ResponseCookie> getCookies() {return null;}}
}
模拟请求:
POST /order/json HTTP/1.1
Host: localhost:9090
accessToken: 10086
Content-Type: application/json
User-Agent: PostmanRuntime/7.13.0
Accept: */*
Cache-Control: no-cache
Postman-Token: 3a830202-f3d1-450e-839f-ae8f3b88bced,b229feb1-7c8b-4d25-a039-09345f3fe8f0
Host: localhost:9090
cookie: customCookieName=customCookieValue
accept-encoding: gzip, deflate
content-length: 5416
Connection: keep-alive
cache-control: no-cache{"serialNumber": "请求流水号","payload": "... ...."
}// 响应
{"serialNumber":"请求流水号","userId":null,"payload":"... ..."}
复制代码
彻底解决了之前的请求或者响应报文截断的问题,笔者发现了很多博文都在(照搬)更改读取DataBuffer
实例时候的代码逻辑,其实那段逻辑是不相关的,可以尝试用BufferedReader
基于行读取然后用StringBuilder
承载,或者像本文那样直接读取为byte数组等等,因为根本的原因是底层的Reactor-Netty
的数据块读取大小限制导致获取到的DataBuffer
实例里面的数据是不完整的,解决方案就是参照Spring Cloud Gateway
本身提供的基础类库进行改造(暂时没发现有入口可以调整Reactor-Netty
的配置),难度也不大。
小结
刚好遇到一个需求需要做网关的加解密包括请求体和响应体的修改,这里顺便把Spring Cloud Gateway
一些涉及到这方面的一些内容梳理了一遍,顺便把坑踩了并且填完。下一步尝试按照目前官方提供的可用组件修改一下实现自定义的逻辑,包括Hystrix
、基于Eureka
和Ribbon
的负载均衡、限流等等。
原文链接
- Github Page:www.throwable.club/2019/05/18/…
- Coding Page:throwable.coding.me/2019/05/18/…
Spring Cloud Gateway-ServerWebExchange核心方法与请求或者响应内容的修改相关推荐
- Spring Cloud Gateway一次请求调用源码解析
简介: 最近通过深入学习Spring Cloud Gateway发现这个框架的架构设计非常简单.有效,很多组件的设计都非常值得学习,本文就Spring Cloud Gateway做一个简单的介绍,以及 ...
- Spring Cloud Gateway 雪崩了,该怎么办不要慌
问题现象与背景 昨晚我们的网关雪崩了一段时间,现象是: 1.不断有各种微服务报异常:在写 HTTP 响应的时候,连接已经关闭: reactor.netty.http.client.PrematureC ...
- 网关Spring Cloud Gateway科普
点击上方"朱小厮的博客",选择"设为星标" 后台回复"加群"获取公众号专属群聊入口 欢迎跳转到本文的原文链接:https://honeypp ...
- Spring Cloud(10)——新一代网关Spring Cloud Gateway
文章目录 Spring Cloud(10)--新一代网关Spring Cloud Gateway 1.背景知识--API网关 2.Spring Cloud Gateway 详细概述 3.Spring ...
- Spring Cloud —— Gateway 服务网关
导航 一.什么是服务网关 二.业界常见网关组件 三.Spring Cloud Gateway 四.Gateway 快速入门 4.1 创建 gateway 服务 4.2 添加 gateway 依赖和 n ...
- Spring Cloud Gateway 概述 《重新定义Spring Cloud实战》读书笔记
什么是Spring Cloud Gateway Spring Cloud Gateway 是 Spring 官方基于 Spring 5.0.Spring Boot 2.0 和 Project Reac ...
- Spring Cloud GateWay 原理
Spring Cloud GateWay 原理 一.概述 在微服务架构中,每个服务都是一个可以独立开发和运行的组件,而一个完整的微服务架构由一系列独立运行的微服务组成.其中每个服务都只会完成特定领域的 ...
- 三分钟了解Spring Cloud Gateway路由转发之自动路由
文章目录 一.前言 二.路由配置 1. 静态路由 2. 动态路由 3. 自动路由 三.Spring Cloud Gateway 是如何实现动态路由 工作原理 源码解析 路由转发原理 路由转发源码解析 ...
- Spring cloud Gateway(二) 一个Http请求的流程解析
Spring cloud Gateway(二) 一个Http请求的流程解析 简介 通过一个简单示例,debug出Spring Cloud Gateway的一个HTTP请求的处理流程 思路整理 ...
最新文章
- 2018.1.9 区块链论文翻译
- 以太坊知识教程------交易
- .Net Core和Jexus配置HTTPS服务
- A and B and Lecture Rooms
- 第30课 棋盘上的学问 《小学生C++趣味编程》
- 2684 亿背后的虚拟化技术:双 11 All on 神龙 | 问底中国 IT 技术演进
- 看了这么多代码,谈一谈代码风格!
- 超详细软件著作权申请——资料篇
- 在VFP里玩SQL查询
- flashfxp修改服务器密码,flashfxp服务器端设置
- 软件工程 之 软件维护
- VP9编码(4)-- 约定
- 4个终于被破译的世界级密码
- mac android使用WiFi安装应用调试程序
- 飞控陀螺仪,磁力计,加速计,四元数姿态结算
- 2023最新SSM计算机毕业设计选题大全(附源码+LW)之java社区闲置物品交易平台z10mc
- 618年中大促 ,全场6折起!限量周边8件套!
- freeSurfer颅骨剥离
- TM1668兼用VK1668 SOP24/SSOP24 应用于VCR.DVD 等产品的显示驱动
- redis 指定端口 启动