1.Fork-Join

1.1 分而治之的设计思想

将一个难以直接解决的大问题,分割成一些规模较小的相同问题,以便各个击破,分而治之。

分治策略:

对于一个规模为n的问题,若该问题可以解决,则直接解决,否则将其分解为K个模块较小的子问题,这些子问题相互独立且与原问题形式相同(子问题相互之间有管理就变成了动态规范算法),递归的解决浙西子问题,然后将子问题的解 合并得到原问题的解。

1.2 Fork/Join框架:

工作密取:

在“生产者—消费者”模式中,生产者和消费者共享一个队列,而在工作密取的情境中,每个消费者都有一个双端队列,在消费者完成了自己队列中的工作时,可以去其他消费者队列的队尾取来工作,而并不会干扰其他消费者的工作。在工作密取情境中,消费者从自己队列的队头取自己的工作,从其他消费者的队尾取别人的工作来完成。

工作密取非常适合于消费者同时也是生产者的情形,当消费者执行工作时发现有更多的工作要做,则可以将这些工作放到自己队列的末尾,也可以送到其他消费者队列的队尾;当自己队列没有工作要做时,可以去其他消费者队列取工作来完成,这样每个消费者都会保持忙碌的状态。

ForkJoinPool中维护着多个线程(一般为CPU核数)在不断地执行Task,每个线程除了执行自己职务内的Task之外,还会根据自己工作线程的闲置情况去获取其他繁忙的工作线程的Task,如此一来就能能够减少线程阻塞或是闲置的时间,提高CPU利用率。

1.3 Fork/Join使用的标准范式

ForkJoinPool pool = new ForkJoinPool();MyTask myTask = new MyTask();pool.invoke(myTask);result = myTask.join();

=================================================

//invoke方法:
public <T> T invoke(ForkJoinTask<T> task) {    if (task == null)        throw new NullPointerException();    externalPush(task);    return task.join();}

MyTask为我们自己的任务 可继承自

其中常用的有:

1)RecursiveAction ,用于没有返回结果的任务

2)RecursiveTask<V> , 用于有返回结果的任务

Task要通过ForkJoinPool来执行,使用submit 或 invoke 提交,两者的区别是:invoke是同步执行,调用之后需要等待任务完成,才能执行后面的代码;submit(有返回值) ,execute(无返回值) 是异步执行。

Task提交后完成后 可以通过task的join()和 get()方法获得返回结果

在我们自己实现的compute方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用invokeAll方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。

get():等待任务执行完成,并返回计算结果

如果当前线程是ForkJoinWorkerThread,调用doJoin方法获取结果

如果当前线程不是ForkerJoinWorkerThread,调用externalInterruptibleAwaitDone方法。

任务执行完成返回后,如果任务完成状态是CANCELLED,抛出CancellationException异常。如果任务完成状态是EXCEPTIONAL,将任务执行过程中抛出的异常包装成ExecutionExcepiton重新抛出。

ForkJoinTask的几种任务状态
volatile int status; // accessed directly by pool and workersstatic final int DONE_MASK   = 0xf0000000;  // mask out non-completion bits  static final int NORMAL      = 0xf0000000;  // must be negative //已完成static final int CANCELLED   = 0xc0000000;  // must be < NORMAL //被取消static final int EXCEPTIONAL = 0x80000000;  // must be < CANCELLED //出现异常static final int SIGNAL      = 0x00010000;  // must be >= 1 << 16 //信号static final int SMASK       = 0x0000ffff;  // short bits for tags
/*** Waits if necessary for the computation to complete, and then* retrieves its result.** @return the computed result* @throws CancellationException if the computation was cancelled* @throws ExecutionException if the computation threw an* exception* @throws InterruptedException if the current thread is not a* member of a ForkJoinPool and was interrupted while waiting*/public final V get() throws InterruptedException, ExecutionException {int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?doJoin() : externalInterruptibleAwaitDone();Throwable ex;if ((s &= DONE_MASK) == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)throw new ExecutionException(ex);return getRawResult();}
/** * Implementation for join, get, quietlyJoin. Directly handles * only cases of already-completed, external wait, and * unfork+exec.  Others are relayed to ForkJoinPool.awaitJoin. * * @return status upon completion */private int doJoin() {    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;    return (s = status) < 0 ? s :        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?        (w = (wt = (ForkJoinWorkerThread)t).workQueue).        tryUnpush(this) && (s = doExec()) < 0 ? s :        wt.pool.awaitJoin(w, this, 0L) :        externalAwaitDone();}
 
/** * Implementation for invoke, quietlyInvoke. * * @return status upon completion */private int doInvoke() {    int s; Thread t; ForkJoinWorkerThread wt;    return (s = doExec()) < 0 ? s :        ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?        (wt = (ForkJoinWorkerThread)t).pool.        awaitJoin(wt.workQueue, this, 0L) :        externalAwaitDone();}

join():阻塞当前线程并等待获取结果,得到结果正常则返回值,得到结果异常 则抛出异常!

/*** Returns the result of the computation when it {@link #isDone is* done}.  This method differs from {@link #get()} in that* abnormal completion results in {@code RuntimeException} or* {@code Error}, not {@code ExecutionException}, and that* interrupts of the calling thread do <em>not</em> cause the* method to abruptly return by throwing {@code* InterruptedException}.** @return the computed result*/public final V join() {int s;if ((s = doJoin() & DONE_MASK) != NORMAL)reportException(s);return getRawResult();}

pool.invoke() 本身也是调用了task.join() 可以返回结果

task.invok() 与task.join()类似 只不过调用的是doInvoke() 而非doJoin()

public class UseForkJoin {public static final int MAX = 100;private static class SumTask extends RecursiveTask<Integer> {/*** 自定义的任务大小*/private int perSize = MAX / 10;/*** 起始数*/private int fromIndex;/*** 结尾数*/private int toIndex;public SumTask(int fromIndex, int toIndex) {this.fromIndex = fromIndex;this.toIndex = toIndex;}@Overrideprotected Integer compute() {if (toIndex - fromIndex < perSize) {int count = 0;for (int i = fromIndex; i <= toIndex; i++) {count = count + i;}return count;} else {int mid = (fromIndex + toIndex) / 2;SumTask left = new SumTask(fromIndex, mid);SumTask right = new SumTask(mid + 1, toIndex);invokeAll(left, right);//此处invokeAll会不断的调用执行compute方法 直到满足toIndex-fromIndex<perSize (递归)return left.join() + right.join();}}}public static void main(String[] args) {//范式ForkJoinPool pool = new ForkJoinPool();SumTask task = new SumTask(0, 100);
//        Integer invoke = pool.invoke(task);
        pool.invoke(task);Integer join = task.join();System.out.println(join);}
}

2.CountDownLatch

CountDownLatch这个类能够使一个线程等待其他线程完成各自的工作后再执行。如主线程在其他初始化线程启动完成后再执行。 闭锁:闭锁是一种同步工具,可以延迟线程直到其达到其终止状态。

CountDowmLatch是通过一个计数器来实现的,计数器的初始值为初始任务量。每当完成一个任务后,计数器的值就会减1(countDown()方法).,当计数器的值达到0时,表示所有任务已完成 然后 闭锁上 等待 await() 方法的线程就可以恢复执行任务。

public class UserCountDownLatch {static CountDownLatch latch = new CountDownLatch(6);private static class CountThread extends Thread {@Overridepublic void run() {System.out.println("CountThread running...");SleepTools.second(1);System.out.println("CountThread end");latch.countDown();}}private static class BusThread extends Thread {@Overridepublic void run() {try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}for (int i = 0; i < 4; i++) {System.out.println("BusThread" + Thread.currentThread().getId()+ " do business-----");}}}private static class afterThread implements Runnable {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " init something...");latch.countDown();System.out.println("init complete!");System.out.println("after init do something...");SleepTools.second(2);System.out.println("after end...");latch.countDown();}}public static void main(String[] args) {new BusThread().start();for (int i = 0; i < 3; i++) {new CountThread().start();}new Thread(new afterThread()).start();System.out.println("main is running...");SleepTools.second(3);System.out.println("main end!");latch.countDown();}
}

 3.CyclicBarrier

栅栏类似于闭锁,它能阻塞一组线程直到某个事件的发生。栅栏与闭锁的关键区别在于,所有的线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

public class UseCyclicBarrier {//    static CyclicBarrier barrier = new CyclicBarrier(4);static CyclicBarrier barrier = new CyclicBarrier(4, new CollectThread());private static ConcurrentHashMap<String, Long> resultMap = new ConcurrentHashMap<>();/*** 汇总线程(barrierAction) 当屏障都执行(触发)完成后执行该线程任务* the command to execute when the barrier is tripped*/private static class CollectThread implements Runnable {@Overridepublic void run() {StringBuffer result = new StringBuffer();for (Map.Entry<String, Long> workResult : resultMap.entrySet()) {result.append(workResult.getValue() + ",");}System.out.println(" the result = " + result);System.out.println("do other thing........");}}/*** 相互等待的线程,所有的线程调用await()以后才一同执行之后的业务逻辑* barrier可重复调用,在次调用 完成 会再次触发 CollectThread*/private static class SubThread implements Runnable {@Overridepublic void run() {long id = Thread.currentThread().getId();resultMap.put(Thread.currentThread().getId() + "", id);try {SleepTools.ms(1000);System.out.println("Thread " + id + "is running...");barrier.await();System.out.println("Thread " + id + "is end...");//barrier可重复调用,在次调用 完成 会再次触发 CollectThread
                //barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}public static void main(String[] args) {/*** 主线程中并没有调用CollectThread*/for (int i = 0; i < 4; i++) {Thread thread = new Thread(new SubThread());thread.start();}}
}

4.Semaphore 信号量

Semaphore类是一个计数信号量,必须由获取它的线程释放, 通常用于限制可以访问某些资源(物理或逻辑的)线程数目。

一个信号量有且仅有3种操作,且它们全部是原子的:初始化、增加和减少 
增加可以为一个进程解除阻塞-> acquire()
减少可以让一个进程进入阻塞-> release()

public class UseSemaphore2 {static Semaphore semaphore = new Semaphore(10);/*** 存放资源的容器*/private static LinkedList<Integer> list = new LinkedList<>();/*** 释放资源*/public void release() {synchronized (list) {list.addLast(new Random().nextInt(100));}/*** 将许可证放回,释放资源  通知没有拿到资源的一方执行*/semaphore.release();System.out.println(Thread.currentThread().getId() + " 释放了资源。。。" + semaphore.availablePermits());}/*** 获取资源*/public void fetch() {/*** acquire()拿不到许可证 时会等待,拿到了继续执行* 获取到许可证后 许可证总量减少*/try {semaphore.acquire();SleepTools.second(1);} catch (InterruptedException e) {e.printStackTrace();}synchronized (list) {if (list.size() > 0) {list.removeFirst();}}System.out.println(Thread.currentThread().getId() + " 获取资源。。。" + semaphore.availablePermits());}private static class BusThread implements Runnable {private UseSemaphore2 semaphore;public BusThread(UseSemaphore2 semaphore) {this.semaphore = semaphore;}@Overridepublic void run() {semaphore.fetch();semaphore.release();}}public static void main(String[] args) {UseSemaphore2 semaphore = new UseSemaphore2();for (int i = 0; i < 50; i++) {new Thread(new BusThread(semaphore)).start();}}}

 5.Exchanger 

可用于两个线程之间交换信息。可简单地将Exchanger对象理解为一个包含两个格子的容器,通过exchanger方法可以向两个格子中填充信息。当两个格子中的均被填充时,该对象会自动将两个格子的信息交换,然后返回给线程,从而实现两个线程的信息交换。

public class UseExchange {private static final Exchanger<Set<String>> exchange = new Exchanger<Set<String>>();public static void main(String[] args) {new Thread(new Runnable() {@Overridepublic void run() {Set<String> setA = new HashSet<String>();//存放数据的容器try {setA.add("A1");setA.add("A2");setA.add("A3");setA = exchange.exchange(setA);//交换set/*处理交换后的数据*/Iterator<String> iterator = setA.iterator();while (iterator.hasNext()) {System.out.println("after exchange setA:" + iterator.next());}} catch (InterruptedException e) {}}}).start();new Thread(new Runnable() {@Overridepublic void run() {Set<String> setB = new HashSet<String>();//存放数据的容器try {setB.add("B1");setB.add("B2");setB.add("B3");setB.add("B4");setB = exchange.exchange(setB);//交换set/*处理交换后的数据*/Iterator<String> iterator = setB.iterator();while (iterator.hasNext()) {System.out.println("after exchange setB:" + iterator.next());}} catch (InterruptedException e) {}}}).start();}
}

5.Callable,Future,FutureTask

前面说过 启动一个线程有两种方式, 继承自Thread 或是 实现Runnable,然后交给Thread运行,但是这两种方式启动线程都没有返回结果。

Future :在 java.util.concurrent包中提供了Future 对于具体的Runnable或者Callable任务的执行结果进行取消,查询是否完成,获取结果。必要时通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Callable : 同样位于java.util.concurrent包下,是一个泛型接口,通过 call()方法能够返回泛型结果 或是抛出异常。和Runnable有相似之处,但Runnable并不能返回结果 或是抛出异常。

/*** A task that returns a result and may throw an exception.* Implementors define a single method with no arguments called* {@code call}.** <p>The {@code Callable} interface is similar to {@link* java.lang.Runnable}, in that both are designed for classes whose* instances are potentially executed by another thread.  A* {@code Runnable}, however, does not return a result and cannot* throw a checked exception.** <p>The {@link Executors} class contains utility methods to* convert from other common forms to {@code Callable} classes.** @see Executor* @since 1.5* @author Doug Lea* @param <V> the result type of method {@code call}*/
@FunctionalInterface
public interface Callable<V> {/*** Computes a result, or throws an exception if unable to do so.** @return computed result* @throws Exception if unable to compute a result*/V call() throws Exception;
}

FutureTask:由于FutureTask仅仅是一个接口,无法直接用来创建对象因此有了FutureTask

/*** A {@link Future} that is {@link Runnable}. Successful execution of* the {@code run} method causes completion of the {@code Future}* and allows access to its results.* @see FutureTask* @see Executor* @since 1.6* @author Doug Lea* @param <V> The result type returned by this Future's {@code get} method*/
public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();
}

由于FutureTask实现了RunnableFuture ,RunnableFuture 同时继承了Runnbale和Future

所以我们需要通过FutureTask把一个Callable包装成Runnable,然后再通过这个FutureTask拿到Callable运行后的返回值。

public class UserCallable {

    private static class CallThread implements Callable<Integer> {        @Override        public Integer call() throws Exception {            int result = 0;            for (int i = 0; i <= 100; i++) {                result += i;            }            System.out.println("Call result:" + result);            return result;        }    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {        FutureTask<Integer> task = new FutureTask<Integer>(new CallThread());        new Thread(task).start();        Integer integer = task.get();        System.out.println("main result:" + integer);    }}

参考:http://enjoy.ke.qq.com

转载于:https://www.cnblogs.com/cangshublogs/p/10752002.html

并发编程(二)线程并发工具类相关推荐

  1. java并发编程中常用的工具类 Executor

    /***************************************************  * TODO: description .  * @author: gao_chun  * ...

  2. Java并发编程(二十三)------并发设计模式之生产者消费者模式

    参考文章:Java实现生产者消费者问题与读者写者问题 目录 1. 生产者消费者问题 1.1 wait() / notify()方法 1.2 await() / signal()方法 1.2.1 对sy ...

  3. 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )

    文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...

  4. 《转载》Python并发编程之线程池/进程池--concurrent.futures模块

    本文转载自 Python并发编程之线程池/进程池--concurrent.futures模块 一.关于concurrent.futures模块 Python标准库为我们提供了threading和mul ...

  5. Java高并发编程:线程池

    这里首先介绍了java5中的并发的小工具包:java.util.concurrent.atomic,然后介绍了线程池的概念,对使用java5的方式创建不同形式的线程进行了演示,之后介绍了两个 对象:C ...

  6. 并发编程-05线程安全性之原子性【锁之synchronized】

    文章目录 线程安全性文章索引 脑图 概述 原子性synchronized 修饰的4种对象 修饰代码块 作用范围及作用对象 Demo 多线程下 同一对象的调用 多线程下不同对象的调用 修饰方法 作用范围 ...

  7. 19、Java并发编程:线程间协作的两种方式:wait、notify、notifyAll和Condition

    Java并发编程:线程间协作的两种方式:wait.notify.notifyAll和Condition 在前面我们将了很多关于同步的问题,然而在现实中,需要线程之间的协作.比如说最经典的生产者-消费者 ...

  8. [转]Java并发编程:线程池的使用

    Java并发编程:线程池的使用 在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了, ...

  9. 【并发编程二十】协程(coroutine)_协程库

    [并发编程二十]协程(coroutine) 一.线程的缺点 二.协程 三.优点 四.个人理解 五.协程库 1.window系统 2.unix系统(包括linux的各个版本) 2.1.makeconte ...

  10. Java并发编程:线程池的使用

    在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统 ...

最新文章

  1. 雷军的100亿计划:不服就干,生死看淡
  2. antd 能自适应吗_ACC自适应巡航能当自动驾驶用吗?答:不能
  3. 循环神经网络(RNN)相关知识
  4. 开源xen对比_女实习生在Xen Project上摇摆开源
  5. 企业应该如何运用商业智能
  6. 20 万台 QQ 服务器全面上云!
  7. AspSpider再次开放asp.net2.0 免费空间注册
  8. ADS仿真遇到error如何查找原因
  9. 计算机怎么制作个人简历表步骤图片,教你如何制作个人简历表格!ppt课件
  10. Github系列教程一 ————开门
  11. logo设计软件哪个好用?小白也能学会的logo设计小技巧
  12. 经典再现,看到就是赚到。尚硅谷雷神 - SpringBoot 2.x 学习笔记 - 核心功能篇
  13. 解决图片放在src上面加载不出来,但是放在浏览器地址可以访问
  14. ASP.NET Core 中文文档 第四章 MVC(3.7 )局部视图(partial)
  15. DNS服务器未响应惊叹号,怎么办呢电脑连接网络出现感叹号?
  16. jenkins 配置代理
  17. LDO + 稳压管的扩压电路试用笔记
  18. 这么优秀的Excel工具类,你难道不用?
  19. Java数据类型之Java数据类型的划分方式
  20. 盥洗台的全球与中国市场2022-2028年:技术、参与者、趋势、市场规模及占有率研究报告

热门文章

  1. 惊喜不止小米9!小米又一4800万新机确认:不给友商机会?
  2. 炮轰小米后柔宇科技再发长文声明:无意碰瓷炒作
  3. Java并发编程之Semaphore信号量
  4. python json.dumps 中的ensure_ascii 参数引起的中文编码问题
  5. 创科视觉软件说明书_【拓斯达 | GGII】20192023年中国机器视觉行业调研
  6. linux中进程unit是什么意思,Linux系统之进程及服务的控制
  7. Golang实践录:简单的代码片段
  8. centos6.5卸载java,CentOS 5.2卸载自带Java1.4.2 安装JDK6
  9. 是哪个app_互联网APP创业哪个好
  10. macsfancontrol 设置方法_重庆市材料好的消防应急筒灯使用方法