池技术是作为一个架构师必须深刻理解的东西,比如线程池,连接池,对象池,内存池等。

首先需要问一个问题:在c/c++ 编程中,你是如何操作一个任务的或者给一个线程添加任务的?如果你很清楚,那么你知道Java中runnable和thread的区别吗?

Java中没有指针,那么没有办法给一个线程直接传方法指针(传递方法,作为任务),那么直接可以传对象,Runnable对象。线程拿到runnable对象的时候,就会调用run()方法。

Java中的线程池其实没有什么高深的思想的,无非就是线程管理的算法。为什么需要线程池?????

说白了,就是存活一定数量的线程,大量并发任务交给他们处理,线程数量可控,线程不是动态的创建销毁,而是一直存活在池中。

在Java中,如果每当一个请求到达就创建一个新线程,开销是相当大的。在实际使用中,每个请求创建新线程的服务器在创建和销毁线程上花费的时间和消耗的系统资源,甚至可能要比花在处理实际的用户请求的时间和资源要多得多。除了创建和销毁线程的开销之外,活动的线程也需要消耗系统资源。如果在一个JVM里创建太多的线程,可能会导致系统由于过度消耗内存或“切换过度”而导致系统资源不足。为了防止资源不足,服务器应用程序需要一些办法来限制任何给定时刻处理的请求数目,尽可能减少创建和销毁线程的次数,特别是一些资源耗费比较大的线程的创建和销毁,尽量利用已有对象来进行服务,这就是“池化资源”技术产生的原因。 

这篇文章写了好久,回头看看,当时对于线程池的理解虽然都是基于别人的话语中,但是主要的点还是说到了;比如线程池避免线程的过度创建(对于系统的稳定性危害极大),避免只针对任务的执行创建和销毁线程,这样对系统资源就是很大的浪费;线程的创建和销毁是很耗资源的(有时甚至比你处理任务的消耗的内存和计算还要多,而且响应时间也延时严重)。对于针对任务,设计线程池的线程数目大小也是线程池设计极为关键的考量。对于软件的掌握深度,其实不容易,在于你操作的经验,在于你对于底层的理解深度;不然你掌握的效率有极大的折扣,学习任何一门科学都是这样,这就是在没有深度的条件下学习,效率就是回环线,收获很小。

线程池主要用来解决线程生命周期开销问题和资源不足问题。通过对多个任务重用线程,线程创建的开销就被分摊到了多个任务上了,而且由于在请求到达时线程已经存在,所以消除了线程创建所带来的延迟。这样,就可以立即为请求服务,使应用程序响应更快。另外,通过适当地调整线程池中的线程数目可以防止出现资源不足的情况。缺点:当任务较少的时候还是每一个任务对应创建一个线程,任务较大的时候,有大量的任务在等待。

说说Java中的java.util.concurrent.ThreadPoolExecutor:

 java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。

  在ThreadPoolExecutor类中提供了四个构造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {.....public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);...
}

从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。

  下面解释下一下构造器中各个参数的含义:

  • corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
  • maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
  • keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
  • unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //纳秒
  • workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。

  • threadFactory:线程工厂,主要用来创建线程;
  • handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务 

  具体参数的配置与线程池的关系将在下一节讲述。

  从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:

public abstract class AbstractExecutorService implements ExecutorService {protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };public Future<?> submit(Runnable task) {};public <T> Future<T> submit(Runnable task, T result) { };public <T> Future<T> submit(Callable<T> task) { };private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos)throws InterruptedException, ExecutionException, TimeoutException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException {};public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException {};public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException {};
}

 AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。

  我们接着看ExecutorService接口的实现:

public interface ExecutorService extends Executor {void shutdown();boolean isShutdown();boolean isTerminated();boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException;<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

  而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:

public interface Executor {void execute(Runnable command);
}

到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。

  Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;

  然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;

  抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;

  然后ThreadPoolExecutor继承了类AbstractExecutorService。

  在ThreadPoolExecutor类中有几个非常重要的方法:

execute()
submit()
shutdown()
shutdownNow()

 execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

  shutdown()和shutdownNow()是用来关闭线程池的。

  还有很多其他的方法:

  比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等获取与线程池相关属性的方法,有兴趣的朋友可以自行查阅API。

-----------------------------至此 Java.util.ThreadPoolExecutor 源码解析完成--------------------------------

其实现原理:

1.线程池状态

在ThreadPoolExecutor中定义了一个volatile变量,另外定义了几个static final变量表示线程池的各个状态:

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

runState表示当前线程池的状态,它是一个volatile变量用来保证线程之间的可见性;下面的几个static final变量表示runState可能的几个取值。当创建线程池后,初始时,线程池处于RUNNING状态;如果调用了shutdown()方法,则线程池处于SHUTDOWN状态,此时线程池不能够接受新的任务,它会等待所有任务执行完毕;如果调用了shutdownNow()方法,则线程池处于STOP状态,此时线程池不能接受新的任务,并且会去尝试终止正在执行的任务;当线程池处于SHUTDOWN或STOP状态,并且所有工作线程已经销毁,任务缓存队列已经清空或执行结束后,线程池被设置为TERMINATED状态。

2.任务的执行

在了解将任务提交给线程池到任务执行完毕整个过程之前,我们先来看一下ThreadPoolExecutor类中其他的一些比较重要成员变量:

private final BlockingQueue<Runnable> workQueue;              //任务缓存队列,用来存放等待执行的任务
private final ReentrantLock mainLock = new ReentrantLock();   //线程池的主要状态锁,对线程池状态(比如线程池大小//、runState等)的改变都要使用这个锁
private final HashSet<Worker> workers = new HashSet<Worker>();  //用来存放工作集private volatile long  keepAliveTime;    //线程存活时间
private volatile boolean allowCoreThreadTimeOut;   //是否允许为核心线程设置存活时间
private volatile int   corePoolSize;     //核心池的大小(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int   maximumPoolSize;   //线程池最大能容忍的线程数private volatile int   poolSize;       //线程池中当前的线程数private volatile RejectedExecutionHandler handler; //任务拒绝策略private volatile ThreadFactory threadFactory;   //线程工厂,用来创建线程private int largestPoolSize;   //用来记录线程池中曾经出现过的最大线程数private long completedTaskCount;   //用来记录已经执行完毕的任务个数

corePoolSize在很多地方被翻译成核心池大小,其实我的理解这个就是线程池的大小。举个简单的例子:
假如有一个工厂,工厂里面有10个工人,每个工人同时只能做一件任务。
因此只要当10个工人中有工人是空闲的,来了任务就分配给空闲的工人做;当10个工人都有任务在做时,如果还来了任务,就把任务进行排队等待;
如果说新任务数目增长的速度远远大于工人做任务的速度,那么此时工厂主管可能会想补救措施,比如重新招4个临时工人进来;
然后就将任务也分配给这4个临时工人做;如果说着14个工人做任务的速度还是不够,此时工厂主管可能就要考虑不再接收新的任务或者抛弃前面的一些任务了。
当这14个工人当中有人空闲时,而新任务增长的速度又比较缓慢,工厂主管可能就考虑辞掉4个临时工了,只保持原来的10个工人,毕竟请额外的工人是要花钱的。
这个例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。
也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
不过为了方便理解,在本文后面还是将corePoolSize翻译成核心池大小。
largestPoolSize只是一个用来起记录作用的变量,用来记录线程池中曾经有过的最大线程数目,跟线程池的容量没有任何关系。

下面我们进入正题,看一下任务从提交到最终执行完毕经历了哪些过程。

在ThreadPoolExecutor类中,最核心的任务提交方法是execute()方法,虽然通过submit也可以提交任务,但是实际上submit方法里面最终调用的还是execute()方法,所以我们只需要研究execute()方法的实现原理即可:

public void execute(Runnable command) {if (command == null)throw new NullPointerException();if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {if (runState == RUNNING && workQueue.offer(command)) {if (runState != RUNNING || poolSize == 0)ensureQueuedTaskHandled(command);}else if (!addIfUnderMaximumPoolSize(command))reject(command); // is shutdown or saturated}
}

上面的代码可能看起来不是那么容易理解,下面我们一句一句解释:

  首先,判断提交的任务command是否为null,若是null,则抛出空指针异常;

  接着是这句,这句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

   由于是或条件运算符,所以先计算前半部分的值,如果线程池中当前线程数不小于核心池大小,那么就会直接进入下面的if语句块了。

  如果线程池中当前线程数小于核心池大小,则接着执行后半部分,也就是执行:

addIfUnderCorePoolSize(command)

  如果执行完addIfUnderCorePoolSize这个方法返回false,则继续执行下面的if语句块,否则整个方法就直接执行完毕了。

  如果执行完addIfUnderCorePoolSize这个方法返回false,然后接着判断:

if (runState == RUNNING && workQueue.offer(command))

  如果当前线程池处于RUNNING状态,则将任务放入任务缓存队列;如果当前线程池不处于RUNNING状态或者任务放入缓存队列失败,则执行:

addIfUnderMaximumPoolSize(command)

  如果执行addIfUnderMaximumPoolSize方法失败,则执行reject()方法进行任务拒绝处理。

  回到前面:

if (runState == RUNNING && workQueue.offer(command))

  这句的执行,如果说当前线程池处于RUNNING状态且将任务放入任务缓存队列成功,则继续进行判断:

if (runState != RUNNING || poolSize == 0)

  这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是这样就执行:

ensureQueuedTaskHandled(command)

  进行应急处理,从名字可以看出是保证 添加到任务缓存队列中的任务得到处理。

  我们接着看2个关键方法的实现:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < corePoolSize && runState == RUNNING)t = addThread(firstTask);        //创建线程去执行firstTask任务   } finally {mainLock.unlock();}if (t == null)return false;t.start();return true;
}

  这个是addIfUnderCorePoolSize方法的具体实现,从名字可以看出它的意图就是当低于核心池大小时执行的方法。下面看其具体实现,首先获取到锁,因为这地方涉及到线程池状态的变化,先通过if语句判断当前线程池中的线程数目是否小于核心池大小,有朋友也许会有疑问:前面在execute()方法中不是已经判断过了吗,只有线程池当前线程数目小于核心池大小才会执行addIfUnderCorePoolSize方法的,为何这地方还要继续判断?原因很简单,前面的判断过程中并没有加锁,因此可能在execute方法判断的时候poolSize小于corePoolSize,而判断完之后,在其他线程中又向线程池提交了任务,就可能导致poolSize不小于corePoolSize了,所以需要在这个地方继续判断。然后接着判断线程池的状态是否为RUNNING,原因也很简单,因为有可能在其他线程中调用了shutdown或者shutdownNow方法。然后就是执行

t = addThread(firstTask);

  这个方法也非常关键,传进去的参数为提交的任务,返回值为Thread类型。然后接着在下面判断t是否为空,为空则表明创建线程失败(即poolSize>=corePoolSize或者runState不等于RUNNING),否则调用t.start()方法启动线程。

  我们来看一下addThread方法的实现:

private Thread addThread(Runnable firstTask) {Worker w = new Worker(firstTask);Thread t = threadFactory.newThread(w);  //创建一个线程,执行任务   if (t != null) {w.thread = t;            //将创建的线程的引用赋值为w的成员变量       workers.add(w);int nt = ++poolSize;     //当前线程数加1       if (nt > largestPoolSize)largestPoolSize = nt;}return t;
}

  在addThread方法中,首先用提交的任务创建了一个Worker对象,然后调用线程工厂threadFactory创建了一个新的线程t,然后将线程t的引用赋值给了Worker对象的成员变量thread,接着通过workers.add(w)将Worker对象添加到工作集当中。

  下面我们看一下Worker类的实现:

private final class Worker implements Runnable {private final ReentrantLock runLock = new ReentrantLock();private Runnable firstTask;volatile long completedTasks;Thread thread;Worker(Runnable firstTask) {this.firstTask = firstTask;}boolean isActive() {return runLock.isLocked();}void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) {try {if (thread != Thread.currentThread())thread.interrupt();} finally {runLock.unlock();}}}void interruptNow() {thread.interrupt();}private void runTask(Runnable task) {final ReentrantLock runLock = this.runLock;runLock.lock();try {if (runState < STOP &&Thread.interrupted() &&runState >= STOP)boolean ran = false;beforeExecute(thread, task);   
            //beforeExecute方法是ThreadPoolExecutor类的一个方法,没有具体实现,用户可以根据//自己需要重载这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等           try {task.run();ran = true;afterExecute(task, null);++completedTasks;} catch (RuntimeException ex) {if (!ran)afterExecute(task, ex);throw ex;}} finally {runLock.unlock();}}public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this);   //当任务队列中没有任务时,进行清理工作       }}
}

  它实际上实现了Runnable接口,因此上面的Thread t = threadFactory.newThread(w);效果跟下面这句的效果基本一样:

Thread t = new Thread(w);

  相当于传进去了一个Runnable任务,在线程t中执行这个Runnable。

  既然Worker实现了Runnable接口,那么自然最核心的方法便是run()方法了:

public void run() {try {Runnable task = firstTask;firstTask = null;while (task != null || (task = getTask()) != null) {runTask(task);task = null;}} finally {workerDone(this);}
}

  从run方法的实现可以看出,它首先执行的是通过构造器传进来的任务firstTask,在调用runTask()执行完firstTask之后,在while循环里面不断通过getTask()去取新的任务来执行,那么去哪里取呢?自然是从任务缓存队列里面去取,getTask是ThreadPoolExecutor类中的方法,并不是Worker类中的方法,下面是getTask方法的实现:

Runnable getTask() {for (;;) {try {int state = runState;if (state > SHUTDOWN)return null;Runnable r;if (state == SHUTDOWN)  // Help drain queuer = workQueue.poll();else if (poolSize > corePoolSize || allowCoreThreadTimeOut) 
                //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,//则通过poll取任务,若等待一定的时间取不到任务,则返回nullr = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);elser = workQueue.take();if (r != null)return r;if (workerCanExit()) {    //如果没取到任务,即r为null,则判断当前的worker是否可以退出if (runState >= SHUTDOWN) // Wake up othersinterruptIdleWorkers();   //中断处于空闲状态的workerreturn null;}// Else retry} catch (InterruptedException ie) {// On interruption, re-check runState}}
}

在getTask中,先判断当前线程池状态,如果runState大于SHUTDOWN(即为STOP或者TERMINATED),则直接返回null。

  如果runState为SHUTDOWN或者RUNNING,则从任务缓存队列取任务。

  如果当前线程池的线程数大于核心池大小corePoolSize或者允许为核心池中的线程设置空闲存活时间,则调用poll(time,timeUnit)来取任务,这个方法会等待一定的时间,如果取不到任务就返回null。

  然后判断取到的任务r是否为null,为null则通过调用workerCanExit()方法来判断当前worker是否可以退出,我们看一下workerCanExit()的实现:

private boolean workerCanExit() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();boolean canExit;//如果runState大于等于STOP,或者任务缓存队列为空了//或者  允许为核心池线程设置空闲存活时间并且线程池中的线程数目大于1try {canExit = runState >= STOP ||workQueue.isEmpty() ||(allowCoreThreadTimeOut &&poolSize > Math.max(1, corePoolSize));} finally {mainLock.unlock();}return canExit;
}

  也就是说如果线程池处于STOP状态、或者任务队列已为空或者允许为核心池线程设置空闲存活时间并且线程数大于1时,允许worker退出。如果允许worker退出,则调用interruptIdleWorkers()中断处于空闲状态的worker,我们看一下interruptIdleWorkers()的实现:

void interruptIdleWorkers() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (Worker w : workers)  //实际上调用的是worker的interruptIfIdle()方法w.interruptIfIdle();} finally {mainLock.unlock();}
}

  从实现可以看出,它实际上调用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {final ReentrantLock runLock = this.runLock;if (runLock.tryLock()) {    
   //注意这里,是调用tryLock()来获取锁的,因为如果当前worker正在执行任务,锁已经被获取了,是无法获取到锁的                               
   //如果成功获取了锁,说明当前worker处于空闲状态try {if (thread != Thread.currentThread())  thread.interrupt();} finally {runLock.unlock();}}
}

  这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程去任务缓存队列里面取任务来执行。

  我们再看addIfUnderMaximumPoolSize方法的实现,这个方法的实现思想和addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {Thread t = null;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (poolSize < maximumPoolSize && runState == RUNNING)t = addThread(firstTask);} finally {mainLock.unlock();}if (t == null)return false;t.start();return true;
}

  看到没有,其实它和addIfUnderCorePoolSize方法的实现基本一模一样,只是if语句判断条件中的poolSize < maximumPoolSize不同而已。

  到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含义;

  2)其次,要知道Worker是用来起到什么作用的;

  3)要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:

  • 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
  • 如果当前线程池中的线程数目>=corePoolSize(其实就是=coresize),则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务,直到达到maximumPoolSize;
  • 如果当前线程池中的线程数目达到maximumPoolSize,且排队队列已经满了,则会采取任务拒绝策略进行处理;
  • 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

3.线程池中的线程初始化

  默认情况下,创建线程池之后,线程池中是没有线程的,需要提交任务之后才会创建线程。

  在实际中如果需要线程池创建之后立即创建线程,可以通过以下两个方法办到:

  • prestartCoreThread():初始化一个核心线程;
  • prestartAllCoreThreads():初始化所有核心线程

  下面是这2个方法的实现:

public boolean prestartCoreThread() {return addIfUnderCorePoolSize(null); //注意传进去的参数是null
}public int prestartAllCoreThreads() {int n = 0;while (addIfUnderCorePoolSize(null))//注意传进去的参数是null++n;return n;
}

  注意上面传进去的参数是null,根据第2小节的分析可知如果传进去的参数为null,则最后执行线程会阻塞在getTask方法中的

r = workQueue.take();

  即等待任务队列中有任务。

4.任务缓存队列及排队策略

  在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。

  workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:

  1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;

  2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;

  3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。

5.任务拒绝策略

  当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

6.线程池的关闭

  ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
  • shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

7.线程池容量的动态调整

  ThreadPoolExecutor提供了动态调整线程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:设置核心池大小
  • setMaximumPoolSize:设置线程池最大能创建的线程数目大小

  当上述参数从小变大时,ThreadPoolExecutor进行线程赋值,还可能立即创建新的线程来执行任务。

8.    合理的配置线程池

要想合理的配置线程池,就必须首先分析任务特性,可以从以下几个角度来进行分析:

  1. 任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
  2. 任务的优先级:高,中和低。
  3. 任务的执行时间:长,中和短。
  4. 任务的依赖性:是否依赖其他系统资源,如数据库连接。

任务性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务配置尽可能少的线程数量,如配置Ncpu+1个线程的线程池。IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,如果等待的时间越长CPU空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。

建议使用有界队列,有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点,比如几千。有一次我们组使用的后台任务线程池的队列和线程池全满了,不断的抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞住,任务积压在线程池里。如果当时我们设置成无界队列,线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然我们的系统所有的任务是用的单独的服务器部署的,而我们使用不同规模的线程池跑不同类型的任务,但是出现这样问题时也会影响到其他任务。

9.    线程池的监控

通过线程池提供的参数进行监控。线程池里有一些属性在监控线程池的时候可以使用

  • taskCount:线程池需要执行的任务数量。
  • completedTaskCount:线程池在运行过程中已完成的任务数量。小于或等于taskCount。
  • largestPoolSize:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。
  • getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。
  • getActiveCount:获取活动的线程数。

通过扩展线程池进行监控。通过继承线程池并重写线程池的beforeExecute,afterExecute和terminated方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:

view sourceprint?
1 protected void beforeExecute(Thread t, Runnable r) { }

java 线程池 -- (Java并发)相关推荐

  1. java线程池_Java 并发编程 线程池源码实战

    作者 | 马启航 杏仁后端工程师.「我头发还多,你们呢?」 一.概述 笔者在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写 ...

  2. JAVA线程池_并发队列工作笔记0002---认识线程池_在线程池中使用队列

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 上面是线程的执行周期 这个线程的生命周期,可以看到时间都浪费在了创建和销毁的这里了. 实际上执行业 ...

  3. JAVA线程池_并发队列工作笔记0004---Callable原理_多线程执行Callable任务

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 注意之前咱们说的线程池队列中都是用的实现了Runnbale接口的线程, 这里咱们用的是实现了Cal ...

  4. JAVA线程池_并发队列工作笔记0003---线程池的分类_可缓存线程池_定长线程池_定时线程池_单例线程池

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 这里说线程池的分类 有可缓存类型, 定长类型, 定时类型, 单例类型, 这里我这次用Executo ...

  5. JAVA线程池_并发队列工作笔记0001---认识阻塞队列_非阻塞队列

    技术交流QQ群[JAVA,C++,Python,.NET,BigData,AI]:170933152 注意这个ConcurrentLinkedQueue这个是个非阻塞队列, 这个队列是没有长度限制的, ...

  6. 四种Java线程池用法解析

    四种Java线程池用法解析 本文为大家分析四种Java线程池用法,供大家参考,具体内容如下 http://www.jb51.net/article/81843.htm 1.new Thread的弊端 ...

  7. 面试题:四种Java线程池用法解析 !=!=未看

    1.new Thread的弊端 执行一个异步任务你还只是如下new Thread吗? 1 2 3 4 5 6 7 8 new Thread(new Runnable() {   @Override   ...

  8. java线程池使用最全详解

    线程池使用 前言 在执行一个异步任务或并发任务时,往往是通过直接new Thread()方法来创建新的线程,这样做弊端较多,更好的解决方案是合理地利用线程池,线程池的优势很明显,如下: 降低系统资源消 ...

  9. java线程池拒绝策略_Java核心知识 多线程并发 线程池原理(二十三)

    线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后 启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕, 再从队列中取出任务来执行.他 ...

  10. java线程池_Java多线程并发:线程基本方法+线程池原理+阻塞队列原理技术分享...

    线程基本方法有哪些? 线程相关的基本方法有 wait,notify,notifyAll,sleep,join,yield 等. 线程等待(wait) 调用该方法的线程进入 WAITING 状态,只有等 ...

最新文章

  1. PowerDesigner的一下简单使用技巧
  2. 线程同步 – lock和Monitor
  3. 使用java底层实现邮件的发送(含测试,源码)
  4. 【Code-Snippet】TextView
  5. VS2010 + C#4.0使用 async + await
  6. 华为云linux用户名,玩转华为云服务器ECS:001 如何登录并创建新用户
  7. 第一个神经网络代码分享
  8. 鼠标双击检测_雷蛇巴塞利斯蛇无线游戏鼠标评测:青出于蓝胜于蓝
  9. 【转】飞鸽端口号被占用时的解决方法
  10. keystone系列二:HTTP协议
  11. lua 远程调试 【zeroBrane 使用mobdebug】(good转)
  12. java贪吃蛇程序v1
  13. python转exe遇到的坑及解决方案
  14. 根据《程序员竞争力矩阵》的自我评价
  15. visio2019和Mathtype7.0版本冲突,出现VBE6EXT.OLB不能被加载
  16. vs2015遇到找不到kernel32.lib,无法解析的外部符号 __imp__printf的问题
  17. Linux自启进程管理工具,Linux进程管理工具--God-详解(1)-入门
  18. 【Babel】1186- 保姆级教学!这次一定学会 Babel 插件开发!
  19. 利用qq邮箱作为个人邮件服务器发送邮件
  20. java timer schedule_Java Timer的使用,timer.schedule定时执行

热门文章

  1. xfce4面板消失了解决方案+xfce4的面板保存设置以及读取
  2. oracle的脚本语言,Oracle 无法执行SQL脚本语句
  3. python 单例模式的实现方法_python中单例模式的四种实现方式
  4. 微软开源 MS-DOS 1.25 和 MS-DOS 2.0
  5. 在python下比celery更加简单的异步任务队列RQ
  6. Spring Cloud自定义Hystrix请求命令
  7. 【uva 1617】Laptop(算法效率--贪心,2种理解)
  8. python web cgi
  9. Robot Framework 使用1-环境配置及简单网站兼容性测试(转)
  10. 更改UISearchBar button属性