JUC学习之共享模型之工具

  • 线程池
    • 1. 自定义线程池
  • ThreadPoolExecutor
  • newFixedThreadPool----固定大小的线程池
    • 自定义线程工厂
  • newCachedThreadPool
  • newSingleThreadExecutor
  • 提交任务相关的方法
    • submit方法
    • invokeAll方法
    • invokeAny方法
  • 关闭线程池
    • shutdown
    • shutdownNow
    • 其它方法
    • 使用演示
  • 异步模式之工作线程
    • 1. 定义
    • 2. 饥饿
    • 创建多少线程池合适
      • CPU 密集型运算
      • I/O 密集型运算
    • 任务调度线程池
      • ScheduledExecutorService
      • scheduleAtFixedRate
    • scheduleWithFixedDelay
    • scheduleAtFixedRate和scheduleWithFixedDelay 的区别
    • 正确处理执行任务异常
      • 方法1:主动捉异常
      • 方法2:使用 Future
    • 定期执行
  • Tomcat 线程池
    • Connector 配置
    • Executor 线程配置
  • Fork/Join
    • 概念
    • 使用

线程池

1. 自定义线程池


步骤1:自定义拒绝策略接口

package Pool;@FunctionalInterface // 拒绝策略
interface RejectPolicy<T>
{void reject(BlockingQueue<T> queue, T task);
}

步骤2:自定义任务队列

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;//阻塞队列
//TODO:泛型可以扩展阻塞队列的扩展性
@Slf4j
public class BlockingQueue<T>
{//1.任务队列private Deque<T> deque=new ArrayDeque<>();//2.锁private ReentrantLock lock=new ReentrantLock();//3.生产者条件变量private Condition fullWaitSet=lock.newCondition();//4.消费者条件变量private Condition emptyWaitSet=lock.newCondition();//5.容量private int capcity;public BlockingQueue(int capcity){this.capcity = capcity;}//带超时的阻塞获取----从队列头部获取一个任务public T poll(long timeout, TimeUnit timeUnit){//先上锁lock.lock();try {//将timeout统一转换为纳秒long nanos = timeUnit.toNanos(timeout);//TODO:队列为空陷入超时等待,否则返回一个任务while(deque.isEmpty()){try {//等待超时if(nanos<=0){return null;}//返回剩余等待时间//TODO:这里之所以会返回一个剩余时间,是因为存在被唤醒后抢不到锁的可能,因此会再次陷入休眠等待nanos = emptyWaitSet.awaitNanos(nanos);}catch (InterruptedException e){e.printStackTrace();}}//返回任务T first = deque.removeFirst();//唤醒等待中的生产者线程fullWaitSet.signal();return first;}finally{//解锁lock.unlock();}}//阻塞获取任务---无限等待,直到被唤醒public T take(){//加锁lock.lock();try{//当前没有任务while(deque.isEmpty()){try{emptyWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}//返回任务T first = deque.removeFirst();//唤醒等待中的生产者线程fullWaitSet.signal();return first;}finally{//解锁lock.unlock();}}//阻塞添加public void put(T task){lock.lock();try{//队列满了while(deque.size()==capcity){try{log.debug("等待加入任务队列 {}...",task);fullWaitSet.await();} catch (InterruptedException e) {e.printStackTrace();}}//当前队列未满log.debug("加入队列 {}",task);//加入队列尾部deque.addLast(task);//唤醒等待中的消费者线程emptyWaitSet.signal();}finally{lock.unlock();}}//带超时的阻塞添加public boolean offer(T task,long timeout,TimeUnit timeUnit){lock.lock();try{long nanos = timeUnit.toNanos(timeout);while(deque.size()==capcity){if(nanos<=0)return false;log.debug("等待加入的任务队列 {} ...",task);try {nanos = fullWaitSet.awaitNanos(nanos);} catch (InterruptedException e) {e.printStackTrace();}}log.debug("加入任务队列 {}",task);deque.addLast(task);emptyWaitSet.signal();return true;} finally {lock.unlock();}}public int size(){lock.lock();try{return deque.size();}finally {lock.unlock();}}public void tryPut(RejectPolicy<T> rejectPolicy,T task){lock.lock();try{//判断队列是否满了if(deque.size()==capcity){//采用指定的拒绝策略rejectPolicy.reject(this,task);}else{//有空闲log.debug("加入任务队列 {}",task);deque.addLast(task);emptyWaitSet.signal();}}finally {lock.unlock();}}
}

步骤3:自定义线程池

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.HashSet;
import java.util.concurrent.TimeUnit;//线程池
@Slf4j
public class ThreadPool
{//任务队列private BlockingQueue<Runnable> taskQueue;//线程集合private HashSet<Worker> workers=new HashSet<>();//核心线程数private int coreSize;//获取任务的超时时间private long timeout;//超时时间单位private TimeUnit timeUnit;//拒绝策略private RejectPolicy<Runnable> rejectPolicy;//执行任务public void execute(Runnable task){//当任务数没有超过coreSize时,直接交给worker对象执行//如果任务数超过coreSize时,加入任务队列暂存if(workers.size()<coreSize){Worker worker=new Worker(task);log.debug("新增worker{},{}",worker,task);workers.add(worker);worker.start();}else{//    taskQueue.put(task);// 1) 死等// 2) 带超时等待// 3) 让调用者放弃任务执行// 4) 让调用者抛出异常// 5) 让调用者自己执行任务taskQueue.tryPut(rejectPolicy, task);}}public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit, int queueCapcity,RejectPolicy<Runnable> rejectPolicy) {this.coreSize = coreSize;this.timeout = timeout;this.timeUnit = timeUnit;this.taskQueue = new BlockingQueue<>(queueCapcity);this.rejectPolicy = rejectPolicy;}public class Worker extends Thread{private Runnable task;public Worker(Runnable task) {this.task = task;}@Overridepublic void run() {// 执行任务// 1) 当 task 不为空,执行任务// 2) 当 task 执行完毕,再接着从任务队列获取任务并执行// while(task != null || (task = taskQueue.take()) != null) {while(task != null || (task = taskQueue.poll(timeout, timeUnit)) != null) {try {log.debug("正在执行...{}", task);task.run();} catch (Exception e) {e.printStackTrace();} finally {task = null;}}synchronized (workers) {log.debug("worker 被移除{}", this);workers.remove(this);}}}
}

步骤4:测试

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.TimeUnit;@Slf4j
public class Test
{public static void main(String[] args) {ThreadPool threadPool = new ThreadPool(1,1000, TimeUnit.MILLISECONDS, 1, (queue, task)->{// 1. 死等
// queue.put(task);// 2) 带超时等待
// queue.offer(task, 1500, TimeUnit.MILLISECONDS);// 3) 让调用者放弃任务执行
// log.debug("放弃{}", task);// 4) 让调用者抛出异常
// throw new RuntimeException("任务执行失败 " + task);// 5) 让调用者自己执行任务task.run();});for (int i = 0; i < 4; i++) {int j = i;threadPool.execute(() -> {try {Thread.sleep(1000L);} catch (InterruptedException e) {e.printStackTrace();}log.debug("{}", j);});}}
}

ThreadPoolExecutor

  • 线程池状态

ThreadPoolExecutor 使用 int 的高 3 位来表示线程池状态,低 29 位表示线程数量


从数字上比较,TERMINATED > TIDYING > STOP > SHUTDOWN > RUNNING

最高位是符号位,1表示负数,0表示整数

这些信息存储在一个原子变量 ctl 中,目的是将线程池状态与线程个数合二为一,这样就可以用一次 cas 原子操作进行赋值

// c 为旧值, ctlOf 返回结果为新值
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))));// rs 为高 3 位代表线程池状态, wc 为低 29 位代表线程个数,ctl 是合并它们
private static int ctlOf(int rs, int wc) { return rs | wc; }

  • 构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
  • corePoolSize 核心线程数目 (最多保留的线程数)
  • maximumPoolSize 最大线程数目
  • keepAliveTime 生存时间 - 针对救急线程
  • unit 时间单位 - 针对救急线程
  • workQueue 阻塞队列
  • threadFactory 线程工厂 - 可以为线程创建时起个好名字
  • handler 拒绝策略

工作方式:


  • 线程池中刚开始没有线程,当一个任务提交给线程池后,线程池会创建一个新线程来执行任务。
  • 当线程数达到 corePoolSize 并没有线程空闲,这时再加入任务,新加的任务会被加入workQueue 队列排
    队,直到有空闲的线程。
  • 如果队列选择了有界队列,那么任务超过了队列大小时,会创建 maximumPoolSize - corePoolSize 数目的线
    程来救急。
  • 如果线程到达 maximumPoolSize 仍然有新任务这时会执行拒绝策略。拒绝策略 jdk 提供了 4 种实现,其它
    著名框架也提供了实现
  • AbortPolicy 让调用者抛出 RejectedExecutionException 异常,这是默认策略
  • CallerRunsPolicy 让调用者运行任务
  • DiscardPolicy 放弃本次任务
  • DiscardOldestPolicy 放弃队列中最早的任务,本任务取而代之
  • Dubbo 的实现,在抛出 RejectedExecutionException 异常之前会记录日志,并 dump 线程栈信息,方
    便定位问题
  • Netty 的实现,是创建一个新线程来执行任务
  • ActiveMQ 的实现,带超时等待(60s)尝试放入队列,类似我们之前自定义的拒绝策略
  • PinPoint 的实现,它使用了一个拒绝策略链,会逐一尝试策略链中每种拒绝策略
  • 当高峰过去后,超过corePoolSize 的救急线程如果一段时间没有任务做,需要结束节省资源,这个时间由
    keepAliveTime 和 unit 来控制


根据这个构造方法,JDK Executors 类中提供了众多工厂方法来创建各种用途的线程池


newFixedThreadPool----固定大小的线程池

    public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}

可以看到newFixedThreadPool底层还是通过ThreadPoolExecutor的构造参数传递不同参数实现

注意这里ThreadPoolExecutor返回的是线程池对象

特点

  • 核心线程数 == 最大线程数(没有救急线程被创建),因此也无需超时时间
  • 阻塞队列是无界的,可以放任意数量的任务

评价 适用于任务量已知,相对耗时的任务

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;@Slf4j
public class Test
{public static void main(String[] args){ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.execute(()->{log.debug("线程名称: {}",Thread.currentThread().getName());log.debug("鸡汤来喽.....");});executorService.execute(()->{log.debug("线程名称: {}",Thread.currentThread().getName());log.debug("啊哈哈哈哈哈哈...");});executorService.execute(()->{log.debug("线程名称: {}",Thread.currentThread().getName());log.debug("代号: 穿山甲");});}}


具体如何结束运行的核心线程,请看下面结束线程的部分


线程名称依赖于线程工厂来实现,jdk提供了线程工厂的默认实现



自定义线程工厂

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;@Slf4j
public class Test
{public static void main(String[] args){ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactory() {private AtomicInteger atomicInteger=new AtomicInteger(1);@Overridepublic Thread newThread(Runnable r) {return new Thread(r,"myPool "+atomicInteger.getAndIncrement());}});executorService.execute(()->{log.debug("线程名称: {}",Thread.currentThread().getName());log.debug("鸡汤来喽.....");});executorService.execute(()->{log.debug("线程名称: {}",Thread.currentThread().getName());log.debug("啊哈哈哈哈哈哈...");});executorService.execute(()->{log.debug("线程名称: {}",Thread.currentThread().getName());log.debug("代号: 穿山甲");});}}


newCachedThreadPool

    public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}

特点

  • 核心线程数是 0, 最大线程数是 Integer.MAX_VALUE,救急线程的空闲生存时间是 60s,意味着全部都是救急线程(60s 后可以回收)
  • 救急线程可以无限创建
  • 队列采用了 SynchronousQueue 实现特点是,它没有容量,没有线程来取是放不进去的(一手交钱、一手交货)
package Pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;import static java.lang.Thread.sleep;@Slf4j
public class Test
{public static void main(String[] args) throws InterruptedException {SynchronousQueue<Integer> integers = new SynchronousQueue<>();new Thread(() -> {try {log.debug("putting {} ", 1);integers.put(1);log.debug("{} putted...", 1);log.debug("putting...{} ", 2);integers.put(2);log.debug("{} putted...", 2);} catch (InterruptedException e) {e.printStackTrace();}},"t1").start();sleep(1);new Thread(() -> {try {log.debug("taking {}", 1);integers.take();} catch (InterruptedException e) {e.printStackTrace();}},"t2").start();sleep(1);new Thread(() -> {try {log.debug("taking {}", 2);integers.take();} catch (InterruptedException e) {e.printStackTrace();}},"t3").start();}}

输出

11:48:15.500 c.TestSynchronousQueue [t1] - putting 1
11:48:16.500 c.TestSynchronousQueue [t2] - taking 1
11:48:16.500 c.TestSynchronousQueue [t1] - 1 putted...
11:48:16.500 c.TestSynchronousQueue [t1] - putting...2
11:48:17.502 c.TestSynchronousQueue [t3] - taking 2
11:48:17.503 c.TestSynchronousQueue [t1] - 2 putted...

评价 整个线程池表现为线程数会根据任务量不断增长,没有上限,当任务执行完毕,空闲 1分钟后释放线程。 适合任务数比较密集,但每个任务执行时间较短的情况


newSingleThreadExecutor

    public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}

使用场景:

  • 希望多个任务排队执行。线程数固定为 1,任务数多于 1 时,会放入无界队列排队。任务执行完毕,这唯一的线程 也不会被释放。

区别:

  • 自己创建一个单线程串行执行任务,如果任务执行失败而终止那么没有任何补救措施,而线程池还会新建一个线程,保证池的正常工作
package Pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;import static java.lang.Thread.sleep;@Slf4j
public class Test
{public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newSingleThreadExecutor();executorService.execute(()->{log.debug("当前线程:"+Thread.currentThread().getName());int i=1/0;});executorService.execute(()->{log.debug("当前线程:"+Thread.currentThread().getName());System.out.println("鸡汤来喽....");});executorService.execute(()->{log.debug("当前线程:"+Thread.currentThread().getName());System.out.println("我非常的相信");});}}

  • Executors.newSingleThreadExecutor() 线程个数始终为1,不能修改

FinalizableDelegatedExecutorService 应用的是装饰器模式,只对外暴露了 ExecutorService 接口,因此不能调用 ThreadPoolExecutor 中特有的方法


FinalizableDelegatedExecutorService继承至父类DelegatedExecutorService



这里返回的对象强制类型转换后,也无法转换为ThreadPoolExecutor

  • Executors.newFixedThreadPool(1) 初始时为1,以后还可以修改

对外暴露的是 ThreadPoolExecutor 对象,可以强转后调用 setCorePoolSize 等方法进行修改


提交任务相关的方法

// 执行任务
void execute(Runnable command);// 提交任务 task,用返回值 Future 获得任务执行结果
<T> Future<T> submit(Callable<T> task);// 提交 tasks 中所有任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)throws InterruptedException;// 提交 tasks 中所有任务,带超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException;// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消
<T> T invokeAny(Collection<? extends Callable<T>> tasks)throws InterruptedException, ExecutionException;// 提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消,带超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;

submit方法

Callable与Runnable的区别: Callable有返回结果并且可以跑出异常,Runnable没有

future是JDK提供的,通过保护性暂停模式实现的guardObject对象,用来在主线程中接收线程池中返回的结果

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.*;@Slf4j
public class Test
{public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newSingleThreadExecutor();//TODO:callable的泛型String确定了返回的返回结果的类型Future<String> ret = executorService.submit(new Callable<String>() {@Overridepublic String call() throws Exception {log.debug("当前线程{}",Thread.currentThread().getName());return "ok";}});String s = ret.get();System.out.println("返回的结果为: "+s);}}



invokeAll方法

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;@Slf4j
public class Test
{public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newSingleThreadExecutor();List<Future<Object>> futures = executorService.invokeAll(Arrays.asList(() -> {log.debug("鸡汤来喽");return 1;}, () -> {log.debug("王大队长,就喜欢开玩笑");return 2;}));futures.forEach(x->{try {System.out.println("结果: "+x.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}});}}


invokeAny方法

提交 tasks 中所有任务,哪个任务先成功执行完毕,返回此任务执行结果,其它任务取消

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;@Slf4j
public class Test
{public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newSingleThreadExecutor();String invokeAny = (String) executorService.invokeAny(Arrays.asList(() -> {log.debug("1 begin");Thread.sleep(1000);log.debug("1 end");return "1";}, () -> {log.debug("2 begin");Thread.sleep(500);log.debug("2 end");return "2";}));System.out.println(invokeAny);}}


关闭线程池

shutdown

/*
线程池状态变为 SHUTDOWN
- 不会接收新任务
- 但已提交任务会执行完
- 此方法不会阻塞调用线程的执行
*/
void shutdown();
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程池状态advanceRunState(SHUTDOWN);// 仅会打断空闲线程interruptIdleWorkers();onShutdown(); // 扩展点 ScheduledThreadPoolExecutor} finally {mainLock.unlock();}// 尝试终结(没有运行的线程可以立刻终结,如果还有运行的线程也不会等,让那些线程运行完后,自己结束)tryTerminate();
}

shutdownNow

/*
线程池状态变为 STOP
- 不会接收新任务
- 会将队列中的任务返回
- 并用 interrupt 的方式中断正在执行的任务
*/
List<Runnable> shutdownNow();
public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 修改线程池状态advanceRunState(STOP);// 打断所有线程interruptWorkers();// 获取队列中剩余任务tasks = drainQueue();} finally {mainLock.unlock();}// 尝试终结tryTerminate();return tasks; }

其它方法

// 不在 RUNNING 状态的线程池,此方法就返回 true
boolean isShutdown();
// 线程池状态是否是 TERMINATED
boolean isTerminated();
// 调用 shutdown 后,由于调用线程并不会等待所有任务运行结束,
//因此如果它想在线程池 TERMINATED 后做些事情,可以利用此方法等待
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

使用演示

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReference;@Slf4j
public class Test
{public static void main(String[] args) throws InterruptedException, ExecutionException {ExecutorService executorService = Executors.newFixedThreadPool(2);executorService.submit(()->{log.debug("1 begin");Thread.sleep(1000);log.debug("1 end");return "task 1";});executorService.submit(()->{log.debug("2 begin");Thread.sleep(1000);log.debug("2 end");return "task 2";});executorService.submit(()->{log.debug("3 begin");Thread.sleep(1000);log.debug("3 end");return "task 3";});//TODO:空闲线程说拜拜,干活线程干完下班//这里会把正在执行的任务和队列中的任务全部执行完毕//executorService.shutdown();//TODO:中断正在执行的任务,将队列中断任务返回//List<Runnable> runnables = executorService.shutdownNow();//log.debug("队列中剩余的任务为{}",runnables);//TODO:等待线程池中的任务执行完毕,但是有等待的超时时间//如果等待超时,主线程会继续往下面执行//如果所有任务都执行完了,主线程会继续往下面执行executorService.awaitTermination(1,TimeUnit.SECONDS);log.debug("主线程往下执行中...");}}

future.get()方法也可以阻塞主线程,直到获取到对应线程的执行结果


异步模式之工作线程

1. 定义

让有限的工作线程(Worker Thread)来轮流异步处理无限多的任务。也可以将其归类为分工模式,它的典型实现
就是线程池,也体现了经典设计模式中的享元模式。

例如,海底捞的服务员(线程),轮流处理每位客人的点餐(任务),如果为每位客人都配一名专属的服务员,那
么成本就太高了(对比另一种多线程设计模式:Thread-Per-Message)

注意,不同任务类型应该使用不同的线程池,这样能够避免饥饿,并能提升效率

例如,如果一个餐馆的工人既要招呼客人(任务类型A),又要到后厨做菜(任务类型B)显然效率不咋地,分成
服务员(线程池A)与厨师(线程池B)更为合理,当然你能想到更细致的分工更好


2. 饥饿

固定大小线程池会有饥饿现象

  • 两个工人是同一个线程池中的两个线程
  • 他们要做的事情是:为客人点餐和到后厨做菜,这是两个阶段的工作

客人点餐:必须先点完餐,等菜做好,上菜,在此期间处理点餐的工人必须等待

后厨做菜:没啥说的,做就是了

  • 比如工人A 处理了点餐任务,接下来它要等着 工人B 把菜做好,然后上菜,他俩也配合的蛮好
  • 但现在同时来了两个客人,这个时候工人A 和工人B 都去处理点餐了,这时没人做饭了,饥饿
package Pool;import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;@Slf4j
public class TestDeadLock {static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");static Random RANDOM = new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {ExecutorService executorService = Executors.newFixedThreadPool(2);//TODO:提交一个点餐任务executorService.execute(() -> {log.debug("处理点餐...");//TODO:处理点餐任务前,先提交一个做菜任务Future<String> f = executorService.submit(() ->{log.debug("做菜");return cooking();});//TODO:菜做好了,才能上菜,结束点餐任务try{log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});/* executorService.execute(() ->{log.debug("处理点餐...");Future<String> f = executorService.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) { e.printStackTrace();}});*/}
}

输出

17:21:27.883 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:21:27.891 c.TestDeadLock [pool-1-thread-2] - 做菜
17:21:27.891 c.TestDeadLock [pool-1-thread-1] - 上菜: 烤鸡翅

当注释取消后,可能的输出

17:08:41.339 c.TestDeadLock [pool-1-thread-2] - 处理点餐...
17:08:41.339 c.TestDeadLock [pool-1-thread-1] - 处理点餐...

解决方法可以增加线程池的大小,不过不是根本解决方案,还是前面提到的,不同的任务类型,采用不同的线程
池,例如:

package Pool;import lombok.extern.slf4j.Slf4j;import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;@Slf4j
public class TestDeadLock {static final List<String> MENU = Arrays.asList("地三鲜", "宫保鸡丁", "辣子鸡丁", "烤鸡翅");static Random RANDOM = new Random();static String cooking() {return MENU.get(RANDOM.nextInt(MENU.size()));}public static void main(String[] args) {ExecutorService waiterPool = Executors.newFixedThreadPool(1);ExecutorService cookPool = Executors.newFixedThreadPool(1);waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});waiterPool.execute(() -> {log.debug("处理点餐...");Future<String> f = cookPool.submit(() -> {log.debug("做菜");return cooking();});try {log.debug("上菜: {}", f.get());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}});}
}

输出

17:25:14.626 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.630 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.631 c.TestDeadLock [pool-1-thread-1] - 上菜: 地三鲜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 处理点餐...
17:25:14.632 c.TestDeadLock [pool-2-thread-1] - 做菜
17:25:14.632 c.TestDeadLock [pool-1-thread-1] - 上菜: 辣子鸡丁

创建多少线程池合适

  • 过小会导致程序不能充分地利用系统资源、容易导致饥饿
  • 过大会导致更多的线程上下文切换,占用更多内存

CPU 密集型运算

通常采用 cpu 核数 + 1 能够实现最优的 CPU 利用率,+1 是保证当线程由于页缺失故障(操作系统)或其它原因导致暂停时,额外的这个线程就能顶上去,保证 CPU 时钟周期不被浪费

I/O 密集型运算

CPU 不总是处于繁忙状态,例如,当你执行业务计算时,这时候会使用 CPU 资源,但当你执行 I/O 操作时、远程
RPC 调用时,包括进行数据库操作时,这时候 CPU 就闲下来了,你可以利用多线程提高它的利用率。

经验公式如下

线程数 = 核数 * 期望 CPU 利用率 * 总时间(CPU计算时间+等待时间) / CPU 计算时间

例如 4 核 CPU 计算时间是 50% ,其它等待时间是 50%,期望 cpu 被 100% 利用,套用公式

4 * 100% * 100% / 50% = 8

例如 4 核 CPU 计算时间是 10% ,其它等待时间是 90%,期望 cpu 被 100% 利用,套用公式

4 * 100% * 100% / 10% = 40

任务调度线程池

在『任务调度线程池』功能加入之前,可以使用 java.util.Timer 来实现定时功能,Timer 的优点在于简单易用,但
由于所有任务都是由同一个线程来调度,因此所有任务都是串行执行的,同一时间只能有一个任务在执行,前一个
任务的延迟或异常都将会影响到之后的任务

package Pool;import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;import java.util.Timer;
import java.util.TimerTask;import static java.lang.Thread.sleep;@Slf4j
public class TestDeadLock {public static void main(String[] args){//TODO:定时器Timer timer = new Timer();//TODO:定时器任务一TimerTask task1 = new TimerTask() {@SneakyThrows@Overridepublic void run() {log.debug("task 1");sleep(2000);}};//TODO:定时器任务二TimerTask task2 = new TimerTask() {@Overridepublic void run() {log.debug("task 2");}};// 使用 timer 添加两个任务,希望它们都在 1s 后执行// 但由于 timer 内只有一个线程来顺序执行队列中的任务,因此『任务1』的延时,影响了『任务2』的执行timer.schedule(task1, 1000);timer.schedule(task2, 1000);}
}

输出

20:46:09.444 c.TestTimer [main] - start...
20:46:10.445 c.TestTimer [Timer-0] - task 1
20:46:12.447 c.TestTimer [Timer-0] - task 2

ScheduledExecutorService

   ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);// 添加两个任务,希望它们都在 1s 后执行executor.schedule(() -> {System.out.println("任务1,执行时间:" + new Date());try { Thread.sleep(2000); } catch (InterruptedException e) { }}, 1000, TimeUnit.MILLISECONDS);executor.schedule(() -> {System.out.println("任务2,执行时间:" + new Date());}, 1000, TimeUnit.MILLISECONDS);

输出

任务1,执行时间:Thu Jan 03 12:45:17 CST 2019
任务2,执行时间:Thu Jan 03 12:45:17 CST 2019

scheduleAtFixedRate

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {log.debug("running...");
}, 1, 1, TimeUnit.SECONDS);

输出

21:45:43.167 c.TestTimer [main] - start...
21:45:44.215 c.TestTimer [pool-1-thread-1] - running...
21:45:45.215 c.TestTimer [pool-1-thread-1] - running...
21:45:46.215 c.TestTimer [pool-1-thread-1] - running...
21:45:47.215 c.TestTimer [pool-1-thread-1] - running...

scheduleAtFixedRate 例子(任务执行时间超过了间隔时间):

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleAtFixedRate(() -> {log.debug("running...");sleep(2);
}, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,接下来,由于任务执行时间 > 间隔时间,间隔被『撑』到了 2s

21:44:30.311 c.TestTimer [main] - start...
21:44:31.360 c.TestTimer [pool-1-thread-1] - running...
21:44:33.361 c.TestTimer [pool-1-thread-1] - running...
21:44:35.362 c.TestTimer [pool-1-thread-1] - running...
21:44:37.362 c.TestTimer [pool-1-thread-1] - running...

scheduleWithFixedDelay

ScheduledExecutorService pool = Executors.newScheduledThreadPool(1);
log.debug("start...");
pool.scheduleWithFixedDelay(()-> {log.debug("running...");sleep(2);
}, 1, 1, TimeUnit.SECONDS);

输出分析:一开始,延时 1s,scheduleWithFixedDelay 的间隔是 上一个任务结束 <-> 延时 <-> 下一个任务开始 所
以间隔都是 3s

21:40:55.078 c.TestTimer [main] - start...
21:40:56.140 c.TestTimer [pool-1-thread-1] - running...
21:40:59.143 c.TestTimer [pool-1-thread-1] - running...
21:41:02.145 c.TestTimer [pool-1-thread-1] - running...
21:41:05.147 c.TestTimer [pool-1-thread-1] - running...

scheduleAtFixedRate和scheduleWithFixedDelay 的区别

  • fixedRate就是每隔多长时间执行一次。(开始------->X时间------>再开始)。如果间隔时间小于任务执行时间,上一次任务执行完成下一次任务就立即执行。如果间隔时间大于任务执行时间,就按照每隔X时间运行一次。
  • 而fixedDelay是当任务执行完毕后一段时间再次执行。(开始—>结束(隔一分钟)开始----->结束)。上一次执行任务未完成,下一次任务不会开始

评价 整个线程池表现为:线程数固定,任务数多于线程数时,会放入无界队列排队。任务执行完毕,这些线
程也不会被释放。用来执行延迟或反复执行的任务


正确处理执行任务异常

方法1:主动捉异常

@Slf4j
public class TestDeadLock {public static void main(String[] args){ExecutorService pool = Executors.newFixedThreadPool(1);pool.submit(() -> {try {log.debug("task1");int i = 1 / 0;} catch (Exception e) {//TODO:这里可以将异常信息写入日志文件中log.error("error:", e);}});}
}

log4j的配置

#日志最低级别为debug,输出到控制台和指定文件
log4j.rootLogger = debug,stdout,logFile#输出到控制台
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n#输出到指定文件,最低输出级别为error
log4j.appender.logFile=org.apache.log4j.FileAppender
log4j.appender.logFile.Threshold=ERROR
log4j.appender.logFile.ImmediateFlush=true
log4j.appender.logFile.Append=true
log4j.appender.logFile.File=C:/Users/zdh/Desktop/日志/error.txt
log4j.appender.logFile.layout=org.apache.log4j.PatternLayout
log4j.appender.logFile.layout.ConversionPattern=[%-5p] %d(%r) --> [%t] %l: %m %x %n

输出


方法2:使用 Future

future的get()方法,在方法正在执行完毕后,得到的是方法的返回值,如果方法执行过程中抛出异常,那么get()方法得到的是抛出的异常信息

        ExecutorService pool = Executors.newFixedThreadPool(1);Future<Boolean> f = pool.submit(() -> {log.debug("task1");int i = 1 / 0;return true;});log.debug("result:{}", f.get());


定期执行

如何让每周四 18:00:00 定时执行任务?

package schedule;import java.time.*;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;/*** @author 大忽悠* @create 2022/1/5 12:16*/
public class Main {public static void main(String[] args) {// 获得当前时间LocalDateTime now = LocalDateTime.now();// 获取本周四 18:00:00.000LocalDateTime thursday =now.with(DayOfWeek.THURSDAY).withHour(18).withMinute(0).withSecond(0).withNano(0);// 如果当前时间已经超过 本周四 18:00:00.000, 那么找下周四 18:00:00.000if (now.compareTo(thursday) >= 0) {thursday = thursday.plusWeeks(1);}// 计算时间差,即延时执行时间long initialDelay = Duration.between(now, thursday).toMillis();// 计算间隔时间,即 1 周的毫秒值long oneWeek = 7 * 24 * 3600 * 1000;ScheduledExecutorService executor = Executors.newScheduledThreadPool(2);System.out.println("开始时间:" + new Date());executor.scheduleAtFixedRate(() -> {System.out.println("执行时间:" + new Date());}, initialDelay, oneWeek, TimeUnit.MILLISECONDS);}
}


Tomcat 线程池

Tomcat 在哪里用到了线程池呢

  • LimitLatch 用来限流,可以控制最大连接个数,类似 J.U.C 中的 Semaphore 后面再讲
  • Acceptor 只负责【接收新的 socket 连接】
  • Poller 只负责监听 socket channel 是否有【可读的 I/O 事件】
  • 一旦可读,封装一个任务对象(socketProcessor),提交给 Executor 线程池处理
  • Executor 线程池中的工作线程最终负责【处理请求】

Tomcat 线程池扩展了 ThreadPoolExecutor,行为稍有不同

  • 如果总线程数达到 maximumPoolSize
  • 这时不会立刻抛 RejectedExecutionException 异常
  • 而是再次尝试将任务放入队列,如果还失败,才抛出 RejectedExecutionException 异常

源码 tomcat-7.0.42

    public void execute(Runnable command, long timeout, TimeUnit unit) {submittedCount.incrementAndGet();try {super.execute(command);} catch (RejectedExecutionException rx) {if (super.getQueue() instanceof TaskQueue) {final TaskQueue queue = (TaskQueue)super.getQueue();try {//尝试放入队列if (!queue.force(command, timeout, unit)) {submittedCount.decrementAndGet();throw new RejectedExecutionException("Queue capacity is full.");}} catch (InterruptedException x) {submittedCount.decrementAndGet();Thread.interrupted();throw new RejectedExecutionException(x);}} else {submittedCount.decrementAndGet();throw rx;}}}

TaskQueue.java

public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");//offer()带超时的阻塞添加return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task
is rejected
}

Connector 配置

Executor 线程配置

  • 这里阻塞队列长度默认是整数最大值,可以认为是无界队列,如果服务器压力特别大,可能会造成任务的堆积

Fork/Join

概念

Fork/Join 是 JDK 1.7 加入的新的线程池实现,它体现的是一种分治思想,适用于能够进行任务拆分的 cpu 密集型
运算

所谓的任务拆分,是将一个大任务拆分为算法上相同的小任务,直至不能拆分可以直接求解。跟递归相关的一些计
算,如归并排序、斐波那契数列、都可以用分治思想进行求解

Fork/Join 在分治的基础上加入了多线程,可以把每个任务的分解和合并交给不同的线程来完成,进一步提升了运
算效率

Fork/Join 默认会创建与 cpu 核心数大小相同的线程池


使用

提交给 Fork/Join 线程池的任务需要继承 RecursiveTask(有返回值)或 RecursiveAction(没有返回值),例如下面定义了一个对 1~n 之间的整数求和的任务

package schedule;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;/*** @author 大忽悠* @create 2022/1/5 16:29*/
@Slf4j(topic = "forkJoin")
public class ForkJoin
{public static void main(String[] args) {//空参构造,线程池大小默认为cpu核心数ForkJoinPool pool = new ForkJoinPool(4);//打印执行结果System.out.println(pool.invoke(new AddTask1(5)));}
}@Slf4j(topic = "addTask")
class AddTask1 extends RecursiveTask<Integer> {int n;public AddTask1(int n) {this.n = n;}@Overridepublic String toString() {return "{" + n + '}';}@Overrideprotected Integer compute() {// 如果 n 已经为 1,可以求得结果了if (n == 1) {log.debug("join() {}", n);return n;}// 将任务进行拆分(fork)AddTask1 t1 = new AddTask1(n - 1);//调出一个线程来执行AddTask1(n-1)的任务t1.fork();log.debug("fork() {} + {}", n, t1);// 合并(join)结果---join方法会阻塞,直到t1任务执行结束int result = n + t1.join();log.debug("join() {} + {} = {}", n, t1, result);return result;}
}

结果

[ForkJoinPool-1-worker-0] - fork() 2 + {1}
[ForkJoinPool-1-worker-1] - fork() 5 + {4}
[ForkJoinPool-1-worker-0] - join() 1
[ForkJoinPool-1-worker-0] - join() 2 + {1} = 3
[ForkJoinPool-1-worker-2] - fork() 4 + {3}
[ForkJoinPool-1-worker-3] - fork() 3 + {2}
[ForkJoinPool-1-worker-3] - join() 3 + {2} = 6
[ForkJoinPool-1-worker-2] - join() 4 + {3} = 10
[ForkJoinPool-1-worker-1] - join() 5 + {4} = 15
15

用图来表示


改进

package schedule;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;/*** @author 大忽悠* @create 2022/1/5 16:29*/
@Slf4j(topic = "forkJoin")
public class ForkJoin
{public static void main(String[] args) {//空参构造,线程池大小默认为cpu核心数ForkJoinPool pool = new ForkJoinPool(4);//打印执行结果System.out.println(pool.invoke(new AddTask3(1,5)));}
}@Slf4j(topic = "addTask")
class AddTask3 extends RecursiveTask<Integer> {int begin;int end;public AddTask3(int begin, int end) {this.begin = begin;this.end = end;}@Overridepublic String toString() {return "{" + begin + "," + end + '}';}@Overrideprotected Integer compute() {// 5, 5if (begin == end) {log.debug("join() {}", begin);return begin;}// 4, 5if (end - begin == 1) {log.debug("join() {} + {} = {}", begin, end, end + begin);return end + begin;}// 1 5int mid = (end + begin)/2; // 3AddTask3 t1 = new AddTask3(begin, mid); // 1,3t1.fork();AddTask3 t2 = new AddTask3(mid + 1, end); // 4,5t2.fork();log.debug("fork() {} + {} = ?", t1, t2);int result = t1.join() + t2.join();log.debug("join() {} + {} = {}", t1, t2, result);return result;}
}
[DEBUG] 2022-01-05 16:45:16,776 method:schedule.AddTask3.compute(ForkJoin.java:58)
fork() {1,2} + {3,3} = ?
[DEBUG] 2022-01-05 16:45:16,776 method:schedule.AddTask3.compute(ForkJoin.java:48)
join() 1 + 2 = 3
[DEBUG] 2022-01-05 16:45:16,776 method:schedule.AddTask3.compute(ForkJoin.java:58)
fork() {1,3} + {4,5} = ?
[DEBUG] 2022-01-05 16:45:16,776 method:schedule.AddTask3.compute(ForkJoin.java:48)
join() 4 + 5 = 9
[DEBUG] 2022-01-05 16:45:16,779 method:schedule.AddTask3.compute(ForkJoin.java:43)
join() 3
[DEBUG] 2022-01-05 16:45:16,780 method:schedule.AddTask3.compute(ForkJoin.java:60)
join() {1,2} + {3,3} = 6
[DEBUG] 2022-01-05 16:45:16,780 method:schedule.AddTask3.compute(ForkJoin.java:60)
join() {1,3} + {4,5} = 15
15

用图来表示


JUC学习之共享模型之工具上之线程池浅学相关推荐

  1. 【并发编程】(学习笔记-共享模型之管程)-part3

    文章目录 并发编程-共享模型之管程-3 1.共享带来的问题 1-1 临界区 Critical Section 1-2 竞态条件 Race Condition 2.synchronized 解决方案 2 ...

  2. java并发编程(7) 共享模型之工具 - stampedLock、semaphore、CountdownLatch、CyclicBarri

    文章目录 前言 1. stampedLock 1. 概述 2. 代码 1. 读读 2. 读写 3. 注意 2. Semaphore 1. 基本使用 2. 应用场景 3. 原理 3. Countdown ...

  3. 如何使用Arthas定位线上 Dubbo 线程池满异常

    点击上方蓝色"程序猿DD",选择"设为星标" 回复"资源"获取独家整理的学习资料! 来源 | 公众号「Kirito的技术分享」 前言 本文是 ...

  4. 【Linux入门】多线程(线程概念、生产者消费者模型、消息队列、线程池)万字解说

    目录 1️⃣线程概念 什么是线程 线程的优点 线程的缺点 线程异常 线程异常 Linux进程VS线程 2️⃣线程控制 创建线程 获取线程的id 线程终止 等待线程 线程分离 3️⃣线程互斥 进程线程间 ...

  5. Arthas | 定位线上 Dubbo 线程池满异常

    作者 | 徐靖峰  阿里云高级开发工程师 前言 Dubbo 线程池满异常应该是大多数 Dubbo 用户都遇到过的一个问题,本文以 Arthas 3.1.7 版本为例,介绍如何针对该异常进行诊断,主要使 ...

  6. 《Java并发编程的艺术》——Java中的并发工具类、线程池、Execute框架(笔记)

    文章目录 八.Java中的并发工具类 8.1 等待多线程完成的CountDownLatch 8.2 同步屏障CyclicBarrier 8.2.1 CyclicBarrier简介 8.2.2 Cycl ...

  7. 【Tools】神经网络、深度学习和机器学习模型可视化工具——Netron

    背景 有时候我们写完深度学习模型后,想看看代码实现的模型和我们预期是否一致,但是没有一个好的工具.最近发现有一个软件Netron支持对一部分深度学习模型可视化,源码地址.Netron 支持 ONNX. ...

  8. JUC并发编程共享模型之无锁(五)

    5.1 问题引出 public interface Account {// 获取余额Integer getBalance();void withdraw(Integer amount);/*** 方法 ...

  9. 共享模型之工具(一)

    1.线程池 1.1.线程池产生背景 1>.线程是一种系统资源,每创建一个新的线程都需要占用一定的内存(分配栈内存),在高并发场景下,某一时刻有大量请求访问系统,如果针对每个请求(任务)都创建一个 ...

  10. 共享模型之工具(二)

    1.自定义线程池 1>.在实际开发过程中建议不要使用JDK提供的方式创建线程池,因为底层不方便优化,在请求量非常大的情况下可能会出现OOM,我们需要手动实现一个线程池; 2>.代码实现: ...

最新文章

  1. 监督分类空白处也被分类了_用于半监督短文本分类的异构图注意网络
  2. 理解DataSet的数据缓存机制
  3. 01 MySQL锁概述
  4. 【细说软件工程】《软件工程》Software Engineering
  5. PHP开发绝对不能违背的安全铁则!
  6. 【收藏】编译安装keepalived
  7. 在ArcGIS调坐标系引发的一系列问题
  8. 大型程序是如何开发的_小程序开发好之后如何引流
  9. 数据结构之内部排序二
  10. 事务处理与SQL查询
  11. 【JVM】Java虚拟机
  12. 在线英文名随机生成器
  13. python调用C语言ctypes详解
  14. Servlet/JSP面试题目-----近期总结
  15. R-FCN算法及Caffe代码详解
  16. MATLAB——DEMATEL代码(转载)
  17. LM2903器件使用说明
  18. 不使用任何中间变量如何将a、b的值进行交换(三种方法)
  19. oracle recover database,recover database until cancel和 recover database区别
  20. 谷歌日语输入法、中文输入法之间的切换

热门文章

  1. python实现循环赛日程表问题的算法_循环赛日程表问题
  2. sd卡分区工具PM9.0汉化版
  3. 伴随矩阵和逆矩阵的关系证明
  4. GROW GM65 条码二维码扫描识别模块 兼容大部分条码和二维码
  5. GeoTools-WKT\GeoJson相互转换
  6. IntelliJ Idea编译报错:请使用 -source 7 或更高版本以启用 diamond 运算符
  7. Echarts中国地图json文件,去除诸岛
  8. 网络安全-解密WinRAR捆绑恶意程序并自动上线MSF的原理
  9. 不开机win7计算机还原,win7忘记开机密码一键还原操作不了怎么办
  10. Ant design分析后台首页