


  SingleThreadPool
  CachedPool
  FixedThreadPool
  ScheduledPool


  WorkStealingPool


SingleThreadPool 这个线程池里面只有一个线程。这样可以保证 我们扔进去的任务是被顺序执行的


package com.mashibing.juc.c_026_01_ThreadPool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class T07_SingleThreadPool {public static void main(String[] args) {ExecutorService service = Executors.newSingleThreadExecutor();for (int i = 0; i < 5; i++) {final int j = i;service.execute(() -> {System.out.println(j + " " + Thread.currentThread().getName());});}}


CachedPool 核心线程数为0,最大线程数是Integer.MAX_VALUE


SynchronousQueue 是一个阻塞队列,其中每个插入操作必须等待另一个线程执行相应的删除操作,反之亦然。同步队列没有任何内部容量,甚至容量都不是1。您无法查看同步队列,因为只有当您试图删除某个元素时,它才会出现;你不能插入元素(使用任何方法),除非另一个线程试图删除它;你不能迭代,因为没有东西可以迭代。队列的头是第一个排队插入线程试图添加到队列中的元素;如果没有这样的排队线程,那么就没有可删除的元素,poll()将返回null。对于其他集合方法(例如包含),SynchronousQueue充当空集合。此队列不允许空元素。

CachedPool 这个线程池,当任务到来时,如果有线程空闲,我就用现有的线程;如果所有线程忙,就启动一个新线程。


package com.mashibing.juc.c_026_01_ThreadPool;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class T08_CachedPool {public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newCachedThreadPool();System.out.println(service);for (int i = 0; i < 2; i++) {service.execute(() -> {try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());});}System.out.println(service);TimeUnit.SECONDS.sleep(80);System.out.println(service);}


java.util.concurrent.ThreadPoolExecutor@6aaa5eb0[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@6aaa5eb0[Running, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 0]
java.util.concurrent.ThreadPoolExecutor@6aaa5eb0[Running, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 2]Process finished with exit code 0


FixedThreadPool 是一个固定线程数的线程池。



/*** 线程池的概念* nasa*/
package com.mashibing.juc.c_026_01_ThreadPool;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;public class T09_FixedThreadPool {public static void main(String[] args) throws InterruptedException, ExecutionException {long start = System.currentTimeMillis();getPrime(1, 200000);long end = System.currentTimeMillis();System.out.println(end - start);final int cpuCoreNum = 4;ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);MyTask t1 = new MyTask(1, 80000); //1-5 5-10 10-15 15-20MyTask t2 = new MyTask(80001, 130000);MyTask t3 = new MyTask(130001, 170000);MyTask t4 = new MyTask(170001, 200000);Future<List<Integer>> f1 = service.submit(t1);Future<List<Integer>> f2 = service.submit(t2);Future<List<Integer>> f3 = service.submit(t3);Future<List<Integer>> f4 = service.submit(t4);start = System.currentTimeMillis();f1.get();f2.get();f3.get();f4.get();end = System.currentTimeMillis();System.out.println(end - start);}static class MyTask implements Callable<List<Integer>> {int startPos, endPos;MyTask(int s, int e) {this.startPos = s;this.endPos = e;}@Overridepublic List<Integer> call() throws Exception {List<Integer> r = getPrime(startPos, endPos);return r;}}static boolean isPrime(int num) {for (int i = 2; i <= num / 2; i++) {if (num % i == 0) return false;}return true;}static List<Integer> getPrime(int start, int end) {List<Integer> results = new ArrayList<>();for (int i = start; i <= end; i++) {if (isPrime(i)) results.add(i);}return results;}



CachedThreadPool 和 FixedThreadPool 的选用

如果任务来的速度忽快忽慢,但是我要保证任务来的时候有人来做这个任务,那么我们可以使用 CachedThreadPool,保证任务不会堆积。





package com.mashibing.juc.c_026_01_ThreadPool;import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;public class T10_ScheduledPool {public static void main(String[] args) {ScheduledExecutorService service = Executors.newScheduledThreadPool(4);service.scheduleAtFixedRate(() -> {try {TimeUnit.MILLISECONDS.sleep(new Random().nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName());}, 0, 500, TimeUnit.MILLISECONDS);}


package com.mashibing.juc.c_026_01_ThreadPool;import java.util.concurrent.*;public class T14_MyRejectedHandler {public static void main(String[] args) {ExecutorService service = new ThreadPoolExecutor(4, 4,0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(6),Executors.defaultThreadFactory(),new MyHandler());}static class MyHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {//log("r rejected")//save r kafka mysql redis//try 3 timesif (executor.getQueue().size() < 10000) {//try put again();}}}



// 1. `ctl`,可以看做一个int类型的数字,高3位表示线程池状态,低29位表示worker数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 2. `COUNT_BITS`,`Integer.SIZE`为32,所以`COUNT_BITS`为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 3. `CAPACITY`,线程池允许的最大线程数。1左移29位,然后减1,即为 2^29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;// Packing and unpacking ctl
// 5. `runStateOf()`,获取线程池状态,通过按位与操作,低29位将全部变成0
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 6. `workerCountOf()`,获取线程池worker数量,通过按位与操作,高3位将全部变成0
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 7. `ctlOf()`,根据线程池状态和线程池worker数量,生成ctl值
private static int ctlOf(int rs, int wc) { return rs | wc; }/** Bit field accessors that don't require unpacking ctl.* These depend on the bit layout and on workerCount being never negative.*/
// 8. `runStateLessThan()`,线程池状态小于xx
private static boolean runStateLessThan(int c, int s) {return c < s;
// 9. `runStateAtLeast()`,线程池状态大于等于xx
private static boolean runStateAtLeast(int c, int s) {return c >= s;


public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {// 基本类型参数校验if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();// 空指针校验if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;// 根据传入参数`unit`和`keepAliveTime`,将存活时间转换为纳秒存到变量`keepAliveTime `中this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;


public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();// worker数量比核心线程数小,直接创建worker执行任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}// worker数量超过核心线程数,任务直接进入队列if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();// 线程池状态不是RUNNING状态,说明执行过shutdown命令,需要对新加入的任务执行reject()操作。// 这儿为什么需要recheck,是因为任务入队列前后,线程池的状态可能会发生变化。if (! isRunning(recheck) && remove(command))reject(command);// 这儿为什么需要判断0值,主要是在线程池构造方法中,核心线程数允许为0else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 如果线程池不是运行状态,或者任务进入队列失败,则尝试创建worker执行任务。// 这儿有3点需要注意:// 1. 线程池不是运行状态时,addWorker内部会判断线程池状态// 2. addWorker第2个参数表示是否创建核心线程// 3. addWorker返回false,则说明任务执行失败,需要执行reject操作else if (!addWorker(command, false))reject(command);


private boolean addWorker(Runnable firstTask, boolean core) {retry:// 外层自旋for (;;) {int c = ctl.get();int rs = runStateOf(c);// 这个条件写得比较难懂,我对其进行了调整,和下面的条件等价// (rs > SHUTDOWN) || // (rs == SHUTDOWN && firstTask != null) || // (rs == SHUTDOWN && workQueue.isEmpty())// 1. 线程池状态大于SHUTDOWN时,直接返回false// 2. 线程池状态等于SHUTDOWN,且firstTask不为null,直接返回false// 3. 线程池状态等于SHUTDOWN,且队列为空,直接返回false// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;// 内层自旋for (;;) {int wc = workerCountOf(c);// worker数量超过容量,直接返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 使用CAS的方式增加worker数量。// 若增加成功,则直接跳出外层循环进入到第二部分if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctl// 线程池状态发生变化,对外层循环进行自旋if (runStateOf(c) != rs)continue retry;// 其他情况,直接内层循环进行自旋即可// else CAS failed due to workerCount change; retry inner loop} }boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// worker的添加必须是串行的,因此需要加锁mainLock.lock();try {// Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.// 这儿需要重新检查线程池状态int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// worker已经调用过了start()方法,则不再创建workerif (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();// worker创建并添加到workers成功workers.add(w);// 更新`largestPoolSize`变量int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}// 启动worker线程if (workerAdded) {t.start();workerStarted = true;}}} finally {// worker线程启动失败,说明线程池状态发生了变化(关闭操作被执行),需要进行shutdown相关操作if (! workerStarted)addWorkerFailed(w);}return workerStarted;


private final class Workerextends AbstractQueuedSynchronizerimplements Runnable
{/*** This class will never be serialized, but we provide a* serialVersionUID to suppress a javac warning.*/private static final long serialVersionUID = 6138294804551838833L;/** Thread this worker is running in.  Null if factory fails. */final Thread thread;/** Initial task to run.  Possibly null. */Runnable firstTask;/** Per-thread task counter */volatile long completedTasks;/*** Creates with given first task and thread from ThreadFactory.* @param firstTask the first task (null if none)*/Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;// 这儿是Worker的关键所在,使用了线程工厂创建了一个线程。传入的参数为当前workerthis.thread = getThreadFactory().newThread(this);}/** Delegates main run loop to outer runWorker  */public void run() {runWorker(this);}// 省略代码...


final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;// 调用unlock()是为了让外部可以中断w.unlock(); // allow interrupts// 这个变量用于判断是否进入过自旋(while循环)boolean completedAbruptly = true;try {// 这儿是自旋// 1. 如果firstTask不为null,则执行firstTask;// 2. 如果firstTask为null,则调用getTask()从队列获取任务。// 3. 阻塞队列的特性就是:当队列为空时,当前线程会被阻塞等待while (task != null || (task = getTask()) != null) {// 这儿对worker进行加锁,是为了达到下面的目的// 1. 降低锁范围,提升性能// 2. 保证每个worker执行的任务是串行的w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interrupt// 如果线程池正在停止,则对当前线程进行中断操作if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();// 执行任务,且在执行前后通过`beforeExecute()`和`afterExecute()`来扩展其功能。// 这两个方法在当前类里面为空实现。try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {// 帮助gctask = null;// 已完成任务数加一 w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {// 自旋操作被退出,说明线程池正在结束processWorkerExit(w, completedAbruptly);}



WorkStealingPool 偷任务的线程池:每一个线程都有自己独立的任务队列,如果某一个线程执行完自己的任务之后,要去别的线程那里偷任务,分担别的线程的任务。

WorkStealingPool 本质上还是一个 ForkJoinPool

package com.mashibing.juc.c_026_01_ThreadPool;import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;public class T11_WorkStealingPool {public static void main(String[] args) throws IOException {ExecutorService service = Executors.newWorkStealingPool();System.out.println(Runtime.getRuntime().availableProcessors());service.execute(new R(1000));service.execute(new R(2000));service.execute(new R(2000));service.execute(new R(2000)); //daemonservice.execute(new R(2000));//由于产生的是精灵线程(守护线程、后台线程),主线程不阻塞的话,看不到输出System.in.read();}static class R implements Runnable {int time;R(int t) {this.time = t;}@Overridepublic void run() {try {TimeUnit.MILLISECONDS.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(time + " " + Thread.currentThread().getName());}}



package com.mashibing.juc.c_026_01_ThreadPool;import java.io.IOException;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.RecursiveTask;public class T12_ForkJoinPool {static int[] nums = new int[1000000];static final int MAX_NUM = 50000;static Random r = new Random();static {for (int i = 0; i < nums.length; i++) {nums[i] = r.nextInt(100);}System.out.println("stream api---" + Arrays.stream(nums).sum()); //stream api,单线程的计算方式}static class AddTask extends RecursiveAction {int start, end;AddTask(int s, int e) {start = s;end = e;}@Overrideprotected void compute() {if (end - start <= MAX_NUM) {long sum = 0L;for (int i = start; i < end; i++) sum += nums[i];System.out.println("from:" + start + " to:" + end + " = " + sum);} else {int middle = start + (end - start) / 2;AddTask subTask1 = new AddTask(start, middle);AddTask subTask2 = new AddTask(middle, end);subTask1.fork();subTask2.fork();}}}// 带有返回值的任务拆分static class AddTaskReturn extends RecursiveTask<Long> {private static final long serialVersionUID = 1L;int start, end;AddTaskReturn(int s, int e) {start = s;end = e;}@Overrideprotected Long compute() {if (end - start <= MAX_NUM) {long sum = 0L;for (int i = start; i < end; i++) sum += nums[i];return sum;}int middle = start + (end - start) / 2;AddTaskReturn subTask1 = new AddTaskReturn(start, middle);AddTaskReturn subTask2 = new AddTaskReturn(middle, end);subTask1.fork();subTask2.fork();return subTask1.join() + subTask2.join();}}public static void main(String[] args) throws IOException {/*ForkJoinPool fjp = new ForkJoinPool();AddTask task = new AddTask(0, nums.length);fjp.execute(task);*/T12_ForkJoinPool temp = new T12_ForkJoinPool();ForkJoinPool fjp = new ForkJoinPool();AddTaskReturn task = new AddTaskReturn(0, nums.length);fjp.execute(task);long result = task.join();System.out.println(result);//System.in.read();}


流式API的底层也是使用 ForkJoinPool 来实现的。nums.parallelStream().forEach这种并行流处理起来效率会更高一些。

package com.mashibing.juc.c_026_01_ThreadPool;import java.util.ArrayList;
import java.util.List;
import java.util.Random;public class T13_ParallelStreamAPI {public static void main(String[] args) {List<Integer> nums = new ArrayList<>();Random r = new Random();for (int i = 0; i < 10000; i++) nums.add(1000000 + r.nextInt(1000000));//System.out.println(nums);long start = System.currentTimeMillis();nums.forEach(v -> isPrime(v));long end = System.currentTimeMillis();System.out.println(end - start);//使用parallel stream apistart = System.currentTimeMillis();nums.parallelStream().forEach(T13_ParallelStreamAPI::isPrime);end = System.currentTimeMillis();System.out.println(end - start);}static boolean isPrime(int num) {for (int i = 2; i <= num / 2; i++) {if (num % i == 0) return false;}return true;}


