Sping Cloud专栏:路由Gateway有效避免 Only one connection receive subscriber allowed问题
Only one connection receive subscriber allowed解决方案
前言
路由SpringCloud Gateway做为代理层请求数据 接口的时可能会遇到很多问题,如何有效的避免掉这些问题?需要一个踩坑人,然后把遇到的问题详细的汇聚成一篇篇的文档展现给开发者,大家好,我就是那个“踩坑人”,也立志于汇聚有效解决方案,帮助大家。
问题描述
spring cloud gateway接收前端请求,然后请求反向代理到服务器。
当请求 method 为 GET 时,可以顺利通过。但是请求 method 为 POST 时,路由则会报如下错误:
{timestamp: "2018-12-27T03:18:58.852+0000", path: "/service12/getUsers", status: 500,…}
error: "Internal Server Error"
message: "Only one connection receive subscriber allowed."
path: "/service12/getUsers"
status: 500
timestamp: "2018-12-27T03:18:58.852+0000"
截图
排查问题
当遇到接口调用不通过,仔细一想今天上午就是将路由稍微改了下,而且通过postman测试服务端接口是正确的,所以快速定位了问题所在:路由发送Post请求会遇到这个问题 。
spring-cloud-gateway反向代理的原理是,首先读取原请求的数据,然后构造一个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然而我们在他封装之前读取了一次request body,而request body只能读取一次
如下是报错地方:
解决方案:
读取request body的时候,我们再封装一次request,转发出去
@Component
public class AuthSignatureFilter implements GlobalFilter ,Ordered {@Autowiredprivate RoutePlugService routePlugService;private Logger logger= LoggerFactory.getLogger(AuthSignatureFilter.class);@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {logger.info("/********route-plug获取详细信息***************/");ServerHttpRequest request = exchange.getRequest();RoutePlug routePlug=new RoutePlug();String url = exchange.getRequest().getPath().pathWithinApplication().value();URI requestUri = request.getURI();String method=exchange.getRequest().getMethodValue();//地址logger.info("请求URL:{}",url);logger.info("requestUri:{}",requestUri);logger.info("method:{}",method);routePlug.setUrl(url);routePlug.setUri(requestUri.toString());routePlug.setMethod(method);//开始 时间exchange.getAttributes().put("startTime", System.currentTimeMillis());//参数logger.info("QueryParams:{}",exchange.getRequest().getQueryParams());logger.info("QueryParamsJSON:{}",JSON.toJSON(exchange.getRequest().getQueryParams()));routePlug.setQueryParams(JSON.toJSON(exchange.getRequest().getQueryParams()).toString());HttpHeaders headers=exchange.getRequest().getHeaders();String contentType = headers.getFirst("Content-Type");logger.info("Host:{}",headers.getHost());logger.info("contentType", contentType);logger.info("headersJson:{}", JSON.toJSON(headers));routePlug.setHost(headers.getHost().toString());routePlug.setQueryHeard(JSON.toJSON(headers).toString());URI ex = UriComponentsBuilder.fromUri(requestUri).build(true).toUri();ServerHttpRequest newRequest = request.mutate().uri(ex).build();//记录发送的参数:获取requstBody体中信息if ("POST".equals(method) && !contentType.startsWith("multipart/form-data")){String bodyStr = resolveBodyFromRequest(request);//下面将请求体再次封装写回到 request 里,传到下一级.DataBuffer bodyDataBuffer = stringBuffer(bodyStr);Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);newRequest = new ServerHttpRequestDecorator(newRequest) {@Overridepublic Flux<DataBuffer> getBody() {return bodyFlux;}};routePlug.setBody(formatStr(bodyStr));logger.info("requesBody:{}",bodyStr);logger.info("requesBody:{}",formatStr(bodyStr));}....省略掉一些代码//记录response的 返回数据ServerHttpResponse originalResponse = exchange.getResponse();DataBufferFactory bufferFactory = originalResponse.bufferFactory();ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {@Overridepublic Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {if (body instanceof Flux) {Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;return super.writeWith(fluxBody.map(dataBuffer -> {byte[] content = new byte[dataBuffer.readableByteCount()];dataBuffer.read(content);//释放掉内存DataBufferUtils.release(dataBuffer);/* String s = new String(content, Charset.forName("UTF-8"));//TODO,s就是response的值,想修改、查看就随意而为了byte[] uppedContent = new String(content, Charset.forName("UTF-8")).getBytes();*/String responseData = null;try {//赋值给实体类responseData = IOUtils.toString(content);routePlug.setSize(responseData.getBytes().length);routePlug.setResultdata(responseData);//请求用时Long startTime = exchange.getAttribute("startTime");if (startTime != null) {long executeTime = (System.currentTimeMillis() - startTime);routePlug.setUsetime(Integer.parseInt(executeTime+""));}routePlugService.save(routePlug);} catch (IOException e) {e.printStackTrace();}logger.debug("/*************返回content*******/");return bufferFactory.wrap(content);}));}// if body is not a flux. never got there.return super.writeWith(body);}};// return chain.filter(exchange.mutate().request(request).build());return chain.filter(exchange.mutate().request(newRequest).response(decoratedResponse).build());}
//补充方法private DataBuffer stringBuffer(String value){byte[] bytes = value.getBytes(StandardCharsets.UTF_8);NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);buffer.write(bytes);return buffer;}..........省略掉n行代码}private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest){//获取请求体Flux<DataBuffer> body = serverHttpRequest.getBody();StringBuilder sb = new StringBuilder();body.subscribe(buffer -> {byte[] bytes = new byte[buffer.readableByteCount()];buffer.read(bytes);DataBufferUtils.release(buffer);String bodyString = new String(bytes, StandardCharsets.UTF_8);sb.append(bodyString);});return sb.toString();}
如上代码,要说到是获取body内容之后,我们再如何处理,关键步骤在返回设置,开始只用了第一个return(如下)
总体解决方案:变成第二个return之后就ok了,需要封装后再转发到下一个filter
// return chain.filter(exchange.mutate().request(request).build());return chain.filter(exchange.mutate().request(newRequest).response(decoratedResponse).build());
END
动态路由设置
获取请求体参数
记录访问时间、流量大小、IP限制、流量限制参考订阅SpringCloud专栏项目。
Sping Cloud专栏:路由Gateway有效避免 Only one connection receive subscriber allowed问题相关推荐
- spring cloud gateway Unhandled failure: Only one connection receive subscriber allowed.
https://gist.github.com/WeirdBob/b25569d461f0f54444d2c0eab51f3c48 ToUppercaseGlobalFilter.java https ...
- 【Spring Cloud Alibaba】Gateway 服务网关
[Spring Cloud Alibaba]Gateway 服务网关 1 架构图 2 Predicate 断言 3 路由 3.1 静态路由 3.2 动态路由 3.3 Nacos 配置 4 过滤器 4. ...
- sping cloud 搭建 微服务
sping cloud 搭建 微服务 微服务定义 微服务就是将一个单独的应用拆分为一个一个的服务,每一个服务都是提供特定的功能,一个服务只做一件事,类似进程,每个服务都能够单独部署,甚至可以拥有自己的 ...
- SpringCloud 从菜鸟到大牛之二 服务注册与发现 Sping Cloud Eureka
继承上一篇文章 ,本文 就专门来介绍一下 服务与注册组件 服务注册与发现 Sping Cloud Eureka ,作为各个微服务的注册中心,维持心跳连接 注册中心 : Eureka Server ,E ...
- Spring Cloud Alibaba 集成 Gateway 实现动态路由功能
文章目录 1 摘要 2 核心 Maven 依赖 3 名词释义 4 Gateway 动态路由原理 5 数据库表 6 核心代码 6.1 配置信息 6.2 路由实体类 6.3 本地路由数据库持久层(DAO/ ...
- 整合spring cloud云架构 - Gateway的基本入门
1.gateway和zuul Spring Cloud Finchley版本的gateway比zuul 1.x系列的性能和功能整体要好,且使用 Gateway 做跨域相比应用本身或是 Nginx 的好 ...
- Spring Cloud Alibaba - 23 Gateway初体验
文章目录 概述 网关的作用 官网 来个栗子 step1 搞依赖 step2 搞注解 (gateway没有注解) step3 搞配置 其他工程 & 验证 参数解读 spring.cloud.ga ...
- Spring Cloud 7:Gateway
Zuul 网关 Zuul 是 Netfilx 开源的一个 API Gateway 服务器,本质是一个 Web Servlet 应用.其在微服务架构体系中提供动态路由.监控.弹性.安全等边缘服务. 使用 ...
- 【Spring Cloud】网关-gateway(2.x)
cloud全家桶中有个很重要的组件就是网关,在1.x版本中都是采用的Zuul网关:但在2.x版本中,zuul的升级一直跳票,springcloud最后自己研发了一个网关代替Zuul,那就是Spring ...
- spring cloud (三) 路由 zuul
1 pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="h ...
最新文章
- 团队开发冲刺第二阶段_4
- mybatis中ResultSetHandler的设计与实现
- Another app is currently holding the yum lock; waitingn
- Redis:事务、管道、Lua脚本
- 原生js 基于canvas写一个简单的前端 截图工具
- (26)基于cookie的登陆认证(写入cookie、删除cookie、登陆后所有域下的网页都可访问、登陆成功跳转至用户开始访问的页面、使用装饰器完成所有页面的登陆认证)...
- 【Oracle】数据清洗案例
- 华为太极magisk安装教程_教程:如何升级太极内部的应用
- 【智能制造】简单明了让你了解什么是柔性制造
- docker run 的 -i -t -d参数
- C++ 赛码打字编程题
- 计算机一级考试可以搜索吗,手动找回Windows7搜索功能
- 运动会分数统计的实验报告(数组实现)
- Android 7.0 新特性
- linux麒麟v10专有机关闭防火墙或开放端口的解决办法
- golang map的遍历
- 中国的人工智能是否能在2030年引领世界?
- php word目录,word怎么做目录和页码
- 它号称 Python 中性能最高的异步 Web 框架:超详细 Sanic 入门指南!
- android 打卡 虚拟定位 sqlite