一. java线程池

带着问题:

  1. 线程是什么时候被创建的?
  2. 线程会一直循环取任务任务吗?怎么做的?
  3. 线程取不到任务会怎么样?
  4. 线程会被Runnable和Callable的异常干掉吗?
  5. 线程怎么干掉自己?
  6. 做完以后,结果放在哪?
  7. 线程做一个SOA请求,线程会在等吗?还是会回到线程池?请求有结果以后谁来处理结果?
  8. 系统退出前,如何优雅结束线程池的任务?
  9. Forkjoinpool是什么?适合场景与原理?

源码解读

ps:worker 继承自AQS

1) 线程池在创建的时候,池子里是没有线程的。看源码发现,构造方法里是没有创建线程、工作队列的,简单的属性赋值而已。

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

只有在有任务提交进来的时候,才会初始化队列,创建一个线程来干活。并且在没有达到核心线程数时,不管有没有空闲线程,来一个任务创建个线程。

那么线程池是怎么创建线程,怎么做到达到核心后放入队列,队列满了以后,去创建线程直到最大线程数了:Worker,队列阻塞,判断线程数量和拒绝策略

提交任务

public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;
}

step1:创建一个任务

创建一个FutureTask任务

protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}

FutureTask中的Callable通过一个适配器将Runnable和结果包装成的。

public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable
}

public static <T> Callable<T> callable(Runnable task, T result) {if (task == null)throw new NullPointerException();return new RunnableAdapter<T>(task, result);
}

static final class RunnableAdapter<T> implements Callable<T> {final Runnable task;final T result;RunnableAdapter(Runnable task, T result) {this.task = task;this.result = result;}public T call() {task.run();return result;}
}

step2:执行

public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false))reject(command);      

大白话描述一下:看下有没有到达核心线程数,没有的话,来个线程。线程搞定,皆大欢喜,搞定。如果失败了,什么情况,看下线程池还活着没,活着,嗯,那再看下队列能正常入队吗?插入也没问题!什么情况,老铁,不科学啊,那再检查一下线程还或者没。假设线程池还活着,看下是不是工作线程数已经为0了。如果工作线程数是为0了,就创建一个线程去干活,由于任务已经入队列,给它一个null任务。(后面分析线程怎么处理null和取任务)。如果double-check过程中,发现线程池关闭了,但remove(command)失败,也就是有线程把任务取走了。那,那就不管了,做就做呗。

接下来,看下怎么创建一个线程。会发现会用一个HashSet保存所有的Worker,并且保证放进去的Woker中的线程是启动的、活得。

addWorker 源码解读 code line:901

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
}/** Delegates main run loop to outer runWorker  */
public void run() {runWorker(this);
}

线程启动后会干什么事情:

Start会调用run,执行的是runWorker(该方法是线程池的方法,而不是Worker的方法:委托模式)

可以看到,如果给Woker的任务是个null,是没有问题的,他将去队列里面去取任务。

worker会加锁执行。如果没有在run之前中断,是中断不了任务的。

  • 2) 线程通过启动后,通过while循环一直从队列中循环取任务。下面以LinkedBlockingQueue为例。取和放任务都是加锁的,由两把锁控制。
final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {beforeExecute(wt, task);Throwable thrown = null;try {task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}

3) getTask():线程从队列中取不到任务,会怎么样?分两种情况,一种是一直等,一种是超时等。实现方式不一样。

先说时机:当允许核心线程在超过空闲时间后可以死去,或者线程池中的线程已经超过核心线程数时,是超时等。达到时间,取不到任务,将工作线程减1,返回null,结束循环取任务。怎样实现?都是通过ReetrantLock的Condition。阻塞后等待被唤醒。然后来一个任务,唤醒一个。(ps:非公平锁)

4) 再次看源码runWorker:线程会被callable和runnable异常干掉,超过空闲时间的线程也会被干掉,都会走到finally走退出流程。注意有个标志为completedAbruptly,来标志是异常还是超时,因为超时的时候已经被线程池中的工作线程计数值减1了,无需在推出流程中再减。然后从workers的hashset中移除这个worker(这一步是加锁操作)。

当出现异常时,异常会杀死worker中的线程吗?没有,线程还要继续下面的退出逻辑了。超时时,因为不再持有worker的引用,其将会被gc回收,线程会被回收。

5) 执行结果放在FutureTask的outcome中。上面步骤中run的时task,也就是执行的task里面的run方法:

public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}

protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}

6) 从上面源码可以知道,子线程就是阻塞的,等执行完以后,改变future状态和结果。只是如果主线程不调用get方法,不会阻塞主线程。

那么,如果调用get方法,是如何阻塞该方法的线程的了。(还记得WaitNode吗?)

调用get方法是,如果检查state不是normal状态,就会调用awitnode方法。

大白话解释:首先给你创建一个wait结点,包含了你的线程。然后把你放入到等待队列的尾部(waiters),然后调用LockSupport的park或者parkNanos阻塞。等待被唤醒。什么时候唤醒?在任务结束后,worker会调用finishCompletion,这个里面如果发现有其他线程正在排队等结果,会一个个亲手唤醒:老哥等辛苦了,醒醒:LockSupport的unpark方法。或者是你等得超时了(park时间到了),系统把你叫醒,你再绝望的看一下任务到底有没有完成,完成了,惊喜,赶快把自己的wait结点绑定的的线程置null,返回线程状态。或者是发现future是处在completing状态:马上就会完成,希望的曙光就在前面,我让出我的cpu,等待下次调度时再看是不是ok了。这两种都不是,没戏了,超时了,不能等了,把自己的waitnode里线程置null(removeWaiter没看明白),返回当前线程状态,判断这个状态是normal状态吗,不是,抛超时异常。是,返回结果。

private int awaitDone(boolean timed, long nanos)throws InterruptedException {final long deadline = timed ? System.nanoTime() + nanos : 0L;WaitNode q = null;boolean queued = false;for (;;) {if (Thread.interrupted()) {removeWaiter(q);throw new InterruptedException();}int s = state;if (s > COMPLETING) {if (q != null)q.thread = null;return s;}else if (s == COMPLETING) // cannot time out yetThread.yield();else if (q == null)q = new WaitNode();else if (!queued)queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);else if (timed) {nanos = deadline - System.nanoTime();if (nanos <= 0L) {removeWaiter(q);return state;}LockSupport.parkNanos(this, nanos);}elseLockSupport.park(this);}
}

private void finishCompletion() {// assert state > COMPLETING;for (WaitNode q; (q = waiters) != null;) {if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {for (;;) {Thread t = q.thread;if (t != null) {q.thread = null;LockSupport.unpark(t);}WaitNode next = q.next;if (next == null)break;q.next = null; // unlink to help gcq = next;}break;}}done();callable = null;        // to reduce footprint
}

7) 靠线程池做异步非阻塞的高并发,没戏。子线程该等就得等。线程池中线程就这么多,队列就这么大。如果拒绝策略选择:CallerRunPolicy,那么主线程干活,也会慢慢阻塞tomcat的所有的主线程,请求堆在tomcat中。如果是DiscardOldestPolicy,放弃最旧的:从等待队列的头部拿一个任务,直接放弃。(假设一个极限情况:任务来的太快太快,然后一直抛弃最刚开始的任务,结果之前的请求大量得不到真正的处理。)DiscardPolicy和AbortPolicy都是直接放弃最新的任务,一个不抛异常,一个抛一个拒接异常。

8) 在退出jvm时,先完成所有任务

private static ThreadPoolExecutor executor =new ThreadPoolExecutor(2, 2 * 2 + 1, 2, TimeUnit.SECONDS,new LinkedBlockingQueue<>(2000), new ThreadPoolExecutor.AbortPolicy());static Runnable consumer = () -> {try {Thread.sleep(1000);System.out.println("task " + Thread.currentThread().getId());} catch (Exception e) {}
};static {Runtime.getRuntime().addShutdownHook(new Thread() {@Overridepublic void run() {executor.shutdown();try {executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);}catch (Exception e){e.printStackTrace();}}});
}public static void main(String[] args) throws InterruptedException {for (int i = 0; i < 10; i++) {executor.submit(consumer);}Thread.sleep(1000);System.exit(0);}

线程池有:

RUNNING  -1
SHUTDOWN 0
STOP  1
TIDYING  2
TERMINATED  3

先将线程池状态变成SHUTDOWN,尝试中断空闲线程(因为线程执行任务的时候是会加锁lock的,所以这个时候对所有线程如果没有状态,但可以锁成功,就是空闲)。

然后尝试执行清理tryTerminate。上面这个例子,因为符合

(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))

这个条件,直接返回了。通过awaitTermination的源码可知,这个方法就是阻塞等待terminated状态(condition),等别人通知或者超时。

那么谁去通知它,谁在处理了?依旧是在代码runWorker中,只有在线程池为Stop、线程被中断时,线程池才会中断。这个时候时SHUTDOWN状态,则继续执行,然后进入processWorkerExit线程退出逻辑,会再次调用tryTerminate清理,这个时候顺利执行到

termination.signalAll();释放awaitTermination方法中阻塞的主线程。

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();
}

final void tryTerminate() {for (;;) {int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) { // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {terminated();} finally {ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {mainLock.unlock();}// else retry on failed CAS}
}

下一篇文章将专门分析forkjoinpool 的使用场景、示例和原理

hystrix 源码 线程池隔离_“池”的思想:从java线程池到数据库连接池的源码解读(1)...相关推荐

  1. 查看java线程是否退出_[原创]IDA调试阻止java线程异常退出

    IDA调试阻止java线程异常退出 最近在使用IDA调试分析某款产品遇见了一个头痛的问题,因为程序核心功能在native层实现的,所以主要的侧重点是分析so文件,但是在分析的时候总是出现java线程异 ...

  2. Java JDBC篇4——数据库连接池

    Java JDBC篇4--数据库连接池 1.DBCP 1.1.依赖jar包 官网:https://mvnrepository.com/artifact/org.apache.commons/commo ...

  3. SpringBoot2数据库连接池自动装配原理,以及如何配置使用其他的数据库连接池(druid)为例

    SpringBoot2数据库连接池自动装配原理 一.SpringBoot的数据库连接池的相关默认 二.SpringBoot默认的数据库连接池,以及自动装配原理 三.使用其他的数据库连接池:例如Drui ...

  4. hystrix 源码 线程池隔离_基于hystrix的线程池隔离

    hystrix进行资源隔离,其实是提供了一个抽象,叫做command,就是说,你如果要把对某一个依赖服务的所有调用请求,全部隔离在同一份资源池内 对这个依赖服务的所有调用请求,全部走这个资源池内的资源 ...

  5. java如何关闭线程池_如何优雅的关闭Java线程池

    ⾯试中经常会问到,创建⼀个线程池需要哪些参数.线程池的工作原理,却很少会问到线程池如何安全关闭的. 也正是因为⼤家不是很关注这块,即便是⼯作三四年的⼈,也会有因为线程池关闭不合理,导致应用⽆法正常st ...

  6. tomcat线程释放时间_聊下并发和Tomcat线程数(错误更正)

    本文前半部分结论存在严重错误,请看最后2015-1-20更新部分. 最近一直在解决线上一个问题,表现是: Tomcat每到凌晨会有一个高峰,峰值的并发达到了3000以上,最后的结果是Tomcat线程池 ...

  7. java集合到线程的考试_成都汇智动力-Java SE考试编程题总结

    原标题:成都汇智动力-Java SE考试编程题总结 线程和进程的区别: (1)进程是运行中的程序,拥有自己独立的内存空间和资源; (2)一个进程可以有一个或多个线程组成,且至少有一个线程称为主线程; ...

  8. java吵醒线程_一文搞懂 Java 线程中断

    在之前的一文<如何"优雅"地终止一个线程>中详细说明了 stop 终止线程的坏处及如何优雅地终止线程,那么还有别的可以终止线程的方法吗?答案是肯定的,它就是我们今天要分 ...

  9. Java中Semaphore(信号量) 数据库连接池

    计数信号量用来控制同时访问某个特定资源的操作数或同时执行某个指定操作的数量 A counting semaphore.Conceptually, a semaphore maintains a set ...

最新文章

  1. Python工具 | 9个用来爬取网络站点的 Python 库
  2. IDC与村村乐合作服务中国农村市场
  3. buu [BJDCTF 2nd]rsa1
  4. 超大文件中查找关键字
  5. Java高级工程师常见面试题(答案)
  6. 利用qiskit实现量子门以及态的初始化
  7. 洛谷p1179数字统计
  8. python—leetcode-459. 重复的子字符串
  9. 微型计算机中lo设备的含义是,专转本计算机 基础知识.doc
  10. CSS中禁止文本选中和鼠标悬入变手型(已解决)
  11. 互联网晚报 |11/23星期三 | 京东高管降薪10%至20%;75%未成年每周游戏少于3小时;惠普宣布未来三年裁员4K-6K人...
  12. 2023年国家留学基金委(CSC)有关国别申请、派出注意事项
  13. 在springboot中导入spring-web相关包导致的错误经验(一)
  14. More Effective C++ 阅读笔记 解释清晰
  15. 5.4 控制器的功能和工作原理
  16. 十分钟搞懂手机号码一键登录
  17. Effie:真正的极简主义!秒杀幕布
  18. 物联网开发笔记(84)- 使用Micropython开发ESP32开发板之控制LCD12864液晶屏和AHT10温度传感器
  19. 激光焊可以代替氩弧焊吗
  20. [Depricated]适用coremail邮件系统,第三方客户端绑定校园邮箱(南邮、河海,以iOS邮件为例)

热门文章

  1. 神经网络中的分类器该如何改成生成器?
  2. 物联网时代,隐私还有救吗?
  3. 2020 年物联网设备达 500 亿台!AI、区块链技术加持,优秀开发者稀缺!
  4. 不写一行代码就能玩转 Kaggle 竞赛?
  5. 从佛罗伦萨记账到区块链,应用才是区块链崛起的真正标志
  6. TMD 之后,再无 BAT? | 畅言
  7. 华米OV 万亿 IoT 争夺战
  8. 爬取两万多条租房数据,算算在广州你能「活到」第几集?
  9. @程序员,幼儿园小班都在学 AI,就问你慌不慌?
  10. 在乌镇拼命“洗白”的拼多多