我的主题系列的另一个博客。 这次是关于线程池,尤其是可靠的线程池设置。 在Java中,线程池由Java 5中引入的ThreadPoolExecutor类实现。该类的Javadoc组织得很好。 因此,我不遗余力地在此处进行概述。 基本上, ThreadPoolExecutor的作用是创建和管理线程,这些线程处理由任意客户端提交到工作队列的可运行任务。 这是一种异步执行工作的机制,这在多核计算机和云计算时代是一项重要功能。

为了在广泛的上下文中有用, ThreadPoolExecutor提供了一些可调整的参数。 很好,但是这也让我们(开发人员)决定为我们的具体案例选择正确的设置。 这是ThreadPoolExecutor的最大构造函数。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { ... }

线程池类型

就资源消耗和所导致的系统稳定性而言,上面构造器中显示的某些参数非常明智。 根据构造函数的不同参数设置,可以区分线程池的一些基本类别。 这是Executors类提供的一些默认线程池设置。

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

在“缓存的线程池”中,线程数不受限制。 这是由于Integer.MAX_VALUE的maximumPoolSize与SynchronousQueue一起引起的。 如果将任务以突发方式提交到该线程池,则可能会为每个任务创建一个线程。 在这种情况下,创建的线程在空闲60秒后会终止。 第二个示例显示“固定线程池”,其中maximumPoolSize设置为特定的固定值。 池线程数永远不会超过该值。 如果任务突发,并且所有线程都忙,那么它们将在工作队列(此处为LinkedBlockingQueue )中排队。 此固定线程池中的线程永不消亡。 无限制池的缺点很明显:两种设置都可能导致JVM内存故障(如果幸运的话,会出现OutOfMemoryErrors )。

让我们看一下一些有限的线程池设置:

ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 50, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());ThreadPoolExecutor pool = new ThreadPoolExecutor(50, 50, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(100000));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

第一个代码段创建了一个受缓冲的线程池,其线程数限制为50。如果任务突发,并且所有线程都处于繁忙状态,则现在通过发出a来拒绝对ThreadPoolExecutor.execute()方法的调用。 RejectedExecutionException 。 通常这不是我通常想要的,因此我通过将rejected-execution-handler设置为CallerRunsPolicy来更改饱和策略。 此策略将工作推回给调用者。 也就是说,发出任务以异步执行的客户端线程现在将同步运行任务。 您可以通过实现自己的RejectedExecutionHandler来开发自己的饱和度策略。 第二个片段创建一个具有50个线程的固定线程池和一个工作队列,该工作队列的值限制为100000个任务。 如果工作队列已满,则饱和策略会将工作推回客户端。 高速缓存的池按需创建线程,如果线程空闲60秒,则终止线程。 固定池使线程保持活动状态。

线程池边界

如上所示,有两种定义线程池的基本方法:有界和无界线程池。 无限制的线程池(如Executors类的默认线程池)可以正常工作,只要您不突发地提交太多任务即可。 如果发生这种情况,无边界线程池可能会损害您的系统稳定性。 高速缓存的线程池创建了太多线程,或者固定线程池中有太多任务排队。 这封信较难实现,但仍有可能。 对于生产用途,最好将边界设置为一些有意义的值,例如最后两个线程池设置中的值。 因为定义那些“有意义的界限”可能很棘手,所以我开发了一个小程序对我有用。

/*** A class that calculates the optimal thread pool boundaries. It takes the desired target utilization and the desired* work queue memory consumption as input and retuns thread count and work queue capacity.* * @author Niklas Schlimm* */
public abstract class PoolSizeCalculator {/*** The sample queue size to calculate the size of a single {@link Runnable} element.*/private final int SAMPLE_QUEUE_SIZE = 1000;/*** Accuracy of test run. It must finish within 20ms of the testTime otherwise we retry the test. This could be* configurable.*/private final int EPSYLON = 20;/*** Control variable for the CPU time investigation.*/private volatile boolean expired;/*** Time (millis) of the test run in the CPU time calculation.*/private final long testtime = 3000;/*** Calculates the boundaries of a thread pool for a given {@link Runnable}.* * @param targetUtilization*            the desired utilization of the CPUs (0 <= targetUtilization <= 1)* @param targetQueueSizeBytes*            the desired maximum work queue size of the thread pool (bytes)*/protected void calculateBoundaries(BigDecimal targetUtilization, BigDecimal targetQueueSizeBytes) {calculateOptimalCapacity(targetQueueSizeBytes);Runnable task = creatTask();start(task);start(task); // warm up phaselong cputime = getCurrentThreadCPUTime();start(task); // test intervallcputime = getCurrentThreadCPUTime() - cputime;long waittime = (testtime * 1000000) - cputime;calculateOptimalThreadCount(cputime, waittime, targetUtilization);}private void calculateOptimalCapacity(BigDecimal targetQueueSizeBytes) {long mem = calculateMemoryUsage();BigDecimal queueCapacity = targetQueueSizeBytes.divide(new BigDecimal(mem), RoundingMode.HALF_UP);System.out.println("Target queue memory usage (bytes): " + targetQueueSizeBytes);System.out.println("createTask() produced " + creatTask().getClass().getName() + " which took " + mem+ " bytes in a queue");System.out.println("Formula: " + targetQueueSizeBytes + " / " + mem);System.out.println("* Recommended queue capacity (bytes): " + queueCapacity);}/*** Brian Goetz' optimal thread count formula, see 'Java Concurrency in Practice' (chapter 8.2)* * @param cpu*            cpu time consumed by considered task* @param wait*            wait time of considered task* @param targetUtilization*            target utilization of the system*/private void calculateOptimalThreadCount(long cpu, long wait, BigDecimal targetUtilization) {BigDecimal waitTime = new BigDecimal(wait);BigDecimal computeTime = new BigDecimal(cpu);BigDecimal numberOfCPU = new BigDecimal(Runtime.getRuntime().availableProcessors());BigDecimal optimalthreadcount = numberOfCPU.multiply(targetUtilization).multiply(new BigDecimal(1).add(waitTime.divide(computeTime, RoundingMode.HALF_UP)));System.out.println("Number of CPU: " + numberOfCPU);System.out.println("Target utilization: " + targetUtilization);System.out.println("Elapsed time (nanos): " + (testtime * 1000000));System.out.println("Compute time (nanos): " + cpu);System.out.println("Wait time (nanos): " + wait);System.out.println("Formula: " + numberOfCPU + " * " + targetUtilization + " * (1 + " + waitTime + " / "+ computeTime + ")");System.out.println("* Optimal thread count: " + optimalthreadcount);}/*** Runs the {@link Runnable} over a period defined in {@link #testtime}. Based on Heinz Kabbutz' ideas* (http://www.javaspecialists.eu/archive/Issue124.html).* * @param task*            the runnable under investigation*/public void start(Runnable task) {long start = 0;int runs = 0;do {if (++runs > 5) {throw new IllegalStateException("Test not accurate");}expired = false;start = System.currentTimeMillis();Timer timer = new Timer();timer.schedule(new TimerTask() {public void run() {expired = true;}}, testtime);while (!expired) {task.run();}start = System.currentTimeMillis() - start;timer.cancel();} while (Math.abs(start - testtime) > EPSYLON);collectGarbage(3);}private void collectGarbage(int times) {for (int i = 0; i < times; i++) {System.gc();try {Thread.sleep(10);} catch (InterruptedException e) {Thread.currentThread().interrupt();break;}}}/*** Calculates the memory usage of a single element in a work queue. Based on Heinz Kabbutz' ideas* (http://www.javaspecialists.eu/archive/Issue029.html).* * @return memory usage of a single {@link Runnable} element in the thread pools work queue*/public long calculateMemoryUsage() {BlockingQueue<Runnable> queue = createWorkQueue();for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {queue.add(creatTask());}long mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();long mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();queue = null;collectGarbage(15);mem0 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();queue = createWorkQueue();for (int i = 0; i < SAMPLE_QUEUE_SIZE; i++) {queue.add(creatTask());}collectGarbage(15);mem1 = Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory();return (mem1 - mem0) / SAMPLE_QUEUE_SIZE;}/*** Create your runnable task here.* * @return an instance of your runnable task under investigation*/protected abstract Runnable creatTask();/*** Return an instance of the queue used in the thread pool.* * @return queue instance*/protected abstract BlockingQueue<Runnable> createWorkQueue();/*** Calculate current cpu time. Various frameworks may be used here, depending on the operating system in use. (e.g.* http://www.hyperic.com/products/sigar). The more accurate the CPU time measurement, the more accurate the results* for thread count boundaries.* * @return current cpu time of current thread*/protected abstract long getCurrentThreadCPUTime();}

该程序将为您的工作队列的最大容量和所需的线程数找到理想的线程池边界。 该算法基于Brian Goetz和Heinz Kabutz博士的工作,您可以在Javadoc中找到引用。 计算固定线程池中的工作队列所需的容量相对简单。 您所需要的只是工作队列的期望目标大小(以字节为单位)除以提交的任务的平均大小(以字节为单位)。 不幸的是,计算最大线程数并不是一门精确的科学。 但是,如果在程序中使用公式,则可以避免工作队列太大和线程太多的有害极端情况。 计算理想的池大小取决于等待时间,以计算任务的时间比率。 等待时间越长,达到给定利用率所需的线程就越多。 PoolSizeCalculator需要所需的目标利用率和所需的最大工作队列内存消耗作为输入。 基于对对象大小和CPU时间的调查,它返回理想的设置,以获得最大线程数和线程池中的工作队列容量。

让我们来看一个例子。 以下代码片段显示了如何在1.0(= 100%)所需利用率和100000字节最大工作队列大小的场景下使用PoolSizeCalculator 。

public class MyPoolSizeCalculator extends PoolSizeCalculator {public static void main(String[] args) throws InterruptedException, InstantiationException, IllegalAccessException,ClassNotFoundException {MyThreadSizeCalculator calculator = new MyThreadSizeCalculator();calculator.calculateBoundaries(new BigDecimal(1.0), new BigDecimal(100000));}protected long getCurrentThreadCPUTime() {return ManagementFactory.getThreadMXBean().getCurrentThreadCpuTime();}protected Runnable creatTask() {return new AsynchronousTask(0, "IO", 1000000);}protected BlockingQueue createWorkQueue() {return new LinkedBlockingQueue<>();}}

MyPoolSizeCalculator扩展了抽象PoolSizeCalculator 。 您需要实现三种模板方法: getCurrentThreadCPUTime , creatTask和createWorkQueue 。 该代码段将标准Java管理扩展应用于CPU时间测量(第13行)。 如果JMX不够准确,则可以考虑其他框架(例如SIGAR API )。 当任务是同构且独立时,线程池最有效。 因此,createTask方法将创建一种类型的Runnable任务的实例(第17行)。 将研究此任务以计算等待时间与CPU时间的比率。 最后,我需要创建一个工作队列实例来计算已提交任务的内存使用情况(第21行)。 该程序的输出显示了工作队列容量和最大池大小(线程数)的理想设置。 这些是我在双核计算机上执行I / O密集型AsynchronousTask的结果。

Target queue memory usage (bytes): 100000
createTask() produced com.schlimm.java7.nio.threadpools.AsynchronousTask which took 40 bytes in a queue
Formula: 100000 / 40
* Recommended queue capacity (bytes): 2500
Number of CPU: 2
Target utilization: 1.0
Elapsed time (nanos): 3000000000
Compute time (nanos): 906250000
Wait time (nanos): 2093750000
Formula: 2 * 1.0 * (1 + 2093750000 / 906250000)
* Optimal thread count: 6.0

“推荐的队列容量”和“最佳线程数”是重要的值。 我的AsynchronousTask的理想设置如下:

ThreadPoolExecutor pool = new ThreadPoolExecutor(6, 6, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(2500));
pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

使用这些设置,您的工作队列不能增长到大于所需的100000字节。 而且,由于所需的利用率为1.0(100%),因此使池大于6个线程没有意义(等待时间与计算时间的比率为3 –对于每个计算时间间隔l,紧随其后的是三个等待时间间隔)。 程序的结果很大程度上取决于您处理的任务的类型。 如果任务是同质的并且计算量很大,则程序可能会建议将池大小设置为可用CPU的数量。 但是,如果任务具有等待时间,例如在I / O密集型任务中,程序将建议增加线程数以达到100%的利用率。 还要注意,某些任务在处理了一段时间后会更改其等待时间以计算时间比率,例如,如果I / O操作的文件大小增加了。 这个事实建议开发一个自调整线程池(我的后续博客之一)。 无论如何,您都应该使线程池的大小可配置,以便可以在运行时进行调整。

好吧,目前就强大的线程池而言。 希望您喜欢它。 如果最大池大小的公式不是100%准确,也不要怪我。 正如我所说,这不是一门精确的科学,它是关于理想池大小的想法。

JCG合作伙伴的 参考资料: “线程故事:关于健壮的线程池”   尼克拉斯。

翻译自: https://www.javacodegeeks.com/2012/03/threading-stories-about-robust-thread.html

线程故事:关于健壮的线程池相关推荐

  1. threadlocal线程_线程故事:Web应用程序中的ThreadLocal

    threadlocal线程 本周,我花了一些合理的时间来消除Web应用程序中的所有ThreadLocal变量. 原因是他们造成了类加载器泄漏,我们不能再适当地取消部署我们的应用程序. 取消部署应用程序 ...

  2. 线程故事:Web应用程序中的ThreadLocal

    本周,我花了一些合理的时间来消除Web应用程序中的所有ThreadLocal变量. 原因是他们造成了类加载器泄漏,我们不能再适当地取消部署我们的应用程序. 取消部署应用程序后,当GC根目录继续引用应用 ...

  3. 线程及同步的性能 – 线程池/ ThreadPoolExecutors/ ForkJoinPool

    线程池和ThreadPoolExecutors 虽然在程序中可以直接使用Thread类型来进行线程操作,但是更多的情况是使用线程池,尤其是在Java EE应用服务器中,一般会使用若干个线程池来处理来自 ...

  4. 一个线程池中的线程异常了,那么线程池会怎么处理这个线程?

    一个线程池中的线程异常了,那么线程池会怎么处理这个线程? 参考文章: (1)一个线程池中的线程异常了,那么线程池会怎么处理这个线程? (2)https://www.cnblogs.com/fangua ...

  5. C#线程系列讲座(3):线程池和文件下载服务器

    如果设计一个服务器程序,每当处理用户请求时,都开始一个线程,将会在一定程序上消耗服务器的资源.为此,一个最好的解决方法就是在服务器启动之前,事先创建一些线程对象,然后,当处理客户端请求时,就从这些建好 ...

  6. java——自己实现基础的线程池及带有任务数过多拒绝策略、线程池销毁、自动扩充线程数量及闲时自动回收线程等操作的改进版线程池

    1. 实现第一版基础的线程池 1.1 首先我们定义一个线程池类ThreadPool,然后线程池有一个容器存放我们创建的线程,另一个容器则是存放当前线程池需要处理的任务队列,线程容器用ArrayList ...

  7. java线程提交_从Java线程到线程池

    线程模型 线程模型分为两类,用户级线程(ULT)和内核级线程(KLT) 用户级线程(ULT):user level threads,系统内核对ULT无感知,线程的创建和调度都由用户级APP进程管理:即 ...

  8. future.cancel不能关闭线程_彻底弄懂线程池-newFixedThreadPool实现线程池

    public class ExecutorServiceTest { public static void main(String[] args) throws IOException, Interr ...

  9. python3 线程池监控线程是否停止工作_Python线程池——个人总结,如果你不喜欢就不要喷,勿...

    有bug,请在评论区留下你得真言,谢谢 concurrent.futures 包含线程池和进程池,目前只记录线程池 ThreadPoolExecutor的使用 小二,上代码~ from concurr ...

最新文章

  1. 中国电子学会图形化四级编程题:绳子算法
  2. Win7+Ubuntu双系统,如何卸载Ubuntu系统?
  3. python3 拼接字符串的7种方法
  4. San CLI 4.0 升级指南
  5. iOS很重要的 block回调
  6. 从fastjson的TypeReference用法,推导如何实现泛型反射
  7. 有关有效企业测试的视频课程
  8. python 获取文件夹所有文件列表_python获取文件夹下所有文件及os模块方法
  9. php for 循环 try_PHP基础案例四:展示学生列表
  10. jQuery链式操作[转]
  11. 为什么同样是200M宽带,移动可以不要钱,联通却要1000多?
  12. dubbo注册中心介绍
  13. JavaEE学习05--cookiesession
  14. lib文件夹 springboot_我把 Spring Boot 项目从 18.18M 瘦身到 0.18M,部署起来真省事!...
  15. 使用python模拟实现PID控制算法
  16. 专业音频术语中英文对照
  17. 一张图理清SpringMVC工作原理
  18. react ssr php,从零开始搭建React同构应用(三):配置SSR
  19. python3调用谷歌翻译_使用python3调用谷歌翻译成中文
  20. 如何删除数据库中的冗余数据

热门文章

  1. spring(2)装配Bean
  2. 事务的状态(状态模式)
  3. spring react_使用Spring WebFlux构建React性REST API –第3部分
  4. 网页益智游戏怎么制作_休息一下,或者:如何使用Java 12制作出色的益智游戏...
  5. java中无法推断类型参数_Java 10中的本地类型推断,或者如果它像鸭子一样嘎嘎叫...
  6. java替换数组中的元素_如何使用Java 8流快速替换列表中的元素
  7. maven将第三方依赖_如何将商业第三方文物整合到您的Maven版本中
  8. jvm7 jvm8_JVM PermGen –您在哪里?
  9. 复合主键 复合外键_复合双重错误
  10. log4j性能 slf4j_Log4j 2:性能接近疯狂