本文主要研究一下jdk httpclient的executor

HttpClientImpl

java.net.http/jdk/internal/net/http/HttpClientImpl.java

    private HttpClientImpl(HttpClientBuilderImpl builder,SingleFacadeFactory facadeFactory) {id = CLIENT_IDS.incrementAndGet();dbgTag = "HttpClientImpl(" + id +")";if (builder.sslContext == null) {try {sslContext = SSLContext.getDefault();} catch (NoSuchAlgorithmException ex) {throw new InternalError(ex);}} else {sslContext = builder.sslContext;}Executor ex = builder.executor;if (ex == null) {ex = Executors.newCachedThreadPool(new DefaultThreadFactory(id));isDefaultExecutor = true;} else {isDefaultExecutor = false;}delegatingExecutor = new DelegatingExecutor(this::isSelectorThread, ex);facadeRef = new WeakReference<>(facadeFactory.createFacade(this));client2 = new Http2ClientImpl(this);cookieHandler = builder.cookieHandler;connectTimeout = builder.connectTimeout;followRedirects = builder.followRedirects == null ?Redirect.NEVER : builder.followRedirects;this.userProxySelector = Optional.ofNullable(builder.proxy);this.proxySelector = userProxySelector.orElseGet(HttpClientImpl::getDefaultProxySelector);if (debug.on())debug.log("proxySelector is %s (user-supplied=%s)",this.proxySelector, userProxySelector.isPresent());authenticator = builder.authenticator;if (builder.version == null) {version = HttpClient.Version.HTTP_2;} else {version = builder.version;}if (builder.sslParams == null) {sslParams = getDefaultParams(sslContext);} else {sslParams = builder.sslParams;}connections = new ConnectionPool(id);connections.start();timeouts = new TreeSet<>();try {selmgr = new SelectorManager(this);} catch (IOException e) {// unlikelythrow new InternalError(e);}selmgr.setDaemon(true);filters = new FilterFactory();initFilters();assert facadeRef.get() != null;}
  • 这里如果HttpClientBuilderImpl的executor为null,则会创建Executors.newCachedThreadPool(new DefaultThreadFactory(id))

HttpClientImpl.sendAsync

java.net.http/jdk/internal/net/http/HttpClientImpl.java

    @Overridepublic <T> CompletableFuture<HttpResponse<T>>sendAsync(HttpRequest userRequest, BodyHandler<T> responseHandler){return sendAsync(userRequest, responseHandler, null);}@Overridepublic <T> CompletableFuture<HttpResponse<T>>sendAsync(HttpRequest userRequest,BodyHandler<T> responseHandler,PushPromiseHandler<T> pushPromiseHandler) {return sendAsync(userRequest, responseHandler, pushPromiseHandler, delegatingExecutor.delegate);}private <T> CompletableFuture<HttpResponse<T>>sendAsync(HttpRequest userRequest,BodyHandler<T> responseHandler,PushPromiseHandler<T> pushPromiseHandler,Executor exchangeExecutor)    {Objects.requireNonNull(userRequest);Objects.requireNonNull(responseHandler);AccessControlContext acc = null;if (System.getSecurityManager() != null)acc = AccessController.getContext();// Clone the, possibly untrusted, HttpRequestHttpRequestImpl requestImpl = new HttpRequestImpl(userRequest, proxySelector);if (requestImpl.method().equals("CONNECT"))throw new IllegalArgumentException("Unsupported method CONNECT");long start = DEBUGELAPSED ? System.nanoTime() : 0;reference();try {if (debugelapsed.on())debugelapsed.log("ClientImpl (async) send %s", userRequest);// When using sendAsync(...) we explicitly pass the// executor's delegate as exchange executor to force// asynchronous scheduling of the exchange.// When using send(...) we don't specify any executor// and default to using the client's delegating executor// which only spawns asynchronous tasks if it detects// that the current thread is the selector manager// thread. This will cause everything to execute inline// until we need to schedule some event with the selector.Executor executor = exchangeExecutor == null? this.delegatingExecutor : exchangeExecutor;MultiExchange<T> mex = new MultiExchange<>(userRequest,requestImpl,this,responseHandler,pushPromiseHandler,acc);CompletableFuture<HttpResponse<T>> res =mex.responseAsync(executor).whenComplete((b,t) -> unreference());if (DEBUGELAPSED) {res = res.whenComplete((b,t) -> debugCompleted("ClientImpl (async)", start, userRequest));}// makes sure that any dependent actions happen in the CF default// executor. This is only needed for sendAsync(...), when// exchangeExecutor is non-null.if (exchangeExecutor != null) {res = res.whenCompleteAsync((r, t) -> { /* do nothing */}, ASYNC_POOL);}return res;} catch(Throwable t) {unreference();debugCompleted("ClientImpl (async)", start, userRequest);throw t;}}
  • 这里如果是sendAsync的话,executor参数传递的是delegatingExecutor.delegate;如果是同步的send方法,则executor传的值是null
  • 这里创建了一个MultiExchange,然后调用mex.responseAsync(executor).whenComplete((b,t) -> unreference()),这里使用了executor

MultiExchange.responseAsync

java.net.http/jdk/internal/net/http/MultiExchange.java

    public CompletableFuture<HttpResponse<T>> responseAsync(Executor executor) {CompletableFuture<Void> start = new MinimalFuture<>();CompletableFuture<HttpResponse<T>> cf = responseAsync0(start);start.completeAsync( () -> null, executor); // trigger executionreturn cf;}private CompletableFuture<HttpResponse<T>>responseAsync0(CompletableFuture<Void> start) {return start.thenCompose( v -> responseAsyncImpl()).thenCompose((Response r) -> {Exchange<T> exch = getExchange();return exch.readBodyAsync(responseHandler).thenApply((T body) -> {this.response =new HttpResponseImpl<>(r.request(), r, this.response, body, exch);return this.response;});});}
  • 可以看到这里使用的是CompletableFuture的completeAsync方法(注意这个方法是java9才有的),executor也是在这里使用的
  • 由于默认是使用Executors.newCachedThreadPool创建的executor,要注意控制并发数及任务执行时间,防止线程数无限制增长过度消耗系统资源
    /*** Creates a thread pool that creates new threads as needed, but* will reuse previously constructed threads when they are* available, and uses the provided* ThreadFactory to create new threads when needed.** @param threadFactory the factory to use when creating new threads* @return the newly created thread pool* @throws NullPointerException if threadFactory is null*/public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),threadFactory);}

RejectedExecutionException

  • 实例代码
    @Testpublic void testAsyncPool(){ThreadPoolExecutor executor = ThreadPoolBuilder.fixedPool().setPoolSize(2).setQueueSize(5).setThreadNamePrefix("test-").build();List<CompletableFuture<String>> futureList = IntStream.rangeClosed(1,100).mapToObj(i -> new CompletableFuture<String>()).collect(Collectors.toList());futureList.stream().forEach(future -> {future.completeAsync(() -> {try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e1) {e1.printStackTrace();}return "message";},executor);});CompletableFuture.allOf(futureList.toArray(new CompletableFuture<?>[futureList.size()])).join();}

这里创建的是fixedPool,指定queueSize为5

  • 日志输出
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.CompletableFuture$AsyncSupply@76b10754 rejected from java.util.concurrent.ThreadPoolExecutor@2bea5ab4[Running, pool size = 2, active threads = 2, queued tasks = 5, completed tasks = 0]at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)at java.base/java.util.concurrent.CompletableFuture.completeAsync(CompletableFuture.java:2591)

可以看到线程池队列大小起到了限制作用

小结

jdk httpclient的executor在进行异步操作的时候使用,默认创建的是使用Executors.newCachedThreadPool创建的executor,其线程池大小是Integer.MAX_VALUE,因此在使用的时候要注意,最好是改为有界队列,然后再加上线程池的监控。

doc

  • java.net.http javadoc

[case39]聊聊jdk httpclient的executor相关推荐

  1. java+connect+time+out_聊聊jdk httpclient的connect timeout异常

    序 本文主要研究一下httpclient的connect timeout异常 实例代码 @Test public void testConnectTimeout() throws IOExceptio ...

  2. 聊聊jdk http的HeaderFilter

    序 本文主要研究一下jdk http的HeaderFilter. FilterFactory java.net.http/jdk/internal/net/http/FilterFactory.jav ...

  3. 聊聊Jdk中你没听过的关键词-synthetic

    文章目录 前言 什么是synthetic? 作用和原理 产生的问题 什么是NBAC? 前言 为什么要讲讲synthetic和NBAC呢?其实在这之前,对Jdk中这两种机制并不了解,甚至没有听过,主要原 ...

  4. 聊聊storm TridentBoltExecutor的finishBatch方法

    序 本文主要研究一下storm TridentBoltExecutor的finishBatch方法 MasterBatchCoordinator.nextTuple storm-core-1.2.2- ...

  5. 【拿来吧你】JDK动态代理

    java proxy 因为最近一段时间准备将这几年做的一些业务和技术做个沉淀,也自己造的一些轮子,发现时不时就会需要用到动态代理和反射,所以今天打算先对jdk的动态代理这部分内容做个简单的整理 介绍 ...

  6. Java基础学习笔记(二)_Java核心技术(进阶)

    本篇文章的学习资源来自Java学习视频教程:Java核心技术(进阶)_华东师范大学_中国大学MOOC(慕课) 本篇文章的学习笔记即是对Java核心技术课程的总结,也是对自己学习的总结 文章目录 Jav ...

  7. JAVA面试100道必考题

    1.如下代码的执行结果: 4,1,11 @Testvoid demo01(){int i=1;i=i++;int j=i++;// i=2 j=1int k=i+ ++i * i++;//2+3*3= ...

  8. 2021秋招高频面经汇总(Java开发岗)

    1.G1为什么高吞吐量 G1多线程并行并发 2.B+树解决什么问题 解决查询遍历太深的问题 3.硬连接和软连接 硬连接:新建的文件是已经存在的文件的一个别名,当原文件删除时,新建的文件仍然可以使用. ...

  9. WebClient 原理及实践—官方原版

    WebClient是一种非阻塞.响应式客户端,用于执行HTTP请求.它在5.0中引入,并提供了RestTemplate的替代方案,支持同步.异步和流式场景. WebClient支持以下功能: 非阻塞I ...

最新文章

  1. 怎样实现企业管理系统的操作日志功能
  2. FuncT,TResult的使用方法(转载)
  3. Spring mvc,uploadifive 文件上传实践(转自:https://segmentfault.com/a/1190000004503262)
  4. easyui增删改查前段代码
  5. Android中获取当前位置的使用步骤
  6. 《学习之道》第九章不要突击工作
  7. unity两个项目合并 同名_表格合并,你还在复制粘贴?教你一键合并,超简单!...
  8. Unity3D之UGUI基础7:Scrollbar卷动条
  9. tensorflow和keras的关系
  10. Java 并发编程实战 -- 常见概念
  11. 魔兽世界API魔兽世界全局函数
  12. 机器学习与算法(6)--学习矢量化
  13. 【DA】单侧T检验p值与双侧T检验p值的关系
  14. 手机用蓝牙键盘好使吗_手机也可以连接键盘,你知道怎么使用吗?
  15. php pdo函数说明,PHP PDO函数库详解,pdo函数库详解
  16. 1,vue播放视频之—引入.m3u8后缀的hsl视频流
  17. mt4量化交易接口:分享日常量化选股方法
  18. 为什么说串行比并行快
  19. 服务器raid卡安装在什么位置,安装raid卡
  20. Java基于SpringBoot+Vue+nodejs的在线小说阅读平台 element

热门文章

  1. 从配置文件到分布式配置管理QConf
  2. block用法(转)
  3. 监听Settings的值的变化
  4. 企业环境下如何把ubuntu的dhcp改为静态IP
  5. 程序怎么才能把自己的删除掉?
  6. 解读:为何在今年的淘宝造物节上!AR直播火到如此程度?
  7. SSH访问控制,多次失败登录即封掉IP,防止暴力破解
  8. Oracle --存储过程,输入不定个数参数
  9. Vmware VsPhere下的VM如何安装Hyper-v服务
  10. 【百度地图API】小学生找哥哥——小学生没钱打车,所以此为公交查询功能