Only one connection receive subscriber allowed解决方案

前言

路由SpringCloud Gateway做为代理层请求数据 接口的时可能会遇到很多问题,如何有效的避免掉这些问题?需要一个踩坑人,然后把遇到的问题详细的汇聚成一篇篇的文档展现给开发者,大家好,我就是那个“踩坑人”,也立志于汇聚有效解决方案,帮助大家。

问题描述

spring cloud gateway接收前端请求,然后请求反向代理到服务器。

当请求 method 为 GET 时,可以顺利通过。但是请求 method 为 POST 时,路由则会报如下错误:

{timestamp: "2018-12-27T03:18:58.852+0000", path: "/service12/getUsers", status: 500,…}
error: "Internal Server Error"
message: "Only one connection receive subscriber allowed."
path: "/service12/getUsers"
status: 500
timestamp: "2018-12-27T03:18:58.852+0000"

截图

排查问题

当遇到接口调用不通过,仔细一想今天上午就是将路由稍微改了下,而且通过postman测试服务端接口是正确的,所以快速定位了问题所在:路由发送Post请求会遇到这个问题 。

spring-cloud-gateway反向代理的原理是,首先读取原请求的数据,然后构造一个新的请求,将原请求的数据封装到新的请求中,然后再转发出去。然而我们在他封装之前读取了一次request body,而request body只能读取一次

如下是报错地方:

解决方案:

读取request body的时候,我们再封装一次request,转发出去

@Component
public class AuthSignatureFilter implements GlobalFilter ,Ordered {@Autowiredprivate RoutePlugService  routePlugService;private Logger logger= LoggerFactory.getLogger(AuthSignatureFilter.class);@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {logger.info("/********route-plug获取详细信息***************/");ServerHttpRequest request = exchange.getRequest();RoutePlug routePlug=new RoutePlug();String url = exchange.getRequest().getPath().pathWithinApplication().value();URI requestUri = request.getURI();String method=exchange.getRequest().getMethodValue();//地址logger.info("请求URL:{}",url);logger.info("requestUri:{}",requestUri);logger.info("method:{}",method);routePlug.setUrl(url);routePlug.setUri(requestUri.toString());routePlug.setMethod(method);//开始 时间exchange.getAttributes().put("startTime", System.currentTimeMillis());//参数logger.info("QueryParams:{}",exchange.getRequest().getQueryParams());logger.info("QueryParamsJSON:{}",JSON.toJSON(exchange.getRequest().getQueryParams()));routePlug.setQueryParams(JSON.toJSON(exchange.getRequest().getQueryParams()).toString());HttpHeaders headers=exchange.getRequest().getHeaders();String contentType = headers.getFirst("Content-Type");logger.info("Host:{}",headers.getHost());logger.info("contentType", contentType);logger.info("headersJson:{}", JSON.toJSON(headers));routePlug.setHost(headers.getHost().toString());routePlug.setQueryHeard(JSON.toJSON(headers).toString());URI ex = UriComponentsBuilder.fromUri(requestUri).build(true).toUri();ServerHttpRequest newRequest = request.mutate().uri(ex).build();//记录发送的参数:获取requstBody体中信息if ("POST".equals(method) && !contentType.startsWith("multipart/form-data")){String bodyStr = resolveBodyFromRequest(request);//下面将请求体再次封装写回到 request 里,传到下一级.DataBuffer bodyDataBuffer = stringBuffer(bodyStr);Flux<DataBuffer> bodyFlux = Flux.just(bodyDataBuffer);newRequest = new ServerHttpRequestDecorator(newRequest) {@Overridepublic Flux<DataBuffer> getBody() {return bodyFlux;}};routePlug.setBody(formatStr(bodyStr));logger.info("requesBody:{}",bodyStr);logger.info("requesBody:{}",formatStr(bodyStr));}....省略掉一些代码//记录response的 返回数据ServerHttpResponse originalResponse = exchange.getResponse();DataBufferFactory bufferFactory = originalResponse.bufferFactory();ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(originalResponse) {@Overridepublic Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {if (body instanceof Flux) {Flux<? extends DataBuffer> fluxBody = (Flux<? extends DataBuffer>) body;return super.writeWith(fluxBody.map(dataBuffer -> {byte[] content = new byte[dataBuffer.readableByteCount()];dataBuffer.read(content);//释放掉内存DataBufferUtils.release(dataBuffer);/* String s = new String(content, Charset.forName("UTF-8"));//TODO,s就是response的值,想修改、查看就随意而为了byte[] uppedContent = new String(content, Charset.forName("UTF-8")).getBytes();*/String responseData = null;try {//赋值给实体类responseData = IOUtils.toString(content);routePlug.setSize(responseData.getBytes().length);routePlug.setResultdata(responseData);//请求用时Long startTime = exchange.getAttribute("startTime");if (startTime != null) {long executeTime = (System.currentTimeMillis() - startTime);routePlug.setUsetime(Integer.parseInt(executeTime+""));}routePlugService.save(routePlug);} catch (IOException e) {e.printStackTrace();}logger.debug("/*************返回content*******/");return bufferFactory.wrap(content);}));}// if body is not a flux. never got there.return super.writeWith(body);}};//        return chain.filter(exchange.mutate().request(request).build());return   chain.filter(exchange.mutate().request(newRequest).response(decoratedResponse).build());}
//补充方法private DataBuffer stringBuffer(String value){byte[] bytes = value.getBytes(StandardCharsets.UTF_8);NettyDataBufferFactory nettyDataBufferFactory = new NettyDataBufferFactory(ByteBufAllocator.DEFAULT);DataBuffer buffer = nettyDataBufferFactory.allocateBuffer(bytes.length);buffer.write(bytes);return buffer;}..........省略掉n行代码}private String resolveBodyFromRequest(ServerHttpRequest serverHttpRequest){//获取请求体Flux<DataBuffer> body = serverHttpRequest.getBody();StringBuilder sb = new StringBuilder();body.subscribe(buffer -> {byte[] bytes = new byte[buffer.readableByteCount()];buffer.read(bytes);DataBufferUtils.release(buffer);String bodyString = new String(bytes, StandardCharsets.UTF_8);sb.append(bodyString);});return sb.toString();}

如上代码,要说到是获取body内容之后,我们再如何处理,关键步骤在返回设置,开始只用了第一个return(如下)

总体解决方案:变成第二个return之后就ok了,需要封装后再转发到下一个filter

//      return chain.filter(exchange.mutate().request(request).build());return   chain.filter(exchange.mutate().request(newRequest).response(decoratedResponse).build());

END

动态路由设置

获取请求体参数

记录访问时间、流量大小、IP限制、流量限制参考订阅SpringCloud专栏项目。

Sping Cloud专栏:路由Gateway有效避免 Only one connection receive subscriber allowed问题相关推荐

  1. spring cloud gateway Unhandled failure: Only one connection receive subscriber allowed.

    https://gist.github.com/WeirdBob/b25569d461f0f54444d2c0eab51f3c48 ToUppercaseGlobalFilter.java https ...

  2. 【Spring Cloud Alibaba】Gateway 服务网关

    [Spring Cloud Alibaba]Gateway 服务网关 1 架构图 2 Predicate 断言 3 路由 3.1 静态路由 3.2 动态路由 3.3 Nacos 配置 4 过滤器 4. ...

  3. sping cloud 搭建 微服务

    sping cloud 搭建 微服务 微服务定义 微服务就是将一个单独的应用拆分为一个一个的服务,每一个服务都是提供特定的功能,一个服务只做一件事,类似进程,每个服务都能够单独部署,甚至可以拥有自己的 ...

  4. SpringCloud 从菜鸟到大牛之二 服务注册与发现 Sping Cloud Eureka

    继承上一篇文章 ,本文 就专门来介绍一下 服务与注册组件 服务注册与发现 Sping Cloud Eureka ,作为各个微服务的注册中心,维持心跳连接 注册中心 : Eureka Server ,E ...

  5. Spring Cloud Alibaba 集成 Gateway 实现动态路由功能

    文章目录 1 摘要 2 核心 Maven 依赖 3 名词释义 4 Gateway 动态路由原理 5 数据库表 6 核心代码 6.1 配置信息 6.2 路由实体类 6.3 本地路由数据库持久层(DAO/ ...

  6. 整合spring cloud云架构 - Gateway的基本入门

    1.gateway和zuul Spring Cloud Finchley版本的gateway比zuul 1.x系列的性能和功能整体要好,且使用 Gateway 做跨域相比应用本身或是 Nginx 的好 ...

  7. Spring Cloud Alibaba - 23 Gateway初体验

    文章目录 概述 网关的作用 官网 来个栗子 step1 搞依赖 step2 搞注解 (gateway没有注解) step3 搞配置 其他工程 & 验证 参数解读 spring.cloud.ga ...

  8. Spring Cloud 7:Gateway

    Zuul 网关 Zuul 是 Netfilx 开源的一个 API Gateway 服务器,本质是一个 Web Servlet 应用.其在微服务架构体系中提供动态路由.监控.弹性.安全等边缘服务. 使用 ...

  9. 【Spring Cloud】网关-gateway(2.x)

    cloud全家桶中有个很重要的组件就是网关,在1.x版本中都是采用的Zuul网关:但在2.x版本中,zuul的升级一直跳票,springcloud最后自己研发了一个网关代替Zuul,那就是Spring ...

  10. spring cloud (三) 路由 zuul

    1 pom.xml <?xml version="1.0" encoding="UTF-8"?> <project xmlns="h ...

最新文章

  1. 团队开发冲刺第二阶段_4
  2. mybatis中ResultSetHandler的设计与实现
  3. Another app is currently holding the yum lock; waitingn
  4. Redis:事务、管道、Lua脚本
  5. 原生js 基于canvas写一个简单的前端 截图工具
  6. (26)基于cookie的登陆认证(写入cookie、删除cookie、登陆后所有域下的网页都可访问、登陆成功跳转至用户开始访问的页面、使用装饰器完成所有页面的登陆认证)...
  7. 【Oracle】数据清洗案例
  8. 华为太极magisk安装教程_教程:如何升级太极内部的应用
  9. 【智能制造】简单明了让你了解什么是柔性制造
  10. docker run 的 -i -t -d参数
  11. C++ 赛码打字编程题
  12. 计算机一级考试可以搜索吗,手动找回Windows7搜索功能
  13. 运动会分数统计的实验报告(数组实现)
  14. Android 7.0 新特性
  15. linux麒麟v10专有机关闭防火墙或开放端口的解决办法
  16. golang map的遍历
  17. 中国的人工智能是否能在2030年引领世界?
  18. php word目录,word怎么做目录和页码
  19. 它号称 Python 中性能最高的异步 Web 框架:超详细 Sanic 入门指南!
  20. android 打卡 虚拟定位 sqlite

热门文章

  1. 生成自定义文字的二维码
  2. Android 相机教程,安卓应用开发调用系统相机教程
  3. 软件测试员的日常逗逼瞬间
  4. Gartner点将分布式文件存储,浪潮存储缘何一鸣惊人?
  5. 除了美团点评合并,国庆长假O2O还发生了什么?
  6. pip install deepforest 失败
  7. SAP MM 采购信息记录中价格单位转换因子的修改
  8. 开源项目 CDN 加速服务站合集:除了BootCDN,你还知道其他免费的前端开源项目 CDN 加速服务吗
  9. 【大话设计模式】——设计模式概论
  10. PAT——1118 Birds in Forest 甲级