21 并发

21.1 并发的多面性

  • 操作系统通常会将进程互相隔离开,使得进程之前相互不干涉。但是操作系统对于进程通常会有数量和开销的限制,导致进程不能无限创建。
  • Java中线程调度采用抢占式,表示调度机制会周期性的中断线程,切换上下文至另一线程,使得每个线程均有机会分到合理的时间片用以执行。

21.2 基本的线程机制

  • 任务由线程驱动,Java中定义任务均需implements Runnable接口,实现run()方法;而要运行此任务,则必须将已定义的任务附着到线程上,即:
        new Thread(new Runnable() {@Overridepublic void run() {//run something}}).start();
  • main中创建线程时,与创建其他对象有一定区别:没有捕获任何对线程对象的引用。 但实际上每个Thread对象都“注册”了他自己,因此确实有一个对此Thread对象的引用,并且在Thread对象包含的任务在run()退出之前,垃圾回收器无法清除此引用。
  • Java线程执行器: Executor 允许管理异步任务的执行,而无须显示的管理线程的生命周期,主要分类为以下几个:
        // 一次性预先执行代价高昂的线程分配,后续不再分配已不再回收,可限定线程的数量ExecutorService service1 = Executors.newFixedThreadPool(5);// 程序执行过程中创建和所需数量相同的线程,在回收旧线程时[默认60s无任务运行时中断且回收旧线程]停止创建新线程,线程可动态调整。ExecutorService service2 = Executors.newCachedThreadPool();// 仅创建一个线程,当提交多个任务时,则任务采用排队方法ExecutorService service3 = Executors.newSingleThreadExecutor();// 开启定时任务,周期性执行确定任务ExecutorService service4 = Executors.newScheduledThreadPool(4);// Timer类的定时任务,相隔1000ms输出一次Hello Wordnew Timer().schedule(new TimerTask() {@Overridepublic void run() {System.out.println("Hello World...");}}, 0, 1000);// 执行任务,无对应返回值service1.execute(new Runnable() {@Overridepublic void run() {// do someThing}});// 执行任务,有对应返回值,其中Future可认为是对应线程句柄,强制中断某个线程可使用ret.setCancel(true)。Future<String> ret = service1.submit(new Callable<String>() {@Overridepublic String call() throws Exception {// do someThingreturn "";}});// 获得返回值,get方法将阻塞主线程直至获取最终结果ret.get();// 强制中断此线程ret.setCancel(true);// 拒绝新任务再次提交,但是会继续运行之前提交的任务直至完成,在继续运行之前提交的任务时主线程继续运行service1.shutdown();// 拒绝新任务再次提交,且中断停止所有正在执行的任务,在中断之前提交的线程任务时主线程继续运行service1.shutdownNow();// 主线程阻塞100s或者之前提交的任务运行完毕后,主线程继续往下运行service1.awaitTermination(100, TimeUnit.SECONDS);
  • 线程休眠:可以调用Thread.sleep(100)或者TimeUnit.MICROSECONDS.sleep(100),以上调用均会抛出异常。但是任务中抛出的异常仅能在本任务中处理,而不能扩线程传播至主线程中
  • 线程异常处理:线程中如若出现异常,此异常一旦逃出线程的run()后,则不能在主线程中捕获,而只能向外传播至控制台,除非采用特殊步骤捕获此种异常(“Thread.UncaughtExceptionHandler类”)。如下所示:
public class TestUncaughtExceptionHandler {public static void main(String[] args) throws InterruptedException {ExecutorService executorService = Executors.newCachedThreadPool(new MyThreadFactory());executorService.execute(new MyRunable());// 线程关闭,不再接受新任务executorService.shutdown();// 等待执行器executorService线程池中的线程执行完毕后,再执行main线程,设置1000s超时时间executorService.awaitTermination(1000, TimeUnit.SECONDS);System.out.println("Task has completed.");}
}class MyRunable implements Runnable {@Overridepublic void run() {Thread t = Thread.currentThread();System.out.println("Thread name: " + t.getName() + " has begin.");System.out.println("unCaughtExceptionHandler is: " + t.getUncaughtExceptionHandler());throw new RuntimeException();}
}class MyThreadFactory implements ThreadFactory {private int index = 0;@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setName("asr-thread-" + index);t.setPriority(Thread.NORM_PRIORITY);// 设置线程的UncaughtExceptionHandler,用以处理线程run方法抛出异常,若不处理,则异常直接向外传播至控制台t.setUncaughtExceptionHandler((t1, e) -> System.out.println("Thread name: " + t1.getName() + " has exception: " + e.getMessage()));++index;return t;}
}
  • 线程优先级:在Java中,所有线程如若不指定,均采用默认优先级执行,而试图操作线程的优先级往往不科学,编码中尽量不使用setPriority()方法操作某个线程的优先级。
  • 线程让步:Java支持线程让步操作(亦即让出CPU供其他线程使用),而采用让步操作(Thread.yield())时,仅是建议CPU执行其他相同优先级的线程,注意仅是建议,而具体是否执行由CPU调用决定
  • 后台线程:指提供给后台的一种通用服务的线程。当所有的非后台线程执行完毕时,程序即终止,同时也会立即杀死所有的后台线程(亦即调用了interrupted()方法,即使后台线程中将要执行finally子句时也会立即杀死,不再执行finally子句);并且若一个线程为后台线程,那么此线程创建的任何线程均自动设置为后台线程。设置后台线程方法如下:
public class TestDeamonThread {public static void main(String[] args) throws InterruptedException {// ExecutorService采用DaemonThreadFactory,若不设置,默认采用ThreadFactoryExecutorService executorService = Executors.newCachedThreadPool(new DaemonThreadFactory());System.out.println("All daaemon thread started...");for (int i = 0; i < 10; i++) {executorService.execute(new DaemonFromFactory());}TimeUnit.SECONDS.sleep(10);}
}class DaemonFromFactory implements Runnable {@Overridepublic void run() {try {while (true) {TimeUnit.SECONDS.sleep(1);System.out.println("Thread name: " + Thread.currentThread().getName() + " has executed.");}} catch (InterruptedException e) {System.out.print("Thread name: " + Thread.currentThread().getName() + " is interrupted.");}}
}// 工厂方法,implements ThreadFactory
class DaemonThreadFactory implements ThreadFactory {private int index = 0;@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);// 设置线程为后台线程,默认为falset.setDaemon(true);t.setName("asr-thread-" + index);++index;return t;}
}

21.3 线程的同步

线程同步指多线程之间因共享资源或者依赖关系等,存在着其中一个线程等待另一个线程执行完毕后,再继续执行。

21.3.1 Join操作

Join操作目的在于线程A等待另一线程B执行完毕后,再执行线程B对应的方法。例如:Joiner线程中调用Sleeper.join()方法,那么则待 Sleeper线程执行完毕后,Joiner线程再执行。

Join方法执行过程中,可通过Sleeper.interrupt()方法打断(Sleeper线程中需处理异常),直接执行Joiner线程。

public class TestJoinOps {public static void main(String[] args) {Sleeper sleeper1 = new Sleeper("sleep1", 2);Sleeper sleeper2 = new Sleeper("sleep2", 2);Joiner joiner1 = new Joiner("joiner1", sleeper1);Joiner joiner2 = new Joiner("joiner2", sleeper2);// sleeper2调用interrupt()方法,打断sleeper2.join操作,catch sleeper2异常,授限制性joiner2操作sleeper2.interrupt();}
}class Sleeper extends Thread {private int duration;private String name;public Sleeper(String name, int duration) {super(name);this.duration = duration;start();}@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(duration);} catch (InterruptedException e) {System.out.println("Thread name: " + getName() + " is interrupted.");return;}System.out.println("Thread name: " + getName() + " is waked.");}
}class Joiner extends Thread {private Sleeper sleeper;private String name;public Joiner(String name, Sleeper sleeper) {super(name);this.sleeper = sleeper;start();}@Overridepublic void run() {try {// Joiner线程中调用sleeper.join操作,故而sleeper线程优先执行sleeper.join();} catch (InterruptedException e) {System.out.println("Thread name: " + getName() + " is interrupted.");}System.out.println("Thread name: " + getName() + " join completed.");}
}

21.3.2 锁操作

synchronized关键字:

  1. Java中每个对象都自动含有单一的锁。当任务调用synchronized标识的方法时,那么此任务将自动获得本对象的锁(对某个对象来说,所有synchronized关键字标识的方法将共享同一个锁),在调用上一个synchronized方法完成后(任务自动释放对象锁),其他任务或者下一个synchronized方法才能被调用。
  2. 一个任务可以多次获得某个对象的锁(如任务A调用对象B的synchronized方法B1,而方法B1又调用了对象B的synchronized方法B2,那么任务A就获得对象B的锁2次),JVM会跟踪对象被加锁的次数,随着完成一个synchronized方法,计数减1,直至0时,对象锁被完全释放。
  3. 同样的针对于每一个类,也有一个类锁(作为类Class对象的一部分),采用synchronized static关键字可以在类的范围内防止对static数据的并发访问。

LOCK关键字:

  1. Lock对象必须被显示创建、锁定和释放
  2. 相对于synchronized关键字,此Lock对象更加灵活(若synchronized关键字标识的方法中某些事务中途失败了,但是可能导致对象的状态已改变。而这种情况,Lock对象可以在finally子句中将对象复原)。
  3. ReentrantLock锁允许获取锁,若如获取失败不用等待可以继续做其他事情
public class TestLockOps {public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newFixedThreadPool(10, new ThreadFactory() {private int index = 0;@Overridepublic Thread newThread(Runnable r) {Thread thread = new Thread(r);thread.setName("asr-thread-" + index);++index;return thread;}});System.out.println("Threads begain...");IntGenerator intGenerator = new IntGenerator();for (int j = 0; j < 10; j++) {service.execute(new IntConsumer(intGenerator));}service.shutdown();service.awaitTermination(1000, TimeUnit.SECONDS);}
}class IntConsumer implements Runnable {private IntGenerator generator;public IntConsumer(IntGenerator generator) {this.generator = generator;}@Overridepublic void run() {for (int i = 0; i < 10; i++) {int index = generator.next();System.out.println("Thread name: " + Thread.currentThread().getName() + ", and index is: " + index);}}
}class IntGenerator {private final Lock lock = new ReentrantLock();private int index;public int next() {// 此lock.lock代码段用以锁住此对象(此对象是共享变量)lock.lock();try {// 睡眠1s,防止线程无需调用导致index无序输出TimeUnit.SECONDS.sleep(1);++index;return index;} catch (InterruptedException e) {e.printStackTrace();} finally {// 必须放在finally子句中,且return必须放在try中,确保return之前unlock不会过早发生// 若不释放,则只会有一个线程获得锁,只有一个线程正常往下走lock.unlock();}return 0;}
}

信号量机制:

  • 与通常的Lock或者synchronized关键字不同(保证任一时刻只允许一个线程/任务访问一项资源);信号量机制(Semaphore)则允许n个任务可同时访问这个资源。
public class TestSemaphore {public static void main(String[] args) throws InterruptedException {int size = 10;// Pool中定义了信号量及计数值,保证同一时刻只有计数值个线程/任务访问Pool<Fat> pool = new Pool<Fat>(Fat.class, size);ExecutorService service = Executors.newCachedThreadPool();for (int i = 0; i < size; i++) {service.execute(new SemaphoreTask<Fat>(pool));}TimeUnit.SECONDS.sleep(1);List<Fat> list = new ArrayList<>(size);for (int i = 0; i < size; i++) {Fat fat = pool.checkOut();System.out.println("Main thread checkout task:" + fat);}}
}/*** 信号量任务* * @param <T>*/
class SemaphoreTask<T> implements Runnable {private final Pool<T> pool;public SemaphoreTask(Pool<T> pool) {this.pool = pool;}@Overridepublic void run() {try {// 获得一个元素T item = pool.checkOut();System.out.println("SemaphoreTask checkout task: " + item);TimeUnit.SECONDS.sleep(1);// 释放此元素pool.checkIn(item);System.out.println("SemaphoreTask checkin task: " + item);} catch (InterruptedException e) {e.printStackTrace();}}
}class Pool<T> {private int size;private Semaphore semaphore;private List<T> items;private boolean[] usable;public Pool(Class<T> tClass, int size) {this.size = size;// 初始化信号量,采用先进先出公平分配方式this.semaphore = new Semaphore(size, true);this.items = new ArrayList<>(size);this.usable = new boolean[size];try {for (int i = 0; i < size; i++) {items.add(tClass.newInstance());}} catch (InstantiationException | IllegalAccessException e) {e.printStackTrace();}}// 依据信号量获得元素public T checkOut() {// 获得信号量try {semaphore.acquire();return getItem();} catch (InterruptedException e) {e.printStackTrace();}return null;}// 回收元素至items列表public void checkIn(T t) {// 释放此信号量semaphore.release();releaseItem(t);}// 获得元素public synchronized T getItem() {for (int i = 0; i < this.size; i++) {if (!usable[i]) {return items.get(i);}}return null;}// 释放元素public synchronized boolean releaseItem(T t) {int index = items.indexOf(t);if (index == -1) {System.out.println("item t not existed in items...");return false;}usable[index] = true;return true;}
}class Fat {private volatile double b;private static int count = 0;private final int id = count++;public Fat() {for (int i = 0; i < 1000; i++) {b += (Math.PI + Math.E) / (double) i;}}@Overridepublic String toString() {return "Fat{" + "b=" + b + ", id=" + id + "}";}
}

21.3.3 原子操作

原子操作:原子操作指不可再分的操作,要么成功要么失败,中间不存在其他状态。在Java中:基本数据类型变量(除long和double,double和long类型长度均为64bit,JVM可将此种类型的读取和写入当做两个分离的32bit,但由于目前各种平台下的商用虚拟机几乎都选择把64位数据的读写操作作为原子操作来对待,因此在编写代码时一般也不需要将用到的long和double变量专门声明为volatile)、引用类型变量声明为volatile的任何类型变量的访问均是具备原子性。

volatile关键字:

  1. volatile关键字定义的域,更新时会立即刷入主存,同样此域也必须从主存中读取(而非volatile关键字定义的域更新无需立即刷入主存,可在寄存器/高速缓存中刷新使用,最后再刷入主存)。
  2. 同步代码块或者方法防护的域: a, 在对一个变量执行lock操作,将会清空寄存器/高速缓存中此变量的值,在执行引擎使用这个变量前需要重新执行load(从主存)或assign操作初始化变量的值。b,对一个变量执行unlock操作之前,必须先把此变量同步回主内存中(执行store和write操作)。c, Java内存规范要求lock和unlock必须成对出现(详见wiki JAVA内存模型)。
  3. volatile关键字和同步代码块都要求读取从主存读取,volatile定义的域更新后的值必须立即写入主存;而同步代码块定义的域保证最多只有一个线程访问,在unlock操作前,必须将更新后的值写入主存
  4. 但当一个域的值依赖于此域上一个状态的值时,volatile关键字无法工作(因为像 i++/i-- 操作非原子操作,存在getfield操作和putfield操作等做个操作,导致多个线程访问时存在状态错乱)。如下述所示:
public class TestAtomicOps {public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newCachedThreadPool();NumberGenerate generate = new NumberGenerate();for (int i = 0; i < 10; i++) {service.execute(new AtomicTask(generate));}service.shutdown();service.awaitTermination(10000, TimeUnit.SECONDS);}
}class AtomicTask implements Runnable {private NumberGenerate generate;public AtomicTask(NumberGenerate generate) {this.generate = generate;}@Overridepublic void run() {for (int i = 0; i < 100000; i++) {System.out.println("Thread name: " + Thread.currentThread().getName() + ", and next number is: " + generate.nextNumber());}}
}class NumberGenerate {// 当volatile修饰的值状态更新时不依赖于上个状态(是原子操作时),多个线程无需显示指定同步关键字private volatile int number;// synchronized关键字修饰方法,若不存在导致多线程更新number值时错乱;若存在此关键字,即使无volatile修饰,也不会乱public synchronized int nextNumber() {return ++number;}
}

原子类:Java中也提供了AtomicInteger, AutomicLong等方法,用于保证对于int类型和long类型的所有操作均保证原子化,包含自增和自减操作。

21.3.4 线程协作

线程协作通常是指多个线程为完成某个具体任务而合作起来。
wait()和notify()/notifyAll():

  1. 在一个已获取锁的方法中,调用sleep()方法和yield()方法后,并不释放锁。
  2. 在一个已获取锁的方法中,调用wait()方法可以将锁释放,执行线程将被挂起(也即意味着其他获取锁的方法可以不受影响的执行)。
  3. 调用wait()方法时,可设置等待一段时间后继续运行,亦可等待notify/notifyAll方法唤醒后运行。
  4. wait()方法、notify()/notifyAll()方法中会自动将锁释放,故而调用以上方法时必须在同步方法(synchronized标识方法)或者同步块(synchronized块和lock块)中执行
  5. notify()方法和notifyAll()方法区别,notify方法只会在众多等待同一个锁的任务中唤醒其中一个任务,而notifyAll方法则会唤醒所有等待同一个锁的任务。
public class TestWaitNotifyOps {public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newCachedThreadPool();// Car为共享资源Car car = new Car();service.execute(new Buffed(car));service.execute(new WaxOn(car));TimeUnit.MINUTES.sleep(1);service.shutdownNow();}
}class Car {private boolean waxOn = false;private int waitForWaxNum;private int waitForBuffNum;// 涂蜡操作public synchronized void waxed() {waxOn = true;notify();}// 抛光操作public synchronized void buffed() {waxOn = false;notifyAll();}// 等待涂蜡操作public synchronized void waitForWax() throws InterruptedException {// 必须为while循环操作while (!waxOn) {++waitForWaxNum;// System.out.println("waitForWaxNum:" + waitForWaxNum);wait();}}// 等待抛光操作public synchronized void waitForBuff() throws InterruptedException {while (waxOn) {++waitForBuffNum;System.out.println("waitForBuffNum:" + waitForBuffNum);wait();}}
}class WaxOn implements Runnable {private final Car car;public WaxOn(Car car) {this.car = car;}@Overridepublic void run() {try {// while循环,一直运行,当线程中断时退出while (!Thread.interrupted()) {System.out.println("waxOn! ");car.waxed();TimeUnit.SECONDS.sleep(2);car.waitForBuff();}} catch (InterruptedException e) {System.out.println("WaxOn Exception: " + e.getMessage());}}
}class Buffed implements Runnable {private final Car car;public Buffed(Car car) {this.car = car;}@Overridepublic void run() {try {// while循环,一直运行,当线程中断时退出while (!Thread.interrupted()) {System.out.println("Buffed! ");car.buffed();TimeUnit.SECONDS.sleep(1);car.waitForWax();}} catch (InterruptedException e) {System.out.println("Buffed Exception: " + e.getMessage());}}
}

同步队列:
同步队列在任一时刻都只允许一个任务插入或移除元素,生产者 -> 同步队列 -> 消费者;若队列为空时,消费者线程阻塞,队列为满时,生产者线程阻塞。

public class TestBlockQueue {public static void main(String[] args) throws InterruptedException {BlockingQueue<Toast> toastQueue = new LinkedBlockingQueue<>();BlockingQueue<Toast> buffQueue = new LinkedBlockingQueue<>();BlockingQueue<Toast> finishQueue = new LinkedBlockingQueue<>();ExecutorService service = Executors.newCachedThreadPool();service.execute(new Toaster(toastQueue));service.execute(new Butter(toastQueue, buffQueue));service.execute(new Jammer(buffQueue, finishQueue));service.execute(new Consumer(finishQueue));TimeUnit.MINUTES.sleep(2);service.shutdownNow();}
}class Toast {public enum Status {DRY,BUTTERED,JAMMED;}public Status status = Status.DRY;public int id;public Toast(int id) {this.id = id;}public void butter() {this.status = Status.BUTTERED;}public void jam() {this.status = Status.JAMMED;}public Status getStatus() {return status;}public int getId() {return id;}@Overridepublic String toString() {return "Toast{" + "status=" + status + ", id=" + id + '}';}
}/*** 制作吐司任务*/
class Toaster implements Runnable {// 制作完成的吐司加入BolockQueueprivate final BlockingQueue<Toast> toasterQueue;private int toastId;public Toaster(BlockingQueue<Toast> blockingQueue) {this.toasterQueue = blockingQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()) {TimeUnit.SECONDS.sleep(2);Toast toast = new Toast(++toastId);System.out.println("Toaster : create toast, id is " + toast.getId() + ", and status is: " + toast.getStatus());toasterQueue.add(toast);}} catch (InterruptedException e) {System.out.println("Toaster : " + "InterruptedException");}}
}/*** 涂黄油任务*/
class Butter implements Runnable {// 制作好的吐司private final BlockingQueue<Toast> toasterQueue;// 涂黄油完成的任务private final BlockingQueue<Toast> butterQueue;public Butter(BlockingQueue<Toast> toasterQueue, BlockingQueue<Toast> butterQueue) {this.toasterQueue = toasterQueue;this.butterQueue = butterQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()) {TimeUnit.SECONDS.sleep(1);// 从制作好的吐司列表中取出第一个吐司Toast toast = toasterQueue.take();toast.butter();System.out.println("Butter : toast has buttered, id is " + toast.getId() + ", and status is: " + toast.getStatus());// 涂好黄油的吐司插入黄油列表中butterQueue.add(toast);}} catch (InterruptedException e) {System.out.println("Butter : " + "InterruptedException");}}
}class Jammer implements Runnable {private final BlockingQueue<Toast> butterQueue;private final BlockingQueue<Toast> finishQueue;public Jammer(BlockingQueue<Toast> butterQueue, BlockingQueue<Toast> finishQueue) {this.butterQueue = butterQueue;this.finishQueue = finishQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()) {TimeUnit.SECONDS.sleep(1);// 从涂好黄油的吐司列表中取出头部吐司Toast toast = butterQueue.take();toast.jam();System.out.println("Jammer : toast has jamed, id is " + toast.getId() + ", and status is: " + toast.getStatus());// 涂好果酱的吐司插入完成列表finishQueue.add(toast);}} catch (InterruptedException e) {System.out.println("Jammer : " + "InterruptedException");}}
}class Consumer implements Runnable {private final BlockingQueue<Toast> finishQueue;private int consumerId;public Consumer(BlockingQueue<Toast> finishQueue) {this.finishQueue = finishQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()) {TimeUnit.SECONDS.sleep(1);Toast toast = finishQueue.take();++consumerId;System.out.println("Consumer : toast has consumed, id is " + toast.getId() + ", and status is: " + toast.getStatus());if (toast.getId() != consumerId) {System.out.println("Consumer : has error, id is illegal, id is:" + toast.getId()+ ", and consumerId is:" + consumerId);}}} catch (InterruptedException e) {System.out.println("Consumer : " + "InterruptedException");}}
}

延迟队列:

  • 延迟队列是一种无界队列,用以存放实现了Delayed接口的对象(任务的执行时间依赖于设置的延迟时间是否到期)。
  • 延迟队列中任务的执行不依赖于任务的入队顺序/入队时间,而依赖于任务设定的延迟时间,只有任务延期时间到期时,任务才能消费并执行。
public class TestDelayQueue {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();Random random = new Random();List<DelayTask> taskList = new ArrayList<>();DelayQueue<DelayTask> taskQueue = new DelayQueue<>();for (int i = 0; i < 10; i++) {DelayTask delayTask = new DelayTask("task" + i, random.nextInt(100));taskList.add(delayTask);taskQueue.add(delayTask);}printQueueInfo(taskQueue, taskList);service.execute(new DelayTaskConsumer("consumer", taskQueue));service.shutdown();}public static void printQueueInfo(DelayQueue<DelayTask> queue, List<DelayTask> list) {System.out.println("Print List of DelayTask");list.forEach(System.out::println);System.out.println("\n" + "Print DelayQueue of DelayTask");// DelayQueue转换为Array时仅能保证全部包含,但不保证有序(iterator方法亦然)DelayTask[] tasks = queue.toArray(new DelayTask[0]);Arrays.stream(tasks).forEach(System.out::println);}
}class DelayTaskConsumer implements Runnable {private final String name;private final BlockingQueue<DelayTask> tasksQueue;public DelayTaskConsumer(String name, BlockingQueue<DelayTask> queue) {this.name = name;this.tasksQueue = queue;}@Overridepublic void run() {System.out.println(this.name + " has exec delayedTasks...");System.out.println(this.name + " delayed task size:" + tasksQueue.size());try {while (!tasksQueue.isEmpty()) {tasksQueue.take().run();}System.out.println("Delayed tasks has all completed...");} catch (InterruptedException e) {e.printStackTrace();}}
}/*** 延时任务定义*/
class DelayTask implements Delayed {private final String name;private final int delayTimeMillSecond;private final long expiredTime;public DelayTask(String taskName, int delayTimeMs) {this.delayTimeMillSecond = delayTimeMs;this.name = taskName;expiredTime = System.nanoTime() + TimeUnit.NANOSECONDS.convert(this.delayTimeMillSecond, TimeUnit.MILLISECONDS);}/*** 此方法非常重要,待getDelay方法返回值为0时,Delayed任务才会出队列,其中System.nanoTime()方法会一直变动。* * @param unit 时间工具类* @return 延时时间*/@Overridepublic long getDelay(@NonNull TimeUnit unit) {return unit.convert(this.expiredTime - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(@NonNull Delayed o) {DelayTask that = (DelayTask) o;return this.delayTimeMillSecond - that.getDelayTimeMillSecond();}public void run() throws InterruptedException {System.out.println("Name: " + this.name + ", delayTime is:" + this.delayTimeMillSecond + " has begin...");TimeUnit.SECONDS.sleep(1);}public int getDelayTimeMillSecond() {return this.delayTimeMillSecond;}@Overridepublic String toString() {return "DelayTask{" + "name='" + name + '\'' + ", delayTimeMillSecond=" + delayTimeMillSecond + '}';}
}

优先级队列:

  • 优先级队列(PriorityBlockingQueue)与延迟队列相似,但不是按照任务设置的延迟时间,而是按照任务的优先级顺序执行。

输入/输出管道:
在同步队列引入之前,线程通信更多时候采用管道概念,同步与阻塞由管道保证(类似与BloakingQueue)。Java中对应为PipedWrite类和PipedReader类。

public class TestPipeIo {public static void main(String[] args) throws InterruptedException {// channel for write and readPipedWriter writer = new PipedWriter();Sender sender = new Sender(writer);Receiver receiver = new Receiver(writer);ExecutorService executorService = Executors.newFixedThreadPool(5);executorService.execute(sender);executorService.execute(receiver);TimeUnit.MINUTES.sleep(2);executorService.shutdownNow();}
}/*** 消息发送类*/
class Sender implements Runnable {// 定义一个管道写入类private PipedWriter writer;public Sender(PipedWriter writer) {this.writer = writer;}@Overridepublic void run() {try {for (char c = 'a'; c <= 'z'; c++) {writer.write(c);System.out.println("Sender: char: " + c);TimeUnit.SECONDS.sleep(2);}} catch (IOException | InterruptedException e) {System.out.println("Sender IOException or InterruptedException...");}}
}/*** 消息接收类*/
class Receiver implements Runnable {// 定义一个管道读入类PipedReader reader;// 创建管道读入类必须与管道写入类关联起来public Receiver(PipedWriter writer) {try {this.reader = new PipedReader(writer);} catch (IOException e) {System.out.println("Receiver: IOException...");}}@Overridepublic void run() {try {while (!Thread.interrupted()) {char ch = (char) reader.read();System.out.println("Receiver: char: " + ch);}} catch (IOException e) {System.out.println("Receiver: IOException...");}}
}

21.3.5 重要构件类

并发编程、多线程交互中,Java已提供了非常便利的实现类。

21.3.5.1 CountDown相关类

CountDownLatch类:

  • CountDownLatch类用于同步一个或者多个任务,目的在于让其他任务等待标记为CountDownLatch的任务执行结束。*
  • 可以向CountDownLatch对象设置一个初始计数值,任何在这个对象上调用wait()方法的任务均需要等待,直至此计数值为0.*
    使用方法如下:
public class TestCountDown {public static void main(String[] args) {ExecutorService service = Executors.newCachedThreadPool();// CountDownLatch公共变量CountDownLatch countDownLatch = new CountDownLatch(100);for (int i = 0; i < 10; i++) {service.execute(new TaskB(countDownLatch, i));}for (int j = 0; j < 100; j++) {service.execute(new TaskA(countDownLatch, j));}service.shutdown();}
}class TaskA implements Runnable {private int id = 0;private CountDownLatch downLatch;public TaskA(CountDownLatch latch, int id) {this.downLatch = latch;this.id = id;}@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(1);System.out.println("TaskA: " + id + " completed...");// CountDownLatch变量减1操作,当变量downLatch的初始值为0时,调用wait()方法的任务才能执行downLatch.countDown();} catch (InterruptedException e) {System.out.println("TaskA: " + id + " InterruptedException is:" + e.getMessage());}}
}class TaskB implements Runnable {private int id;private CountDownLatch downLatch;public TaskB(CountDownLatch latch, int id) {this.id = id;this.downLatch = latch;}@Overridepublic void run() {try {// CountDownLatch变量等待至0downLatch.await();System.out.println("TaskB: " + id + " has begin...");} catch (InterruptedException e) {System.out.println("TaskB: " + id + " InterruptedException is:" + e.getMessage());}}
}

CycliBarrier类:

  • CycliBarrier工具类与CountDownLatch类似,有一个类似于“栈栏”的概念,当多个线程同时进入“栈栏”时之后(因各个线程开始时间与执行时间不一致),多个单独的线程同时往下走。
public class TestCircleBarrier {public static void main(String[] args) throws ExecutionException, InterruptedException {int horseSize = 10;int finishLine = 75;ExecutorService service = Executors.newCachedThreadPool();List<Horse> horses = new ArrayList<>();for (int i = 0; i < horseSize; i++) {Horse horse = new Horse(i);horses.add(horse);}// horseSize+1代表HorseRace任务也作为栈栏任务,保证所有任务同时准备好之后,再执行下述定义任务CyclicBarrier barrier = new CyclicBarrier(horseSize + 1, () -> System.out.println("Horse Race begin..."));// 提交赛马任务for (Horse horse : horses) {horse.setBarrier(barrier);service.execute(horse);}// 提交赛马统计任务,用以统计胜利者new Thread(new HorseRace(horses, service, barrier, finishLine)).start();}
}class HorseRace implements Runnable {private final int finishLine;private final List<Horse> horses;private final ExecutorService service;private final CyclicBarrier barrier;HorseRace(List<Horse> horses, ExecutorService service, CyclicBarrier barrier, int finishLine) {this.finishLine = finishLine;this.horses = horses;this.service = service;this.barrier = barrier;}@Overridepublic void run() {try {// 待所有赛马均已准备好,且已运行了barrier的自定义程序后,再执行下述统计过程barrier.await();} catch (InterruptedException | BrokenBarrierException e) {System.out.println("HourseRace has exception: " + e.getMessage());}while (!Thread.interrupted()) {try {for (Horse horse : horses) {System.out.println("Horse " + horse.getId() + " tracks:" + horse.getRecords());if (horse.getStride() >= this.finishLine) {System.out.println("Horse " + horse.getId() + " won");service.shutdownNow();return;}}TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {System.out.println("HourseRace has exception: " + e.getMessage());}}}
}class Horse implements Runnable {private final StringBuilder recordStride;private int id = 0;private CyclicBarrier barrier;private int stride;private Random rand = new Random();public Horse(int id) {this.id = id;this.recordStride = new StringBuilder();}public void setBarrier(CyclicBarrier barrier) {this.barrier = barrier;}@Overridepublic void run() {try {this.recordStride.append(0);System.out.println("Horse " + id + " has begin, and stride: " + stride);TimeUnit.SECONDS.sleep(rand.nextInt(5));// 待所有赛马准备好,处于同一个起跑线时barrier.await();while (!Thread.interrupted()) {this.stride += rand.nextInt(3);this.recordStride.append(" ").append(stride);TimeUnit.MILLISECONDS.sleep(rand.nextInt(200));}} catch (InterruptedException | BrokenBarrierException e) {System.out.println("Horse " + id + " has exception: " + e.getMessage());}}public String getRecords() {return recordStride.toString();}public int getStride() {return this.stride;}public int getId() {return this.id;}
}

Java编程思想-并发相关推荐

  1. java 编程思想 并发_java编程思想-java中的并发(一)

    一.基本的线程机制 并发编程使我们可以将程序划分为多个分离的.独立运行的任务.通过使用多线程机制,这些独立任务中的每一个都将由执行线程来驱动. 线程模型为编程带来了便利,它简化了在单一程序中同时jia ...

  2. Java编程思想读书笔记一:并发

    1. Thread.yield( )方法 当调用yield()时,即在建议具有相同优先级的其他线程可以运行了,但是注意的是,仅仅是建议,没有任何机制保证你这个建议会被采纳 .一般情况下,对于任何重要的 ...

  3. 《JAVA编程思想》学习笔记:第21章(并发)

    目录 Java编程思想(一)第1~4章:概述 Java编程思想(二)第5章:初始化和清理 Java编程思想(三)第6章:访问权限 Java编程思想(四)第7章:复用类 Java编程思想(五)第8章:多 ...

  4. java 四舍五入_《JAVA编程思想》5分钟速成:1-4章:概述

    前言: 1.面向对象的特征有哪些方面? 2.Math.round(11.5) 等于多少? Math.round(-11.5)等于多少? 3.float f=3.4;是否正确? 4.short s1 = ...

  5. Java编程思想 (1~10)

    [注:此博客旨在从<Java编程思想>这本书的目录结构上来检验自己的Java基础知识,只为笔记之用] 第一章 对象导论 1.万物皆对象 2.程序就是对象的集合 3.每个对象都是由其它对象所 ...

  6. java编程思想怎么样_读完java编程思想后的思考?

    谢邀,这本书真的给我带来很多思考. 我的java入门不是java编程思想,学校的教材是一本紫色的书,已经忘了叫什么名字了,里面内容倒挺新还讲了些javafx.但那本书实在是太浅并且结构混乱,以至于我和 ...

  7. Java编程思想(第4版)(评注版)

    传世经典书丛  Java编程思想(第4版)(评注版)  (美)埃克尔(Eckel, B.)著 刘中兵评注 ISBN 978-7-121-13521-7 2011年6月出版 定    价:108.00元 ...

  8. 《Java编程思想》读书笔记

    前言:三年之前就买了<Java编程思想>这本书,但是到现在为止都还没有好好看过这本书,这次希望能够坚持通读完整本书并整理好自己的读书笔记,上一篇文章是记录的第十七章到第十八章的内容,这一次 ...

  9. 【java】《java编程思想》 读书笔记

    之前主要用的C++的比较多,之前花了快2个月的实际认真系统全面的学习了以下java的基础语法,<java编程思想>这本书翻译水平确实不是很好,很多话读着会比较拗口.推荐读之前,先去网上搜索 ...

最新文章

  1. 词法分析是否需要处理负数
  2. Go channel 的妙用
  3. Polly组件对微服务场景的价值
  4. Java 8 Lambda表达式的函数式编程– Monads
  5. JavaScript知识笔记(二)——事件
  6. TypeError: HashUpdate fail
  7. java window的对象方法,[Java教程]如何真正重写window对象的方法_星空网
  8. 成都刘女士的第一场锤子科技发布会 | 现场特写
  9. 在ASP.NET MVC中使用“RadioButtonList”和“CheckBoxList”
  10. matlab均值滤波实现
  11. 抽象代数 Abstract Algebra 学习笔记
  12. ip地址解析(scala)
  13. 5分绩点转4分_5分绩点转4分
  14. [POI2008]MAF-Mafia
  15. Java程序员的规划之路
  16. 一段用c#操作datatable的代码
  17. 影像的滑动窗口裁切与拼接(附代码)
  18. AlphaFold2源码解析(10)--补充信息1(residue_constants)
  19. 如何进行实时频谱分析仪的二次开发——MATLAB、C++、labview开发环境的部署
  20. DockWidget

热门文章

  1. 区块链落地中的九大问题与解法
  2. 点阵图像与矢量图像的计算机记录原理,图形图像学
  3. 语法错误和语义错误区别
  4. 【新书推荐】【2019.12】二十一世纪的人工智能(第三版)
  5. 前端:a 鼠标悬浮变小手
  6. Linux sed识别HTML标签
  7. 火车票软件哪个好用_火车票用哪个软件买好?
  8. 网站能分为哪些类型?
  9. [goa]golang微服务框架学习(二)-- 代码自动生成
  10. Agile 之 Scrum