自定义java线程池

ThreadPoolExecutor是Java并发api添加的一项功能,可以有效地维护和重用线程,因此我们的程序不必担心创建和销毁线程,也不必关注核心功能。 我创建了一个自定义线程池执行程序,以更好地了解线程池执行程序的工作方式。

功能性:

  • 它维护一个固定的线程池,即使没有任务提交也创建线程并启动线程,而ThreadPoolExecutor根据需要创建线程,即,每当将可运行对象提交给池且线程数小于核心池大小时。
  • 在ThreadPoolExecutor中,我们提供了一个等待队列,当所有线程忙于运行现有任务时,新的可运行任务将在该队列中等待。 队列填满后,将创建最大线程池大小的新线程。 在MyThreadPool中,我将可运行对象存储在链接列表中,因此每个任务都将在列表中等待并且不受限制,因此在此不使用maxPoolSize。
  • 在ThreadPoolExecutor中,我们使用Future Objects从任务中获取结果,如果结果不可用,则future.get()方法将阻塞,或者使用CompletionService。 在MyThreadPoolExecutor中,我创建了一个名为ResultListener的简单接口,用户必须提供对此的实现,如他希望如何处理输出。 每个任务完成后,ResultListener将获得带有任务输出的回调,或者在发生任何异常的情况下将调用error方法。
  • 调用shutdown方法时,MyThreadPoolExecutor将停止接受新任务并完成剩余任务。
  • 与ThreadPoolExecutor相比,我提供了非常基本的功能,我使用了简单的线程机制,如wait(),notify(),notifyAll()和join()。
  • 在性能方面,它类似于ThreadPoolExecutor,在某些情况下好一些。 如果您发现任何有趣的结果或改进方法,请告诉我。
package com.util;import java.util.concurrent.Callable;/*** Run submitted task of {@link MyThreadPool} After running the task , It calls* on {@link ResultListener}object with {@link Output}which contains returned* result of {@link Callable}task. Waits if the pool is empty.* * @author abhishek* * @param */import java.util.concurrent.Callable;
/**
* Run submitted task of {@link MyThreadPool} After running the task , It calls
* on {@link ResultListener}object with {@link Output}which contains returned
* result of {@link Callable}task. Waits if the pool is empty.
*
* @author abhishek
*
* @param <V>
*/
public class MyThread<V> extends Thread {/*** MyThreadPool object, from which the task to be run*/private MyThreadPool<V> pool;private boolean active = true;public boolean isActive() {return active;}public void setPool(MyThreadPool<V> p) {pool = p;}/*** Checks if there are any unfinished tasks left. if there are , then runs* the task and call back with output on resultListner Waits if there are no* tasks available to run If shutDown is called on MyThreadPool, all waiting* threads will exit and all running threads will exit after finishing the* task*/public void run() {ResultListener<V> result = pool.getResultListener();Callable<V> task;while (true){task = pool.removeFromQueue();if (task != null){try{V output = task.call();result.finish(output);} catch (Exception e){result.error(e);}} else{if (!isActive())break;else{synchronized (pool.getWaitLock()){try{pool.getWaitLock().wait();} catch (InterruptedException e){// TODO Auto-generated catch blocke.printStackTrace();}}}}}}void shutdown() {active = false;}
}
package com.util;
import java.util.LinkedList;
import java.util.concurrent.Callable;
/**
* This class is used to execute submitted {@link Callable} tasks. this class
* creates and manages fixed number of threads User will provide a
* {@link ResultListener}object in order to get the Result of submitted task
*
* @author abhishek
*
*
*/
public class MyThreadPool<V> {private Object waitLock = new Object();public Object getWaitLock() {return waitLock;}/*** list of threads for completing submitted tasks*/private final LinkedList<MyThread<V>> threads;/*** submitted task will be kept in this list untill they run by one of* threads in pool*/private final LinkedList<Callable<V>> tasks;/*** shutDown flag to shut Down service*/private volatile boolean shutDown;/*** ResultListener to get back the result of submitted tasks*/private ResultListener<V> resultListener;/*** initializes the threadPool by starting the threads threads will wait till* tasks are not submitted** @param size* Number of threads to be created and maintained in pool* @param myResultListener* ResultListener to get back result*/public MyThreadPool(int size, ResultListener<V> myResultListener) {tasks = new LinkedList<Callable<V>>();threads = new LinkedList<MyThread<V>>();shutDown = false;resultListener = myResultListener;for (int i = 0; i < size; i++) {MyThread<V> myThread = new MyThread<V>();myThread.setPool(this);threads.add(myThread);myThread.start();}}public ResultListener<V> getResultListener() {return resultListener;}public void setResultListener(ResultListener<V> resultListener) {this.resultListener = resultListener;}public boolean isShutDown() {return shutDown;}public int getThreadPoolSize() {return threads.size();}public synchronized Callable<V> removeFromQueue() {return tasks.poll();}public synchronized void addToTasks(Callable<V> callable) {tasks.add(callable);}/*** submits the task to threadPool. will not accept any new task if shutDown* is called Adds the task to the list and notify any waiting threads** @param callable*/public void submit(Callable<V> callable) {if (!shutDown) {addToTasks(callable);synchronized (this.waitLock) {waitLock.notify();}} else {System.out.println('task is rejected.. Pool shutDown executed');}}/*** Initiates a shutdown in which previously submitted tasks are executed,* but no new tasks will be accepted. Waits if there are unfinished tasks* remaining**/public void stop() {for (MyThread<V> mythread : threads) {mythread.shutdown();}synchronized (this.waitLock) {waitLock.notifyAll();}for (MyThread<V> mythread : threads) {try {mythread.join();} catch (InterruptedException e) {// TODO Auto-generated catch blocke.printStackTrace();}}}
}
package com.util;/*** This interface imposes finish method * which is used to get the {@link Output} object * of finished task* @author abhishek** @param */public interface ResultListener {public void finish(T obj);public void error(Exception ex);}

您可以根据需要实现此类并返回并处理任务返回的结果。

package com.util;public class DefaultResultListener implements ResultListener{@Overridepublic void finish(Object obj) {}@Overridepublic void error(Exception ex) {ex.printStackTrace();}}

例如,此类将添加task返回的数字。

package com.util;import java.util.concurrent.atomic.AtomicInteger;/*** ResultListener class to keep track of total matched count* @author abhishek* * @param */
public class MatchedCountResultListenerimplements ResultListener{/*** matchedCount to keep track of the number of matches returned by submitted* task*/AtomicInteger matchedCount = new AtomicInteger();/*** this method is called by ThreadPool to give back the result of callable* task. if the task completed successfully then increment the matchedCount by* result count*/@Overridepublic void finish(V obj) {//System.out.println('count is '+obj);matchedCount.addAndGet((Integer)obj);}/*** print exception thrown in running the task*/@Overridepublic void error(Exception ex) {ex.printStackTrace();}/*** returns the final matched count of all the finished tasks* * @return*/public int getFinalCount() {return matchedCount.get();}
}

这是一个测试类,使用CompletionService和MyThreadPoolExecutor对循环运行简单

package test;import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;import com.util.DefaultResultListener;
import com.util.MyThreadPool;public class TestClass {public static void main(String[] args) throws InterruptedException {CompletionServicethreadService;ExecutorService service = Executors.newFixedThreadPool(2);threadService = new ExecutorCompletionService(service);long b = System.currentTimeMillis();for(int i =0;i<50000;i++){threadService.submit(new MyRunable (i));}service.shutdown();System.out.println('time taken by Completion Service ' + (System.currentTimeMillis()-b));DefaultResultListener result = new DefaultResultListener();MyThreadPoolnewPool = new MyThreadPool(2,result);long a = System.currentTimeMillis();int cc =0;for(int i =0;i<50000;i++){cc = cc+i;}System.out.println('time taken without any pool ' + (System.currentTimeMillis()-a));a= System.currentTimeMillis();for(int i =0;i<5000;i++){newPool.submit(new MyRunable (i));}newPool.stop();System.out.println('time taken by myThreadPool ' + (System.currentTimeMillis()-a));}}class MyRunable implements Callable{int index = -1;public MyRunable(int index){this.index = index;}@Overridepublic Integer call() throws Exception {return index;}}

参考: 我的JCG合作伙伴 Abhishek Somani在Java,J2EE和Server博客上的Java 自定义线程池执行程序 。

翻译自: https://www.javacodegeeks.com/2013/03/my-custom-thread-pool-executor-in-java.html

自定义java线程池

自定义java线程池_我的Java自定义线程池执行器相关推荐

  1. java线程池和线程实例化_浅谈Java 线程池原理及使用方式

    一.简介 什么是线程池? 池的概念大家也许都有所听闻,池就是相当于一个容器,里面有许许多多的东西你可以即拿即用.java中有线程池.连接池等等.线程池就是在系统启动或者实例化池时创建一些空闲的线程,等 ...

  2. java 手编线程池_死磕 java线程系列之自己动手写一个线程池

    欢迎关注我的公众号"彤哥读源码",查看更多源码系列文章, 与彤哥一起畅游源码的海洋. (手机横屏看源码更方便) 问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写 ...

  3. 能跑java的服务器_一台java服务器可以跑多少个线程?

    一台java服务器能跑多少个线程?这个问题来自一次线上报警如下图,超过了我们的配置阈值. 京东自研UMP监控分析 打出jstack文件,通过IBM Thread and Monitor Dump An ...

  4. java 对象锁定_少锁定Java对象池

    java 对象锁定 自从我写任何东西以来已经有一段时间了,我一直在忙于我的新工作,其中涉及在性能调优方面做一些有趣的工作. 挑战之一是减少应用程序关键部分的对象创建. 尽管Java随着时间的推移已改进 ...

  5. java queue 线程安全_详解Java高并发——设计线程安全的类

    前言: 将现有的线程安全的组件组合为更大规模的组件或程序. 通过使用封装技术可以使得在不对整个程序进行分析的情况下就可以判断一个类是否是线程安全的. 一. 基本要素 1. 找出对象状态的所有变量 如果 ...

  6. java线程服务器_一台Java服务器跑多少个线程

    一台Java服务器能跑多少个线程?这个问题来自一次线上报警如下图,超过了我们的配置阈值. 图:京东自研UMP监控分析 打出jstack文件,通过IBM Thread and Monitor Dump ...

  7. java让线程空转_详解Java编程中对线程的中断处理

    1. 引言 当我们点击某个杀毒软件的取消按钮来停止查杀病毒时,当我们在控制台敲入quit命令以结束某个后台服务时--都需要通过一个线程去取消另一个线程正在执行的任务.Java没有提供一种安全直接的方法 ...

  8. java任务流程_死磕 java线程系列之线程池深入解析——普通任务执行流程

    (手机横屏看源码更方便) 注:java源码分析部分如无特殊说明均基于 java8 版本. 注:线程池源码部分如无特殊说明均指ThreadPoolExecutor类. 简介 前面我们一起学习了Java中 ...

  9. java 线程 暂停_如何实现Java线程的暂停和重新启用?

    展开全部 JAVA中线程开始有start方法,暂停用sleep(time)方法,线程停止用stop方法,线程等待wait方法,java 中没有线程重启一说,只能说线62616964757a686964 ...

  10. java 内存排序_详细解析Java内存,处理器重排序,编译器重排序以及它对线程的影响...

    欢迎大家搜索"小猴子的技术笔记"关注我的公众号,有问题可以及时和我交流. 我们在编写程序的时候有一个编写代码的顺序,那么计算机执行的时候就是按照我们编写代码的顺序来执行的吗?答案是 ...

最新文章

  1. 闲着无聊去体验远程面试,最后竟然被录取了...
  2. 基因测序3——三、四代测序技术来势汹汹,国产化仍在布局二代测序?
  3. 一份史上最全的深度学习资料,包括国内外顶尖学校课程以及顶会论文集
  4. void slove C语言什么意思,菜鸟求助-如何用指针法将一串字符按单词的倒序输出?如:i love yo...
  5. Core Data(4)- 使用绑定
  6. oracle错误号大全(查询ora错误号以及解决方法技巧)
  7. jQuery和MooTools的真正区别
  8. python爬取网易评论
  9. 技术胖TypeScript图文视频教程
  10. PTA 愿天下有情人都是失散多年的兄妹 (二叉树遍历)
  11. Android R 通知新特性—人与对话(气泡窗)
  12. 2017-2018 ACM-ICPC, Asia Daejeon Regional Contest H题
  13. ksoftirqd内核线程
  14. 七代处理器装win7_7代cpu能装win7旗舰版吗?七代处理器 不能装win7的解决方法
  15. 【机器学习算法实践】AdaBoost是典型的Boosting算法,加法模型多个弱分类器流水线式的提升精度,更关注那些难处理的数据
  16. 招聘Bev感知实习生
  17. MATLAB 的函数
  18. R语言——如何调用自己写的函数
  19. yolov3执行reval_voc_py3文件过程出现的错误记录及解决
  20. 智慧书——永恒的处世经典 (序)

热门文章

  1. JavaFX UI控件教程(五)之Radio Button
  2. 一文带你理解Java中Lock的实现原理
  3. MySQL month()函数
  4. 深入理解分布式系统的2PC和3PC
  5. 这 30 个常用的 Maven 命令你必须熟悉
  6. Photoshop基本操作
  7. 2015蓝桥杯省赛---java---B---3(三羊献瑞)
  8. python快捷方式图标_python – PyInstaller无法更改快捷方式图标
  9. javaweb调用python算法_请教怎么用java远程调用python? 多谢
  10. oracle中join另一个表后会查询不出一些数据_面试必备 | 8个Hive数据仓工具面试题锦集!...