最近我组使用的spring cloud gateway 线上偶发返回500,后台查看报错日志信息,发现有一条下面的异常:

reactor.netty.channel.AbortedException: Connection has been closed BEFORE response

网上狂搜,以我以往的解决思路,欲解决问题,首先复现问题,然后再解决问题,这种最放心,若是实在无法复现问题,那就先解决问题,然后再持续观察

 一、重现

参考GitHub 几位大神的经验,自己也在本地进行模拟,不过github里面的模拟方案大都是直接访问reactor-nettty工具包,并且并发量比较大的情况进行重现。跟我组并发性不到100qps还不能相提并论

首先准备我的祸源,就是产生问题的client服务,实际该问题主要是使用spring5后一个webclient(见另一篇博文)非阻塞的web请求客户端引起的,他在大量的并发访问时就会产生该问题,他底层是使用reactor-nettty(待完善)实现。代码如下:

@RestController
@RequestMapping("/requestPost")
@AllArgsConstructor
public class Controller {private SampleApiClient sampleApiClient;@GetMappingpublic Mono<String> trigger() {int count = 10000;List list = new ArrayList<Integer>();for (int i = 0; i < count; i ++) {list.add(i);}// simulate concurrent API callsreturn Flux.fromStream(list.stream()).flatMap(id -> sampleApiClient.postSomethingWithExchange()).collectList().map(results -> "success");}
}

另外一个调用外部的类:

@Component
@AllArgsConstructor
public class SampleApiClient {private final static String PATH = "/";private WebClient sampleWebClient;public Mono<ClientResponse> postSomethingWithExchange() {return sampleWebClient.post().uri(PATH, System.currentTimeMillis()).exchange();}}

配置类:

package com.thsnoopy.report.configuration;import io.netty.channel.ChannelOption;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.http.client.reactive.ClientHttpConnector;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.DefaultUriBuilderFactory;
import reactor.netty.http.client.HttpClient;import static org.springframework.web.reactive.function.client.ExchangeFilterFunctions.basicAuthentication;@Configuration
public class WebClientConfiguration {@Qualifier("sampleWebClient")@Beanpublic WebClient sampleWebClient(WebClient.Builder builder,@Value("${api.sample.endpoint}") String endpoint,@Value("${api.sample.connectTimeoutInMilliSecond}") int connectTimeoutInMilliSecond,@Value("${api.sample.readWriteTimeoutInSecond}") int readWriteTimeoutInSecond) {return builder.uriBuilderFactory(uriBuilderFactory(endpoint)).clientConnector(clientHttpConnector(connectTimeoutInMilliSecond, readWriteTimeoutInSecond)).build();}private DefaultUriBuilderFactory uriBuilderFactory(String baseUri) {DefaultUriBuilderFactory defaultUriBuilderFactory = new DefaultUriBuilderFactory(baseUri);return defaultUriBuilderFactory;}private ClientHttpConnector clientHttpConnector(int connectTimeoutInMilliSecond, int readWriteTimeoutInSecond) {HttpClient httpClient =HttpClient.create()
//                    .keepAlive(false);.tcpConfiguration(client ->client.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeoutInMilliSecond).doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(readWriteTimeoutInSecond)).addHandlerLast(new WriteTimeoutHandler(readWriteTimeoutInSecond))));return new ReactorClientHttpConnector(httpClient);}
}

yml配置

server:port: 8080api:sample:endpoint: http://www.baidu.comconnectTimeoutInMilliSecond: 20000readWriteTimeoutInSecond: 100logging:level:reactor:netty: debug

然后狂点:http://localhost:8080/requestPost 几次,就打印出相关的日志了:

eactor.netty.channel.AbortedException: Connection has been closed BEFORE responseat reactor.netty.http.HttpOperations.then(HttpOperations.java:131)at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)at org.springframework.http.client.reactive.AbstractClientHttpRequest.doCommit(AbstractClientHttpRequest.java:133)at org.springframework.http.client.reactive.ReactorClientHttpRequest.setComplete(ReactorClientHttpRequest.java:114)at org.springframework.web.reactive.function.BodyInserters.lambda$static$0(BodyInserters.java:62)at org.springframework.web.reactive.function.client.DefaultClientRequestBuilder$BodyInserterRequest.writeTo(DefaultClientRequestBuilder.java:249)at org.springframework.web.reactive.function.client.ExchangeFunctions$DefaultExchangeFunction.lambda$exchange$1(ExchangeFunctions.java:103)at org.springframework.http.client.reactive.ReactorClientHttpConnector.lambda$connect$2(ReactorClientHttpConnector.java:110)at reactor.netty.http.client.HttpClientConnect$HttpClientHandler.requestWithBody(HttpClientConnect.java:587)at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.lambda$onStateChange$0(HttpClientConnect.java:440)at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)at reactor.netty.http.client.HttpClientConnect$HttpIOHandlerObserver.onStateChange(HttpClientConnect.java:441)at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:470)at reactor.netty.resources.PooledConnectionProvider$DisposableAcquire.onStateChange(PooledConnectionProvider.java:512)at reactor.netty.resources.PooledConnectionProvider$PooledConnection.onStateChange(PooledConnectionProvider.java:451)at reactor.netty.channel.ChannelOperationsHandler.channelActive(ChannelOperationsHandler.java:62)at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:225)at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:211)at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:204)at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelActive(CombinedChannelDuplexHandler.java:414)at io.netty.channel.ChannelInboundHandlerAdapter.channelActive(ChannelInboundHandlerAdapter.java:69)at io.netty.channel.CombinedChannelDuplexHandler.channelActive(CombinedChannelDuplexHandler.java:213)at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:225)at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:211)at io.netty.channel.AbstractChannelHandlerContext.fireChannelActive(AbstractChannelHandlerContext.java:204)at io.netty.channel.DefaultChannelPipeline$HeadContext.channelActive(DefaultChannelPipeline.java:1396)at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:225)at io.netty.channel.AbstractChannelHandlerContext.invokeChannelActive(AbstractChannelHandlerContext.java:211)at io.netty.channel.DefaultChannelPipeline.fireChannelActive(DefaultChannelPipeline.java:906)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:311)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:341)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:670)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:617)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:534)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:906)at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)at java.lang.Thread.run(Thread.java:748)

二、原因

google一下吧,果然是个bug,见一下所述(https://github.com/reactor/reactor-netty/issues/796):

we root caused the issue to two things :the exception is not very self descriptive, but its a valid one
the root cause of the exception : a connect timeout due to too many concurrent connection open/acquired.
We are going to change the default connection pool for our clients in 0.9, and document how to set those up. What happens right now is there is no limit, so you are opening 80000 connections to a single destination, thats more than ephemeral ports and thats overall a lot of pending events that could be delayed and cause a connect timeout. Usually, connection pools are capped by default, e.g. Golang has 100 (including only "2" by destination), other libraries run 64, 100 or again 2-6 (the RFC recommendation for browsers). We should limit the number of open connections too by default and we benchmarked with ConnectionProvider.fixed("test", 500) without any troubles.So i'm leaving the issue open until we change that default and document it overall but you should try as a workaround to set your client this way:HttpClient.create(ConnectionProvider.fixed("test", 500))

如上所示,大概意思是会在reactor-netty 0.9中解决该问题,问题的根源是默认连接池数没有做限制,单个目标要打开80000个连接,就会导致很多等待中的端口和相关事件延迟和超时

并且在最新版本里面加上了如下代码。

那就更新版本吧,修改入戏pom配置

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-webflux</artifactId><version>2.2.2.RELEASE</version></dependency>

该版本webflux会使用最新的reactor-netty 0.9.2.RELEASE

然后再次发起测试

接口依然抛出该异常,服务日志依然:reactor.netty.channel.AbortedException: Connection has been closed BEFORE response

为何?

三、spring cloud gateway 测试

既然我组出问题的根源是在spring cloud gateway ,那就先剖析调用代码,顺藤摸瓜,整个网络调用的核心类是在一个

NettyRoutingFilter这里面,先欣赏下代码:
public class NettyRoutingFilter implements GlobalFilter, Ordered {private static final Log log = LogFactory.getLog(NettyRoutingFilter.class);private final HttpClient httpClient;private final ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider;private final HttpClientProperties properties;private volatile List<HttpHeadersFilter> headersFilters;public NettyRoutingFilter(HttpClient httpClient, ObjectProvider<List<HttpHeadersFilter>> headersFiltersProvider, HttpClientProperties properties) {this.httpClient = httpClient;this.headersFiltersProvider = headersFiltersProvider;this.properties = properties;}public List<HttpHeadersFilter> getHeadersFilters() {if (this.headersFilters == null) {this.headersFilters = (List)this.headersFiltersProvider.getIfAvailable();}return this.headersFilters;}public int getOrder() {return 2147483647;}public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {URI requestUrl = (URI)exchange.getRequiredAttribute(ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR);String scheme = requestUrl.getScheme();if (!ServerWebExchangeUtils.isAlreadyRouted(exchange) && ("http".equals(scheme) || "https".equals(scheme))) {ServerWebExchangeUtils.setAlreadyRouted(exchange);ServerHttpRequest request = exchange.getRequest();HttpMethod method = HttpMethod.valueOf(request.getMethodValue());String url = requestUrl.toString();HttpHeaders filtered = HttpHeadersFilter.filterRequest(this.getHeadersFilters(), exchange);DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();filtered.forEach(httpHeaders::set);String transferEncoding = request.getHeaders().getFirst("Transfer-Encoding");boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);boolean preserveHost = (Boolean)exchange.getAttributeOrDefault(ServerWebExchangeUtils.PRESERVE_HOST_HEADER_ATTRIBUTE, false);Flux<HttpClientResponse> responseFlux = ((RequestSender)this.httpClient.chunkedTransfer(chunkedTransfer).request(method).uri(url)).send((req, nettyOutbound) -> {req.headers(httpHeaders);if (preserveHost) {String host = request.getHeaders().getFirst("Host");req.header("Host", host);}if (log.isTraceEnabled()) {nettyOutbound.withConnection((connection) -> {log.trace("outbound route: " + connection.channel().id().asShortText() + ", inbound: " + exchange.getLogPrefix());});}return nettyOutbound.options(SendOptions::flushOnEach).send(request.getBody().map((dataBuffer) -> {return ((NettyDataBuffer)dataBuffer).getNativeBuffer();}));}).responseConnection((res, connection) -> {exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_ATTR, res);exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_CONN_ATTR, connection);ServerHttpResponse response = exchange.getResponse();HttpHeaders headers = new HttpHeaders();res.responseHeaders().forEach((entry) -> {headers.add((String)entry.getKey(), (String)entry.getValue());});String contentTypeValue = headers.getFirst("Content-Type");if (StringUtils.hasLength(contentTypeValue)) {exchange.getAttributes().put("original_response_content_type", contentTypeValue);}HttpStatus status = HttpStatus.resolve(res.status().code());if (status != null) {response.setStatusCode(status);} else {if (!(response instanceof AbstractServerHttpResponse)) {throw new IllegalStateException("Unable to set status code on response: " + res.status().code() + ", " + response.getClass());}((AbstractServerHttpResponse)response).setStatusCodeValue(res.status().code());}HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(this.getHeadersFilters(), headers, exchange, Type.RESPONSE);if (!filteredResponseHeaders.containsKey("Transfer-Encoding") && filteredResponseHeaders.containsKey("Content-Length")) {response.getHeaders().remove("Transfer-Encoding");}exchange.getAttributes().put(ServerWebExchangeUtils.CLIENT_RESPONSE_HEADER_NAMES, filteredResponseHeaders.keySet());response.getHeaders().putAll(filteredResponseHeaders);return Mono.just(res);});if (this.properties.getResponseTimeout() != null) {responseFlux = responseFlux.timeout(this.properties.getResponseTimeout(), Mono.error(new TimeoutException("Response took longer than timeout: " + this.properties.getResponseTimeout()))).onErrorMap(TimeoutException.class, (th) -> {return new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT, th.getMessage(), th);});}return responseFlux.then(chain.filter(exchange));} else {return chain.filter(exchange);}}
}

这块代码很简单,就是调用reactor-netty的 httpclient 接口,整个是netty 响应式的非阻塞网络方案,参考:https://projectreactor.io/docs/netty/release/reference/index.html#http-client

所以核心代码还是来自于reactor-netty

使用jmeter压测spring cloud gateway 接口

线程配置:

结果如下:

持续测试半个多小时,期间没有出现一笔相关错误

陷入沉思,思索片刻后,决定只有2种方案可做

1、给作者报bug

2、深入阅读reactor-netty,协助作者解决

只能持续关注了

后续:

参考:https://github.com/spring-cloud/spring-cloud-gateway/issues/1148

最近有一则解决方案,产生的根源如下,与上述大同小异:

在 reactor-netty0.9.0版本后 添加了一个连接池的 maxIdleTime参数,如果达到这个时间后,这个channel将会关闭

如果不设置他,netty将不会关闭,一直保持无限制的状态。然后apache的服务将会在2s后关闭这个连接,而导致报:

Connection has been closed BEFORE response

的异常。

该作者提交了代码,在spring cloud gateway  HttpClientProperties.java 类中添加了 该时间:

希望在spring cloud gateway正式版 稳定版 更新后

加入连接池、maxIdleTime  再做观察

希望可以解决问题

参考:https://github.com/reactor/reactor-netty/issues/796

https://github.com/reactor/reactor-netty/issues/632

Connection has been closed BEFORE response异常相关推荐

  1. Connection prematurely closed BEFORE response reactor.netty.http.client.PrematureCloseException: Co

    一.最近在开发网关系统,就在感觉万事大吉可以上线的时候发现了如下的错误(这个是我在配置rabbitmq访问多个服务时发现的) Connection prematurely closed BEFORE ...

  2. 单元测试报connection is allready closed导致dailybuild中断的解决方案——类加载机制的应用...

    现象; 前段时间在dailybuild过程中,经常遇到connection is allready closed错误,特别是在dailybuild高峰期. 分析定位: 这个错误是的起因比较多,这里的情 ...

  3. Gateway加解密偶尔HTTP method names must be tokensConnection prematurely closed BEFORE response问题

    背景 在上篇博客中使用了RSA配合AES完成了接口的加解密操作,但是在实际测试过程中,偶尔会产生`HTTP method names must be tokens`.`Connection prema ...

  4. tomcat5下jsp出现getOutputStream() has already been called for this response异常的原因和解决方法...

    tomcat5下jsp出现getOutputStream() has already been called for this response异常的原因和解决方法 [标  题]:tomcat5下js ...

  5. getOutputStream() has already been called for this response异常的原因和解决方法[转]

    1.在tomcat6.0下jsp出现getOutputStream() has already been called for this response异常的原因和解决方法 在tomcat6.0下j ...

  6. This connection has been closed

    SQL 错误 [08003]: This connection has been closed. 出现问题: 使用连接工具跑sql时,长时间没有结果,工具自动断开 解决方案: 将sql语句放到后台去执 ...

  7. nginx响应超时upstream timed out (110: Connection timed out) while reading response header from upstream

    问题描述 解决方法 提高nginx网络吞吐量buffers优化指令说明 nginx代理超时配置 nginx缓存区大小设置 问题描述 后台server服务响应时间正常,但是请求没有打到服务器,在ngin ...

  8. mysql No operations allowed after connection closed.Connection was implicitly closed due to underly

    转载原文地址:原文地址 在说这个错误之前先说明我的项目是通过Hibernate来进行数据库操作的 关于MySQL连接超时问题,估计很多人都遇到过:大致情形都是这样,开发测试时程序都是正常的,一到第二天 ...

  9. Connection pool shut down http请求异常关闭

    本人在项目运用中写了一个数据推送的组件,需要多线程频繁调用远程接口进行传输数据,远程请求通过HttpClient 使用 CloseableHttpClient 发起连接后,使用CloseableHtt ...

最新文章

  1. 改改 Python 代码,运行速度还能提升 6 万倍,Science:先别想摩尔定律了
  2. 如何在桌面上安装运行Rancher 2.0
  3. [转]/tomcat/conf/server.xml配置文件的源码解析
  4. java练手代码大全手机版_java循环练习的简单代码实例
  5. if with large data project
  6. 人工智障学习笔记——机器学习(6)协同过滤
  7. duilib WindowImplBase BUG修复 --- 按一次ESC键, 关闭多个窗口
  8. 华为p40pro如何升级鸿蒙,可以升级到鸿蒙OS的四款华为手机,相信都没有后悔入手!...
  9. 东农计算机网络技术离线作业,东农16秋《电力系统分析》在线作业
  10. 视频播放加密功能的演示
  11. Java 删除文件夹以及文件夹下的文件
  12. AT89C2051烧写器的制做与调试
  13. 运放的相位补偿 ?
  14. 基于Unity3d的FPS与塔防相结合的游戏设计
  15. html标签各属性之间用空格隔开对吗,03-HTML标签(二)
  16. C语言气温连续上升的天数,广西多地连阴雨天数破纪录 - 广西首页 -中国天气网...
  17. 第三章 集合的基本概念和运算
  18. 【网络安全】网络安全期末大题 复习题
  19. 微信小程序全栈开发实践 第三章 微信小程序开发常用的API介绍及使用 -- 3.9 网络接口简介(九)扩展wxp模块的request3方法,实现用户登录的自动融合
  20. 【数据库基础】02_数据库基础练习

热门文章

  1. openjudge 1.8.24 蛇形填充数组
  2. SAP甲方历程回顾-01 2017年转到甲方的故事~从乙方离职
  3. Java之旅--如何从草根成为技术专家
  4. Asp.Net Ajax (1)---入门篇
  5. Isaac-gym(3): 官方文档——programming之仿真设置
  6. android 调试驱动程序,Android驱动程序开发和调试环境配置
  7. 03 - OAI接入网搭建过程 - 研0
  8. 论文阅读(2021) 探讨语境在会话中话语层情绪、行为和意图分类中的作用
  9. 术语-BPM:BPM
  10. java IO的学习记录