目录

1.基础知识

2.简单应用

3.异常机制

4.丰富的扩展

一.基础知识

构造函数。

public ThreadPoolExecutor(   
int corePoolSize,    指的是保留的线程池大小
int maximumPoolSize,    指的是线程池的最大大小
long keepAliveTime,    指的是空闲线程结束的超时时间
TimeUnit unit,   
BlockingQueue<Runnable> workQueue)  表示存放任务的队列  

工作过程:

1 、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
2 、当调用 execute() 方法添加一个任务时,线程池会做如下判断:
  a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
  b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
  c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
  d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
3 、当一个线程完成任务时,它会从队列中取下一个任务来执行。
4 、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

二.简单应用

    public void threadPool1Test() throws InterruptedException {BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();  ThreadPoolExecutor exe = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);  try {int threadNum = 0;for (int i = 0; i < 10; i++) {threadNum++;final int currentThreadNum = threadNum;exe.execute(new Runnable() {@Overridepublic void run() {try {System.out.println("子线程[" + currentThreadNum + "]开启");Thread.sleep(1000 * 10);} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("子线程[" + currentThreadNum + "]结束");}}});}//不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务System.out.println("已经开启所有的子线程");exe.shutdown();System.out.println("shutdown():启动一次顺序关闭,执行以前提交的任务,但不接受新任务。");//判断线程池所有任务是否执行完毕while (true) {if (exe.isTerminated()) {System.out.println("所有的子线程都结束了!");break;}Thread.sleep(1000);System.out.println("线程池中线程数目:"+exe.getPoolSize()+",队列中等待执行的任务数目:"+ exe.getQueue().size()+",已执行玩别的任务数目:"+exe.getCompletedTaskCount());    System.out.println("线程队列大小为-->"+queue.size());  }} catch (InterruptedException e) {e.printStackTrace();} finally {System.out.println("主线程结束");}}

1、BlockingQueue 只是一个接口,常用的实现类有 LinkedBlockingQueue 和 ArrayBlockingQueue。用 LinkedBlockingQueue 的好处在于没有大小限制。这样的话,因为队列不会满,所以 execute() 不会抛出异常,而线程池中运行的线程数也永远不会超过 corePoolSize 个,keepAliveTime 参数也就没有意义了。
2、shutdown() 方法不会阻塞。调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。

二.异常处理

线程超出了线程池的总容量(线程队列大小+最大线程数)

    @Testpublic void threadPool2Test() throws InterruptedException {ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(4));//executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println(String.format("Task %d rejected.", r.hashCode()));//System.out.println("DemoTask Rejected : " + ((DemoThread) r).getName());System.out.println("Waiting for a second !!"); try {Thread.sleep(1000);executor.execute(r);//executor.getQueue().put(r);} catch (InterruptedException e) {}}});///for (int i = 0; i < 11; i++) {final int taskNum = i;Runnable myTask = new Runnable() {@Overridepublic void run() {System.out.println("正在执行task " + taskNum);try {Thread.currentThread().sleep(100 * (new Random()).nextInt(8));} catch (InterruptedException e) {e.printStackTrace();}System.out.println("task " + taskNum + "执行完毕");}};executor.execute(myTask);}// 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
        executor.shutdown();while (true) {if (executor.isTerminated()) {System.out.println("所有的子线程都结束了!");break;}Thread.sleep(1000);System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size()+ ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());}}

ThreadPoolExecutor 提供 4 个现有的策略,分别是:

ThreadPoolExecutor.AbortPolicy:表示拒绝任务并抛出异常

ThreadPoolExecutor.DiscardPolicy:表示拒绝任务但不做任何动作
ThreadPoolExecutor.CallerRunsPolicy:表示拒绝任务,并在调用者的线程中直接执行该任务
ThreadPoolExecutor.DiscardOldestPolicy:表示先丢弃任务队列中的第一个任务,然后把这个任务加进队列。
使用CallerRunsPolicy,会将所有线程都执行到,也不可避免的会使用主线程来加载一个线程任务。一次同时创建maximumPoolSize个线程,加上主线程,就是一次执行maximumPoolSize+1个线程任务,等执行完后才会,再执行maximunPoolSize+1个线程任务,当然创建的maximumPoolSize个线程也会复用。
显然这种策略保证了每一个线程任务都会执行,但牺牲了性能。而,DiscardPolicy和DiscardOldestPolicy都会抛弃容纳不了的线程任务,保证了性能。

a. ThreadPoolExecutor可以设置一个“拒绝策略”,这是指当一个task被拒绝添加到线程池中时,采取的处理措施,
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
 b.task非常重要,“等得起”,但是“丢不起”
    System.out.println("Waiting for a second !!");
   try {
      Thread.sleep(1000);
       executor.execute(r);
   } catch (InterruptedException e) {
   }
 c.利用RejectedExecutionHandler来阻塞submit()?
    首先 submit()方法是调用了workQueue的offer()方法来塞入task,而offer()方法是非阻塞的,当workQueue已经满的时候,offer()方法会立即返回false,并不会阻塞在那里等待workQueue有空出位置,所以要让submit()阻塞,关键在于改变向workQueue添加task的行为,
  if (!executor.isShutdown()) {
      try {
          executor.getQueue().put(r);
       } catch (InterruptedException e) {
       }
   }
   调用了getQueue()方法,得到了workQueue,再调用其put()方法,将task放到workQueue中,而这个put()方法是阻塞的
   当workQueue满时,submit()一个task会导致调用我们自定义的RejectedExecutionHandler,而我们自定义的RejectedExecutionHandler会保证该task继续被尝试   用阻塞式的put()到workQueue中。

d.重写了offer()方法的BlockingQueue

由于submit()是调用workQueue的offer()方法来添加task的,而offer()是非阻塞的,所以,如果我们自己实现一个BlockingQueue,其offer()方法是阻塞的

public class LimitedQueue<E> extends LinkedBlockingQueue<E> {public LimitedQueue(int maxSize) {super(maxSize);}@Overridepublic boolean offer(E e) {// turn offer() and add() into a blocking calls (unless interrupted)try {put(e);return true;} catch (InterruptedException ie) {Thread.currentThread().interrupt();}return false;}
}

四.丰富的可扩展性

1.扩展生成器,如线程的创建前beforeExecute,创建后afterExecute,退出terminated;   
                         线程的创建策略, 通过重构ThreadFactory 重构线程名,设置守护线程等;

线程的拒绝策略,日志,拒绝,重试等

线程运行抛出异常机制

class ExtThreadPoolExecutor extends ThreadPoolExecutor {public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue) {super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);super.setThreadFactory(new ExtThreadFactory());super.setRejectedExecutionHandler(new ExtRejectedExecutionHandler());}@Overrideprotected void beforeExecute(Thread t, Runnable r) {super.beforeExecute(t, r);System.out.println("Perform beforeExecute() ");}@Overrideprotected void terminated() {System.out.println("线程池退出 ");}@Overrideprotected void afterExecute(Runnable r, Throwable t) {super.afterExecute(r, t);if (t != null) {System.out.println("Perform exception handler logic");}System.out.println("Perform afterExecute() ");}/
//    @Override
//    public void execute(Runnable task) {
//        super.execute(wrap(task,clientTrace(),Thread.currentThread().getName()));
//    }
//    @Override
//    public Future<?> submit(Runnable task) {
//        return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName()));
//    }private Exception clientTrace(){return new Exception("client stack trace");}private Runnable wrap(final Runnable task ,final Exception clientStack,String clientThreadName ){return new Runnable(){@Overridepublic void run() {try {task.run();} catch (Exception e) {clientStack.printStackTrace();throw e;}task.run();}};}///private class ExtThreadFactory implements ThreadFactory {private AtomicInteger count = new AtomicInteger(0);@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);String threadName = ExtThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);System.out.println("create "+threadName);//t.setDaemon(true);
            t.setName(threadName);return t;}}//
    private class ExtRejectedExecutionHandler implements RejectedExecutionHandler {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {System.out.println("DemoTask Rejected : " + ((DemoThread) r).getName());System.out.println("Waiting for a second !!");if (!executor.isShutdown()) {try {Thread.sleep(1000);// executor.execute(r);
                     executor.getQueue().put(r);// 当task被拒绝添加到线程池中时,ThreadPoolExecutor会采用“丢弃”策略来对待这个任务,即这个task被丢弃了。//    System.out.println("Lets add another time : " + ((DemoThread) r).getName());} catch (InterruptedException e) {}}}}
}

2.扩展生成器

class DemoThread implements Runnable {private String name = null;public DemoThread(String name) {this.name = name;}public String getName() {return this.name;}@Overridepublic void run() {try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Executing : taskname: " + name +", Thread id: " +Thread.currentThread().getId());}
}

    public static void main(String[] args) {BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);ExtThreadPoolExecutor pool = new ExtThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS,blockingQueue);for (int i = 1; i < 100; i++) {System.out.println("提交第" + i + "个任务!");pool.execute(new DemoThread(Integer.valueOf(i).toString() ) );pool.submit(new DemoThread(Integer.valueOf(i).toString() ));}// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了// exec.destory();try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}}

转载于:https://www.cnblogs.com/dengzy/p/5811643.html

thread_ThreadPoolExecutor相关推荐

最新文章

  1. Android Studio 工具栏消失了 设置回来
  2. boost::EccentricityProperty用法的测试程序
  3. 798·锣鼓巷·牛街
  4. react避免子组件渲染_如何与React一起使用正确的方法来避免一些常见的陷阱
  5. 数据库人员面试:SQL Server常用测试题
  6. 调研邀请:我们到底需要什么样的低代码平台?
  7. deepin8、9安装docker并添加用户,解决报错:aptsources.distro.NoDistroTemplateException
  8. 将angular转化为手机app_手机照片快速转化为PBR材质流程
  9. GoogleCpp风格指南 8)格式 _part1
  10. linux shell实现守护进程 看门狗 脚本
  11. Apache——阿帕奇简介
  12. g20杭州峰会 太极计算机,从杭州G20峰会晚会中的太极表演看中国气派
  13. 路由跟踪命令 查看DNS IP Mac等
  14. [c++]-uint8_t,uint16_t,uint32_t,uint64_t代表含义及其标准定义
  15. C++中的友元——编程界的老王
  16. 5 年京东后端研发程序员,从开始的3k到现在的36k,我终于熬出头
  17. 妹子找你修电脑,按照这几步操作,你就是黑客大佬!
  18. ERROR:Session/line number was not unique in database. History logging moved to new session.
  19. 关于虚拟机镜像无法检测
  20. 如何制定企业5s管理制度手册?

热门文章

  1. Python简单的拼写检查
  2. [Windows Server 2012] 安装IIS8.5及FTP
  3. Android 呼吸灯流程分析
  4. MFC:ID命名和数字约定
  5. 1052. Linked List Sorting
  6. LoadRunner性能测试技术培训
  7. ASP.NET中的事件处理
  8. 好用的pdf预览插件
  9. 2018推荐的android手机,外媒推荐:2018年下半年最值得期待的5款安卓手机
  10. Linux内核套接字(Socket)的设计与实现