[case39]聊聊jdk httpclient的executor
序
本文主要研究一下jdk httpclient的executor
HttpClientImpl
java.net.http/jdk/internal/net/http/HttpClientImpl.java
private HttpClientImpl(HttpClientBuilderImpl builder,SingleFacadeFactory facadeFactory) {id = CLIENT_IDS.incrementAndGet();dbgTag = "HttpClientImpl(" + id +")";if (builder.sslContext == null) {try {sslContext = SSLContext.getDefault();} catch (NoSuchAlgorithmException ex) {throw new InternalError(ex);}} else {sslContext = builder.sslContext;}Executor ex = builder.executor;if (ex == null) {ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));isDefaultExecutor = true;} else {isDefaultExecutor = false;}delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);facadeRef = new WeakReference<>(facadeFactory.createFacade(this));client2 = new Http2ClientImpl(this);cookieHandler = builder.cookieHandler;connectTimeout = builder.connectTimeout;followRedirects = builder.followRedirects == null ?Redirect.NEVER : builder.followRedirects;this.userProxySelector = Optional.ofNullable(builder.proxy);this.proxySelector = userProxySelector.orElseGet(HttpClientImpl::getDefaultProxySelector);if (debug.on())debug.log("proxySelector is %s (user-supplied=%s)",this.proxySelector, userProxySelector.isPresent());authenticator = builder.authenticator;if (builder.version == null) {version = HttpClient.Version.HTTP_2;} else {version = builder.version;}if (builder.sslParams == null) {sslParams = getDefaultParams(sslContext);} else {sslParams = builder.sslParams;}connections = new ConnectionPool(id);connections.start();timeouts = new TreeSet<>();try {selmgr = new SelectorManager(this);} catch (IOException e) {// unlikelythrow new InternalError(e);}selmgr.setDaemon(true);filters = new FilterFactory();initFilters();assert facadeRef.get() != null;}
- 这里如果HttpClientBuilderImpl的executor为null,则会创建Executors.newCachedThreadPool(new DefaultThreadFactory(id))
HttpClientImpl.sendAsync
java.net.http/jdk/internal/net/http/HttpClientImpl.java
@Overridepublic <T> CompletableFuture<HttpResponse<T>>sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler){return sendAsync(userRequest, responseHandler, null);}@Overridepublic <T> CompletableFuture<HttpResponse<T>>sendAsync(HttpRequest userRequest,BodyHandler<T> responseHandler,PushPromiseHandler<T> pushPromiseHandler) {return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);}private <T> CompletableFuture<HttpResponse<T>>sendAsync(HttpRequest userRequest,BodyHandler<T> responseHandler,PushPromiseHandler<T> pushPromiseHandler,Executor exchangeExecutor) {Objects.requireNonNull(userRequest);Objects.requireNonNull(responseHandler);AccessControlContext acc = null;if (System.getSecurityManager() != null)acc = AccessController.getContext();// Clone the, possibly untrusted, HttpRequestHttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);if (requestImpl.method().equals("CONNECT"))throw new IllegalArgumentException("Unsupported method CONNECT");long start = DEBUGELAPSED ? System.nanoTime() : 0;reference();try {if (debugelapsed.on())debugelapsed.log("ClientImpl (async) send %s", userRequest);// When using sendAsync(...) we explicitly pass the// executor's delegate as exchange executor to force// asynchronous scheduling of the exchange.// When using send(...) we don't specify any executor// and default to using the client's delegating executor// which only spawns asynchronous tasks if it detects// that the current thread is the selector manager// thread. This will cause everything to execute inline// until we need to schedule some event with the selector.Executor executor = exchangeExecutor == null? this.delegatingExecutor : exchangeExecutor;MultiExchange<T> mex = new MultiExchange<>(userRequest,requestImpl,this,responseHandler,pushPromiseHandler,acc);CompletableFuture<HttpResponse<T>> res =mex.responseAsync(executor).whenComplete((b,t) -> unreference());if (DEBUGELAPSED) {res = res.whenComplete((b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));}// makes sure that any dependent actions happen in the CF default// executor. This is only needed for sendAsync(...), when// exchangeExecutor is non-null.if (exchangeExecutor != null) {res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);}return res;} catch(Throwable t) {unreference();debugCompleted("ClientImpl (async)", start, userRequest);throw t;}}
- 这里如果是sendAsync的话,executor参数传递的是delegatingExecutor.delegate;如果是同步的send方法,则executor传的值是null
- 这里创建了一个MultiExchange,然后调用mex.responseAsync(executor).whenComplete((b,t) -> unreference()),这里使用了executor
MultiExchange.responseAsync
java.net.http/jdk/internal/net/http/MultiExchange.java
public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {CompletableFuture<Void> start = new MinimalFuture<>();CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);start.completeAsync( () -> null, executor); // trigger executionreturn cf;}private CompletableFuture<HttpResponse<T>>responseAsync0(CompletableFuture<Void> start) {return start.thenCompose( v -> responseAsyncImpl()).thenCompose((Response r) -> {Exchange<T> exch = getExchange();return exch.readBodyAsync(responseHandler).thenApply((T body) -> {this.response =new HttpResponseImpl<>(r.request(), r, this.response, body, exch);return this.response;});});}
- 可以看到这里使用的是CompletableFuture的completeAsync方法(
注意这个方法是java9才有的
),executor也是在这里使用的 - 由于默认是使用Executors.newCachedThreadPool创建的executor,要注意控制并发数及任务执行时间,防止线程数无限制增长过度消耗系统资源
/*** Creates a thread pool that creates new threads as needed, but* will reuse previously constructed threads when they are* available, and uses the provided* ThreadFactory to create new threads when needed.** @param threadFactory the factory to use when creating new threads* @return the newly created thread pool* @throws NullPointerException if threadFactory is null*/public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}
RejectedExecutionException
- 实例代码
@Testpublic void testAsyncPool(){ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool().setPoolSize(2).setQueueSize(5).setThreadNamePrefix("test-").build();List<CompletableFuture<String>> futureList = IntStream.rangeClosed(1,100).mapToObj(i -> new CompletableFuture<String>()).collect(Collectors.toList());futureList.stream().forEach(future -> {future.completeAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e1) {e1.printStackTrace();}return "message";},executor);});CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[futureList.size()])).join();}
这里创建的是fixedPool,指定queueSize为5
- 日志输出
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@76b10754 rejected from java.util.concurrent.ThreadPoolExecutor@2bea5ab4[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)at java.base/java.util.concurrent.CompletableFuture.completeAsync(CompletableFuture.java:2591)
可以看到线程池队列大小起到了限制作用
小结
jdk httpclient的executor在进行异步操作的时候使用,默认创建的是使用Executors.newCachedThreadPool创建的executor,其线程池大小是Integer.MAX_VALUE,因此在使用的时候要注意,最好是改为有界队列,然后再加上线程池的监控。
doc
- java.net.http javadoc
[case39]聊聊jdk httpclient的executor相关推荐
- java+connect+time+out_聊聊jdk httpclient的connect timeout异常
序 本文主要研究一下httpclient的connect timeout异常 实例代码 @Test public void testConnectTimeout() throws IOExceptio ...
- 聊聊jdk http的HeaderFilter
序 本文主要研究一下jdk http的HeaderFilter. FilterFactory java.net.http/jdk/internal/net/http/FilterFactory.jav ...
- 聊聊Jdk中你没听过的关键词-synthetic
文章目录 前言 什么是synthetic? 作用和原理 产生的问题 什么是NBAC? 前言 为什么要讲讲synthetic和NBAC呢?其实在这之前,对Jdk中这两种机制并不了解,甚至没有听过,主要原 ...
- 聊聊storm TridentBoltExecutor的finishBatch方法
序 本文主要研究一下storm TridentBoltExecutor的finishBatch方法 MasterBatchCoordinator.nextTuple storm-core-1.2.2- ...
- 【拿来吧你】JDK动态代理
java proxy 因为最近一段时间准备将这几年做的一些业务和技术做个沉淀,也自己造的一些轮子,发现时不时就会需要用到动态代理和反射,所以今天打算先对jdk的动态代理这部分内容做个简单的整理 介绍 ...
- Java基础学习笔记(二)_Java核心技术(进阶)
本篇文章的学习资源来自Java学习视频教程:Java核心技术(进阶)_华东师范大学_中国大学MOOC(慕课) 本篇文章的学习笔记即是对Java核心技术课程的总结,也是对自己学习的总结 文章目录 Jav ...
- JAVA面试100道必考题
1.如下代码的执行结果: 4,1,11 @Testvoid demo01(){int i=1;i=i++;int j=i++;// i=2 j=1int k=i+ ++i * i++;//2+3*3= ...
- 2021秋招高频面经汇总(Java开发岗)
1.G1为什么高吞吐量 G1多线程并行并发 2.B+树解决什么问题 解决查询遍历太深的问题 3.硬连接和软连接 硬连接:新建的文件是已经存在的文件的一个别名,当原文件删除时,新建的文件仍然可以使用. ...
- WebClient 原理及实践—官方原版
WebClient是一种非阻塞.响应式客户端,用于执行HTTP请求.它在5.0中引入,并提供了RestTemplate的替代方案,支持同步.异步和流式场景. WebClient支持以下功能: 非阻塞I ...
最新文章
- 怎样实现企业管理系统的操作日志功能
- FuncT,TResult的使用方法(转载)
- Spring mvc,uploadifive 文件上传实践(转自:https://segmentfault.com/a/1190000004503262)
- easyui增删改查前段代码
- Android中获取当前位置的使用步骤
- 《学习之道》第九章不要突击工作
- unity两个项目合并 同名_表格合并,你还在复制粘贴?教你一键合并,超简单!...
- Unity3D之UGUI基础7:Scrollbar卷动条
- tensorflow和keras的关系
- Java 并发编程实战 -- 常见概念
- 魔兽世界API魔兽世界全局函数
- 机器学习与算法(6)--学习矢量化
- 【DA】单侧T检验p值与双侧T检验p值的关系
- 手机用蓝牙键盘好使吗_手机也可以连接键盘,你知道怎么使用吗?
- php pdo函数说明,PHP PDO函数库详解,pdo函数库详解
- 1,vue播放视频之—引入.m3u8后缀的hsl视频流
- mt4量化交易接口:分享日常量化选股方法
- 为什么说串行比并行快
- 服务器raid卡安装在什么位置,安装raid卡
- Java基于SpringBoot+Vue+nodejs的在线小说阅读平台 element