之前博客的所有内容是对单个线程的操作,例如有Thread和Runnable的使用以及ThreadGroup等的使用,但是对于在有些场景下我们需要管理很多的线程,而对于这些线程的管理有一个统一的管理工具叫做线程池,线程池就是管理很多的线程的一个集合。这篇分享中提出的一个就是关于线程池的概念。

线程池原理

  从JDK1.5开始,utils包提供了一个类ExecutorService,这个类是对线程池的实现,关于线程Thread来说,它的整个生命周期中都是需要很好的管理,但是我们频繁的创建或者是销毁线程极大的浪费了系统的资源,那么就需要将线程进行重复的利用。基于这个需求设计出了线程池。

  所谓的线程池,从字面上理解就是存放线程的一个容器。当某个任务需要执行的时候,在线程池里的线程就会主动调用这个任务。但是还需要知道这个线程池的资源也是有限的,什么时候对于线程池的资源进行回收,什么时候进行线程的补充,什么如果达到容量之后继续提交任务会是什么状态等等。
  基于上面的描述可以知道一个线程池主要具备的要素有以下一些

  1. 任务队列,用来缓存提交的任务
  2. 线程池数量管理功能,在线程池执行的任务要在一个可以控制的范围内。例如在使用线程池的时候最为经典的三个参数,第一个参数就是初始化线程的数量,第二参数就是可以扩展的线程池的最大容量,第三个参数就是线程池所能执行的核心任务的数量。其中这个核心线程的数量表示当线程池闲置的时候也要维护的线程的数量。、
  3. 任务的执行策略,对于正在执行的线程如果达到了线程池的最大容量,之后需要对于后来的任务进行拒绝操作。或者是通知提交者没有资源可以执行了。
  4. 线程工厂。通过线程工厂可以定制对应的需要执行的是什么样的线程。
  5. 队列大小,也就是当线程池资源已经占满的时候,需要在队列里面继续缓存任务,但是这个任务并不是可以无限缓存的所以对对应的任务队列也要进行大小的限制。
  6. 保持存活的时间KeepLive,对于各个线程来说都要有自己所要维护的任务的时间间隔,如果在这个时间内没有获取到资源就会执行回收。

自定义线程池各功能策略实现

  通过上面的分析,我们知道了线程的几个重要的因素。下面就来自己实现一个线程池。

1.创建线程池接口
public interface ThreadPool {//提交执行任务到线程池中void execute(Runnable runnable);//关闭线程池void shutdown();//获取线程池初始化大小int getInitSize();//获取线程池最大链接数int getMaxSize();//获取线程池需要维护的核心线程数int getCoreSize();//获取线程池中用于缓存任务队列的大小int getQueueSize();//获取线程中活跃线程的大小int getActiveCount();//判断线程是否已经被shutdownboolean isShutdown();
}
2.创建线程队列接口
public interface RunnableQueue {//当有新任务的时候首先进入到offer中void offer(Runnable runnable);//通过take方法获取到任务Runnable take();//获取任务队列中任务的数量int size();
}
3.实现一个线程工厂

定义了一个函数式接口

@FunctionalInterface
public interface ThreadFactory {Thread createThread(Runnable runnable);
}
4.实现决绝策略
@FunctionalInterface
public interface DenyPolicy {void reject(Runnable runnable,ThreadPool threadPool);class DiscardDenyPolicy implements DenyPolicy{@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {//do nothing}}class AbortDenyPolicy implements DenyPolicy{@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {throw new RunnableDenyException("The runnable "+runnable+" will be abort.");}}class RunnerDenyPolicy implements DenyPolicy{@Overridepublic void reject(Runnable runnable, ThreadPool threadPool) {if (!threadPool.isShutdown()){runnable.run();}}}
}
public class RunnableDenyException extends RuntimeException{public RunnableDenyException(String message) {super(message);}
}
5.实现线程池调用策略
public class InternalTask implements Runnable {private final RunnableQueue runnableQueue;private volatile boolean running = true;public InternalTask(RunnableQueue runnableQueue) {this.runnableQueue = runnableQueue;}@Overridepublic void run() {//如果当前任务正在执行并且没有被中断,则需要不断的从Queue中获取任务到run方法中while (running&& !Thread.currentThread().isInterrupted()) {try{Runnable task = runnableQueue.take();task.run();}catch (Exception e){running = false;break;}}}public void stop(){this.running = false;}
}

自定义线程池各功能详细实现

线程队列的设计
  做完基本设计之后剩下的就是对于基本功能的实现,首先是对外部线程队列的实现,对这个线程队列来说,有几个需要注意点,第一个就是队列的大小,第二个就是队列的拒绝策略。也就是是说当队列达到上限之后应该怎么处理。

public class LinkedRunableQueue implements RunnableQueue {//任务队列最大容量private final int limit;private final DenyPolicy denyPolicy;private final LinkedList<Runnable> runnableList = new LinkedList<>();private final ThreadPool threadPool;public LinkedRunableQueue(int limit, DenyPolicy denyPolicy, ThreadPool threadPool) {this.limit = limit;this.denyPolicy = denyPolicy;this.threadPool = threadPool;}@Overridepublic void offer(Runnable runnable) {synchronized (runnableList){if (runnableList.size() >= limit){denyPolicy.reject(runnable,threadPool);}else {runnableList.addLast(runnable);runnableList.notifyAll();}}}@Overridepublic Runnable take() {synchronized (runnableList){while (runnableList.isEmpty()){try {runnableList.wait();} catch (InterruptedException e) {e.printStackTrace();}}return runnableList.removeFirst();}}@Overridepublic int size() {synchronized (runnableList){return runnableList.size();}}
}

其中两个比较关键的方法一个是offer一个take,在offer方法中如果队列达到上限会执行拒绝策略,否则将继续往队列中放入执行的任务,同时唤醒take的任务线程。take会不断的从队列中获取任务,当队列为空的时候会进入阻塞状态,这个有可能在阻塞的过程中会被中断,所以处理异常的时候要对异常进行thow处理。也就是说对异常进行抛出而不是catch。

线程池的设计

根据前面的设计来开一个线程池有很多的控制属性、例如线程池大小、核心线程数、最大线程数、等等。

public class BasicThreadPool extends Thread implements ThreadPool {private final int initSize;private final int maxSize;private final int coreSize;private int activeCount;private final ThreadFactory threadFactory;private final RunnableQueue runnableQueue;private volatile boolean isShutdown = false;private final Queue<ThreadTask> threadQueue = new ArrayDeque<>();private final static DenyPolicy DEFAULT_DENY_POLICY = new DenyPolicy.DiscardDenyPolicy();private final static ThreadFactory DEFAULT_THREAD_FACTORY = new DefaultThreadFactory();private final long keepAliveTime;private final TimeUnit timeUnit;public BasicThreadPool(int initSize,int maxSize,int coreSize,int queueSize){this(initSize,maxSize,coreSize,DEFAULT_THREAD_FACTORY,queueSize,DEFAULT_DENY_POLICY,10,TimeUnit.SECONDS);}public BasicThreadPool(int initSize, int maxSize, int coreSize, ThreadFactory threadFactory,int queueSize, DenyPolicy denyPolicy,long keepAliveTime, TimeUnit timeUnit) {this.initSize = initSize;this.maxSize = maxSize;this.coreSize = coreSize;this.threadFactory = threadFactory;this.runnableQueue = new LinkedRunableQueue(queueSize,denyPolicy,this);this.keepAliveTime = keepAliveTime;this.timeUnit = timeUnit;this.init();}private void init(){start();for (int i = 0; i < initSize; i++) {newThread();}}private void newThread() {InternalTask internalTask = new InternalTask(runnableQueue);Thread thread = this.threadFactory.createThread(internalTask);ThreadTask threadTask = new ThreadTask(thread,internalTask);threadQueue.offer(threadTask);this.activeCount++;thread.start();}private void removeThread(){ThreadTask threadTask = threadQueue.remove();threadTask.internalTask.stop();this.activeCount--;}@Overridepublic void run() {while (isShutdown&& !isInterrupted()){try {timeUnit.sleep(keepAliveTime);} catch (InterruptedException e) {isShutdown  = true;break;}synchronized (this){if (isShutdown){break;}if (runnableQueue.size()>0&& activeCount<coreSize){for (int i = initSize; i < coreSize ; i++) {newThread();}continue;}if (runnableQueue.size()>0&& activeCount<maxSize){for (int i = coreSize; i < maxSize ; i++) {newThread();}}if (runnableQueue.size()==0&& activeCount>coreSize){for (int i = coreSize; i < activeCount ; i++) {removeThread();}}}}}private static class ThreadTask{Thread thread;InternalTask internalTask;public ThreadTask(Thread thread,InternalTask internalTask){this.thread = thread;this.internalTask = internalTask;}}@Overridepublic void execute(Runnable runnable) {if (this.isShutdown){throw  new IllegalStateException("The thread pool is destory");}this.runnableQueue.offer(runnable);}@Overridepublic void shutdown() {synchronized (this){if (isShutdown){return;}isShutdown =true;threadQueue.forEach(threadTask -> {threadTask.internalTask.stop();threadTask.thread.interrupt();});this.interrupt();}}@Overridepublic int getInitSize() {if (isShutdown){throw new IllegalStateException("The thread pool is destory");}return this.initSize;}@Overridepublic int getMaxSize() {if (isShutdown){throw new IllegalStateException("The thread pool is destory");}return this.maxSize;}@Overridepublic int getCoreSize() {if (isShutdown){throw new IllegalStateException("The thread pool is destory");}return this.coreSize;}@Overridepublic int getQueueSize() {if (isShutdown){throw new IllegalStateException("The thread pool is destory");}return this.runnableQueue.size();}@Overridepublic int getActiveCount() {synchronized (this){return this.activeCount;}}@Overridepublic boolean isShutdown() {return this.isShutdown;}private static class DefaultThreadFactory implements ThreadFactory{private static final AtomicInteger GROUP_COUNTER = new AtomicInteger(1);private static final ThreadGroup group = new ThreadGroup("TestGroup-"+GROUP_COUNTER.getAndDecrement());private static final AtomicInteger COUNTER = new AtomicInteger(0);@Overridepublic Thread createThread(Runnable runnable) {return new Thread(group,runnable,"thread-pool-"+COUNTER.getAndDecrement());}}
}

对于一个线程池来说,既要有对参数的控制,还要有对活动的控制,当线程执行的时候需要有一个执行线程的方法,将线程放入到线程队列中去执行。

在执行线程的过程中线程池还有自我维护的功能,也就是说,在执行操作的过程中的健壮性

  1. 当任务队列中出现任务积压的时候,并且当前活动的线程少于核心线程的时候,则需要重新建立核心线程和初始化的线程,并且要加入到线程活动队列中,防止马上进行最大线程的扩展,导致系统崩溃。
  2. 任务队列中出现任务积压,并且当前活动线程少于最大线程数的时候,则需要新建最大线程和核心线程并且将其加入到活动队列中。
  3. 当线程池不忙的时候,回收到核心线程数即可,也就是removeThread方法中需要考虑的问题。

线程池的销毁策略
线程池作为需要多个线程同时访问的对象,难免会出现线程安全问题,使用同步方式是为了防止线程池自我保护而导致数据不匹配的问题。而线程池的销毁主要是对整个线程池的停止工作。也就是说需要停止线程池中所有的线程执行操作。并且将开关变量设置为true。

由于所有的方法都需要多线程访问,所以为了线程安全,都是使用同步的方式进行操作。

自定义线程池的使用

public class ThreadPoolTest {public static void main(String[] args) throws InterruptedException {final ThreadPool threadPool = new BasicThreadPool(2,6,4,1000);for (int i = 0; i < 20; i++) {threadPool.execute(()->{try {TimeUnit.SECONDS.sleep(10);System.out.println(Thread.currentThread().getName() + "is running and done.");} catch (InterruptedException e) {e.printStackTrace();}});}for (;;){System.out.println("getActiveCount:"+threadPool.getActiveCount());System.out.println("getQueueSize:"+threadPool.getQueueSize());System.out.println("getCoreSize:"+threadPool.getCoreSize());System.out.println("getMaxSize:"+threadPool.getMaxSize());System.out.println("==============================");TimeUnit.SECONDS.sleep(1);}}
}

结果如下

总结

结合自定义的线程池的实现,对线程池技术的基本原理有了一个更加深刻的认识。通过自定义的线程池,对于以后使用JDK自带的ExecutorService线程池就有了原理上的认识,当然这个线程池比我现在实现的这个线程池实现的功能强大,但是基本的原理都是一样的。希望可以对大家有所帮助。

Java高并发编程详解系列-线程池原理自定义线程池相关推荐

  1. Java高并发编程详解系列-Java线程入门

    根据自己学的知识加上从各个网站上收集的资料分享一下关于java高并发编程的知识点.对于代码示例会以Maven工程的形式分享到个人的GitHub上面.   首先介绍一下这个系列的东西是什么,这个系列自己 ...

  2. Java高并发编程详解系列-7种单例模式

    引言 在之前的文章中从技术以及源代码的层面上分析了关于Java高并发的解决方式.这篇博客主要介绍关于单例设计模式.关于单例设计模式大家应该不会陌生,作为GoF23中设计模式中最为基础的设计模式,实现起 ...

  3. Java高并发编程详解系列-线程上下文设计模式及ThreadLocal详解

    导语   在之前的分享中提到过一个概念就是线程之间的通信,都知道在线程之间的通信是一件很消耗资源的事情.但是又不得不去做的一件事情.为了保证多线程线程安全就必须进行线程之间的通信,保证每个线程获取到的 ...

  4. Java高并发编程详解系列-线程安全数据同步

    在多线程中最为复杂和最为重要的就是线程安全.多个线程访问同一个对象的时候会导致线程安全问题.通过加锁可以避免这种问题.但是在串行执行的过程中又不用考虑线程安全问题,而使用串行程序效率低没有办法将CPU ...

  5. Java高并发编程详解系列-线程通信

      进程间的通信,又被称为是进程内部的通信,我们都知道每个进程中有多个线程在执行,多个线程要互斥的访问共享资源的时候会发送对应的等待信号或者是唤醒线程执行等信号.那么这些信号背后还有什么样的技术支持呢 ...

  6. Java高并发编程详解系列-线程异常处理

    前面的博客中主要描述的关于线程的概念,通过源码分析了解线程的基本操作方式,但是如何在线程运行期间获取异常信息呢?这就要使用到一个Hook线程了 线程运行时的异常   在Thread类中,关于线程运行时 ...

  7. Java高并发编程详解系列-Future设计模式

    导语   假设,在一个使用场景中有一个任务需要执行比较长的时间,通常需要等待任务执行结束之后或者是中途出错之后才能返回结果.在这个期间调用者只能等待,对于这个结果Future设计模式提供了一种凭据式的 ...

  8. Java高并发编程详解系列-类加载

    之前在写关于JVM的时候提到过类加载机制,类加载机制也是在Java面试中被经常问道的一个问题,在这篇博客中就来了解一下关于类加载的知识. 类加载   在JVM执行Java程序的时候实际上执行的编译好的 ...

  9. Java高并发编程详解系列-ThreadAPI简单说明

    之前的两篇分享中,简单的从概念上和简单的操作上了解了Thread,这篇分享中主要是看看对于Thread的所有API的使用方式,通过解析源码的方式来了解关于Thread的细节的使用方式 引言   首先在 ...

最新文章

  1. python转换维度
  2. ios滚动条影响父页面 vue_父元素设置overflow:scroll时vuedraggable组件出现奇怪效果的解决方案...
  3. 反射获取成员变量并使用【应用】
  4. 线性代数应该这样讲(二)
  5. Bootstrap3 表单控件的状态
  6. [转载] python float()
  7. 块层介绍 第二篇: request层
  8. 矩阵的秩的一些定理证明
  9. 这几个垂直类小众导航网站,你绝对不会想错过
  10. 信息检索与利用(第三版)第一章
  11. [书籍翻译]12周撰写期刊文章 学术出版成功指南——第 1 周:设计写作计划
  12. 如何在 arm 官网上找到合适的手册
  13. u盘修复linux系统,电脑怎么修复u盘安装linux的器
  14. 卧槽,ChatGPT 太强了吧!
  15. IDEA输入字母间距变大报红
  16. 编程入门篇之零基础入门(通用)
  17. 基于Echarts和百度地图的HTML插件
  18. Setting property ‘source‘ to ‘org.eclipse.jst.jee.server
  19. Error in render: TypeError: Cannot read property 'XXXXX' of undefined
  20. 无线网络技术导论笔记(第二讲)

热门文章

  1. android10适配机型华为,EMUI10适配机型进度公布 这10款机型将率先尝鲜
  2. Spring重点面题总结
  3. mysql 分区表_MySQL 分区分表应用场景分析和分区中可能遇到的坑点
  4. linux系统下如何使用U盘、光盘、软盘?如何挂载U盘,光盘镜像?
  5. 在 CentOS 5.4 下编译安装MySQL时
  6. 深刻理解HDFS工作机制
  7. 安装程序无法创建新的系统分区
  8. drawRect方法在UIImageView的派生类中不被调用
  9. 苏宁智慧家庭助跑智慧零售
  10. javascript经典问题总结及代码实例(未完待续)