【README】

并发后半部分(并发2,从21.4.3中断开始)参见: https://blog.csdn.net/PacosonSWJTU/article/details/106878087 ;

本文章包括了并发1和并发2 ,期初新建并发2是为了方便编写文档,因为并发内容实属太多,所以分了2次post;

【21.2】基本线程机制

并发编码使我们可以将程序划分为多个分离的,独立运行的任务;

cpu将轮流给每个任务分配其占用时间。

【21.2.1】定义任务

线程可以驱动任务,一种描述任务的方式是使用 Runnable 接口;

方式1:直接调用 Runnalbe接口的run 方法创建线程驱动任务;

/*** 用 Runnable 接口定义任务*/
public class LiftOff implements Runnable {protected int countDown = 10;private static int taskCount = 0;private final int id = taskCount++;public LiftOff(){}public LiftOff(int countDown) {this.countDown = countDown;}public String status() {return "#" + id + "(" + (countDown > 0? countDown: "liftoff") + "), "; }@Override public void run() {while(countDown-- > 0 ) {System.out.println(status());Thread.yield(); // 当前线程转入可运行状态,把cpu时间片让步给其他线程 }}public static void main(String[] args) {LiftOff obj = new LiftOff();obj.run(); // 这里直接调用 Runnalbe接口的run 方法创建线程驱动任务}
}

运行结果:

#0(9),
#0(8),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#0(2),
#0(1),
#0(liftoff),

【21.2.2】Thread类

开启线程的第2种方式,使用Thread来驱动任务;调用thread 的start方法,start方法会调用 runnable 接口实现类的run 方法;

/*** 开启线程的第2种方式,使用Thread来驱动任务 */
public class BasicThreads {public static void main(String[] args) {Thread t = new Thread(new LiftOff());t.start();System.out.println("waiting for liftOff"); }
}

main 方法通过主线程来驱动,而 LiftOff中run方法的逻辑通过main方法分发的子线程来驱动;

运行结果:

waiting for liftOff
#0(9),
#0(8),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#0(2),
#0(1),
#0(liftoff),

启动多个线程运行多个任务,可以看到线程切换的动作

/*** 启动多个线程运行多个任务,可以看到线程切换的动作 */
public class MoreBasicThreads {public static void main(String[] args) {for (int i=0; i< 5; i++) {new Thread(new LiftOff()).start();}System.out.println("waiting for lift off");}
}
/*waiting for lift off
#3(9),
#3(8),
#4(9),
#4(8),
#4(7),
#0(9),
#0(8),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#1(9),
#1(8),
#1(7),
#2(9),
#2(8),
#1(6),
#1(5),
#1(4),
#1(3),
#1(2),
#0(2),
#0(1),
#0(liftoff),
#4(6),
#3(7),
#4(5),
#4(4),
#4(3),
#1(1),
#1(liftoff),
#2(7),
#4(2),
#4(1),
#3(6),
#4(liftoff),
#2(6),
#3(5),
#2(5),
#2(4),
#2(3),
#3(4),
#3(3),
#3(2),
#3(1),
#3(liftoff),
#2(2),
#2(1),
#2(liftoff),
*/

通过主线程显式创建多个子线程的问题:

主线程创建多个子线程,每个子线程Thread 都注册了他自己,内存存在对他的引用,所以在子线程退出其 run 方法之前,垃圾回收器无法清除它, 这不便于内存回收与分配;

【21.2.3】使用 Executor 执行器(CachedThreadPool、FixedThreadPool、SingleThreadExecutor )

1、使用 Executor 执行器管理Thread线程对象, 可以简化并发编程,且处理线程占用的内存回收事宜;

Executor允许你管理异步任务的执行, 而无需显式地管理线程的生命周期 。

Executor执行器在 java5或6中是启动任务的优选方法;

荔枝1、基于 newCachedThreadPool实现线程池

/*** page657/线程池* shutdown 方法调用可以防止新任务被提交给 Executor, 当前线程* 将继续运行在 shutdown被调用之前提交的所有任务 * @author */
public class CachedThreadPool {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();for (int i=0; i<5; i++) {exec.execute(new LiftOff());}/*shutdown 方法调用可以防止新任务被提交给 Executor, 当前线程* 将继续运行在 shutdown被调用之前提交的所有任务 */exec.shutdown();}
}
/*#1(9),
#2(9),
#2(8),
#2(7),
#3(9),
#4(9),
#4(8),
#4(7),
#4(6),
#0(9),
#4(5),
#4(4),
#3(8),
#2(6),
#1(8),
#1(7),
#1(6),
#1(5),
#2(5),
#3(7),
#3(6),
#4(3),
#0(8),
#0(7),
#4(2),
#4(1),
#3(5),
#3(4),
#3(3),
#3(2),
#3(1),
#3(liftoff),
#2(4),
#1(4),
#1(3),
#1(2),
#2(3),
#4(liftoff),
#0(6),
#2(2),
#1(1),
#1(liftoff),
#2(1),
#0(5),
#2(liftoff),
#0(4),
#0(3),
#0(2),
#0(1),
#0(liftoff),
*/

常见的是: 单个Executor 执行器被用来创建和管理系统中的所有任务;

荔枝2、基于 newFixedThreadPool实现线程池

newFixedThreadPool 可以一次性预先执行代价高昂的线程分配,因此可以限制线程数量

/*** page 657 * newFixedThreadPool线程池 */
public class FixedThreadPool {public static void main(String[] args) {// newFixedThreadPool 可以一次性预先执行代价高昂的线程分配,因此可以限制线程数量 ExecutorService exec = Executors.newFixedThreadPool(5);for (int i=0; i<5; i++) {exec.execute(new LiftOff());}/*shutdown 方法调用可以防止新任务被提交给 Executor, 当前线程* 将继续运行在 shutdown被调用之前提交的所有任务 */exec.shutdown();}
}
/*#0(9),
#2(9),
#2(8),
#3(9),
#3(8),
#4(9),
#1(9),
#1(8),
#4(8),
#4(7),
#3(7),
#3(6),
#3(5),
#3(4),
#3(3),
#3(2),
#3(1),
#3(liftoff),
#2(7),
#0(8),
#2(6),
#4(6),
#1(7),
#4(5),
#4(4),
#4(3),
#2(5),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#2(4),
#4(2),
#4(1),
#4(liftoff),
#1(6),
#2(3),
#0(2),
#0(1),
#0(liftoff),
#2(2),
#1(5),
#1(4),
#2(1),
#1(3),
#2(liftoff),
#1(2),
#1(1),
#1(liftoff),
*/

注意:CachedThreadPool 与 FixedThreadPool 线程池的区别: CacheThreadPool 在程序执行过程中通常会创建于所需数量相同的线程,然后在他回收旧线程时停止创建新线程, CachedThreadPool 是合理的的 Executor执行器的首选。

而FixedThreadPool 是可以限制线程数量的线程池,只有当 CachedThreadPool 出现问题时,才需要切换到 FixedThreadPool;

荔枝3: SingleThreadPool 是 线程数量为1的 FixedThreadPool。

如果向 SingleThreadPool 提交多个任务, 这些任务将排队,每个任务都会在下一个任务开始之前结束,所有任务使用相同的线程。因为 SingleThreadExecutor 会序列化所有提交给他的任务,并会维护他自己隐藏的悬挂任务队列。

/*** page 657 /* newSingleThreadExecutor 类似于线程数量为1的 FixedThreadPool */
public class SingleThreadExecutor {public static void main(String[] args) {ExecutorService exec = Executors.newSingleThreadExecutor();for (int i=0; i<5; i++) {exec.execute(new LiftOff());}exec.shutdown(); }
}
/*#0(9),
#0(8),
#0(7),
#0(6),
#0(5),
#0(4),
#0(3),
#0(2),
#0(1),
#0(liftoff),
#1(9),
#1(8),
#1(7),
#1(6),
#1(5),
#1(4),
#1(3),
#1(2),
#1(1),
#1(liftoff),
#2(9),
#2(8),
#2(7),
#2(6),
#2(5),
#2(4),
#2(3),
#2(2),
#2(1),
#2(liftoff),
#3(9),
#3(8),
#3(7),
#3(6),
#3(5),
#3(4),
#3(3),
#3(2),
#3(1),
#3(liftoff),
#4(9),
#4(8),
#4(7),
#4(6),
#4(5),
#4(4),
#4(3),
#4(2),
#4(1),
#4(liftoff),
*/

【21.2.4】 从任务中产生返回值

若任务在执行完成时需要返回值,则使用 Callable 而不是 Runnable 来描述任务;

/*** page 658 * newCachedThreadPool:任务执行完成后可以返回执行结果 */
public class CallableDemo {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();ArrayList<Future<String>> results = new ArrayList<>();for (int i=0; i<10; i++) {// submit 方法会产生 Future 对象, 他用 Callable返回结果的特定类型进行了参数化 results.add(exec.submit(new TaskWithResult(i))); // 驱动或运行任务使用 ExecutorService.submit() 方法 }for (Future<String> fu : results) {try {System.out.println(fu.get()); // 结果值 } catch (Exception e) {e.printStackTrace();} finally {exec.shutdown(); }}System.out.println("我是main线程");}
}
/*result of task with result 0
result of task with result 1
result of task with result 2
result of task with result 3
result of task with result 4
result of task with result 5
result of task with result 6
result of task with result 7
result of task with result 8
result of task with result 9
我是main线程
*/
class TaskWithResult implements Callable<String> {private int id; public TaskWithResult(int id) {this.id = id ;}@Overridepublic String call() throws Exception {return "result of task with result " + id;}
}

显然, 在子线程全部返回前,主线程是阻塞的,因为 主线程打印的消息在所有子线程返回结果之后;

【21.2.5】休眠

方法1: sleep方法 让线程休眠给定时间,然后又重新回到可运行状态;

方法2:yield方法表示:当前线程的重要任务已经运行完毕了, 让出占用的cpu时间片给其他线程;


/*** page659  * Thread.sleep 线程休眠 */
public class SleepingTask extends LiftOff {public void run() {try {while (countDown-- > 0) {System.out.println(status());Thread.sleep(1000); // 当前线程休眠1秒钟 (老式方法)TimeUnit.SECONDS.sleep(1);// 休眠1秒钟 (推荐方法)}} catch (Exception e) {e.printStackTrace();}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();for (int i=0; i<3; i++) {exec.execute(new SleepingTask());}exec.shutdown();}
}
/*#0(9),
#2(9),
#1(9),
#2(8),
#0(8),
#1(8),
#1(7),
#2(7),
#0(7),
#2(6),
#0(6),
#1(6),
#2(5),
#1(5),
#0(5),
#0(4),
#1(4),
#2(4),
#2(3),
#0(3),
#1(3),
#1(2),
#2(2),
#0(2),
#2(1),
#0(1),
#1(1),
#1(liftoff),
#2(liftoff),
#0(liftoff),
*/

Thread.sleep(1000); // 当前线程休眠1秒钟 (老方法)
TimeUnit.SECONDS.sleep(1);// 休眠1秒钟 (新方法,java5或6推荐)

【21.2.6】优先级

线程优先级将线程重要性传递给了调度器,优先级低的线程仅仅是执行频率较低。

试图通过控制线程优先级是一种错误。因为cpu的时间片划分是未知的,可能碰到中断,如io,所以不建议使用优先级,这里仅仅看下代码;

/*** page660 * 线程优先级*/
public class SimplePriority implements Runnable {private int countDown = 5; private volatile double d; // volative 确保变量不被任何编译器优化(指令优化) private int priority;public SimplePriority(int priority) {this.priority = priority;}public String toString() {return Thread.currentThread() + ":" + countDown;}@Override public void run() {Thread.currentThread().setPriority(priority); // 设置线程优先级while(true) {for (int i=1; i<100000; i++) {d += (Math.PI + Math.E) / (double) i;if (i % 1000 == 0) {Thread.yield(); // 当前线程释放cpu时间片给其他线程}System.out.println(this);if (--countDown ==0) return ; }}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();for (int i=0; i< 5; i++) {exec.execute(new SimplePriority(Thread.MIN_PRIORITY)); // 最小优先级}exec.execute(new SimplePriority(Thread.MAX_PRIORITY)); // 最高优先级 exec.shutdown(); }
}
/*Thread[pool-1-thread-3,1,main]:5
Thread[pool-1-thread-3,1,main]:4
Thread[pool-1-thread-6,10,main]:5
Thread[pool-1-thread-6,10,main]:4
Thread[pool-1-thread-5,1,main]:5
Thread[pool-1-thread-1,1,main]:5
Thread[pool-1-thread-4,1,main]:5
Thread[pool-1-thread-2,1,main]:5
Thread[pool-1-thread-2,1,main]:4
Thread[pool-1-thread-2,1,main]:3
Thread[pool-1-thread-2,1,main]:2
Thread[pool-1-thread-2,1,main]:1
Thread[pool-1-thread-4,1,main]:4
Thread[pool-1-thread-1,1,main]:4
Thread[pool-1-thread-1,1,main]:3
Thread[pool-1-thread-1,1,main]:2
Thread[pool-1-thread-1,1,main]:1
Thread[pool-1-thread-5,1,main]:4
Thread[pool-1-thread-6,10,main]:3
Thread[pool-1-thread-3,1,main]:3
Thread[pool-1-thread-3,1,main]:2
Thread[pool-1-thread-6,10,main]:2
Thread[pool-1-thread-6,10,main]:1
Thread[pool-1-thread-5,1,main]:3
Thread[pool-1-thread-5,1,main]:2
Thread[pool-1-thread-4,1,main]:3
Thread[pool-1-thread-5,1,main]:1
Thread[pool-1-thread-3,1,main]:1
Thread[pool-1-thread-4,1,main]:2
Thread[pool-1-thread-4,1,main]:1
*/

【21.2.7】让步

1、当前线程调用 Thread.yield方法将给线程调度器一个暗示:

我的工作已经完成了, 可以让别的线程使用cpu时间片了,但这里仅仅是一个暗示,没有任何机制保证它将被采纳;

注意: yield方法经常被误用。

【21.2.8】 后台线程

1、后台线程:指在程序运行的时候在后台提供一种通用服务的线程,并且这种线程并不属于程序中不可或缺的部分。

当所有非后台线程结束时, 程序也就终止了,同时会杀死进程中的所有后台线程。

2、后天线程 daemon 荔枝:

/*** page662 * 后台线程  daemon.setDaemon(true); 后台线程不影响非后台线程的结束,如main主线程就是非后台线程 * 当所有非后台线程结束时,程序终止了,同事会杀死进程中的所有后台线程 */
public class SimpleDaemons implements Runnable {@Override public void run() {try {while(true) {Thread.sleep(1000);System.out.println(Thread.currentThread() + " " + this);}} catch (Exception e){e.printStackTrace();}}public static void main(String[] args) {for (int i=0; i<10; i++) {Thread daemon = new Thread(new SimpleDaemons());daemon.setDaemon(true); // 在 start 方法前调用,设置为后台线程  daemon.start(); }System.out.println("all daemons started");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}}
}

3、通过 ThreadFactory 线程工厂创建后台线程

/*** page662 * 编写定制的 ThreadFactory 可以定制由 Executor 创建的线程属性*/
public class DaemonThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {Thread t = new Thread(r);t.setDaemon(true);// 设置为后台线程return t; }
}
/*** page 663 * 基于 ThreadFactory 创建后台线程 */
public class DaemonFromFactory implements Runnable {@Overridepublic void run() {try {while(true) {
//              Thread.sleep(1000);TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread() + " " + this);}} catch (Exception e) {e.printStackTrace();} }public static void main(String[] args) { /*DaemonThreadFactory创建的全是后台线程*/ExecutorService exec = Executors.newCachedThreadPool(new DaemonThreadFactory());     for (int i=0; i<10; i++) {exec.execute(new DaemonFromFactory());}System.out.println("all daemons started");try {
//          Thread.sleep(3000);TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace();} }
}
/*all daemons started
Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory@12b68fd4
Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory@1ce6edff
Thread[Thread-9,5,main] diy.chapter21.DaemonFromFactory@50f7270b
Thread[Thread-0,5,main] diy.chapter21.DaemonFromFactory@4c19ebce
Thread[Thread-5,5,main] diy.chapter21.DaemonFromFactory@70c680f2
Thread[Thread-2,5,main] diy.chapter21.DaemonFromFactory@3812167c
Thread[Thread-3,5,main] diy.chapter21.DaemonFromFactory@2b0a5613
Thread[Thread-6,5,main] diy.chapter21.DaemonFromFactory@15547dba
Thread[Thread-1,5,main] diy.chapter21.DaemonFromFactory@16c1c280
Thread[Thread-4,5,main] diy.chapter21.DaemonFromFactory@58bbe46a
Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory@12b68fd4
Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory@1ce6edff
Thread[Thread-2,5,main] diy.chapter21.DaemonFromFactory@3812167c
Thread[Thread-1,5,main] diy.chapter21.DaemonFromFactory@16c1c280
Thread[Thread-0,5,main] diy.chapter21.DaemonFromFactory@4c19ebce
Thread[Thread-6,5,main] diy.chapter21.DaemonFromFactory@15547dba
Thread[Thread-9,5,main] diy.chapter21.DaemonFromFactory@50f7270b
Thread[Thread-3,5,main] diy.chapter21.DaemonFromFactory@2b0a5613
Thread[Thread-5,5,main] diy.chapter21.DaemonFromFactory@70c680f2
Thread[Thread-4,5,main] diy.chapter21.DaemonFromFactory@58bbe46a
Thread[Thread-8,5,main] diy.chapter21.DaemonFromFactory@12b68fd4
Thread[Thread-7,5,main] diy.chapter21.DaemonFromFactory@1ce6edff
*/

4、 自定义线程执行器

/***  自定义线程执行器-DaemonThreadPoolExecutor *  ThreadPoolExecutor extends AbstractExecutorService*  AbstractExecutorService implements ExecutorService*  interface ExecutorService extends Executor*/
public class DaemonThreadPoolExecutor extends ThreadPoolExecutor {public DaemonThreadPoolExecutor() {super(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new DaemonThreadFactory());}
} 
/*** Creates a new {@code ThreadPoolExecutor} with the given initial* parameters and default rejected execution handler.** @param corePoolSize the number of threads to keep in the pool, even*        if they are idle, unless {@code allowCoreThreadTimeOut} is set* @param maximumPoolSize the maximum number of threads to allow in the*        pool* @param keepAliveTime when the number of threads is greater than*        the core, this is the maximum time that excess idle threads*        will wait for new tasks before terminating.* @param unit the time unit for the {@code keepAliveTime} argument* @param workQueue the queue to use for holding tasks before they are*        executed.  This queue will hold only the {@code Runnable}*        tasks submitted by the {@code execute} method.* @param threadFactory the factory to use when the executor*        creates a new thread* @throws IllegalArgumentException if one of the following holds:<br>*         {@code corePoolSize < 0}<br>*         {@code keepAliveTime < 0}<br>*         {@code maximumPoolSize <= 0}<br>*         {@code maximumPoolSize < corePoolSize}* @throws NullPointerException if {@code workQueue}*         or {@code threadFactory} is null*/public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler);}

5、后台线程创建的任何线程都将被自动设置为后台线程

/*** page663* 后台线程创建的任何线程都将被自动设置为后台线程 */
public class Daemons {public static void main(String[] args) {Thread d = new Thread(new Daemon());d.setDaemon(true); // 主线程是后台线程 ,其创建的10个线程也是后台线程 d.start();System.out.println("d.isDaemon() = " + d.isDaemon() + ", "); try {TimeUnit.SECONDS.sleep(1); // 睡眠1秒 } catch (InterruptedException e) {e.printStackTrace(); } }
}
// 后台线程
class Daemon implements Runnable {private Thread[] t = new Thread[10];@Override public void run() { for (int i=0; i<t.length; i++) {t[i] = new Thread(new DaemonSpawn());t[i].start();System.out.println("daemon spawn" + i + "started");}while(true) {Thread.yield(); // 一直处于可运行状态,但无法获取cpu时间片运行 }}
}
// 任务
class DaemonSpawn implements Runnable {public void run() {while(true) {Thread.yield(); // 把cpu时间片让给其他线程 }}
}

6、后台线程在不执行 finally 子句的情况下就会终止其 run 方法

/*** page664 * 后台线程在不执行 finally 子句的情况下就会终止其 run 方法* 即终止run方法时,不会执行finally子句中的代码;* 但把 t.setDaemon(true) 给删除掉,则会执行 finally 子句中的代码   */
public class ADaemon implements Runnable {@Overridepublic void run() {try {System.out.println("starting daemon");TimeUnit.SECONDS.sleep(1);} catch (Exception e) {e.printStackTrace();} finally {System.out.println("this should always run? "); // 没有执行 }}/*** 一旦main 方法退出, jvm就会立即关闭所有的后台进程;* 因为不能以优雅的方式来关闭后台线程,故非后台线程的 Executors 是一种更好的方式, * 因为 Executors 控制的所有任务可以同时被关闭;* @param args*/public static void main(String[] args) {Thread t = new Thread(new ADaemon());t.setDaemon(true);t.start(); }
}
/*starting daemon
*/

非后台的Executor 通常是一种更好的方式,因为 Executor控制的所有任务可以同时被关闭;

【21.2.9】编码的变体

1、上面描述任务的方式都是通过实现 Runnable接口, 还有一种方式是继承 Thread 类;

/*** page665 * 创建线程的第2种方式, 继承了Thread,就不能继承其他类了 */
public class SimpleThread extends Thread {private int countDown = 5; private static int threadCount = 0;public SimpleThread() {super(Integer.toString(++threadCount));start();}public String toString() {return "#" + getName() + "(" + countDown + "), ";}@Override public void run() {while(true) {System.out.println(this);if (--countDown == 0) {return ;}}}public static void main(String[] args) {for (int i=0; i< 5; i++) {new SimpleThread(); }}
}
/*#2(5),
#5(5),
#5(4),
#5(3),
#5(2),
#5(1),
#4(5),
#1(5),
#3(5),
#1(4),
#1(3),
#1(2),
#4(4),
#2(4),
#2(3),
#2(2),
#4(3),
#1(1),
#3(4),
#4(2),
#2(1),
#4(1),
#3(3),
#3(2),
#3(1),
*/

2、第2种方式,比较常见是 实现 Runnable接口;

/*** page666 * 与继承Thread不同的是,这里是实现 Runnable接口 , * 在构造器中启动线程可能有问题, 因为另一个任务可能会在构造器结束之前开始执行, * 这意味着该任务能够访问处于不稳定状态的对象,  * 这是优选Executor 而不是显式创建Thread对象的另一个原因*/
public class SelfManaged implements Runnable {private int countDown = 5; private Thread t = new Thread(this);public SelfManaged() {t.start();}public String toString() {return Thread.currentThread().getName() + "(" + countDown +")";}@Overridepublic void run() {while(true) {System.out.println(this);if (--countDown ==0) {return ;}}} public static void main(String[] args) {for(int i=0; i<5; i++) {new SelfManaged();}}
}

小结: 在构造器中启动线程可能有问题, 因为另一个任务可能会在构造器结束之前开始执行,  这意味着该任务能够访问处于不稳定状态的对象。 这是优选Executor 而不是显式创建Thread对象的另一个原因。

3、通过内部类隐藏线程代码

/*** 使用内部类隐藏线程代码 */
public class ThreadVariations {public static void main(String[] args) {new InnerThread1("InnerThread1");new InnerThread2("InnerThread2");new InnerRunnable1("InnerRunnable1");new InnerRunnable2("InnerRunnable2");new ThreadMethod("ThreadMethod").runTaks();}
}
// 一个单独的方法开启线程运行任务
class ThreadMethod {private int countDown = 5;private Thread t;private String name;public ThreadMethod(String name) {this.name = name;} public void runTaks() {if (t == null) {t = new Thread(name) {public void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() + ": " + countDown;}};t.start();}}
}// 匿名内部类继承 Thread
class InnerThread2 {private int countDown = 5;private Thread t;public InnerThread2(String name) {t = new Thread(name) {public void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() + ":" + countDown;}};t.start();}
}/*** 匿名内部类实现 Runnable接口*/
class InnerRunnable2 {private int countDown = 5;private Thread t;public InnerRunnable2(String name) {t = new Thread(new Runnable() { // 匿名内部类实现 Runnable接口@Overridepublic void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;TimeUnit.MILLISECONDS.sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return Thread.currentThread().getName() + ":"+ countDown;}}, name);t.start();}}/*** 内部类继承线程Thread*/
class InnerThread1 {private int countDown = 5;private Inner inner;public InnerThread1(String name) {inner = new Inner(name);}// 内部类private class Inner extends Thread {Inner(String name) {super(name);start();}@Overridepublic void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;sleep(1000);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return getName() + ":" + countDown;}}
}/*** 内部类实现 Runnable接口*/
class InnerRunnable1 {private int countDown = 5;private Inner inner;public InnerRunnable1(String name) {inner = new Inner(name);}private class Inner implements Runnable {Thread t;Inner(String name) {t = new Thread(this, name);t.start();}public void run() {try {while (true) {System.out.println(this);if (--countDown == 0)return;TimeUnit.MILLISECONDS.sleep(10);}} catch (Exception e) {e.printStackTrace();}}public String toString() {return t.getName() + ":" + countDown;}}
}
/*InnerThread1:5
InnerThread2:5
InnerRunnable1:5
InnerRunnable2:5
ThreadMethod: 5
InnerRunnable1:4
InnerRunnable1:3
InnerRunnable1:2
InnerRunnable1:1
InnerThread2:4
InnerThread1:4
InnerRunnable2:4
ThreadMethod: 4
InnerThread2:3
InnerThread1:3
InnerRunnable2:3
ThreadMethod: 3
InnerThread2:2
InnerThread1:2
InnerRunnable2:2
ThreadMethod: 2
InnerThread2:1
InnerThread1:1
InnerRunnable2:1
ThreadMethod: 1
*/

【21.2.11】加入一个线程

Thread.join() 方法: 若线程A 在另一个线程B上调用 B.join(),则线程A将被挂起,直到目标线程B结束才恢复;

也可以在join()方法上加个超时参数,如果目标线程在超时时间内没有结束的话,join还是返回;

/*** page670 * join() 方法: 在 线程a上调用 线程b的join方法,则线程a阻塞* ,直到线程b运行结束,线程a才继续运行  * 注意:java.util.concurrent类库包含 CyclicBarrier工具类,比join更加适合使线程让步*/
public class Joining {public static void main(String[] args) {Sleeper s1 = new Sleeper("s1", 1500);Sleeper s2 = new Sleeper("s2", 1500);Joiner j1 = new Joiner("j1", s1);Joiner j2 = new Joiner("j2", s2);// s2 被强制中断s2.interrupt(); }
}
/*** 睡眠线程*/
class Sleeper extends Thread {private int duration ;public Sleeper(String name, int sleepTime) {super(name);this.duration = sleepTime;start();}@Overridepublic void run() {try {sleep(duration);} catch (Exception e) {System.out.println(getName() + " was interrupted, isInterrupted() = " + isInterrupted());return ;}System.out.println("线程 " + getName() + "  已经被唤醒");}
}class Joiner extends Thread {private Sleeper sleeper;public Joiner(String name, Sleeper sleeper) {super(name); this.sleeper = sleeper;start();}public void run() {try {sleeper.join(); // 主线程 Joiner 调用 其他线程 sleeper的join 方法, sleeper没有执行完, 主线程一直阻塞  } catch (Exception e) {System.out.println("interrupted");}System.out.println("线程  " + getName() + " join 完成"); }
}
/*s2 was interrupted, isInterrupted() = false
线程  j2 join 完成
线程 s1  已经被唤醒
线程  j1 join 完成
*/

【21.2.12】创建有响应的用户界面

/*** page 671 * 有响应的用户界面* (要想程序有响应,就需要把计算程序放在 run方法里,这样他才能让出cpu时间片给其他线程)*/
public class ResponsiveUI extends Thread {private static volatile double d = 1; public ResponsiveUI()  {setDaemon(true); // 把当前线程设置为后台线程 start();}@Overridepublic void run() {while(true) {d = d+ (Math.PI + Math.E) / d;}}public static void main(String[] args) throws Exception {// 创建无响应的ui
//      new UnresponsiveUI();// 创建响应式 ui new ResponsiveUI();System.in.read(); System.out.println(d); }
}
/*** 无响应的ui */
class UnresponsiveUI {private volatile double d = 1; public UnresponsiveUI() throws Exception { while(d>0) {d = d+ (Math.PI + Math.E) / d;}System.in.read(); // 永远不会执行到这里 (此谓无响应)}
}

【21.2.13】线程组

【21.2.14】捕获子线程(Thread, Runnable子类或线程池子线程)异常

看个荔枝:下面的程序总会抛出异常:


/*** page 672 * 线程异常(捕获线程异常)*/
public class ExceptionThread implements Runnable {@Overridepublic void run() {throw new RuntimeException();}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();try {// 把子线程抛出异常的代码放到 主线程的try-catch 块里,主线程的try-catch块无法捕获的  exec.execute(new ExceptionThread()); } catch (Exception e) {System.out.println("抛出了异常"); }}
}
/*
Exception in thread "pool-1-thread-1" java.lang.RuntimeExceptionat diy.chapter21.ExceptionThread.run(ExceptionThread.java:13)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)*/

可见,把main方法放在 try-catch 中没有作用;

荔枝2:

/*** page 672 * 捕获异常 - 还是没有捕获异常 */
public class NavieExceptionHandler {public static void main(String[] args) {try {ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new ExceptionThread());} catch(RuntimeException e) {System.out.println("Exception has been handled");}}
}
/*
Exception in thread "pool-1-thread-1" java.lang.RuntimeExceptionat diy.chapter21.ExceptionThread.run(ExceptionThread.java:13)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)*/

为了解决无法捕获线程异常的问题,需要修改Executor产生线程的方式。设置未捕获异常处理器去捕获子线程抛出的异常。(干货——非常重要,如何捕获子线程异常)

/*** 设置未捕获异常处理器去捕获子线程抛出的异常  */
public class CaptureUncaughtException {public static void main(String[] args) {// 传入带有未捕获异常处理器的线程工厂到 线程池以改变线程池创建线程的方式 ExecutorService executorService = Executors.newCachedThreadPool(new MyHandlerThreadFactory());// 运行任务 executorService.execute(new MyExceptionThread()); }
}
// 线程类
class MyExceptionThread implements Runnable {@Override public void run() {Thread t = Thread.currentThread(); System.out.println("run() by " + t);// getUncaughtExceptionHandler 表示获取未捕获异常处理器 System.out.println("异常处理器 = " + t.getUncaughtExceptionHandler());// 抛出运行时异常 throw new RuntimeException(); }
}
// 类-未捕获异常处理器
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {// uncaughtException方法会在线程因未捕获的异常而临近死亡时被调用  @Override public void uncaughtException(Thread t, Throwable e) {System.out.println("我是未捕获异常处理器-MyUncaughtExceptionHandler,我捕获到的异常信息为" + e); }
}
// 线程处理器工厂
class MyHandlerThreadFactory implements ThreadFactory {// 定义创建线程的方式 @Overridepublic Thread newThread(Runnable r) {System.out.println(this + " 创建新线程");Thread t = new Thread(r, ""+System.currentTimeMillis());System.out.println("新线程信息==" + t);// 为线程设置未捕获异常处理器 t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());System.out.println("异常处理器= " + t.getUncaughtExceptionHandler());return t; }
}
/*diy.chapter21.MyHandlerThreadFactory@4e25154f 创建新线程
新线程信息==Thread[Thread-0,5,main]
异常处理器= diy.chapter21.MyUncaughtExceptionHandler@70dea4e
run() by Thread[Thread-0,5,main]
异常处理器 = diy.chapter21.MyUncaughtExceptionHandler@70dea4e
diy.chapter21.MyHandlerThreadFactory@4e25154f 创建新线程
新线程信息==Thread[Thread-1,5,main]
异常处理器= diy.chapter21.MyUncaughtExceptionHandler@15547dba
caught java.lang.RuntimeException
*/

设置默认的未捕获异常处理器;

/*** page 674 * 设置默认的未捕获异常处理器* 这个处理器-MyUncaughtExceptionHandler* 只有在不存在线程专有的未捕获异常处理器的情况下才会被调用 */
public class SettingDefaultHandler {public static void main(String[] args) {Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(new ExceptionThread()); }
}
/*
* 我是未捕获异常处理器-MyUncaughtExceptionHandler,我捕获到的异常信息为java.lang.RuntimeException
*/ 

【21.3】共享受限资源(临界资源)

【21.3.1】不正确地访问资源

多个线程在没有并发控制前提下同时访问和修改共享资源,导致资源状态异常, 不满足业务场景;

【21.3.2】解决共享资源竞争

1、java提供了关键字 synchronized 关键字,为防止资源冲突提供了内置支持。当任务要执行被 synchronized 关键字保护起来的代码片段的时候,他将检查锁是否可用,然后获取锁,执行代码,释放锁;

共享资源一般指的是对象,但也可以是 文件,输入输出端口,或者打印机;

对于同一个对象来说,其所拥有的的 synchronized 方法共享同一个锁,这可以被用来防止多个任务同时访问同一个对象内存; 

注意: 在使用并发时,将域设置为  private 非常重要, 否则 synchronized 关键字就无法防止其他任务直接访问域了;

2、对于静态 static数据,synchronized static 方法可以在类的方位内防止对 static数据的并发访问;

/** 使用Synchronized 同步 * page 678* */
public class SynchronizedEvenGenerator extends IntGenerator {private int currentEventValue = 0;// synchronized 关键字修饰方法以防止多个线程同时访问该方法@Overridepublic synchronized int next() { ++currentEventValue;Thread.yield();++currentEventValue;return currentEventValue;}public static void main(String[] args) {EvenChecker.test(new MutextEvenGenerator());}
}

3、什么时候需要同步呢?

如果你正在写一个变量, 他可能接下来将被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步, 并且,读写线程都必须用相同的监视器锁同步;

注意:每个访问临界资源的方法都必须被同步,否则他们就不会正确的工作; 

4、使用显式的Lock 对象

lock() 与 unlock() 方法之间的代码就是临界资源;

/** 使用显式的 Lock对象  * page 678  * */
public class MutextEvenGenerator extends IntGenerator{private int currentEventValue = 0;private Lock lock = new ReentrantLock();@Overridepublic int next() {lock.lock(); // 加锁 try {++currentEventValue;Thread.yield();++currentEventValue;return currentEventValue; } finally {lock.unlock();  // 解锁 }}public static void main(String[] args) {EvenChecker.test(new MutextEvenGenerator());}
}

干货(如何编写基于Lock的并发控制代码):当你在使用 Lock对象时,将这里所示的惯用法内部化是很重要的。紧接着的对 lock() 方法的调用,你必须吧临界资源代码放置在 finally 子句中带有 unlock() 的 try-finally 语句中。 很在意, return 语句必须在 try 子句中出现,以确保 unlock()方法不会过早发生,从而将数据暴露给第2个任务;

5、synchronzied 与 Lock 对别:

如果使用 synchronized 关键字时,某些事物失败了,那么就会抛出一个异常; 而你没有机会去做任何清理工作,以维护系统使其处于良好状态;

但显式的Lock对象, 你就可以使用 finally 子句将系统维护在正确的状态了 ;

小结: 通常情况下,推荐使用 synchronized 关键字,因为写的代码量更少,并且用户出现错误的可能性也会降低;

只有在特殊情况下,才会使用 显式的 Lock对象 ;

/*** page 679* 尝试获取锁 */
public class AttemptLocking {/* 可重入锁  */private ReentrantLock lock = new ReentrantLock();// 获得锁,未设置超时时间 public void untimed() {boolean captured = lock.tryLock(); // 获得锁 try {System.out.println("trylock() : " + captured);} finally {if (captured) {lock.unlock(); // 在 finally 子句中解锁 }}}// 获得锁,设置了超时时间 public void timed() {boolean captured = false; try {captured = lock.tryLock(2, TimeUnit.SECONDS); // 基于超时控制尝试获取锁,超时时间为3秒  } catch (Exception e) {throw new RuntimeException(); }try {System.out.println(" tryLock(2, TimeUnit.SECONDS): " + captured);} finally {if (captured) { // 是否获得锁,若获得锁,则解锁lock.unlock();  // 解锁 }}}public static void main(String[] args) {final AttemptLocking al = new AttemptLocking();al.untimed();al.timed();new Thread() { // 子线程 {setDaemon(true);} // 设置其为后台线程  public void run() {al.lock.lock();System.out.println("acquired:");}}.start();Thread.yield(); // 当前线程让出 cpu时间片 try { TimeUnit.SECONDS.sleep(1); // 主线程睡眠1秒,让 子线程先获得锁 } catch (InterruptedException e) {e.printStackTrace();}al.untimed(); // 没有设置超时时间的获得锁al.timed();  // 设置了超时时间获得锁 }
}
/*
trylock() : truetryLock(2, TimeUnit.SECONDS): true
acquired:
trylock() : falsetryLock(2, TimeUnit.SECONDS): false
*/

Lock小结: 显式的Lock 对象在加锁和释放锁方面,相对于内建的 synchronized 锁来说,还赋予了你更细粒度的控制力;

【21.3.3】原子性与易变性

1、原子操作不需要进行同步控制, 原子操作是不能被线程调度机制中断的 操作;

2、但是有个问题:

原子性可以应用于 除了 long 和 double 之外的所有基本类型的操作。 因为jvm 可以将64位(long和double遍历)的读取与写入当做两个分离的32位操作来执行,这就产生了在一个读取和写入操作中间发送上下文切换,从而导致不同 的任务可以看到不正确结果的可能。(干货——这也叫做字撕裂)

但是, 当你定义long 或double变量时,如果使用 volatile关键字,就会获得原子性(干货——原子性+volatile 可以获得原子性)

3、可见性或可视性问题: 在多处理器系统上, 相对单处理器系统而言,可视性问题远比原子性问题多得多。即一个任务作出的修改,即使在不中断的意义上讲是原子性的,但对其他任务也可能是不可见的 ;因此不同的任务对应用的状态是不同的视图;

4、volatile关键字: 确保了应用中的可视性或可见性;只要对 volatile域产生了写操作,那么所有的读操作都可以看这个这个修改。即便使用了本地缓存, 情况也是如此。因为volatile域会立即被写入主存中,而读取操作就发生在主存中(干货——因为volatile域会立即被写入主存中,而读取操作就发生在主存中,故 volatile可以保证可见性)

补充1: 理解原子性与易变性是两个不同的概念很重要。 在非 volatile域上的原子操作不必刷新到主存去,因此其他读取该域的任务也没有必要(当然也不会)看到这个新值;如果多个任务同时访问某个域,那么这个域就应该是 volatile的,否则,这个域就应该同步访问;

补充2:当一个域的值依赖于他之前的值时,如递增一个计数器,则 volatile无法工作;如果某个域的值收到其他域的值的限制,那么volatile也无法工作,如 Range类的 lower 和 upper 边界就必须遵循lower <= upper的限制;

小结:使用volatile而不是 synchronized的唯一安全的情况是类中只有一个可变的域。再次提醒,你的第一选择应该是  synchronized , 这是最安全的方式,而其他方式都是有风险的(干货)

/*** page 682 * 原子性测试 */
public class AtomicityTest implements Runnable {private int i = 0; public int getValue() { // 这里的 getValue没有 synchronized 来修饰,有并发问题 return i ;}// 偶数增量 private synchronized void evenIncrement() {i++; i++; // 当执行第1次自加时, 线程切换 导致 i为奇数 }@Overridepublic void run() {while(true) {evenIncrement();}}public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();AtomicityTest at = new AtomicityTest();exec.execute(at);while(true) {int val = at.getValue();if (val % 2 != 0) {System.out.println(val); // 1 或 3  或 7 System.exit(0);  }}}
}
//1 或 3  或 7

上面的程序, evenIncrement 和 getValue方法都应该是  synchronized 关键字来修饰,否则会有并发问题;

/*** page 683* 序列号生成器 */
public class SerialNumberGenerator {private static volatile int serialNumber = 0;public static int nextSerialNumber() { // 非线程安全,因为方法没有被设置为同步 // 先返回,然后再自增  return serialNumber++; }
}
*** page 683 * 数字检测器 */
public class SerialNumberChecker {private static final int SIZE = 10;private static CircularSet serials = new CircularSet(1000);private static ExecutorService exec = Executors.newCachedThreadPool();// 序列数检测器,保证序列数是唯一的 static class SerialChecker implements Runnable {@Overridepublic void run() {while(true) {int serial = SerialNumberGenerator.nextSerialNumber();if (serials.contains(serial)) { // 序列数字集合是否存在该数字,存在,结束System.out.println("duplicate:" + serial);System.exit(0); // 终止当前虚拟机 }serials.add(serial); // 否则, 把该数字添加到集合 }}}public static void main(String[] args) {for (int i=0; i<SIZE; i++) {exec.execute(new SerialChecker());}if (args.length > 0) {try {TimeUnit.SECONDS.sleep(new Integer(args[0])); // 睡眠 System.out.println("no duplicates detected.");System.exit(0); } catch (Exception e) {e.printStackTrace();} }}/*duplicate:39705duplicate:39704duplicate:38934duplicate:39706duplicate:39707
*/
}
/*** 循环集合  */
class CircularSet {private int[] array ;private int len;private int index = 0;// 构造方法 public CircularSet(int size) {array = new int[size];len = size; for (int i=0; i<size; i++) {array[i] = -1;}}// 同步加法 public synchronized void add(int i) {array[index] = i ;index = ++index % len; }// 同步是否包含 public synchronized boolean contains(int val) {for (int i=0; i<len ; i++) {if (array[i] == val) return true;  }return false; }
}
//duplicate:1680
//duplicate:1681
//duplicate:1619
//duplicate:1682

这里存在线程安全问题, 因为 SerialNumberGenerator.nextSerialNumber 方法 不是synchronized 修饰, 所以才会有重复的数字,所以才会终止,如果把该方法修改为 synchronized, 则程序不会终止;

【21.3.4】原子类: AtomicInteger, AutomicLong, AtomicReference

提供了原子性条件更新操作: compareAndSet(expectValue, updateValue);

荔枝: AtomicInteger

/*** page 684 * 原子类-AtomicInteger-提供原子性条件更新操作: compareAndSet 方法 */
public class AtomicIntegerTest implements Runnable {private AtomicInteger i = new AtomicInteger(0);public int getValue() {return i.get(); }// 原子性加法,用 AtomicInteger 替换了 synchronized 方法 private void evenIncrement() { i.addAndGet(2); // 原子操作, 线程安全 }@Overridepublic void run() {while(true) {evenIncrement(); }}public static void main(String[] args) {Timer timer = new Timer(); // 定时器 timer.schedule(new TimerTask() {@Overridepublic void run() {System.err.println("aborting");System.exit(0); // 终止当前虚拟机 }}, 5000); // 5秒后运行该子线程 ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 AtomicIntegerTest ait = new AtomicIntegerTest();exec.execute(ait);while(true) {int val = ait.getValue();if (val % 2 != 0) {System.out.println(val); // 永远不会执行到这里,因为 val 一定为偶数System.exit(0);   // 终止当前虚拟机  }}}
}
/*aborting*/

强调:Atomic类被设计用来构建 java.util.concurrent中的类,因此只有在特殊情况下才在自己的代码中使用他们, 即便使用了也需要确保不存在其他可能出现单的问题。通常依赖于锁更安全一些(要么是 synchronized关键字, 要么是显式的Lock对象)

【21.3.5】临界区

1、有时临界区是方法内部的部分代码而不是整个方法;synchronized 被用来指定某个对象,此对象的锁被用来对花括号内部代码进行同步控制; 同步控制块如下:

synchronized(synchObject) {

临界区代码

}

2、使用 synchronized来创建临界区

public class CriticalSection {static void testApproaches(PairManager manager1, PairManager manager2) {ExecutorService exec = Executors.newCachedThreadPool();PairManipulator pm1 = new PairManipulator(manager1);PairManipulator pm2 = new PairManipulator(manager2);PairChecker checker1 = new PairChecker(manager1);PairChecker checker2 = new PairChecker(manager2);exec.execute(pm1);exec.execute(pm2);exec.execute(checker1);exec.execute(checker2);try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e ) {System.out.println("sleep interrupted");}System.out.println("pm1:" + pm1 + ", pm2 = " + pm2);System.exit(0);}public static void main(String[] args) {PairManager manager1 = new PairManager1();PairManager manager2 = new PairManager2();testApproaches(manager1, manager2); }/*pm1:pair:x = 3, y = 3, checkCounter = 1, pm2 = pair:x = 7, y = 7, checkCounter = 3*/
}
// 命名内部类
class Pair {private int x, y ;public Pair(int x, int y) {this.x = x; this.y = y; }public Pair () {this(0, 0);}public int getX() {return x; }public int getY() {return y;}// x 自增1 public void incrementX() {x++;}// y 自增1 public void incrementY() {y++; }public String toString() {return "x = " + x + ", y = " + y; }// 自定义异常类 public class PairValuesNotEqualException extends RuntimeException {public PairValuesNotEqualException() {super("pair values not equal: " + Pair.this);}}// 检测x与y是否相等 public void checkState() {if (x != y) {throw new PairValuesNotEqualException(); }}
}
// 对子管理器
abstract class PairManager {AtomicInteger checkCounter = new AtomicInteger(0);protected Pair p = new Pair();private List<Pair> storage = Collections.synchronizedList(new ArrayList<Pair>()); // 加锁的list// 同步获取 pair 对象 public synchronized Pair getPair() {return new Pair(p.getX(), p.getY()); }protected void store(Pair p) {storage.add(p);try {TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {}}public abstract void increment();
}
class PairManager1 extends PairManager {public synchronized void increment() {p.incrementX(); // 让 x 自增 p.incrementY();  // y 自增 this.store(p);}
}
class PairManager2 extends PairManager {public synchronized void increment() {Pair temp; synchronized(this) { // 同步控制块 p.incrementX(); // 让 x 自增 p.incrementY();  // y 自增 temp = this.getPair(); }this.store(temp); }
}
/*** 对子操纵器 */
class PairManipulator implements Runnable {private PairManager pm ;public PairManipulator(PairManager pm) {this.pm = pm;}@Overridepublic void run() {while(true) {pm.increment();}}public String toString() {return "pair:" + pm.getPair() + ", checkCounter = " + pm.checkCounter.get(); }
}
// 对子检测器
class PairChecker implements Runnable {private PairManager pm ; public PairChecker(PairManager pm) {this.pm =  pm; }@Overridepublic void run() {while(true) {pm.checkCounter.incrementAndGet();pm.getPair().checkState();}}
}

3、使用 Lock来创建临界区

// 使用Lock对象创建临界区
class ExplicitPairManager1 extends PairManager {private Lock lock = new ReentrantLock();public synchronized void increment() {lock.lock(); // 加锁 try {p.incrementX();p.incrementY();store(getPair());} finally {lock.unlock(); // 解锁 }}
}

【21.3.6】在其他对象上同步

1、synchronized 块必须给定一个在其上进行同步的对象,并且最合理的方式 是 ,使用其方法正在被调用的当前对象,如 synchronize(this);

class DualSynch {private Object syncObject = new Object();
//  public synchronized void f() {public synchronized void f() {synchronized(syncObject) { // 对 syncObject 进行同步控制,获取的是 syncObject 对象的锁 for (int i = 0; i < 5; i++) {print("f()");Thread.yield();}}}public void g() {synchronized (syncObject) { // 对 syncObject 进行同步控制,获取的是 syncObject 对象的锁  for (int i = 0; i < 5; i++) {print("g()");Thread.yield();}}}
}
public class SyncObject {public static void main(String[] args) {final DualSynch ds = new DualSynch();new Thread() {public void run() {ds.f();}}.start();ds.g();}
}
/*
g()
g()
g()
g()
g()
f()
f()
f()
f()
f()
*/

【21.3.7】线程本地存储 ThreadLocal类实现

1、可以为使用相同变量的每个不同线程都创建不同的存储;有5个线程都是用变量x多表示的对象,name线程本地存储就会生成5块内存;

class Accessor implements Runnable {private final int id;public Accessor(int idn) {id = idn;}public void run() {while (!Thread.currentThread().isInterrupted()) {ThreadLocalVariableHolder.increment();System.out.println(this);Thread.yield();}}public String toString() {return Thread.currentThread().getName() + ", #" + id + ": " + ThreadLocalVariableHolder.get();}
}public class ThreadLocalVariableHolder {/** 使用 ThreadLocal 根除不同线程对变量的共享 */private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {private Random rand = new Random(47);protected synchronized Integer initialValue() {return rand.nextInt(10000);}};public static void increment() {value.set(value.get() + 1);}public static int get() {return value.get();}public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();for (int i = 0; i < 5; i++)exec.execute(new Accessor(i));TimeUnit.SECONDS.sleep(3); // Run for a whileexec.shutdownNow(); // All Accessors will quit}
}
/*pool-1-thread-3, #2: 25714
pool-1-thread-3, #2: 25715
pool-1-thread-3, #2: 25716
pool-1-thread-3, #2: 25717*/

【21.4】终结任务

/*** 终结任务-装饰性花园 =获取多个大门进入公园的总人数,每个大门有计数器 * 本测试案例的作用在于: 并发控制使得,每个线程的计数总和等于 主线程的总计数值*/
public class OrnamentalGarden {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 for(int i=0; i<5; i++) {exec.execute(new Entrance(i)); // 运行任务 }TimeUnit.SECONDS.sleep(3); // 睡眠3秒 Entrance.cancel(); // 所有线程停止运行,即所有大门关闭 exec.shutdown(); // 关闭线程池 // ExecutorService.awaitTermination 方法等待每个任务结束,// 如果所有任务在超时时间达到之前全部结束,则返回true,否则返回false if (!exec.awaitTermination(1000, TimeUnit.MICROSECONDS)) {System.out.println("some tasks were not terminated");}System.out.println("total :" + Entrance.getTotalCount()); // 获取count,总计数值 System.out.println("sum of entrances:"  + Entrance.sumEntrances()); // 所有大门人数相加}
}class Count {private int count = 0;private Random random = new Random(47);public synchronized int increment() {// 自增同步方法 int temp = count;if (random.nextBoolean()) {// 获取随机布尔值,50%机会为true Thread.yield(); // 当前线程让出cpu时间片 }return (count = ++temp); // count 自增}public synchronized int value() { // 同步方法-获取count  return count; }
}
/*** 入口=大门 */
class Entrance implements Runnable {private static Count count = new Count();// 总计数器,所有大门人数计数器  private static List<Entrance> entrances = new ArrayList<>(); // 入口列表 private int number = 0; // 每个大门的人数 private int id = 0;public static volatile boolean canceled = false;// 停止线程运行,默认为false  public static void cancel() {canceled = true; }public Entrance(int id) {this.id = id;entrances.add(this);}@Overridepublic void run() {while(!canceled) {synchronized(this) {// 同步块-获取当前对象的锁 ++number; // number 自增 , 每个大门人数自增 }System.out.println(this + ", total:" + count.increment()); // 总计数器自增,即所有大门人数计数器自增 try {TimeUnit.MICROSECONDS.sleep(1000);// 睡眠1秒 } catch (InterruptedException e) {System.out.println("sleep interrupted");}}System.out.println("stopping " + this); }public synchronized int getValue() { // 同步方法-获取本大门的人数 return number; }public String toString() {return "entrance " + id + ": " + getValue();}public static int getTotalCount() { // 获取总计数值 return count.value();}public static int sumEntrances() { // 获取总和int sum = 0;for (Entrance e : entrances) {sum += e.getValue(); // 所有大门人数求和 }return sum; }
}
/*stopping entrance 1: 1693
stopping entrance 2: 1695
stopping entrance 4: 1695
total :8473
sum of entrances:8473
*/

【21.4.2】在阻塞时终结

1、线程状态:

新建,当线程被创建时,他会短暂处于这个状态, 此时线程已经分配了资源,并完成了初始化;

就绪, 只要调度器把cpu时间片分配该这个线程,他就可以运行;

阻塞:线程能够运行, 但有个条件阻止他运行。线程可以由阻塞状态重新进入就绪状态;

死亡:处于死亡或终止状态的线程不在是可调度的,并且也不会得到 cpu时间片,他的任务已经结束;

2、进入阻塞状态:进入阻塞状态的原因:

2.1、调用 sleep 方法使线程进入休眠状态;

2.2、调用wait()方法使线程挂起, 直到线程得到了 notify或notifyAll方法调用(或 SE5中的java.util.concurrent类库中的 signal或signalAll() 方法调用), 线程才会进入就绪状态;

2.3、任务在等待某个输入输出完成;

2.4、任务试图在某个对象上调用其同步控制方法, 但是对象锁不可用,因为另一个任务已经获取了这个锁;

【21.4.3】中断

1、Thread类包含 interrupt方法,可以终止被阻塞的任务。这个方法将设置线程的中断状态。 如果一个线程被阻塞,或者视图执行一个阻塞操作,那么设置这个线程的中断状态将抛出 InterruptedException异常。当抛出该异常或该任务调用了 Thread.interrupted() 方法时, 中断状态将被复位,设置为true;

2、如果调用 Executor上调用 shutdownNow方法,那么它将发送一个 interrupte方法调用给他启动的所有线程。

通过调用submit()而不是executor() 来启动任务,就可以持有该任务的上下文。

【21.4.3】中断

1、Thread类包含 interrupt方法,可以终止被阻塞的任务。这个方法将设置线程的中断状态。 如果一个线程被阻塞,或者视图执行一个阻塞操作,那么设置这个线程的中断状态将抛出 InterruptedException异常。当抛出该异常或该任务调用了 Thread.interrupted() 方法时, 中断状态将被复位,设置为true;

2、如果调用 Executor上调用 shutdownNow方法,那么它将发送一个 interrupte方法调用给他启动的所有线程。

通过调用submit()而不是executor() 来启动任务,就可以持有该任务的上下文。submit()方法将返回一个泛型Future<?>, 持有这个Future的关键在于,你可以调用该对象的cancle() 方法, 并因此可以使用他来中断某个特定任务。如果把true传给 cancel方法,他就会拥有在该线程上调用 interrupt方法以停止这个线程的权限。

/*** 中断由线程池管理的某个线程 */
public class Interrupting {private static ExecutorService exec = Executors.newCachedThreadPool();// 线程池 static void test(Runnable r) throws InterruptedException {// 使用 ExecutorService.submit() 而不是 ExecutorService.execute()方法来启动任务,就可以持有该任务的上下文 ,submit() 方法返回 Future 对象   // exec.execute(r); 不用 execute() 方法 Future<?> f = exec.submit(r);TimeUnit.MILLISECONDS.sleep(1000); // 睡眠1秒 System.out.println("interrupting " + r.getClass().getName()); // 正在中断某个线程 // 调用Future.cancel() 方法来中断某个特定任务 // 把true传给cancel() 方法,该方法就拥有在该线程上调用interrupt() 方法以停止这个线程的权限 // cancel 是一种中断由 Executor启动的单个线程的方式 f.cancel(true);System.out.println("interrupt sent to " + r.getClass().getName()); // 中断信号发送给线程 System.out.println("====================================== seperate line ==================================== ");}public static void main(String[] args) throws Exception {test(new SleepBlocked());test(new IOBlocked(System.in));test(new SynchronizedBlocked()); TimeUnit.SECONDS.sleep(3);System.out.println("aborting with System.exit(0)");System.exit(0);// 终止当前虚拟机进程,所以有部分打印信息无法没有正常输出}
}
// 睡眠式阻塞线程, 可中断的阻塞
class SleepBlocked implements Runnable {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(3);// 睡眠3秒} catch (InterruptedException e ) { // 捕获中断异常System.out.println("interrupted exception in SleepBlocked ");}System.out.println("exiting SleepBlocked.run()");}
}
// IO式阻塞线程 , 不可中断的阻塞
class IOBlocked implements Runnable {private InputStream in;public IOBlocked(InputStream is) {in = is; }@Overridepublic void run() {try {System.out.println("waiting for read();");in.read(); // 等待输入流输入数据  } catch (IOException e) { // IO 异常 , 但执行结果没有报 IO 异常 if (Thread.currentThread().isInterrupted()) {System.out.println("interrupted from blocked IO");} else {throw new RuntimeException();}}System.out.println("exiting IOBlocked.run()");}
}
// 线程同步式阻塞,不可中断的阻塞
class SynchronizedBlocked implements Runnable {public synchronized void f() {while(true) {Thread.yield(); // 让出cpu时间片  }}public SynchronizedBlocked() { // 构造器开启一个线程 new Thread() { // 匿名线程调用f() 方法,获取 SynchronizedBlocked 对象锁,且不释放;其他线程只能阻塞 public void run() {f();// f() 为同步方法 }}.start(); }@Override public void run() {System.out.println("trying to call 同步f()");f(); // 调用f() 同步方法 , 让出cpu时间片 System.out.println("exiting SynchronizedBlocked.run()"); // 这里永远不会执行 }
}
/*interrupting diy.chapter21.SleepBlocked
interrupt sent to diy.chapter21.SleepBlocked
====================================== seperate line ====================================
interrupted exception in SleepBlocked
exiting SleepBlocked.run()
waiting for read();
interrupting diy.chapter21.IOBlocked
interrupt sent to diy.chapter21.IOBlocked
====================================== seperate line ====================================
trying to call 同步f()
interrupting diy.chapter21.SynchronizedBlocked
interrupt sent to diy.chapter21.SynchronizedBlocked
====================================== seperate line ====================================
aborting with System.exit(0)
*/

小结:

序号 阻塞方式 是否可以中断
1 sleep
2 IO
3 synchronized获取锁

所以,对于IO操作线程或synchronized操作的线程,其具有锁住多线程程序的潜在危险。

如何解决呢? 关闭任务在其上发生阻塞的底层资源;

/*** 无法中断线程,但可以关闭任务阻塞所依赖的资源。* 这里只能够中断 基于socket输入流的io线程,因为socket输入流可以关闭;* 但无法中断基于系统输入流的io线程,因为系统输入流无法关闭;*/
public class CloseResource {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 ServerSocket server = new ServerSocket(8080); // 服务端套接字 InputStream socketInput = new Socket("localhost", 8080).getInputStream();/* 启动线程 */exec.execute(new IOBlocked(socketInput));exec.execute(new IOBlocked(System.in));TimeUnit.MILLISECONDS.sleep(1000); // 睡眠1秒 System.out.println("shutting down all threads");exec.shutdownNow(); // 发送一个interrupte() 信号给exec启动的所有线程 TimeUnit.SECONDS.sleep(1); // 睡眠1秒 System.out.println("closing " + socketInput.getClass().getName());socketInput.close(); // 关闭io线程依赖的资源  TimeUnit.SECONDS.sleep(1);System.out.println("closing " + System.in.getClass().getName());System.in.close();  // 关闭io线程依赖的资源  }
}
/**
waiting for read();
waiting for read();
shutting down all threads
closing java.net.SocketInputStream
interrupted from blocked IO
exiting IOBlocked.run()
closing java.io.BufferedInputStream
*/

3、nio类提供了更人性化的IO中断,被阻塞的nio通道会自动响应中断;

/*** page 698* nio中断 */
public class NIOInterruption {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 ServerSocket ss = new ServerSocket(8080); // 服务器套接字 // InetAddress:类的主要作用是封装IP及DNS, // InetSocketAddress类主要作用是封装端口 他是在在InetAddress基础上加端口,但它是有构造器的。InetSocketAddress isa = new InetSocketAddress("localhost", 8080);SocketChannel sc1 = SocketChannel.open(isa); // 套接字通道 SocketChannel sc2 = SocketChannel.open(isa); // 套接字通道 // 使用 ExecutorService.submit() 而不是 ExecutorService.execute()方法来启动任务,就可以持有该任务的上下文 ,submit() 方法返回 Future 对象 Future<?> f = exec.submit(new NIOBlocked(sc1));// 以submit方式启动线程 exec.execute(new NIOBlocked(sc2)); // 以 execute方式启动线程 exec.shutdown(); // 关闭所有线程  TimeUnit.SECONDS.sleep(1); // 睡眠1秒// 调用Future.cancel() 方法来中断某个特定任务 // 把true传给cancel() 方法,该方法就拥有在该线程上调用interrupt() 方法以停止这个线程的权限 // cancel 是一种中断由 Executor启动的单个线程的方式f.cancel(true); // sc2.close(); // }
}
// NIO 新io式阻塞
class NIOBlocked implements Runnable {private final SocketChannel sc;public NIOBlocked(SocketChannel sc) {this.sc = sc; }@Overridepublic void run() {try {System.out.println("waiting for read() in " + this);sc.read(ByteBuffer.allocate(1));} catch (ClosedByInterruptException e1) {System.out.println("ClosedByInterruptException, this = " + this);} catch (AsynchronousCloseException e2) {System.out.println("AsynchronousCloseException, this = " + this);} catch (IOException e3) {throw new RuntimeException(e3); }System.out.println("exiting NIOBlocked.run() " + this);}
}
/**
waiting for read() in diy.chapter21.NIOBlocked@3856c761
waiting for read() in diy.chapter21.NIOBlocked@55de2e48
ClosedByInterruptException, this = diy.chapter21.NIOBlocked@55de2e48
exiting NIOBlocked.run() diy.chapter21.NIOBlocked@55de2e48
AsynchronousCloseException, this = diy.chapter21.NIOBlocked@3856c761
exiting NIOBlocked.run() diy.chapter21.NIOBlocked@3856c761
*/

4、被互斥所阻塞: 一个任务能够调用在同一个对象中的其他的 synchronized 方法,而这个任务已经持有锁了 ;

/*** 被互斥所阻塞* 同步方法f1 和 f2 相互调用直到 count为0 * 一个任务应该能够调用在同一个对象中的其他 synchronized 方法,因为这个任务已经获取这个对象的锁* 2020/04/16  */
public class MultiLock {public synchronized void f1(int count) { // 同步方法 f1 if(count-- > 0) {System.out.println("f1() calling f2() with count = " + count);f2(count); // 调用 f2 }}public synchronized void f2(int count) { // 同步方法f2 if(count-- > 0) {System.out.println("f2() calling f1() with count = " + count);f1(count); // 调用f1 }}public static void main(String[] args) {final MultiLock multiLock = new MultiLock();new Thread() {public void run() {multiLock.f1(5);}}.start(); }
}
/**
f1() calling f2() with count = 4
f2() calling f1() with count = 3
f1() calling f2() with count = 2
f2() calling f1() with count = 1
f1() calling f2() with count = 0
*/

5、java se5 并发类库中添加了一个特性,在 ReentrantLock  可重入锁上阻塞的任务具备可以被中断的能力;

/*** 可重入锁的可中断式加锁  * page 700 */
public class Interrupting2 {public static void main(String[] args) throws Exception {Thread t = new Thread(new Blocked2());t.start();TimeUnit.SECONDS.sleep(1);System.out.println("issuing t.interrupt()"); // 2 t.interrupt(); // 中断线程 }
}
/*** 阻塞互斥量 */
class BlockedMutex {private Lock lock = new ReentrantLock(); // 可重入锁 public BlockedMutex() {lock.lock(); // 构造器即加锁,且从不会释放锁 }public void f() {try {lock.lockInterruptibly(); // 可中断式加锁 System.out.println("lock acquired in f()");} catch(InterruptedException e) {System.out.println("interrupted from lock acquisition in f()"); // 3 可中断阻塞,捕获中断异常 }}
}
class Blocked2 implements Runnable {BlockedMutex blocked = new BlockedMutex(); @Overridepublic void run() {System.out.println("waiting for f() in Blocked Mutex"); // 1 blocked.f();System.out.println("broken out of blocked call"); // 4 }
}
/*** waiting for f() in Blocked Mutex
issuing t.interrupt()
interrupted from lock acquisition in f()
broken out of blocked call*/

【21.4.4】检查中断

1、在线程上调用 interrupt方法去中断线程执行时,能够中断线程的前提是: 任务要进入到阻塞操作中,已经在阻塞操作内部;否则,调用 interrupt方法是无法中断线程的;需要通过其他方式;

其他方式是: 由中断状态来表示, 其状态可以通过调用 interrupt 来设置。通过 Thread.interrupted() 来检查中断  。

/*** 通过 Thread.interrupted() 来检查中断  * page 701*/
public class InterruptingIdiom {public static void main(String[] args) throws Exception {if(args.length != 1) {System.out.println("InterruptingIdiom-傻瓜式中断");}Thread t = new Thread(new Blocked3()); t.start();TimeUnit.SECONDS.sleep(3); // 睡眠  t.interrupt();  // 中断 }
}class NeedsCleanup {private final int id;public NeedsCleanup(int id) {this.id = id;System.out.println("NeedsCleanup " + id);}public void cleanup() {System.out.println("clean up " +id);}
}
/*** 在run()方法中创建的 NeedsCleanup 资源都必须在其后面紧跟 try-finally 子句, * 以确保 清理资源方法被调用 */
class Blocked3 implements Runnable {private volatile double d = 0.0; @Overridepublic void run() {try {int index = 1;// interrupted方法来检查中断状态 while(!Thread.interrupted()) { // 只要当前线程没有中断 System.out.println("========== 第 " + index++ + " 次循环 =========="); NeedsCleanup n1 = new NeedsCleanup(1);try {System.out.println("sleeping-睡眠一秒");TimeUnit.SECONDS.sleep(1);NeedsCleanup n2 = new NeedsCleanup(2);try {System.out.println("calculating-高强度计算");for (int i=1; i<250000; i++) {d = d + (Math.PI + Math.E) / d;}System.out.println("finished time-consuming operation 完成耗时操作.");  } finally {n2.cleanup(); // 清理 }} finally{n1.cleanup(); // 清理 }}System.out.println("exiting via while() test-从while循环退出 "); // 从while循环退出 } catch (InterruptedException e) {System.out.println("exiting via InterruptedException-从中断InterruptedException退出 "); // 从中断退出 }}}
/***
InterruptingIdiom-傻瓜式中断
========== 第 1 次循环 ==========
NeedsCleanup 1
sleeping-睡眠一秒
NeedsCleanup 2
calculating-高强度计算
finished time-consuming operation 完成耗时操作.
clean up 2
clean up 1
========== 第 2 次循环 ==========
NeedsCleanup 1
sleeping-睡眠一秒
NeedsCleanup 2
calculating-高强度计算
finished time-consuming operation 完成耗时操作.
clean up 2
clean up 1
========== 第 3 次循环 ==========
NeedsCleanup 1
sleeping-睡眠一秒
clean up 1
exiting via InterruptedException-从中断InterruptedException退出 */

【21.5】线程间的协作

1、当任务协作时,关键问题是任务间的握手。握手可以通过 Object.wait() Object.notify() 方法来安全实现。当然了 java se5 的并发类库还提供了具有 await() 和 signal() 方法的Condition对象;

【21.5.1】wait()方法与notifyAll() 方法

1、wait() 方法会在等待外部世界产生变化的时候将任务挂起,并且只有在 nofity() 或notifyall() 发生时,即表示发生了某些感兴趣的事务,这个任务才会被唤醒去检查锁产生的变化。wait()方法提供了一种在任务之间对活动同步的方式。

还有,调用wait() 方法将释放锁,意味着另一个任务可以获得锁,所以该对象上的其他synchronized方法可以在线程A wait期间,被其他线程调用;

2、有两种形式的 wait() 调用

形式1: wait方法接收毫秒数作为参数,在wait()期间对象锁是释放的;通过 notify() notifyAll() 方法,或者时间到期后,从 wait() 恢复执行;

形式2:wait方法不接受任何参数,这种wait将无线等待下去,直到线程接收到 notify或 notifyAll方法;

补充1:wait方法,notify方法, notifyAll方法,都是基类Object的一部分,因为这些方法操作的锁也是对象的一部分,而所有对象都是OBject的子类;

补充2:实际上,只能在同步控制方法或同步控制块里调用 wait, notify, notifyAll方法(因为不操作锁,所有sleep方法可以在非同步控制方法里调用)。如果在非同步方法中调用 wait, notify, notifyAll方法, 编译可以通过,但运行就报 IllegalMonitorStateException 异常,异常意思是: 在调用wait, notify, notifyAll方法前,必须获取对象的锁;

(干货——只能在同步控制方法或同步控制块里调用 wait, notify, notifyAll方法) 

【荔枝】涂蜡与抛光: 抛光任务在涂蜡完成之前,是不能执行其工作的;而涂蜡任务在涂另一层蜡之前,必须等待抛光任务完成;
抛光 WaxOn, WaxOff, 使用了wait和notifyAll方法来挂起和重启这些任务;

/*** 汽车上蜡与抛光* (抛光任务在涂蜡完成之前,是不能执行其工作的;而涂蜡任务在涂另一层蜡之前,必须等待抛光任务完成;) * page 705 */
public class WaxOMatic {public static void main(String[] args) throws Exception {Car car = new Car();ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new WaxOff(car)); // 先上蜡 exec.execute(new WaxOn(car)); // 后抛光 TimeUnit.SECONDS.sleep(1); // 睡眠5秒 exec.shutdown(); // 线程池关闭 }
}
//  汽车
class Car {private boolean waxOn = false; // 是否上蜡// 已上蜡 /*** notifyAll() 和 wait() 方法只能在 synchronized方法或synchronized块中执行,因为获取或释放锁 */public synchronized void waxed() { // 上蜡 waxOn = true; notifyAll(); // 唤醒所有调用 wait() 方法锁阻塞的线程 // 为了使该任务从 wait() 中唤醒,线程必须重新获得之前进入wait()时释放的锁。// 在这个锁变得可用之前,这个任务是不会被唤醒的。}public synchronized void buffed() { // 抛光 waxOn = false; notifyAll();}public synchronized void waitForWaxing() throws InterruptedException { // 等待上蜡 while (waxOn == false) { // 若没有上蜡,则等待 wait(); // 线程被挂起, 当前线程持有的car对象锁被释放 }}public synchronized void waitForBuffing() throws InterruptedException { // 等待抛光 while(waxOn == true) { // 若已上蜡,则等待抛光wait(); // 线程被挂起, 当前线程持有的car对象锁被释放}}
}
class WaxOn implements Runnable { // 上蜡线程(本线程先执行第1次上蜡,等待抛光,抛光线程第1次执行抛光后,本线程执行第2次上蜡......)  private Car car; public WaxOn(Car c) {this.car = c; }@Overridepublic void run() {try {while(!Thread.interrupted()) {System.out.println("wax on !");TimeUnit.MILLISECONDS.sleep(200);car.waxed(); // 先上蜡完成 (把waxOn设置为true),唤醒等待上蜡的线程 car.waitForBuffing(); // 再等待抛光,当waxOn为ture,则抛光线程一直等待   }} catch (InterruptedException e ) {System.out.println("exiting via interrupt");}System.out.println("ending wax on task"); }
}
class WaxOff implements Runnable { // 抛光线程(本线程先等待上蜡,上蜡线程第1次执行后,本线程立即执行第1次抛光,接着本线程等待第2次上蜡......) private Car car; public WaxOff(Car c) {this.car = c; }@Overridepublic void run() {try {while(!Thread.interrupted()) {car.waitForWaxing(); // 先等待上蜡 , 当waxOn为false,则上蜡线程一直等待 System.out.println("wax off !"); // TimeUnit.MILLISECONDS.sleep(200);car.buffed(); // 抛光完成后,把waxOn设置为false,唤醒等待抛光的线程   }} catch (InterruptedException e ) {System.out.println("exiting via interrupt");}System.out.println("ending wax off task"); }
}
/*** wax on !
wax off !
wax on !
wax off !
...... */

补充:前面的实例强调必须用一个检查感兴趣的条件的while循环包围wait方法。这很重要,因为:(为啥要用while包裹wait呢)

前面的示例强调必须用一个检查感兴趣的条件的while循环包围wait()。这很重要,原因如下:
原因1:可能有多个任务出于相同的原因在等待同一个锁,而第一个唤醒任务可能会改变这种状况;如果属于这种情况,那么任务应该被再次挂起,直到其感兴趣的条件发生变化;
原因2:在本任务从其 wait()中被唤醒的时刻,有可能会有某个其他任务已经做出了改变,从而使得本任务在此时不能执行,或者执行其操作已显得无关紧要;此时,应该再次执行wait()将其重新挂起;
(个人理解——比如有2个任务A,B都在等待资源R可用而阻塞,当R可用时,任务A和B均被唤醒,但任务A被唤醒后立即拿到了临界资源或获取了锁,则任务B仍然需要再次阻塞,这就是while的作用)
原因3:有可能某些任务出于不同的原因在等待你的对象上的锁(必须使用notifyAll唤醒);在这种情况下,需要检查是否已经由正确的原因唤醒,如果不是,则再次调用wait方法;

用while 包围wait方法的本质:检查所有感兴趣的条件,并在条件不满足的情况下再次调用wait方法,让任务再次阻塞;

3、错失的信号:当两个线程使用 notify/wait() 或 notifyAll()/ wait() 方法进行协作时,有可能会错过某个信号;即 notify或 notifyAll发出的信号,带有wait的线程无法感知到。

荔枝:

// T1:
synchronized(sharedMonitor) {<setup condition for T2>sharedMonitor.notify() // 唤醒所有等待线程
}
// T2:
while(someCondition) {// point 1 synchronized(sharedMonitor) {sharedMonitor.wait(); // 当前线程阻塞 }
} 

当T2 还没有调用 wait方法时,T1就发送了notify信号; 这个时候T2线程肯定接收不到这个信号;T1发送信号notify后,T2才调用wait方法,这时,T2将永久阻塞下去;因为他错过了T1的notify信号;

T2正确的写法如下:

// T2正确的写法如下:
synchronized(sharedMonitor) {while(someCondition) {sharedMonitor.wait(); // 当前线程阻塞 }
}

如果T1先执行后释放锁;此时T2获取锁且检测到 someCondition已经发生了变化,T2不会调用wait() 方法;

如果T2先执行且调用了wait()方法, 释放了锁; 这时T1后执行,然后调用notify()唤醒阻塞线程, 这时T2可以收到T1的 notify信号,从而被唤醒, 由T1修改了 someCondition的条件, 所以T2 不会进入while循环;

【21.5.2】notify与notifyAll方法
1、notify()方法:在使用 notify方法时,在众多等待同一个锁的任务中只有一个会被唤醒,如果你希望使用notify,就必须保证被唤醒的是恰当的任务。
2、notifyAll将唤醒所有正在等待的任务。这是否意味着在任何地方,任何处于wait状态中的任务都将被任何对notifyAll的调用唤醒呢。事实上,当notifyAll因某个特定锁而被调用时,只有等待这个锁的任务才会被唤醒;

/*** page 707* notify 与 notifyAll的对比, * notify 唤醒单个阻塞线程,而notifyAll唤醒所有阻塞线程*/
public class NotifyVsNotifyAll {public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();// 线程池 for (int i =0; i<5; i++) {exec.execute(new Task()); // 运行5个任务, 只要task 任务一运行就会阻塞,除非被唤醒 }exec.execute(new Task2()); // 运行第6个任务, 只要task2 任务一运行就会阻塞,除非被唤醒 Timer timer = new Timer(); // 定时器 // 定时调度, 延迟400毫秒开始执行,两次运行的时间间隔为500毫秒 timer.scheduleAtFixedRate(new TimerTask() {boolean prod = true; @Overridepublic void run() {if (prod) {System.out.println("\n notify() ");Task.blocker.prod(); // 唤醒单个阻塞线程 prod = false ;} else {System.out.println("\n notifyAll()");Task.blocker.prodAll(); // 唤醒所有阻塞线程 prod = true ;}}}, 400, 500);TimeUnit.SECONDS.sleep(5);timer.cancel(); // 关闭定时器,关闭所有线程,正在运行的任务除外  System.out.println("timer canceled");TimeUnit.MILLISECONDS.sleep(500); // 睡眠500毫秒 System.out.println("task2.blocker.prodAll()");Task2.blocker.prodAll(); // task2 唤醒所有阻塞线程 TimeUnit.MILLISECONDS.sleep(500); // 睡眠500毫秒System.out.println("\n shutting down");exec.shutdownNow(); // 关闭线程池 }
}
// 阻塞器
class Blocker {synchronized void waitingCall() {try {while(!Thread.interrupted()) {wait(); // 期初所有线程均阻塞,等待 notify 或 notifyAll 来唤醒 System.out.println(Thread.currentThread() + " ");}} catch (InterruptedException e ) {}}synchronized void prod() {notify();// 唤醒单个阻塞线程 }synchronized void prodAll() {notifyAll(); // 唤醒所有阻塞线程 }
}
// 任务
class Task implements Runnable {static Blocker blocker = new Blocker();// 阻塞器 @Overridepublic void run() {blocker.waitingCall();// wait() 方法阻塞 }
}
// 任务2
class Task2 implements Runnable {static Blocker blocker = new Blocker(); // 阻塞器 @Overridepublic void run() {blocker.waitingCall(); // wait() 方法阻塞  }
}
/*notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main] notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main] notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main] notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-2,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-5,5,main] notify()
Thread[pool-1-thread-1,5,main] notifyAll()
Thread[pool-1-thread-1,5,main]
Thread[pool-1-thread-5,5,main]
Thread[pool-1-thread-4,5,main]
Thread[pool-1-thread-3,5,main]
Thread[pool-1-thread-2,5,main]
timer canceled
task2.blocker.prodAll()
Thread[pool-1-thread-6,5,main] shutting down */

补充:

// 阻塞器
class Blocker {synchronized void waitingCall() {try {while(!Thread.interrupted()) {wait(); // 期初所有线程均阻塞,等待 notify 或 notifyAll 来唤醒 System.out.println(Thread.currentThread() + " ");}} catch (InterruptedException e ) {}}synchronized void prod() {notify();// 唤醒单个阻塞线程 }synchronized void prodAll() {notifyAll(); // 唤醒所有阻塞线程 }
}

Blocker.waitingCall 方法中的while循环, 有两种方式可以离开这个循环:
方式1:发生异常而离开;
方式2:通过检查 interrupted标志离开;

【21.5.3】生产者与消费者
1、对于一个饭店,有一个厨师和服务员。服务员必须等待厨师准备好膳食。当厨师准备好时,他会通知服务员,之后服务员上菜,然后返回继续等待下一次上菜。
这是一个任务协作的荔枝,厨师代表生产者,服务员代表消费者。两个任务必须在膳食被生产和消费时进行握手,而系统必须以有序的方式关闭。

/*** page 709* 生产者(厨师chef)-与消费者(服务员WaitPerson)  */
public class Restaurant { // 餐馆 Meal meal ; ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 WaitPerson waitPerson = new WaitPerson(this); // 服务员 Chef chef = new Chef(this); // 厨师 // 构造器中通过线程池 运行厨师和服务员的任务 public Restaurant() {exec.execute(chef);exec.execute(waitPerson);}public static void main(String[] args) {new Restaurant(); }
}
class Meal { // 膳食 private final int orderNum;  // 订单号 public Meal(int orderNum) {this.orderNum = orderNum; }@Override public String toString() {return "meal " + orderNum; }
}
class WaitPerson implements Runnable { // 服务员(消费者)private Restaurant restaurant; // 餐馆 public WaitPerson(Restaurant restaurant) {this.restaurant = restaurant;} @Overridepublic void run() {try {while(!Thread.interrupted()) {synchronized (this) {while(restaurant.meal == null) { // 没有菜可以上,服务员等待 wait(); // 阻塞,直到 notify 或 notifyAll 唤醒  }}System.out.println("服务器取到餐=WaitPerson got " + restaurant.meal);synchronized (restaurant.chef) { // 厨师 restaurant.meal = null; restaurant.chef.notifyAll(); // 唤醒所有阻塞在 chef对象上的线程 }}} catch (InterruptedException e) {System.out.println("WaitPerson interrupted(服务员线程中断)");}}
}
class Chef implements Runnable {// 厨师(生产者) private Restaurant restaurant; private int count = 0;public Chef(Restaurant r) {restaurant = r; }@Overridepublic void run() {try {while(!Thread.interrupted()) {synchronized (this) {while(restaurant.meal != null) { // 菜没有被端走,厨师等待  wait(); // 阻塞,直到 notify 或 notifyAll 唤醒}}if (++count == 10) { // 厨师只做10个菜 System.out.println("out of food, closing。厨师只做10个菜,关闭线程池");restaurant.exec.shutdownNow();  // 关闭餐馆的线程池,该池运行着厨师和服务员任务 }System.out.println("厨师说,上菜了,order up!");synchronized (restaurant.waitPerson) {restaurant.meal = new Meal(count); // 厨师生产一个菜 restaurant.waitPerson.notifyAll(); // 唤醒服务员端菜 }TimeUnit.MILLISECONDS.sleep(100);}} catch (InterruptedException e) {System.out.println("chef interrupted");}}
}
/*
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 1
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 2
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 3
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 4
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 5
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 6
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 7
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 8
厨师说,上菜了,order up!
服务器取到餐=WaitPerson got meal 9
out of food, closing。厨师只做10个菜,关闭线程池
厨师说,上菜了,order up!
WaitPerson interrupted(服务员线程中断)
chef interrupted
*/

2、代码解说, Restraurant是 WaitPerson和Chef的焦点,作为连接两者的桥梁。他们知道在为哪个Restraurant工作,因为他们必须和这家饭店打交道,以便放置或拿取膳食。
2.1、(干货)再次提问:如果在等待一个订单,一旦你被唤醒,这个订单就必定是可以获得的吗?
答案不是的。因为在并发应用中,某个其他的任务可能会在WaitPerson被唤醒时,会突然插足并拿走订单,唯一安全的方式是使用下面这种惯用的wait() 方法,来保证在退出等待循环前,条件将得到满足。如果条件不满足,还可以确保你可以重返等待状态。
while(conditionIsNotMet) {
    wait();
}
2.2、shutdownNow()将向所有由  ExecutorService启动的任务发送 interrupt信号。但是在Chef中,任务并没有在获得该interrupt信号后立即关闭,因为当任务试图进入一个可中断阻塞操作时, 这个中断只能抛出 InterruptException。然后当 Chef 试图调用sleep()时,抛出了 InterruptedException。如果移除对sleep()的调用,那么这个任务将回到run()循环的顶部,并由于Thread.interrupted() 测试而退出,同时并不抛出异常。

3、使用显式的Lock和 Condition 对象  
使用互斥并允许任务挂起的基本类是 Condition,调用Condition的await() 可以挂起一个任务;调用signal() 可以唤醒一个任务;调用signalAll() 可以唤醒所有在这个Condition上被其自身挂起的任务。
(干货——与notifyAll()相比,signalAll()方法是更安全的方式)

/*** page 711 * 使用显式的Lock 和 Condition对象 */
public class WaxOMatic2 {public static void main(String[] args) throws InterruptedException {Car2 car = new Car2(); ExecutorService executorService = Executors.newCachedThreadPool();executorService.execute(new WaxOff2(car)); // 抛光executorService.execute(new WaxOn2(car)); // 打蜡 TimeUnit.SECONDS.sleep(1); // 睡眠5秒 executorService.shutdownNow(); }
}
class Car2 {private Lock lock = new ReentrantLock(); // 可重入锁 private Condition condition = lock.newCondition(); // 获取锁的条件 private boolean waxOn = false; // 期初时,没有上蜡public void waxed() { // 上蜡 lock.lock(); // 加锁 try { waxOn = true;  // 上蜡完成 condition.signalAll(); // 唤醒所有等待线程 } finally {lock.unlock(); // 解锁 }}public void buffed() { // 抛光 lock.lock(); // 加锁 try {waxOn = false; // 抛光完成,待上蜡 condition.signalAll();} finally {lock.unlock(); // 解锁 }}public void waitForWaxing() throws InterruptedException { // 等待上蜡 lock.lock();try {while(waxOn == false) { // 还未上蜡,等待上蜡condition.await(); // 挂起 }} finally {lock.unlock();}}public void waitForBuffing() throws InterruptedException { // 等待抛光 lock.lock();try {while(waxOn == true) { // 上蜡完成,等待抛光 condition.await(); // 挂起 }} finally {lock.unlock(); }}
}
class WaxOn2 implements Runnable { // 打蜡任务 private Car2 car ; public WaxOn2(Car2 c) {this.car = c; }@Overridepublic void run() {try {while(!Thread.interrupted()) {System.out.println("wax on ");TimeUnit.MILLISECONDS.sleep(200);car.waxed(); // 打蜡完成 car.waitForBuffing(); // 等待抛光 }} catch(InterruptedException e ) {System.out.println("WaxOn2 exiting via interrupt");}System.out.println("WaxOn2 ending wax on task");}
}
class WaxOff2 implements Runnable  {// 打蜡结束,开始抛光任务 private Car2 car; public WaxOff2(Car2 c) {this.car = c; }@Overridepublic void run() {try {while(!Thread.interrupted()) {car.waitForWaxing();  // 等待打蜡 System.out.println("wax off");TimeUnit.MILLISECONDS.sleep(200);car.buffed(); // 抛光完成 }} catch(InterruptedException e ) {System.out.println("WaxOff2 exiting via interrupt");}System.out.println("WaxOff2 ending wax off task");}
}
/*
wax on
wax off
wax on
wax off
wax on
WaxOff2 exiting via interrupt
WaxOff2 ending wax off task
WaxOn2 exiting via interrupt
WaxOn2 ending wax on task
*/

代码解说:每个对lock()的调用都必须紧跟一个try-finally子句,用来保证在所有情况下都可以释放锁。在使用内建版本时,任务在可以调用 await(), signal(), signalAll() 方法前,必须拥有这个锁。
(干货——不推荐使用Lock和Condition对象来控制并发)使用Lock和Condition对象来控制并发比较复杂,只有在更加困难的多线程问题中才使用他们;

【21.5.4】生产者与消费者队列
1、wait()和notifyAll() 是一种低级的方式来解决任务协作问题;也可以使用同步队列这种高级方式来解决,同步队列在任何时刻都只允许一个任务插入或移除元素。
2、同步队列 BlockingQueue,两个实现,LinkedBlockingQueue,无界队列, ArrayBlockingQueue-固定尺寸,放置有限数量的元素;
3、若消费者任务试图从队列中获取元素,而该队列为空时,队列可以挂起消费者任务让其阻塞;并且当有更多元素可用时,队列可以唤醒消费者任务。
阻塞队列可以解决非常多的问题,且比 wait()与notifyAll()简单得多。
【看个荔枝】

/*** 阻塞队列 * page 714 */
public class TestBlockingQueues {static void getKey() {try {// 从控制台读入用户输入new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (IOException e) {throw new RuntimeException(e); }}static void getKey(String msg) {System.out.println(msg);getKey(); }static void test(String msg, BlockingQueue<LiftOff> queue) {System.out.println(msg);LiftOffRunner runner = new LiftOffRunner(queue);Thread t = new Thread(runner);t.start();for (int i=0; i<3; i++) {runner.add(new LiftOff(3)); // 添加5个发射任务到阻塞队列 } getKey("press enter " + msg);t.interrupt(); // 线程中断 System.out.println("finished " + msg + " test");}public static void main(String[] args) {test("LinkedBlockingQueue", new LinkedBlockingQueue<LiftOff>()); // 链表阻塞队列,无界 test("ArrayBlockingQueue", new ArrayBlockingQueue<LiftOff>(3)); // 数组阻塞队列,固定长度 test("SynchronousQueue", new SynchronousQueue<LiftOff>()); // 同步队列  }
}
// lift off 发射,起飞
class LiftOffRunner implements Runnable {private BlockingQueue<LiftOff> rockets; // 阻塞队列,火箭队列public LiftOffRunner(BlockingQueue<LiftOff> queue) {this.rockets = queue; }public void add(LiftOff lo) { // LiftOff 发射起飞任务 try {rockets.put(lo); // 往队列里面放入 发射起飞任务  } catch (InterruptedException e) {System.out.println("interupted during put()");}}@Overridepublic void run() {try {while(!Thread.interrupted()) {LiftOff rocket = rockets.take(); // 从队列中取出任务,运行,没有任务,则阻塞 rocket.run(); }} catch (InterruptedException e) {System.out.println("waking from task()");}System.out.println("exiting LiftOffRunner"); }
}
/*
LinkedBlockingQueue
press enter LinkedBlockingQueue
#0(2),
#0(1),
#0(liftoff),
#1(2),
#1(1),
#1(liftoff),
#2(2),
#2(1),
#2(liftoff), finished LinkedBlockingQueue test
waking from task()
exiting LiftOffRunner
ArrayBlockingQueue
press enter ArrayBlockingQueue
#3(2),
#3(1),
#3(liftoff),
#4(2),
#4(1),
#4(liftoff),
#5(2),
#5(1),
#5(liftoff), finished ArrayBlockingQueue test
waking from task()
exiting LiftOffRunner
SynchronousQueue
#6(2),
#6(1),
#6(liftoff),
#7(2),
#7(1),
#7(liftoff),
#8(2),
press enter SynchronousQueue
#8(1),
#8(liftoff), finished SynchronousQueue test
waking from task()
exiting LiftOffRunner*/

【吐司BlockingQueue】
1、一台机器有3个任务: 一个制作吐司,一个给吐司抹黄油,另一个在抹过黄油的吐司上涂果酱;

/*** 吐司制作程序-* 一台机器有3个任务:第1制作吐司, 第2抹黄油,第3涂果酱,阻塞队列-LinkedBlockingQueue  * page 715 */
public class ToastOMatic {public static void main(String[] args) throws Exception {ToastQueue dryQueue = new ToastQueue(); // 烘干的吐司队列 ToastQueue butterQueue = new ToastQueue(); // 涂黄油的吐司队列 ToastQueue finishQueue = new ToastQueue(); // 制作完成的吐司队列 /* 线程池 */ExecutorService exec = Executors.newCachedThreadPool();exec.execute(new Toaster(dryQueue)); // 吐司exec.execute(new Butterer(dryQueue, butterQueue)); // 黄油 exec.execute(new Jammer(butterQueue, finishQueue)); // 果酱 exec.execute(new Eater(finishQueue)); // 吃 TimeUnit.SECONDS.sleep(1);exec.shutdownNow(); // 发出中断信号, 线程报  InterruptedException 中断异常 , 所有线程均结束exec.shutdown(); }
}
class Toast { // 吐司类  public enum Status{DRY, BUTTERED, JAMMED}; // 枚举类 dry-烘干, butter-黄油,jam-果酱 private Status status = Status.DRY; private final int id ;public Toast(int id) { // 编号 this.id = id; }public void butter() { // 抹黄油结束 status = Status.BUTTERED;}public void jam() { // 涂果酱结束 status = Status.JAMMED;}public Status getStatus() {return status; }public int getId() {return id; }public String toString() {return "toast " + id + " : " + status; }
}
class ToastQueue extends LinkedBlockingQueue<Toast> {} // 吐司队列  class Toaster implements Runnable { // 第1个工序: 做吐司 private ToastQueue toastQueue; // 吐司队列 private int count = 0; // 计数器  private Random rand = new Random(47);public Toaster(ToastQueue toastQueue) {this.toastQueue = toastQueue; } @Override public void run() {try {while(!Thread.interrupted()) { // 只要任务不中断  TimeUnit.MILLISECONDS.sleep(100+ rand.nextInt(500));Toast t = new Toast(count++) ; System.out.println(t);toastQueue.put(t); // 往队列添加吐司 }} catch (InterruptedException e) {System.out.println("toaster interrupted");}System.out.println("toaster off");  // 吐司制作完成  }
}
class Butterer implements Runnable { // 第2个工序:黄油 private ToastQueue dryQueue, butterQueue;public Butterer(ToastQueue dryQueue, ToastQueue butterQueue) { // 已烘干吐司队列, 已抹黄油的吐司队列 this.dryQueue = dryQueue;this.butterQueue = butterQueue; }@Overridepublic void run() {try {while(!Thread.interrupted()) {Toast t = dryQueue.take(); // 获取烘干的吐司 t.butter(); // 抹黄油 System.out.println(t);butterQueue.put(t); // 往黄油队列添加 }} catch (InterruptedException e) {System.out.println("butterer interrupted ");}System.out.println("butterer off");}
}
class Jammer implements Runnable { // 第3个工序,涂果酱 private ToastQueue butterQueue, finishQueue; public Jammer(ToastQueue butterQueue, ToastQueue finishQueue) {this.butterQueue = butterQueue;this.finishQueue = finishQueue;}@Overridepublic void run() {try {while(!Thread.interrupted()) {Toast t = butterQueue.take(); // 从抹黄油队列中获取吐司  t.jam(); // 涂果酱 System.out.println(t);finishQueue.put(t); // 添加到完成队列 }} catch (InterruptedException e ) {System.out.println("jammer interrupted");}System.out.println("jammer off"); // 涂果酱完成  }
}
class Eater implements Runnable { // 消费者,吃吐司 private ToastQueue finishQueue; private int counter = 0;public Eater(ToastQueue finishQueue) {this.finishQueue = finishQueue;}@Overridepublic void run() {try {while(!Thread.interrupted()) {Toast t = finishQueue.take(); // 从吐司制作完成队列中获取吐司 if (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) {System.out.println(">>>> Error: " + t);System.exit(1);} else {System.out.println("chomp !" + t); // chomp-大声咀嚼,吃吐司 }}} catch (InterruptedException e) {System.out.println("eater interrupted");}System.out.println("eat off"); // 吃饱回家 }
}
/*toast 0 : DRY
toast 0 : BUTTERED
toast 0 : JAMMED
chomp !toast 0 : JAMMED
toast 1 : DRY
toast 1 : BUTTERED
toast 1 : JAMMED
chomp !toast 1 : JAMMED
toast 2 : DRY
toast 2 : BUTTERED
toast 2 : JAMMED
chomp !toast 2 : JAMMED
eater interrupted
eat off
toaster interrupted
toaster off
butterer interrupted
butterer off
jammer interrupted
jammer off* */

【21.5.5】任务间使用管道进行输入输出
1、通过输入输出在线程间通信很常用。提供线程功能的类库以管道的形式对线程间的输入输出提供了支持,分别是PipedWriter和PipedReader类,分别允许任务向管道写和允许不同任务从同一个管道读取。
这种模式可以看做是 生产者-消费者问题的变体,管道就是一个封装好了的解决方案。管道可以看做是一个阻塞队列,存在于多个引入 BlockingQueue之间的java版本中。

/*** 任务间使用管道进行输入输出 * page 718 */
public class PipedIO {public static void main(String[] args) throws Exception {Sender sender = new Sender(); // 发送器Receiver receiver = new Receiver(sender); // 接收器 ExecutorService exec = Executors.newCachedThreadPool(); // 线程池  exec.execute(sender); // 发送 exec.execute(receiver); // 接收 TimeUnit.SECONDS.sleep(3);exec.shutdown(); // 关闭线程池 }
}
//发送者任务
class Sender implements Runnable { private Random rand = new Random(47); // 随机数 private PipedWriter out = new PipedWriter(); // 管道输出对象 public PipedWriter getPipedWriter() {return out; }@Overridepublic void run() {try {while(true) {for (char c = 'A'; c <= 'z'; c++) {out.write(c); // 把字符输出到管道 TimeUnit.MILLISECONDS.sleep(rand.nextInt(500));}}} catch (IOException e) {System.out.println("\n" + e + " sender write exception ");} catch (InterruptedException e2) {System.out.println("\n" + e2 + " sender sleep interrupted. ");}}
}
// 接收者任务
class Receiver implements Runnable {private PipedReader in ; public Receiver(Sender sender) throws IOException {in = new PipedReader(sender.getPipedWriter()); // 管道输入对象 }@Overridepublic void run() {try {while(true ) {System.out.print("read:" + (char)in.read() + ", ");// 从管道读取数据 }} catch (IOException e ) {System.out.println("\n" + e + " receiver read exception.");}}
}

代码解说1: 当Receiver调用read() 方法时,如果没有更多的数据,管道将自动阻塞;
补充1:注意sender和receiver是在main()中启动的,即对象构造彻底完成以后。如果你启动一个没有构造完成的对象,在不同的平台上管道可能会产生不一致的行为。(BlockingQueue使用起来更加健壮且容易)(干货)
补充2:在shudownNow() 被调用时,PipedReader与普通IO之间的区别是:PipiedReader是可以中断的。 如果将 in.read() 修改为System.in.read(), 那么interrupt调用将不能打断read()调用。 (干货)

【21.6】死锁

1、作为哲学家,他们很穷,只能买5根筷子(通俗讲,筷子数量与哲学家数量相同);他们坐在桌子周围,每人之间放一根筷子,当一个哲学家要就餐的时候,这个哲学家必须同时拥有左边和右边的筷子。
如果一个哲学家左边或右表已经有人在使用筷子了,那么这个哲学家就必须等待,直至可以得到必须的筷子。
【代码-Chopstick】

/*** 死锁-筷子* page719  */
public class Chopstick {private boolean taken = false;// 拿起筷子 public synchronized void take() throws InterruptedException {while (taken) {wait();}taken = true; }// 放下筷子  public synchronized void drop() {taken = false; notifyAll(); }
}

【代码-Philosopher】

/*** 哲学家* page 718 */
public class Philosopher implements Runnable {private Chopstick left; // 左筷private Chopstick right;  // 右筷 private int id = 1; // 编号 private int ponderFactor = 0; // 思考因素 private Random rand = new Random(47); // 随机数发生器 private void pause() throws InterruptedException { // 暂停 if (ponderFactor == 0) return ; TimeUnit.MILLISECONDS.sleep(rand.nextInt(ponderFactor * 250)); // 睡眠 }public Philosopher(Chopstick left, Chopstick right, int ident, int ponder) {this.left = left; this.right = right; this.id = ident; this.ponderFactor = ponder; } @Overridepublic void run() {try {while (!Thread.interrupted()) {System.out.println(this + " " + "thinking"); // 思考 pause(); // 暂停 System.out.println(this + " " + "grabbing right"); // 拿走右边筷子 right.take(); System.out.println(this + " " + "grabbing left"); // 拿走左边筷子 left.take();System.out.println(this + " eating"); // 吃饭 pause(); // 暂停 right.drop(); //放下右边筷子left.drop(); //放下左边筷子  }} catch (InterruptedException e) {System.out.println(this + " exiting via interrupt. " ); }}@Overridepublic String toString() {return "Philosopher-哲学家" + id;  }
}

【代码-DeadLockDiningPhilosophers】

/*** 发生死锁的哲学家晚餐* page720 */
public class DeadLockDiningPhilosophers {public static void main(String[] args) throws Exception {int ponder = 0; // 把 ponder-思考时间 调整为0,发生死锁 int size = 5; ExecutorService exec = Executors.newCachedThreadPool(); // 线程池  Chopstick[] sticks = new Chopstick[size]; // 筷子数组 for (int i = 0; i < sticks.length; i++) {sticks[i] = new Chopstick(); // 初始化数组 }for (int i = 0; i < sticks.length; i++) {exec.execute(new Philosopher(sticks[i], sticks[(i+1)%size], i+1, ponder)); //  执行哲学家任务 }System.out.println("press enter to quit");System.in.read(); exec.shutdownNow(); }
}
/** 死锁发生了
Philosopher-哲学家2 thinking
Philosopher-哲学家4 thinking
press enter to quit
Philosopher-哲学家1 thinking
Philosopher-哲学家3 thinking
Philosopher-哲学家3 grabbing right
Philosopher-哲学家1 grabbing right
Philosopher-哲学家5 thinking
Philosopher-哲学家4 grabbing right
Philosopher-哲学家4 grabbing left
Philosopher-哲学家2 grabbing right
Philosopher-哲学家5 grabbing right
Philosopher-哲学家1 grabbing left
Philosopher-哲学家3 grabbing left
Philosopher-哲学家5 grabbing left
Philosopher-哲学家2 grabbing left
*/

代码解说:如果philosopher花费更多的时间思考而不是进餐(ponder值越大,思考时间越长),那么他们请求共享资源的可能性就会小许多,这样你就会确信该程序不会死锁,尽管他们并非如此。

2、死锁发生条件: 当以下4个条件同事满足时,死锁发生;(干货——死锁发生的4个条件,同时满足)
条件1:互斥条件。 任务使用的资源至少有一个是不能共享的;这里,一根筷子一次就只能被一个哲学家使用;
条件2:有任务请求被其他任务占用的共享资源。至少有一个任务,它必须持有一个资源且正在等待获取一个当前被别的任务持有的资源;即,要发生死锁,哲学家必须拿着一根筷子,且等待另一根;
条件3:资源不能被任务抢占。任务必须把资源释放当做普通事件;即哲学家不会从其他哲学家那里抢筷子;
条件4:必须有循环等待。一个任务等待其他任务所持有的资源,后者又在等待另一个任务所持有的资源,这样一直下去,直到有一个任务在等待第一个任务所持有的资源,使得大家都被锁住。;
在 DeadLockDiningPhilosophers 程序中,每个哲学家都试图先得到右边的筷子,然后得到左边的筷子,所以发生了循环等待;

3、要防止死锁,只需要破坏其中一个条件即可。最容易的方法是破坏第4个条件。
因为每个哲学家都先拿右边筷子,后拿左边筷子。 如果最后一个哲学家先拿左边筷子,后拿右边筷子,那么这个哲学家将永远不会阻止其右边的哲学家拿起筷子。即破坏了第4个条件。

【21.7】新类库中的构件
【21.7.1】 CountDownLatch
1、作用:被用来同步一个或多个任务,强制他们等待由其他任务执行的一组操作完成;
2、向CountDownLatch 对象设置一个初始值,任何在这个对象上调用wait() 方法都将阻塞,直到这个计数值到达0;调用 countDown()来减小这个计数值;
3、CountDownLatch 只能被初始化一次或触发一次,计数值不能被重置。如果需要重置,使用 CyclicBarrier;
4、典型用法:将一个程序分为n个相互独立的可解决任务,并创建值为0的 CountDownLatch;当每个任务完成时,都会在这个锁上调用 await方法,将自己拦住,直到锁存器计数为0结束;
【代码-CountDownLatchDemo】

/*** count down-倒计时* latch-锁存器  * page723 */
public class CountDownLatchDemo {static final int size = 5;public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();CountDownLatch latch = new CountDownLatch(size); // 倒计时锁存器 for (int i = 0; i < size; i++) {exec.execute(new WaitingTask(latch)); // 运行10个等待任务,直到锁存器计数值为0  }for (int i = 0; i < size; i++) {exec.execute(new TaskPortion(latch)); // 运行任务,使得锁存器计数值递减 }System.out.println("launched all tasks"); // 启动所有任务 exec.shutdown();  // 待所有线程执行完成,则线程池自动关闭 }
}
class TaskPortion implements Runnable { // 任务部分 private static int counter = 0; // 计数器private final int id = counter++;private static Random rand = new Random(47);private final CountDownLatch latch; // 递减锁存器 public TaskPortion(CountDownLatch latch) {this.latch = latch ; }@Overridepublic void run() {try {this.doWork(); // 做任务-睡眠 latch.countDown(); // 减少锁存器计数 } catch (InterruptedException e ) {System.out.println(this + " interrupted"); } }public void doWork() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(rand.nextInt(2000));System.out.println(this + " completed ");}public String toString() { return String.format("%1$-3d", id);}
}
class WaitingTask implements Runnable { // 等待任务 private static int counter = 0;private final int id = counter++;private final CountDownLatch latch; // 递减式锁存器public WaitingTask(CountDownLatch latch) {this.latch = latch;}@Overridepublic void run() {try {latch.await(); // 使得当前线程等待或把自己拦住,直到锁存器latch计数减至0, 或者本线程中断 System.out.println("通过锁存器障碍-latch barrier passed for " + this);} catch (InterruptedException e ) {System.out.println(this + " interrupted");}}@Override public String toString() {return String.format("waiting task %1$-3d", id); }
}
/*launched all tasks
1   completed
2   completed
4   completed
0   completed
3   completed
通过锁存器障碍-latch barrier passed for waiting task 2
通过锁存器障碍-latch barrier passed for waiting task 4
通过锁存器障碍-latch barrier passed for waiting task 3
通过锁存器障碍-latch barrier passed for waiting task 1
通过锁存器障碍-latch barrier passed for waiting task 0
*/

【21.7.2】CyclicBarrier
1、定义:可以将锁存器计数重置;
2、应用场景:创建一组任务,并行地执行工作,然后再进行下一个步骤之前等待,直到所有任务都完成。它使得所有的并行任务都讲在栅栏处列队等待,可以一致向前移动;
3、区别: CountDownLatch 只能使用一次,而 CyclicBarrier 可以循环重复使用;

/*** 循环障碍-赛马比赛 * page724*/
public class HorseRace {static final int FINISH_LINE = 10; private List<Horse> horses = new ArrayList<>(); private ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 private CyclicBarrier barrier; public HorseRace(int number, final int pause) {// 当有number个线程等待时,barrier就会断开。当其断开时,给定的任务就会执行// ,由最后一个进入 barrier的线程提供资源执行。 barrier = new CyclicBarrier(number, new Runnable() { @Override public void run() { // 这个任务会多次执行,因为 当有number个线程等待时会发生多次 StringBuilder builder = new StringBuilder(); for (int i = 0; i < FINISH_LINE; i++) {builder.append("="); // 追加字符串  }System.out.println("builder");for (Horse horse : horses) {System.out.println(horse.tracks()) ; // 调用跟踪方法  } for (Horse horse2 : horses) { // 遍历任务 if (horse2.getStrides() >= FINISH_LINE) { System.out.println(horse2 + " own !"); exec.shutdownNow(); // 这里会终止掉所有线程的执行  return ;}}try {TimeUnit.MILLISECONDS.sleep(pause);} catch (InterruptedException e) {System.out.println("barrier-action sleep interrupted");}} }); for (int i = 0; i < number; i++) {Horse horse = new Horse(barrier);horses.add(horse);exec.execute(horse); // 执行赛马任务  }}public static void main(String[] args) {int number = 5; // 5个马 int pause = 100;// 暂停时间 new HorseRace(number, pause);  }
}
class Horse implements Runnable { // 赛马任务 private static int counter = 0; // 计数器 private final int id = counter++; // 编号 private int strides = 0; // 步数 private static Random rand = new Random(47);private static CyclicBarrier barrier; // 栅栏 public Horse(CyclicBarrier cyclicBarrier) {this.barrier = cyclicBarrier;}public synchronized int getStrides () {return strides; }@Overridepublic void run() {try {while(!Thread.interrupted()) {synchronized(this) {strides += rand.nextInt(3); // 步数自加 } barrier.await(); // 当前线程等待,直到所有线程在该 barrier等待为止 } } catch (InterruptedException e) {throw new RuntimeException(e);} catch (BrokenBarrierException e2) {throw new RuntimeException(e2);} }@Override public String toString() {return "horse " + id + " ";  }public String tracks() { // 跟踪方法 StringBuilder builder = new StringBuilder();for (int i = 0; i < getStrides(); i++) {builder.append("*"); }builder.append(id); return builder.toString(); }
}
/*builder
**0
**1
*2
**3
*4
builder
***0
****1
***2
**3
**4
builder
***0
*****1
***2
****3
**4
builder
*****0
******1
****2
*****3
***4
builder
*******0
*******1
****2
*****3
****4
builder
*******0
*********1
******2
******3
*****4
builder
********0
**********1
******2
******3
******4
horse 1  own !
*/

【21.7.3】 DelayQueue
1、定义:无界的阻塞队列 BlockingQueue,用于放置实现了 Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走;
2、这种队列是有序的,即队头对象的延迟到期的时间最长。如果没有任何延迟到期,那么就不会有任何头元素,并且poll() 方法将返回null;所以,不能将null放置到该队列中;
【代码-DelayQueueDemo】

/*** 阻塞队列演示* page 726 */
public class DelayQueueDemo {public static void main(String[] args) {Random rand = new Random(47);ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 DelayQueue<DelayedTask> queue = new DelayQueue<>(); // 延迟队列 for (int i = 0; i < 5; i++) {queue.put(new DelayedTask(rand.nextInt(5000))); // 往队列添加延迟任务,延迟时间为5000内的随机数,小于5000毫秒  }queue.add(new DelayedTask.EndSentinel(5000, exec)); // 再添加一个延迟任务(该任务,负责关闭线程池),延迟时间为5000毫秒exec.execute(new DelayedTaskConsumer(queue)); // 执行任务 }
}
class DelayedTask implements Runnable, Delayed {private static int counter = 0; // 计数器 private final int id = counter++;private final int delta; // 三角洲 private final long trigger; // 触发器  protected static List<DelayedTask> sequence = new ArrayList<>(); // 列表 public DelayedTask(int delayInMilliseconds) {delta = delayInMilliseconds; // 延迟毫秒数  trigger = System.nanoTime() + TimeUnit.NANOSECONDS.convert(delta, TimeUnit.MILLISECONDS); // 时间单位转换,有精度丢失 sequence.add(this);}@Override public long getDelay(TimeUnit unit) { // 获取延迟时间 return unit.convert(trigger - System.nanoTime(), TimeUnit.NANOSECONDS);}@Overridepublic int compareTo(Delayed o) { // 时间比较 DelayedTask that = (DelayedTask) o; if (trigger < that.trigger) return -1; if (trigger > that.trigger) return  1; return 0;}@Overridepublic void run() {System.out.println(this + " ");}@Overridepublic String toString() {return String.format("[%1$-4d]", delta) + " task " + id ; }public String summary() { return "(" + id + ": " + delta + ")";}public static class EndSentinel extends DelayedTask { // 哨兵 private ExecutorService exec; public EndSentinel(int delay, ExecutorService exec) {super(delay);this.exec = exec; }@Overridepublic void run() { for (DelayedTask task : sequence) { // 获取每个延迟任务 System.out.println("task.summary() " + task.summary());}System.out.println(this  + " calling shutdownNow() 立即关闭线程池-关闭所有线程");exec.shutdownNow();  // 关闭线程池 }}
}
class DelayedTaskConsumer implements Runnable { // 延迟任务消费者  private DelayQueue<DelayedTask> queue; public DelayedTaskConsumer(DelayQueue<DelayedTask> queue) {this.queue = queue; }@Overridepublic void run() {try {while(!Thread.interrupted()) {queue.take().run(); // 获取队列任务并运行 }} catch (InterruptedException e) {System.out.println("DelayedTaskConsumer interrupted");}System.out.println("finish DelayedTaskConsumer");}
}
/*[555 ] task 1
[961 ] task 4
[1693] task 2
[1861] task 3
[4258] task 0
task.summary() (0: 4258)
task.summary() (1: 555)
task.summary() (2: 1693)
task.summary() (3: 1861)
task.summary() (4: 961)
task.summary() (5: 5000)
[5000] task 5 calling shutdownNow() 立即关闭线程池-关闭所有线程
finish DelayedTaskConsumer
*/

代码解说: 上述控制台输出信息为:

[555 ] task 1
[961 ] task 4
[1693] task 2
[1861] task 3
[4258] task 0
task.summary() (0: 4258)
task.summary() (1: 555)
task.summary() (2: 1693)
task.summary() (3: 1861)
task.summary() (4: 961)
task.summary() (5: 5000)
小结1:其中 555 是最小的延迟时间,即 DelayedTaskConsumer 将最紧急的任务从队列中取出,然后运行它;
小结2:在 DelayedQueue中, 任务创建顺序与执行没有关系,任务是按照所期望的延迟顺序来执行的;
如上, task1 最先执行,但其是第2个创建的任务task.summary 是从 sequence取值的,sequence记录了创建顺序;

【21.7.4】 PriorityBlockingQueue 优先级阻塞队列
1、定义:其执行顺序是按照优先级顺序来执行的;
【代码-PriorityBlockQueueDemo】

/*** 优先级阻塞队列演示* page728 */
public class PriorityBlockQueueDemo {public static void main(String[] args) {ExecutorService exec = Executors.newCachedThreadPool();  // 线程吃 PriorityBlockingQueue<Runnable> queue = new PriorityBlockingQueue<>(); // 优先级阻塞队列 exec.execute(new PrioritizedTaskProducer(queue, exec)); // 任务生产者 -创建41个任务exec.execute(new PrioritizedTaskConsumer(queue));}
}
// 优先级任务
class PrioritizedTask implements Runnable, Comparable<PrioritizedTask> {private Random rand = new Random(47);private static int counter = 0;private final int id = counter++; // 计数器 private  int priority = 0; // 优先级  protected static List<PrioritizedTask> sequence = new ArrayList<>(); public PrioritizedTask(int priority) {this.priority = priority; sequence.add(this); }@Overridepublic int compareTo(PrioritizedTask o) { // 比较 // 值越大,优先级越高,越先执行    return priority < o.priority ? 1 : (priority > o.priority ? -1 : 0) ;}@Override public void run() { try {TimeUnit.MILLISECONDS.sleep(rand.nextInt(250)); // 做任务就是睡眠 } catch (InterruptedException e) {System.out.println("PrioritizedTask Interrupted"); }System.out.println(this); }@Overridepublic String toString() {return String.format("toString() = 线程优先级[%1$-3d]", priority) + " task-线程编号- " + id; }public String summary() {return "(summary() = 线程编号:" + id + ": 线程优先级:" + priority +")";  }public static class EndSentinel extends PrioritizedTask { // 哨兵任务 private ExecutorService exec; public EndSentinel(ExecutorService exec) {super(-1); // 优先级为-1, 值越小,越后执行 this.exec = exec; }@Overridepublic void run() {int count = 0;for (PrioritizedTask task : sequence) { // 遍历每个任务,打印任务详情 System.out.println(task.summary());if (++count % 5 == 0) {System.out.println(" --------------------我是换行符--------------------  ");}} System.out.println();System.out.println(this + " calling shutdownNow() 关闭线程池 ");this.exec.shutdownNow();  // 关闭所有任务 }}
}
// 任务生产者 -创建16个任务
class PrioritizedTaskProducer implements Runnable {private Random rand = new Random(47);private Queue<Runnable> queue; // 任务队列  (优先级队列 )private ExecutorService exec; // 线程池 public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService exec) {this.queue = queue; this.exec = exec;  }@Overridepublic void run() { // 任务生产者  for (int i = 0; i < 10; i++) { queue.add(new PrioritizedTask(rand.nextInt(10))); // 往队列添加任务,优先级小于10,比10先执行 Thread.yield(); // 当前线程让出cpu 时间片 }try {for (int i = 0; i < 3; i++) {TimeUnit.MILLISECONDS.sleep(250); // 当前线程睡眠 queue.add(new PrioritizedTask(10)); // 再往队列添加3个任务 ,其优先级为10,已添加13个}for (int i = 0; i < 2; i++) {queue.add(new PrioritizedTask(i)); // 再添加2个任务,其优先级为i,已添15个}// 添加任务EndSentinel,该任务会遍历每个任务,打印任务详情,并会关闭线程池queue.add(new PrioritizedTask.EndSentinel(exec));} catch (InterruptedException e) {System.out.println("PrioritizedTaskProducer interrupted");}System.out.println("finish PrioritizedTaskProducer");}
}
// 任务消费者
class PrioritizedTaskConsumer implements Runnable {private PriorityBlockingQueue<Runnable> queue;  // PriorityBlockingQueue-优先级阻塞队列 public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> queue) {this.queue = queue;  }@Override  public void run() {try {while(!Thread.interrupted()) {// 从优先级队列中取出任务并执行, PrioritizedTask优先级任务 的优先级值越大,优先级越高,越先执行 queue.take().run();  }} catch (InterruptedException e) {System.out.println("PrioritizedTaskConsumer interrupted");}System.out.println("finish PrioritizedTaskConsumer");}
}
/*
toString() = 线程优先级[9  ] task-线程编号- 5
toString() = 线程优先级[8  ] task-线程编号- 0
toString() = 线程优先级[8  ] task-线程编号- 6
toString() = 线程优先级[7  ] task-线程编号- 9
toString() = 线程优先级[5  ] task-线程编号- 1
toString() = 线程优先级[3  ] task-线程编号- 2
toString() = 线程优先级[2  ] task-线程编号- 8
toString() = 线程优先级[1  ] task-线程编号- 4
toString() = 线程优先级[1  ] task-线程编号- 3
toString() = 线程优先级[0  ] task-线程编号- 7
toString() = 线程优先级[10 ] task-线程编号- 10
toString() = 线程优先级[10 ] task-线程编号- 11
finish PrioritizedTaskProducer
toString() = 线程优先级[10 ] task-线程编号- 12
toString() = 线程优先级[1  ] task-线程编号- 14
toString() = 线程优先级[0  ] task-线程编号- 13
(summary() = 线程编号:0: 线程优先级:8)
(summary() = 线程编号:1: 线程优先级:5)
(summary() = 线程编号:2: 线程优先级:3)
(summary() = 线程编号:3: 线程优先级:1)
(summary() = 线程编号:4: 线程优先级:1)--------------------我是换行符--------------------
(summary() = 线程编号:5: 线程优先级:9)
(summary() = 线程编号:6: 线程优先级:8)
(summary() = 线程编号:7: 线程优先级:0)
(summary() = 线程编号:8: 线程优先级:2)
(summary() = 线程编号:9: 线程优先级:7)--------------------我是换行符--------------------
(summary() = 线程编号:10: 线程优先级:10)
(summary() = 线程编号:11: 线程优先级:10)
(summary() = 线程编号:12: 线程优先级:10)
(summary() = 线程编号:13: 线程优先级:0)
(summary() = 线程编号:14: 线程优先级:1)--------------------我是换行符--------------------
(summary() = 线程编号:15: 线程优先级:-1)toString() = 线程优先级[-1 ] task-线程编号- 15 calling shutdownNow() 关闭线程池
finish PrioritizedTaskConsumer
*/

代码解说:
toString() = 线程优先级[9  ] task-线程编号- 5
toString() = 线程优先级[8  ] task-线程编号- 0
toString() = 线程优先级[8  ] task-线程编号- 6
toString() = 线程优先级[7  ] task-线程编号- 9
toString() = 线程优先级[5  ] task-线程编号- 1
toString() = 线程优先级[3  ] task-线程编号- 2
toString() = 线程优先级[2  ] task-线程编号- 8
toString() = 线程优先级[1  ] task-线程编号- 4
toString() = 线程优先级[1  ] task-线程编号- 3
toString() = 线程优先级[0  ] task-线程编号- 7
根据输出信息,可以看出,优先级高的线程先执行,其执行顺序与线程创建顺序无关;

【21.7.5】使用 ScheduledExecutor的温室控制器 (干货——ScheduledExecutor计划调度器可用于系统后台的周期性或定时跑批,如每日凌晨跑批,采用cron 表达式)
1、背景:每个期望的温室事件都是一个在预定时间运行的任务。ScheduledThreadPoolExecutor 提供了解决该问题的服务。
2、如何解决:通过使用schedule() 方法运行一次任务或者 scheduleAtFixedRate()(每隔规则的时间重复执行任务),你可以将Runnable对象设置为在将来的某个时刻执行。
【代码-GreenHouseScheduler】

/*** 温室调度器(定时调度,非常重要) * page 730 */
public class GreenHouseScheduler {private volatile boolean light = false; private volatile boolean water = false;private String thermostat = "day"; // 调温器-day-白天 ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(10); // 调度线程池/*** 创建并执行一次操作,该操作在给定的延迟后变为启用状态。*/public void schedcule(Runnable event, long delay) { // 调度方法 scheduler.schedule(event,  delay,  TimeUnit.MILLISECONDS); }public void repeat(Runnable event, long initialDelay, long period) { // 重复执行任务 /*创建并执行一个周期性操作,该操作将在给定的初始延迟后首先启用,然后在给定的周期内启用; 即执行将在initialDelay,initialDelay + period,initialDelay + 2 *期间等之后开始,如果任务的任何执行遇到异常,则后续执行将被抑制,否则,该任务将仅通过执行者的取消或终止而终止 。如果此任务的任何执行花费的时间超过其周期,则后续执行可能会延迟开始,但不会同时执行。*/scheduler.scheduleAtFixedRate(event, initialDelay, period, TimeUnit.MILLISECONDS); } class LightOn implements Runnable { // 开灯任务 @Overridepublic void run() {light = true; System.out.println("turning on lights");}}class LightOff implements Runnable {// 关灯灯任务@Overridepublic void run() {light = false; System.out.println("turning off lights");}}class WaterOn implements Runnable { // 浇水任务 @Overridepublic void run() {water = true; System.out.println("turning on water");}}class WaterOff implements Runnable { // 停止浇水任务 @Overridepublic void run() {water = false; System.out.println("turning off water");}}class ThermostatNight implements Runnable { // 把温度调整为夜晚模式 @Overridepublic void run() {System.out.println("Thermostat to night setting ");setThermostat("night");}}class ThermostatDay implements Runnable { // 把温度调整为夜晚白昼模式 @Overridepublic void run() {System.out.println("Thermostat to day setting ");setThermostat("day");}}class Bell implements Runnable { // 钟声响起任务 @Overridepublic void run() {System.out.println("bing"); }}class Terminate implements Runnable { // 终止任务,关闭线程池 @Overridepublic void run() {System.out.println("terminating");scheduler.shutdownNow(); // 关闭线程池 // 声明 同步列表 data // List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>()); new Thread() {public void run() {for (DataPoint d : data) { // 遍历并打印数据点 System.out.println(d);}}}.start(); }}static class DataPoint { // 数据点 Calendar time;  // 日期float temperature; // 温度float humidity; // 湿度 public DataPoint(Calendar time, float temperature, float humidity) {this.time = time;this.temperature = temperature;this.humidity = humidity;}@Overridepublic String toString() {return time.getTime() +  String.format(", temperature: %1$.1f humidity: %2$.2f", temperature, humidity); }}private Calendar lastTime = Calendar.getInstance();{lastTime.set(Calendar.MINUTE, 30);lastTime.set(Calendar.SECOND, 30); }private float lastTemp = 65.0f; // 最新温度private int tempDirection = 1; // 温度方向, 正private float lastHumidity = 50.0f;  // 最新湿度 private int humidityDirection = 1;  // 湿度方向,正 private Random rand = new Random(47);/*Collections.synchronizedList - 返回由指定列表支持的同步(线程安全)列表。 为了保证串行访问,至关重要的是,对后备列表的所有访问都必须通过返回的列表来完成。*/List<DataPoint> data = Collections.synchronizedList(new ArrayList<DataPoint>());// 收集数据任务  class CollectData implements Runnable {@Override public void run() {System.out.println("collecting data");synchronized (GreenHouseScheduler.this) {lastTime.set(Calendar.MINUTE, lastTime.get(Calendar.MINUTE)+ 30); // 设置最新时间 if (rand.nextInt(5) == 4) {tempDirection *= -1; // 改变温度方向, 正数加, 负数减 }lastTemp += tempDirection * (1.0f + rand.nextFloat()); // 最新温度 if (rand.nextInt(5) == 4) {humidityDirection *= -1;  // 改变湿度方向, 正数加, 负数减}lastHumidity += humidityDirection * rand.nextFloat(); // 计算最新湿度data.add(new DataPoint((Calendar)lastTime.clone(), lastTemp, lastHumidity)); // 添加数据信息  }}}// main public static void main(String[] args) {GreenHouseScheduler scheduler = new GreenHouseScheduler(); // 温室调度器 scheduler.schedcule(scheduler.new Terminate(), 2000);// 3000毫秒后执行Terminate任务,关闭线程池  scheduler.repeat(scheduler.new Bell(), 0, 1000); // 0毫秒,0+1000毫秒,0+1000+1000毫秒.... 后执行Bell任务  scheduler.repeat(scheduler.new ThermostatNight(), 0, 1000); // 0毫秒,0+1000毫秒,0+1000+1000毫秒.... 后执行ThermostatNight任务scheduler.repeat(scheduler.new LightOn(), 0, 200); // 0毫秒,0+200毫秒,0+200+200.... 后执行LightOn任务scheduler.repeat(scheduler.new LightOff(), 0, 200); // 0毫秒,0+2毫秒,0+200+200毫秒.... 后执行LightOff任务scheduler.repeat(scheduler.new WaterOn(), 0, 200); // 0毫秒,0+200毫秒,0+200+200毫秒.... 后执行WaterOn任务scheduler.repeat(scheduler.new WaterOff(), 0, 200); // 0毫秒,0+200毫秒,0+200+200毫秒.... 后执行WaterOff任务scheduler.repeat(scheduler.new ThermostatDay(), 0, 200); // 0毫秒,0+200毫秒,0+200+200毫秒.... 后执行ThermostatDay 任务scheduler.repeat(scheduler.new CollectData(), 500, 500); // 0毫秒,0+500毫秒,0+500+500毫秒.... 后执行CollectData任务}public String getThermostat() { return thermostat;}public void setThermostat(String thermostat) {this.thermostat = thermostat;}
}
/*
bing
Thermostat to night setting
turning on lights
turning off lights
turning on water
turning off water
Thermostat to day setting
turning on lights
turning off lights
turning on water
Thermostat to day setting
turning off water
turning on lights
turning on water
turning off lights
Thermostat to day setting
turning off water
collecting data
turning on lights
turning off lights
turning off water
turning on water
Thermostat to day setting
turning on lights
turning off lights
turning on water
Thermostat to day setting
turning off water
bing
Thermostat to night setting
turning on lights
turning off lights
turning off water
collecting data
turning on water
Thermostat to day setting
turning on lights
Thermostat to day setting
turning off water
turning on water
turning off lights
turning on lights
turning off water
Thermostat to day setting
turning off lights
turning on water
collecting data
turning on lights
turning off lights
turning on water
turning off water
Thermostat to day setting
turning on lights
turning on water
turning off lights
turning off water
Thermostat to day setting
bing
turning on lights
Thermostat to day setting
Thermostat to night setting
terminating
collecting data
turning off water
turning on water
turning off lights
Sun Jul 05 17:00:30 CST 2020, temperature: 66.4 humidity: 50.05
Sun Jul 05 17:30:30 CST 2020, temperature: 68.0 humidity: 50.47
Sun Jul 05 18:00:30 CST 2020, temperature: 69.7 humidity: 51.42
Sun Jul 05 18:30:30 CST 2020, temperature: 70.8 humidity: 50.87
*/
public class ScheduledThreadPoolExecutorextends ThreadPoolExecutorimplements ScheduledExecutorService

代码解说:volatile和 synchronized 都得到了应用,以防止任务之间的相互干涉。在持有 DataPoint的List中的所有方法都是 synchronized,这是因为在List被创建时,使用了 Collections工具 synchronizedList();

【21.7.6】Semaphore
1、正常的锁(current.locks或 synchronized锁)在任何时刻都只允许一个任务访问一项资源,而计数信号量允许n个任务同时访问这个资源;你还可以将信号量看做是在向外分发使用资源的许可证,尽管实际上没有使用任何许可证对象。
2、看个荔枝:线程池。 管理着数量有限的对象,当要使用对象时可以签出他们,而在用户使用完成时,可以将它们签回;
【代码-Pool】

/*** Pool对象池-通过信号量Semaphore来管理 Semaphore-信号量(他管理着数量有限的对象,当要使用对象时可以签出他们,* 在用户使用完成时,将他们签回) page 733*/
public class Pool<T> {private int size;private List<T> items = new ArrayList<>();private volatile boolean[] checkOut;private Semaphore available; // 计数信号量public Pool(Class<T> classObj, int size) {this.size = size;checkOut = new boolean[size];available = new Semaphore(size, true);for (int i = 0; i < size; i++) {try {items.add(classObj.newInstance());} catch (Exception e) {throw new RuntimeException(e);}}}// 如果没有任何信号量许可证可用, available将阻塞调用。public T checkOut() throws InterruptedException {// 从此信号量获取许可,直到一个可用或线程中断为止,将一直阻塞。available.acquire(); // 获取return getItem();}// 如果被签入的对象有效,则向信号量返回一个许可证public void checkIn(T x) { // 嵌入对象if (releaseItem(x)) { // 释放对象成功// 释放许可证,将其返回到信号灯。available.release(); //}}private synchronized T getItem() { // 签出对象for (int i = 0; i < size; i++) {if (!checkOut[i]) { // 签出状态为 false,则返回该对象checkOut[i] = true;return items.get(i);}}return null;}private synchronized boolean releaseItem(T item) { // 释放对象int index = items.indexOf(item);if (index == -1)return false;if (checkOut[index]) {checkOut[index] = false;return true;}return false;}
}
/*** 创建代价高昂的对象类型,构造器运行起来很耗时* page 734 */
public class Fat {private volatile double d; private static int counter = 0;private final int id = counter++;public Fat() {for (int i = 0; i < 10000; i++) {d += (Math.PI + Math.E) / (double) i ;}}public void operation() {System.out.println(this);}@Override public String toString() {return "fat id: " + id; }
}
/*** 信号量演示(非常重要的荔枝) * page 734 * (*    一旦池中的所有对象被签出,semaphore 将不允许执行任何签出操作; *    blocked的run()方法因此会被阻塞, 2秒钟后,cancel()方法会被调用, 以此来挣脱Future的束缚* )*/
public class SemaphoreDemo {final static int size = 5; public static void main(String[] args) throws InterruptedException {final Pool<Fat> pool = new Pool<>(Fat.class, size); // 对象池,通过信号量来管理 ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 for (int i = 0; i < size; i++) {exec.execute(new CheckOutTask<Fat>(pool)); // 运行签出任务 }System.out.println("all checkout tasks created");List<Fat> list = new ArrayList<>();for (int i = 0; i < size; i++) {Fat f = pool.checkOut(); // 签出对象 System.out.println(i + " : main() thread check out");f.operation(); // "fat id: " + id list.add(f);}Future<?> blocked = exec.submit(new Runnable() {@Overridepublic void run() {try {pool.checkOut();// 开启单个线程,签出对象 } catch (InterruptedException e) {System.out.println("checkout() interrupted. "); }}});TimeUnit.SECONDS.sleep(2); //睡眠2秒 blocked.cancel(true); // 尝试取消执行此任务 for (Fat f : list) {pool.checkIn(f);}for (Fat f : list) { // 冗余的签入将被pool 忽略 pool.checkIn(f);} exec.shutdown(); // 关闭线程池 }
}
// 创建一个任务,先签出Fat对象,持有一段时间后,再签入,以此来测试Pool这个类
class CheckOutTask<T> implements Runnable { // 签出任务 private static int counter = 0;private final int id = counter++;private Pool<T> pool;public CheckOutTask(Pool<T> pool) {this.pool = pool; }@Overridepublic void run() {try {T item = pool.checkOut(); // 签出对象,获取信号量许可证System.out.println(this + " checked out " + item);System.out.println(this + " checking in " + item);pool.checkIn(item);  // 签入对象,释放许可证归还给信号量 } catch (InterruptedException e) {System.out.println("CheckOutTask interrupted");}}@Overridepublic String toString() {return "checkout task " + id + " "; }
}
/*checkout task 1  checked out fat id: 1
checkout task 4  checked out fat id: 4
checkout task 4  checking in fat id: 4
all checkout tasks created
checkout task 3  checked out fat id: 3
checkout task 3  checking in fat id: 3
checkout task 0  checked out fat id: 0
checkout task 0  checking in fat id: 0
checkout task 2  checked out fat id: 2
checkout task 2  checking in fat id: 2
0 : main() thread check out
fat id: 4
1 : main() thread check out
checkout task 1  checking in fat id: 1
fat id: 0
2 : main() thread check out
fat id: 1
3 : main() thread check out
fat id: 2
4 : main() thread check out
fat id: 3
checkout() interrupted. */

【21.7.7】 Exchanger 交换器
1、定义:Exchanger 是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,各自拥有一个对象,当它们离开时,它们都拥有之前由对象持有的对象;
2、应用场景:一个任务在创建对象,这些对象的生产代价很高昂;而另一个任务在消费这些对象。通过这种方式,可以有更多的对象在被创建的同时被消费;

/*** 任务交换演示 * page 736*/
public class ExchangerDemo {static int size = 5; static int delay = 5; // secondspublic static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 Exchanger<List<Fat>> xc = new Exchanger<>(); // 交换器 List<Fat> producerList = new CopyOnWriteArrayList<>(); // 生产者列表 List<Fat> consumerList = new CopyOnWriteArrayList<>(); // 消费者列表 exec.execute(new ExchangerProducer<Fat>(xc, BasicGenerator.create(Fat.class), producerList));// 运行交换任务生产者 exec.execute(new ExchangerConsumer<Fat>(xc, consumerList)); // 运行交换任务消费者   TimeUnit.MILLISECONDS.sleep(5);exec.shutdownNow();  }
}
// 任务交换生产者
class ExchangerProducer<T> implements Runnable {private Generator<T> generator;private Exchanger<List<T>> exchanger;private List<T> holder; ExchangerProducer(Exchanger<List<T>> exchanger, Generator<T> gen, List<T> holder) {this.exchanger = exchanger; this.generator = gen; this.holder = holder; }@Overridepublic void run() {try {while (!Thread.interrupted()) {for (int i = 0; i < ExchangerDemo.size; i++) {holder.add(generator.next()); } System.out.println("producer, before exchange, holder = " + holder);// exchange方法-等待另一个线程到达此交换点(除非当前线程被中断),然后将给定对象传送给它,并在返回时接收其对象。holder = exchanger.exchange(holder);System.out.println("producer, after exchange, holder = " + holder);}} catch (InterruptedException e) {System.out.println("ExchangerProducer interrupted");}}
}
// 任务交换消费者
class ExchangerConsumer<T> implements Runnable {private Exchanger<List<T>> exchanger;private List<T> holder; private volatile T value ;ExchangerConsumer(Exchanger<List<T>> ex, List<T> holder) {this.exchanger = ex; this.holder = holder; }@Overridepublic void run() {try {while(!Thread.interrupted()) {System.out.println("consumer, before exchange, holder = " + holder);// exchange方法-等待另一个线程到达此交换点(除非当前线程被中断),然后将给定对象传送给它,并在返回时接收其对象。holder = exchanger.exchange(holder);System.out.println("consumer, after exchange, holder = " + holder);for (T x : holder) {value = x;  holder.remove(x); }}} catch (InterruptedException e) {System.out.println(" ExchangerConsumer interrupted");}System.out.println("final value: " + value);}
}
/*
consumer, before exchange, holder = []
producer, before exchange, holder = [fat id: 0, fat id: 1, fat id: 2, fat id: 3, fat id: 4]
producer, after exchange, holder = []
consumer, after exchange, holder = [fat id: 0, fat id: 1, fat id: 2, fat id: 3, fat id: 4]
consumer, before exchange, holder = []
producer, before exchange, holder = [fat id: 5, fat id: 6, fat id: 7, fat id: 8, fat id: 9]
producer, after exchange, holder = []
consumer, after exchange, holder = [fat id: 5, fat id: 6, fat id: 7, fat id: 8, fat id: 9]
consumer, before exchange, holder = []
producer, before exchange, holder = [fat id: 10, fat id: 11, fat id: 12, fat id: 13, fat id: 14]
producer, after exchange, holder = []
consumer, after exchange, holder = [fat id: 10, fat id: 11, fat id: 12, fat id: 13, fat id: 14]
consumer, before exchange, holder = []
producer, before exchange, holder = [fat id: 15, fat id: 16, fat id: 17, fat id: 18, fat id: 19]
producer, after exchange, holder = []
consumer, after exchange, holder = [fat id: 15, fat id: 16, fat id: 17, fat id: 18, fat id: 19]
consumer, before exchange, holder = []ExchangerConsumer interrupted
final value: fat id: 19
producer, before exchange, holder = [fat id: 20, fat id: 21, fat id: 22, fat id: 23, fat id: 24]
ExchangerProducer interrupted */

代码解说:
在main方法中,创建了用于两个任务的单一的Exchanger 交换器,以及两个用于互换的 CopyOnWriteArrayList。这个特定的list变体允许在列表被遍历时调用remove()方法,而不抛出异常 ModificationExcetpion。ExchangeProduer 填充这个list, ExchangerConsumer消费这个list,然后将这个满列表交换为 ExchangerConsumer传递给它的空列表。因为有了 Exchanger,填充一个列表和消费另一个列表可以同时发生了。

(干货——引入了CopyOnWriteArrayList,允许在列表被遍历时调用remove()方法,而不抛出异常 ModificationExcetpion)

【21.8】仿真

【21.9】性能调优

1、比较 synchronize 与 Lock的性能

/*** page 748* 启动单个任务比较 synchronized关键字和 Lock和Atomic类的区别 */
public class SimpleMicroBenchmark {static long test(Incrementable incr) {long start = System.nanoTime();for (long i=0; i < 100000000L; i++) {incr.increment();}return System.nanoTime() - start; }public static void main(String[] args) {long synchTime = test(new SynchronizingTest());long lockTime = test(new LockingTest());System.out.printf("synchronized 花费多少纳秒: %1$10d \n", synchTime); // 203974196 System.out.printf("lock 花费多少纳秒: %1$10d \n", lockTime); // 164559713  System.out.printf("lock/synchronized: %1$.3f\n", lockTime/(double)synchTime); // 0.807 }
}
abstract class Incrementable {protected long counter = 0;public abstract void increment();
}
// synchronized 关键字性能测试
class SynchronizingTest extends Incrementable {public synchronized void increment() {++counter;}
}
// lock锁性能测试
class LockingTest extends Incrementable {private Lock lock = new ReentrantLock();public void increment() {lock.lock();try {++counter;} finally {lock.unlock();}}
}
/*
synchronized 花费多少纳秒: 2166063742
lock 花费多少纳秒: 1725775548
lock/synchronized: 0.797
*/

以上代码是单线程,不具体代表性;我们需要构建复杂程序,或多个任务来测试;

【代码-SynchronizationComparison】

/*** page 749 * 启动多个任务测试 synchronize, lock , Atomic类性能  (经过验证,Atomic原子类同步性能最佳 )*/
public class SynchronizationComparison{static BaseLine baseLine = new BaseLine(); // 基线 static SynchronizedTest synch = new SynchronizedTest(); // synchronize测试 static LockTest lock = new LockTest(); // ReentrantLock 可重入锁测试 static AtomicTest atomic = new AtomicTest(); // 原子类测试 static void test() { System.out.println("=============================="); System.out.printf("%-12s : %13d\n", "Cycles", Accumulator.cycles);// 运行任务测试总时长baseLine.timedTest(); // 基本测试,无任何同步方法  synch.timedTest(); // synchronize同步 lock.timedTest(); // lock 同步 atomic.timedTest(); // Atomic 类同步 // 比较两种模拟器性能  Accumulator.report(synch, baseLine); // 1.65 Accumulator.report(lock, baseLine); // 2.31 Accumulator.report(atomic, baseLine); //   0.91 Accumulator.report(synch, lock); // 0.71 Accumulator.report(synch, atomic); // 1.82 Accumulator.report(lock, atomic); // 2.54 }public static void main(String[] args) {int iteration = 5; System.out.println("warm up");baseLine.timedTest();for (int i = 0; i < iteration; i++) {test();Accumulator.cycles *= 2; }Accumulator.exec.shutdown(); }
}
abstract class Accumulator { // 模拟器 public static long cycles = 50000L; // 循环次数 private static final int N = 4; public static ExecutorService exec = Executors.newFixedThreadPool(N*2); // 线程池
//  CyclicBarrier也叫同步屏障,在JDK1.5被引入,可以让一组线程达到一个屏障时被阻塞
//  ,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行。 private static CyclicBarrier barrier = new CyclicBarrier(N*2+1);  // 同步屏障 protected volatile int index = 0;protected volatile long value = 0;protected long duration = 0; // 持续时间 protected String id = "error";protected final static int SIZE = 100000 ;protected static int[] preLoaded = new int[SIZE]; static {Random rand = new Random(47); // 随机数发生器 for (int i=0; i< SIZE; i++) {preLoaded[i] = rand.nextInt(); // 预加载}}public abstract void accumulate(); // 模拟方法,抽象,由子类实现 public abstract long read(); // 读取方法 private class Modifier implements Runnable { // 修改器 @Overridepublic void run() { // 模板方法模式,由子类提供实现 for(long i=0; i<cycles; i++) {accumulate(); // 调用模拟方法 }try {barrier.await(); // 屏障阻塞,直到给定数量的线程都等待为止 } catch (Exception e) {throw new RuntimeException(e);}}}private class Reader implements Runnable { // 读取器 private volatile long value; @Overridepublic void run() {for(long i=0; i<cycles; i++) {value = read(); }try {barrier.await();  // 屏障阻塞,直到给定数量的线程都等待为止  } catch (Exception e) {throw new RuntimeException(e);}}}public void timedTest() { // 时间测试 long start = System.nanoTime(); for (int i=0; i<N; i++) {exec.execute(new Modifier()); // 执行修改器 exec.execute(new Reader()); // 执行读取器 }try {// 程序中必须有一个 CyclicBarrier, 因为需要确保所有任务在声明每个测试完成之前都已经完成  barrier.await();  // 屏障阻塞,直到给定数量的线程都等待为止  } catch (Exception e) {throw new RuntimeException(e);}duration = System.nanoTime() - start; // 总时长 System.out.printf("%-13s:%13d\n", id, duration);}public static void report(Accumulator acc1, Accumulator acc2) { // 报告, System.out.printf("%-22s: %.2f\n", acc1.id + "/" + acc2.id, acc1.duration/(double)acc2.duration); }
}
class BaseLine extends Accumulator { // 基本测试,无任何同步方法   {id = "baseline"; }@Override public void accumulate() {if (index >= SIZE-1) index = 0;value += preLoaded[(index++)%SIZE];}@Overridepublic long read() {return value;}
}
class SynchronizedTest extends Accumulator { // synchronize同步 测试 {id = "Synchronized";  }@Overridepublic synchronized void accumulate() {if (index >= SIZE-1) index = 0;value += preLoaded[index++];}@Overridepublic long read() {return value;}
}
class LockTest extends Accumulator { // ReentrantLock可重入锁同步 测试  {id = "lock";}private Lock lock = new ReentrantLock(); @Overridepublic void accumulate() {lock.lock();try {if (index >= SIZE-1) index = 0;value += preLoaded[index++];} finally {lock.unlock();}}@Overridepublic long read() {lock.lock();try {return value; } finally {lock.unlock(); }}
}
class AtomicTest extends Accumulator { // Atomic原子类同步 测试  {id = "atomic";}private AtomicInteger index = new AtomicInteger(0);private AtomicLong value = new AtomicLong(0);@Overridepublic void accumulate() {int i = index.getAndIncrement();value.getAndAdd(preLoaded[i%SIZE]);if (++i >= SIZE-1) {index.set(0);}}@Overridepublic long read() {return value.get();}
}
/*
warm up
baseline     :     12811667
==============================
Cycles       :         50000
baseline     :     10401913
Synchronized :     18486698
lock         :     26550332
atomic       :      8931189
Synchronized/baseline : 1.78
lock/baseline         : 2.55
atomic/baseline       : 0.86
Synchronized/lock     : 0.70
Synchronized/atomic   : 2.07
lock/atomic           : 2.97
==============================
Cycles       :        100000
baseline     :     18458982
Synchronized :     28172394
lock         :     36321361
atomic       :     14323233
Synchronized/baseline : 1.53
lock/baseline         : 1.97
atomic/baseline       : 0.78
Synchronized/lock     : 0.78
Synchronized/atomic   : 1.97
lock/atomic           : 2.54
==============================
Cycles       :        200000
baseline     :     36408153
Synchronized :     50424697
lock         :     71790482
atomic       :     28702992
Synchronized/baseline : 1.38
lock/baseline         : 1.97
atomic/baseline       : 0.79
Synchronized/lock     : 0.70
Synchronized/atomic   : 1.76
lock/atomic           : 2.50
==============================
Cycles       :        400000
baseline     :     68541253
Synchronized :    103632938
lock         :    144097706
atomic       :     53405164
Synchronized/baseline : 1.51
lock/baseline         : 2.10
atomic/baseline       : 0.78
Synchronized/lock     : 0.72
Synchronized/atomic   : 1.94
lock/atomic           : 2.70
==============================
Cycles       :        800000
baseline     :    137235667
Synchronized :    180808536
lock         :    283742763
atomic       :    108986327
Synchronized/baseline : 1.32
lock/baseline         : 2.07
atomic/baseline       : 0.79
Synchronized/lock     : 0.64
Synchronized/atomic   : 1.66
lock/atomic           : 2.60
*/

代码解说:程序中有一个CyclicBarrier 循环屏障,因为我们希望确保所有的任务在声明每个测试完成之前都已经完成了;

【互斥技术总结】
1、Atomic:如果涉及多个Atomic对象,你就有可能会被强制要求放弃这种用法;因为Atomic对象只有在非常简单的情况下才有用,这些情况通常包括你只有一个要被修改的Atomic对象,并且这个对象独立于其他所有的对象。更安全的做法:只有在性能方面的需求能够明确指示时,再替换为 Atomic,否则还是推荐使用 synchronized; (干货——Atomic类的使用场景)
2、推荐使用 synchronize进行并发控制:因为 synchronize关键字所产生的代码,与Lock所需的 加锁-try-finally-解锁惯用方法锁产生的代码相比,可读性提高了很多;所以推荐使用 synchronize。就如我在本书其他地方提到的,代码被阅读次数远多于被编写的次数。在编程时,与其他人交流相对于与计算机交流而言,要重要得多,因此代码的可读性至关重要。因此,从 synchronized 入手,只有在性能调优时才替换为 Lock对象这种做法,具有实际意义的。(干货——推荐使用 synchronize进行并发控制)

【21.9.2】免锁容器
1、CopyOnWriteArrayList:写入将导致创建整个底层数组的副本,而源数组将保留在原地,使得复制的数组在被修改时,读取操作可以安全执行; CopyOnWriteArrayList好处是当多个迭代器同时遍历和修改这个列表时,不会抛出 ConcurrentModificationException;CopyOnWriteArraySet 使用了CopyOnWriteArrayList 来实现其免锁行为;
2、ConcurrentHashMap 与ConcurrentLinkedQueue 使用了类似的技术,允许并发的读取和写入,但是容器中只有部分内容而不是整个容器可以被复制和修改。然后,任何修改在完成之前,读取者仍旧不能看到他们。ConcurrentHashMap 不会抛出 ConcurrentModificationException异常。
3、乐观锁: 只要你主要是从免锁容器中读取,那么它就会比 synchronized 快很多,因为获取和释放锁的开销省掉了;

4、比较并发控制的list容器
(干货——测试并发编程下的list性能: CopyOnWriteArrayList性能 优于  SynchronizedList)
【代码——Tester】

/*** 性能测试器* page 756* @param <C>*/
public abstract class Tester<C> {static int testReps = 10;static int testCycles = 10;static int containerSize = 10; // 容器大小/** 抽象方法-初始化容器 */abstract C containerInitializer();/** 抽象方法-开启读取和写入任务 */abstract void startReadersAndWriters();C testContainer;String testId;/** 读取线程个数 */int nReaders;/** 写入线程个数 */ int nWriters;volatile long readResult = 0;volatile long readTime = 0;volatile long writeTime = 0;/*CountDownLatch的作用也是如此,在构造CountDownLatch的时候需要传入一个整数n,* 在这个整数“倒数”到0之前,主线程需要等待在门口,而这个“倒数”过程则是由各个执行线程驱动的,* 每个线程执行完一个任务“倒数”一次。总结来说,CountDownLatch的作用就是等待其他的线程都执行完任务,* 必要时可以对各个任务的执行结果进行汇总,然后主线程才继续往下执行。*/CountDownLatch endLatch; // latch-门栓 static ExecutorService exec = Executors.newCachedThreadPool(); // 线程池 Integer[] writeData;/** 构造器 */ Tester(String testId, int nReaders, int nWriters) {this.testId = testId + " , " + nReaders + " reader thread, " + nWriters + " writer thread"; this.nReaders = nReaders;this.nWriters = nWriters; writeData = Generated.array(Integer.class, new RandomGenerator.Integer(), containerSize);for (int i=0; i< testReps; i++) {runTest();readTime = 0 ;writeTime = 0;}}void runTest() {endLatch = new CountDownLatch(nReaders+ nWriters);testContainer = containerInitializer();startReadersAndWriters();try {endLatch.await(); // 门栓等待,直到所有线程都执行完成 } catch (InterruptedException ex) {System.out.println("endLatch interrupted");}System.out.printf("%-100s %14d %14d\n", testId, readTime, writeTime);if (readTime !=0 && writeTime != 0) {System.out.printf("%-100s %14d\n", "readTime + writeTime = ", readTime + writeTime);}}abstract class TestTask implements Runnable {/** 开启线程,运行test方法 */abstract void test();/** 存放结果,在synchronzid 静态块里执行  */abstract void putResults();long duration; public void run() {long startTime = System.nanoTime();test();duration = System.nanoTime() - startTime;synchronized (Tester.this) {putResults();}endLatch.countDown(); // 门栓减1,直到减为0,则门栓不等待 }}public static void initMain(String[] args) {testReps = new Integer(3);testCycles = new Integer(3);containerSize = new Integer(3);System.out.printf("%-100s %14s %14s\n", "type", "readTime", "write Time"); }
}

【代码——ListComparisons——比较列表】

/*** 测试并发编程下的list性能: CopyOnWriteArrayList性能 优于  SynchronizedList* page 758  */
public class ListComparisons {public static void main(String[] args) {Tester.initMain(null);new SynchronizedArrayListTest(10, 0);new SynchronizedArrayListTest(9, 1);new SynchronizedArrayListTest(5, 5);new CopyOnWriteArrayListTest(10,  0);new CopyOnWriteArrayListTest(9,  1);new CopyOnWriteArrayListTest(5,  5);Tester.exec.shutdown();}
}
/** List测试类  */
abstract class ListTest extends Tester<List<Integer>> {ListTest(String testId, int nReaders, int nWriters) {super(testId, nReaders, nWriters);}class Reader extends TestTask { // 读取任务 long result = 0;void test() {for (int i = 0; i < testCycles; i++) {for (int j = 0; j < containerSize; j++) {result += testContainer.get(j);}}}void putResults() {readResult += result;readTime += duration; }}class Writer extends TestTask { // 写入任务 @Overridevoid test() {for (int i = 0; i < testCycles; i++) {for (int j = 0; j < containerSize; j++) {testContainer.set(i, writeData[j]);}}}@Overridevoid putResults() {writeTime += duration; }}/** 运行读取任务和写入任务  */void startReadersAndWriters() {for (int i = 0; i < nReaders; i++) {exec.execute(new Reader());}for (int i = 0; i < nWriters; i++) {exec.execute(new Writer());}}
}
/** 同步list-SynchronizedList*/
class SynchronizedArrayListTest extends ListTest {List<Integer> containerInitializer() {return Collections.synchronizedList(new ArrayList<Integer>(new CountingIntegerList(containerSize)));}SynchronizedArrayListTest(int nreaders, int nwriters) {super("synched arraylist", nreaders, nwriters);}
}
/** 同步list-CopyOnWriteArrayList*/
class CopyOnWriteArrayListTest extends ListTest {@OverrideList<Integer> containerInitializer() {/*CopyOnWriteArrayList好处是当多个迭代器同时遍历和修改这个列表时* ,不会抛出 ConcurrentModificationException;*/ return new CopyOnWriteArrayList<Integer>(new CountingIntegerList(containerSize));} CopyOnWriteArrayListTest(int nreaders, int nwriters) {super("CopyOnWriteArrayListTest", nreaders, nwriters);}
}
/*
type                                                                                                       readTime     write Time
t = null
t = null
t = null
synched arraylist , 10 reader thread, 0 writer thread                                                         86062              0
synched arraylist , 10 reader thread, 0 writer thread                                                        140764              0
synched arraylist , 10 reader thread, 0 writer thread                                                        535339              0
t = null
t = null
t = null
synched arraylist , 9 reader thread, 1 writer thread                                                         238497          20422
readTime + writeTime =                                                                                       258919
synched arraylist , 9 reader thread, 1 writer thread                                                         188900           4376
readTime + writeTime =                                                                                       193276
synched arraylist , 9 reader thread, 1 writer thread                                                         192182           3647
readTime + writeTime =                                                                                       195829
t = null
t = null
t = null
synched arraylist , 5 reader thread, 5 writer thread                                                          86791          74393
readTime + writeTime =                                                                                       161184
synched arraylist , 5 reader thread, 5 writer thread                                                         605721         540446
readTime + writeTime =                                                                                      1146167
synched arraylist , 5 reader thread, 5 writer thread                                                          39385          76216
readTime + writeTime =                                                                                       115601
t = null
t = null
t = null
CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread                                                  63456              0
CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread                                                  48866              0
CopyOnWriteArrayListTest , 10 reader thread, 0 writer thread                                                  44126              0
t = null
t = null
t = null
CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread                                                   30997          35738
readTime + writeTime =                                                                                        66735
CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread                                                   71475          21516
readTime + writeTime =                                                                                        92991
CopyOnWriteArrayListTest , 9 reader thread, 1 writer thread                                                   24434          21151
readTime + writeTime =                                                                                        45585
t = null
t = null
t = null
CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread                                                   19692         680843
readTime + writeTime =                                                                                       700535
CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread                                                   48503         622496
readTime + writeTime =                                                                                       670999
CopyOnWriteArrayListTest , 5 reader thread, 5 writer thread                                                   14587         756695
readTime + writeTime =                                                                                       771282*/

代码解说: synchronized ArrayList 无论读者和写入者的数量是多少,都具有大致相同的性能——读取者与其他读取者竞争锁的方式与写入者相同。但 CopyOnWriteArrayList 在没有写入者时,速度会快很多。通过测试,CopyOnWriteArrayList 性能优于 synchronized list,对列表写入的影响并没有超过短期同步整个列表的影响。

5、比较并发控制的map容器性能
(干货——测试并发编程下的Map性能: CurrentHashMap性能优于 synchronizedHashMap)

/*** 测试并发编程下的Map性能: CurrentHashMap性能优于synchronizedHashMap   * page 758*/
public class MapComparisons {public static void main(String[] args) {Tester.initMain(null);         new SynchronizedHashMapTest(10, 0);new SynchronizedHashMapTest(9, 1);new SynchronizedHashMapTest(5, 5);new ConcurrentHashMapTest(10, 0);new ConcurrentHashMapTest(9, 1);new ConcurrentHashMapTest(5, 5);Tester.exec.shutdown();}
}
/** Map测试 */
abstract class MapTest extends Tester<Map<Integer, Integer>> {MapTest(String testId, int nReaders, int nWriters) {super(testId, nReaders, nWriters); }/** 读取器 */class Reader extends TestTask {long result = 0;void test() {for (int i = 0; i < testCycles; i++) {for (int j = 0; j < containerSize; j++) {result += testContainer.get(j);}}}@Overridevoid putResults() {readResult += result; readTime += duration; }}/** 写入器 */ class Writer extends TestTask {long result = 0;void test() {for (int i = 0; i < testCycles; i++) {for (int j = 0; j < containerSize; j++) {testContainer.put(j, writeData[j]);}}}@Overridevoid putResults() {writeTime += duration;  }}/** 运行读取与写入任务 */ void startReadersAndWriters() {for (int i = 0; i < nReaders; i++) {exec.execute(new Reader());}for (int i = 0; i < nWriters; i++) {exec.execute(new Writer());}}
}
/** 同步块SynchronizedHashMap*/
class SynchronizedHashMapTest extends MapTest {Map<Integer, Integer> containerInitializer() {return Collections.synchronizedMap(new HashMap<Integer, Integer>(MapData.map(new CountingGenerator.Integer(), new CountingGenerator.Integer(), containerSize)));}SynchronizedHashMapTest(int nreaders, int nwriters) {super("SynchronizedHashMapTest", nreaders, nwriters);}
}
/** 同步HashMap */
class ConcurrentHashMapTest extends MapTest {Map<Integer, Integer> containerInitializer() {return new ConcurrentHashMap<Integer, Integer>(              MapData.map(new CountingGenerator.Integer(), new CountingGenerator.Integer(), containerSize));}ConcurrentHashMapTest(int nreaders, int nwriters) {super("ConcurrentHashMapTest", nreaders, nwriters);}
}
/*
type                                                                                                       readTime     write Time
t = null
t = null
t = null
SynchronizedHashMapTest , 10 reader thread, 0 writer thread                                                  207497              0
SynchronizedHashMapTest , 10 reader thread, 0 writer thread                                                  487569              0
SynchronizedHashMapTest , 10 reader thread, 0 writer thread                                                  253084              0
t = null
t = null
t = null
SynchronizedHashMapTest , 9 reader thread, 1 writer thread                                                   126177          31361
readTime + writeTime =                                                                                       157538
SynchronizedHashMapTest , 9 reader thread, 1 writer thread                                                   131281          25892
readTime + writeTime =                                                                                       157173
SynchronizedHashMapTest , 9 reader thread, 1 writer thread                                                    98095           7658
readTime + writeTime =                                                                                       105753
t = null
t = null
t = null
SynchronizedHashMapTest , 5 reader thread, 5 writer thread                                                    37559          56889
readTime + writeTime =                                                                                        94448
SynchronizedHashMapTest , 5 reader thread, 5 writer thread                                                    49961          74393
readTime + writeTime =                                                                                       124354
SynchronizedHashMapTest , 5 reader thread, 5 writer thread                                                    64910         122531
readTime + writeTime =                                                                                       187441
t = null
t = null
t = null
ConcurrentHashMapTest , 10 reader thread, 0 writer thread                                                     46679              0
ConcurrentHashMapTest , 10 reader thread, 0 writer thread                                                     48867              0
ConcurrentHashMapTest , 10 reader thread, 0 writer thread                                                     41572              0
t = null
t = null
t = null
ConcurrentHashMapTest , 9 reader thread, 1 writer thread                                                      48134           8023
readTime + writeTime =                                                                                        56157
ConcurrentHashMapTest , 9 reader thread, 1 writer thread                                                      55795           8023
readTime + writeTime =                                                                                        63818
ConcurrentHashMapTest , 9 reader thread, 1 writer thread                                                      28080           7294
readTime + writeTime =                                                                                        35374
t = null
t = null
t = null
ConcurrentHashMapTest , 5 reader thread, 5 writer thread                                                      17506          60170
readTime + writeTime =                                                                                        77676
ConcurrentHashMapTest , 5 reader thread, 5 writer thread                                                      17506          41207
readTime + writeTime =                                                                                        58713
ConcurrentHashMapTest , 5 reader thread, 5 writer thread                                                      14221          58348
readTime + writeTime =                                                                                        72569*/

【21.9.3】乐观加锁
1、原理:在执行某项计算时,实际上没有使用互斥,但在计算完成时,准备更新时,需要使用 compareAndSet的方法。你把旧值和新值一起提交给这个方法,如果旧值与他在Atomic对象中发现的值不一致,那么这个操作就会失败,这意味着某个其他任务已经于此操作前修改了这个对象。
2、通常情况下,我们使用 synchronized 或 lock来防止多个任务同时修改同一个对象,但这里我们是乐观的,因为我们保持数据为为锁定状态,并希望没有任何其他任务插入修改他。使用 Atomic替代 synchronized或 Lock,可以获得性能上的好处;
3、注意:compareAndSet() 方法操作失败会发生什么? 建议程序做好补偿机制;
【代码——FastSimulation】

/*** page 760 * 乐观加锁测试 */
public class FastSimulation {/** 元素个数 */static final int N_ELEMENTS = 100;/** 每个元素的基因个数 */static final int N_GENES = 30;/** 进化者 */static final int N_EVOLVERS = 50;// 进化者 /** 网格  */ static final AtomicInteger[][] GRID = new AtomicInteger[N_ELEMENTS][N_GENES];  static Random rand = new Random(47);static class Evolver implements Runnable { // 进化者任务 @Overridepublic void run() {while(!Thread.interrupted()) { // 如果线程不中断,重复执行  int element = rand.nextInt(N_ELEMENTS); // 获取元素 for (int i = 0; i < N_GENES; i++) {/** 前一个元素 */int previous = element - 1; if (previous <0) previous = N_ELEMENTS - 1;/** 下一个元素 */ int next = element + 1; if (next >= N_ELEMENTS) next = 0;/** 旧值 */int oldValue = GRID[element][i].get();/** 新值 */ int newValue = oldValue + GRID[previous][i].get() + GRID[next][i].get();newValue /= 3; // Atomic.compareAndSet 方法比较重要:// 比较当前值与传入的 old 值是否相同,相同则更新为 新值 if (!GRID[element][i].compareAndSet(oldValue, newValue)) {System.out.printf("oldvalue, changed from %d  to %d \n", oldValue, newValue);  }}}}}public static void main(String[] args) throws Exception {ExecutorService exec = Executors.newCachedThreadPool();for (int i=0; i<N_ELEMENTS; i++) {for (int j = 0; j < N_GENES; j++) {GRID[i][j] = new AtomicInteger(rand.nextInt(1000));}}for (int i = 0; i < N_EVOLVERS; i++) {exec.execute(new Evolver());}exec.shutdown(); }
}
/*
oldvalue, changed from 670  to 676
oldvalue, changed from 352  to 351
oldvalue, changed from 455  to 454
oldvalue, changed from 424  to 423
............
*/

【21.9.4】ReadWriteLock 读写锁
1、定义: ReadWriteLock对向数据结构相对不频繁写入,但是有多个任务要经常读取这个数据结构的这类情况进行了优化。
ReadWriteLock 使得你可以同时有多个读取这,只要他们都不试图写入。如果写锁已经被其他任务持有,那么任何读取者都不能访问,直到这个写锁被释放为止。
2、ReadWriteLock是否能够提高程序性能是不确定的,取决于数据被读取与修改的频率,读取和写入操作的时间,有多少线程竞争以及是否在多处理器上运行等因素。
最好的方法就是用实验来证明是否性能提升。
【代码——ReaderWriterList】

/*** 读写任务列表-可重入锁测试 page 763*/
public class ReaderWriterList<T> {private ArrayList<T> lockedList;/** 可重入读写锁 */private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);public ReaderWriterList(int size, T initialValue) {/** Collections.nCopies-返回由指定对象(这里是initialValue,初始值)的n(size)个副本组成的不可变列表。* 新分配的数据对象很小(它包含对数据对象的单个引用)。 该方法与List.addAll方法结合使用以增长列表很有用。* 返回的列表是可序列化的。*/lockedList = new ArrayList<T>(Collections.nCopies(size, initialValue));}/** 写入 */public T set(int index, T element) {Lock wLock = lock.writeLock(); // 获取写锁wLock.lock(); // 加锁try {System.out.println("写锁数量 = " + lock.getWriteHoldCount());return lockedList.set(index, element);} finally {wLock.unlock(); // 解锁}}public T get(int index) {Lock rLock = lock.readLock(); // 获取读锁rLock.lock();// 加锁try {if (lock.getReadLockCount() > 1) {System.out.println("读锁数量 = " + lock.getReadLockCount());}return lockedList.get(index);} finally {rLock.unlock(); // 解锁}}public static void main(String[] args) {new ReaderWriterListTest(5, 1);}
}
/*** 读写任务列表测试*/
class ReaderWriterListTest {ExecutorService exec = Executors.newCachedThreadPool();private final static int SIZE = 10;private static Random rand = new Random(47);private ReaderWriterList<Integer> list = new ReaderWriterList<Integer>(SIZE, 0);/** 构造器,启动读取和写入任务 */public ReaderWriterListTest(int readers, int writers) {for (int i = 0; i < readers; i++) {exec.execute(new Reader()); // 读取任务}for (int i = 0; i < writers; i++) {exec.execute(new Writer()); // 写入任务}}/** 写入任务 */private class Writer implements Runnable {@Overridepublic void run() {try {for (int i = 0; i < SIZE; i++) {list.set(i, rand.nextInt()); // 获取写锁TimeUnit.MICROSECONDS.sleep(100);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println("writer finished, shutting down");exec.shutdown();}}/** 读取任务 */private class Reader implements Runnable {@Overridepublic void run() {try {for (int i = 0; i < SIZE; i++) {list.get(i); // 获取读锁TimeUnit.MICROSECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println("reader finished, shutting down");}}
}
/** 读锁数量 = 3 读锁数量 = 3 读锁数量 = 4 读锁数量 = 3 写锁数量 = 1 写锁数量 = 1 写锁数量 = 1 写锁数量 = 1 写锁数量* = 1 读锁数量 = 2 读锁数量 = 2 写锁数量 = 1 写锁数量 = 1 读锁数量 = 3 读锁数量 = 4 读锁数量 = 4 读锁数量 = 4* 写锁数量 = 1 写锁数量 = 1 写锁数量 = 1 reader finished, shutting down reader finished,* shutting down writer finished, shutting down reader finished, shutting down* reader finished, shutting down reader finished, shutting down*/

【21.10】活动对象
1、有一种可替换的方式被称为活动对象或行动者。之所以称这些对象是活动的,是因为每个对象都维护着它自己的工作器线程和消息队列,并且所有对这种对象的请求都将进入队列排队,任何时刻都只能运行其中一个。然而,有个活动对象,可以串行化消息而不是方法,这意味着不需要防备一个任务在其循环的中间被中断这种问题。
2、当你向一个活动对象发送消息时,这条消息会转变为一个任务,该任务会被插入到这个对象的队列中,等待在以后的某个时刻运行。Future在实现这种模式时将派上用场。

【看个荔枝】有两个方法,可以将方法调用排进队列;

/*** 活动对象演示* page 764 */
public class ActiveObjectDemo {/** 线程池  */ private ExecutorService exec = Executors.newSingleThreadExecutor();private Random rand = new Random(47);/** 暂停方法,睡眠 */private void pause(int factor) {try {TimeUnit.MILLISECONDS.sleep(100+ rand.nextInt(factor));} catch (InterruptedException e) {System.out.println("sleep interrupt");} } /** 调用int方法  */public Future<Integer> calculateInt(final int x, final int y) {return exec.submit(new Callable<Integer>() {public Integer call() {System.out.println("starting x = " + x + ", y =  " + y);pause(500);return  x+ y ;}});}public Future<Float> calculateFloat(final float x, final float y) {return exec.submit(new Callable<Float>() {public Float call() {System.out.println("starting  x = " + x + ", y = " + y);pause(2000);return x+ y; }});}public void shutdown() {exec.shutdown(); }public static void main(String[] args) {ActiveObjectDemo d1 = new ActiveObjectDemo(); /*在计算机中就是当你想要对一块内存进行修改时,我们不在原有内存块中进行写操作* ,而是将内存拷贝一份,在新的内存中进行写操作* ,写完之后呢,就将指向原来内存指针指向新的内存,原来的内存就可以被回收掉嘛!*/List<Future<?>> results = new CopyOnWriteArrayList<Future<?>>();for (float f = 0.0f; f < 1.0f; f += 0.2f) {results.add(d1.calculateFloat(f, f));}for (int i = 0; i < 5; i++) {results.add(d1.calculateInt(i, i));}System.out.println("========== all asynch calls made ========== ");int index = 0;while(results.size() >0) { // while 循环,再放一层for循环,因为 可能f.isDone() 为false  for (Future<?> f: results) {if (f.isDone()) {try {System.out.println("f.get(" + ++index +") = " + f.get()); } catch (Exception e) {throw new RuntimeException(e);}results.remove(f);}} d1.shutdown(); }}
}
/*
========== all asynch calls made ==========
starting  x = 0.0, y = 0.0
f.get(1) = 0.0
starting  x = 0.2, y = 0.2
f.get(2) = 0.4
starting  x = 0.4, y = 0.4
f.get(3) = 0.8
starting  x = 0.6, y = 0.6
f.get(4) = 1.2
starting  x = 0.8, y = 0.8
f.get(5) = 1.6
starting x = 0, y =  0
starting x = 1, y =  1
f.get(6) = 0
f.get(7) = 2
starting x = 2, y =  2
f.get(8) = 4
starting x = 3, y =  3
f.get(9) = 6
starting x = 4, y =  4
f.get(10) = 8
*/

代码解说:使用 CopyOnWriteArrayList 可以移除为了防止 ConcurrentModificationException而复制List的这种需求;
小结:
小结1:为了能够在不经意间就可以防止线程之间的耦合,任何传递给活动对象方法调用的参数都必须是只读的其他活动对象,或者是不连续对象。即没有连接任何其他任务的对象。
小结2;有个活动对象, 你可以干一下事情:
事情1、每个对象都可以拥有自己的工作器线程;
事情2、每个对象都将维护对他自己的域的全部控制权; 这比普通类要严格一些,普通类只是拥有防护它们的域的选择权;
事情3、所有在活动对象之间的通信都将以在这些对象之间的消息形式发生;
事情4、活动对象之间的所有消息都要排队;

【21.11】总结
【21.11.1】java线程进行并发编码的基础知识;
1、可以运行多个独立任务;
2、必须考虑当这些任务关闭时,可能出现的所有问题;
3、任务可能会在共享资源上彼此干涉。互斥(或锁)是用来防止这种冲突的基本工具;
4、如果任务设计得不够合理,就有可能会死锁;
【21.11.2】什么时候使用并发,什么时候应该避免并发非常关键,使用并发的原因如下:
1、要处理很多任务,它们交织在一起,应用并发能够更有效地使用计算机;
2、要能够更好地组织代码;
3、要便于用户使用;

【21.11.3】 线程的好处
1、轻量级的上下文切换:轻量级的线程上下万切换只需要100条指令,重量级的进程上下文切换需要上千条指令;
2、因为一个给定进程内的所有线程共享相同的内存空间,轻量级的上下文切换只是改变了程序的执行序列和局部变量,而进程切换必须改变所有内存空间;(干货——线程的上下文切换是轻量级的,进程的上下文切换是重量级的)

【21.11.4】多线程的缺陷
1、等待共享资源的时候性能降低;
2、需要处理线程的额外cpu花费;
3、糟糕的程序设计导致不必要的复杂度;
4、有可能产生一些病态行为,如饿死,竞争,死锁,活锁(多个运行各自任务的线程使得整体无法完成);
5、不同平台导致的不一致性;

think-in-java(21)并发相关推荐

  1. Java 高并发_JAVA并发编程与高并发解决方案 JAVA高并发项目实战课程 没有项目经验的朋友不要错过!...

    JAVA并发编程与高并发解决方案 JAVA高并发项目实战课程 没有项目经验的朋友不要错过! 1.JPG (37.82 KB, 下载次数: 0) 2018-12-3 09:40 上传 2.JPG (28 ...

  2. thinking-in-java(21)并发2

    think-in-java 并发前半部分(并发1)参见:  https://blog.csdn.net/PacosonSWJTU/article/details/104855730 [21.4.3]中 ...

  3. 【实战Java高并发程序设计6】挑战无锁算法

    我们已经比较完整得介绍了有关无锁的概念和使用方法.相对于有锁的方法,使用无锁的方式编程更加考验一个程序员的耐心和智力.但是,无锁带来的好处也是显而易见的,第一,在高并发的情况下,它比有锁的程序拥有更好 ...

  4. JAVA高并发程序设计(葛一鸣著)读书笔记

    本文为JAVA高并发程序设计(葛一鸣著)读书笔记.这本书对于刚刚入门的童鞋来讲可能有点深,我推荐可以先看看Java多线程编程核心技术(高洪岩著)一书. 第一章 走入并行世界 什么是同步和异步? 同步就 ...

  5. Java高并发,如何解决,什么方式解决

     对于我们开发的网站,如果网站的访问量非常大的话,那么我们就需要考虑相关的并发访问问题了.而并发问题是绝大部分的程序员头疼的问题, 但话又说回来了,既然逃避不掉,那我们就坦然面对吧~今天就让我们一 ...

  6. java高并发案例详细讲解

    对于我们开发的网站,如果网站的访问量非常大的话,那么我们就需要考虑相关的并发访问问题了.而并发问题是绝大部分的程序员头疼的问题, 但话又说回来了,既然逃避不掉,那我们就坦然面对吧~今天就让我们一起来研 ...

  7. Java高并发编程案例

    文章目录 synchronized关键字 对象加锁 修饰方法 锁定静态方法 同步和非同步方法同时调用 脏读 可重入锁 异常释放锁 同步监视器变化 volatile 线程之间的可见性 对比synchro ...

  8. 关于Java高并发的问题

    前言: 对于我们开发的网站,如果网站的访问量非常大的话,那么我们就需要考虑相关的并发访问问题了.而并发问题是绝大部分的程序员头疼的问题, 为了更好的理解并发和同步,我们需要先明白两个重要的概念:同步和 ...

  9. 《实战Java高并发程序设计》读后感

    写在前面无关的内容 白驹过隙,看下日历已经毕业4年多,加上在大学里的4年,算算在计算机界也躺了八年,按照格拉德韦尔的1万小时定律差不多我也该成为行业的专家了,然后并没有.当看着"什么是Jav ...

最新文章

  1. 阿里员工绩效只拿3.25!自我反省:平时假装努力!晚上没加班!去厕所时间太长!还老买彩票!...
  2. Silverlight 应用程序之间在客户端通信
  3. C++新旧类型转换小记
  4. c++的程序的文件结构
  5. java中的case1怎么说_Java Cas20ServiceTicketValidator類代碼示例
  6. android应用程序的生命周期,Android应用程序的生命周期.doc
  7. PAT 1073 多选题常见计分法(20)(代码+思路)
  8. java easyui分页源码_SpringMVC+easyUI中datagrid分页实现_2014.5.1
  9. 基于各国贷款数据的可视化分析(含python代码)
  10. mongodb中的3t客户端的时间延长做法
  11. android远程桌面源码,ARDC Android远程桌面助手 简介(示例代码)
  12. 寄存器的七种寻址方式
  13. 【数据结构】循环链表(circular linked list) 双向链表(doubly linked list)
  14. 验证码短信是如何实现的?怎么用短信平台发送验证码短信?
  15. 从软件测试培训班出来后找工作的这段经历,教会了我这五件事...
  16. 十二经纳干支歌,十二经纳地支歌
  17. 音视频开发-SRS 4.0流媒体服务器系列
  18. 亲测 阿里云虚拟主机部署FastAadmin
  19. mima.php密码找回,mima.php
  20. 电脑xp传照片显示服务器错误,xp系统显示“服务器错误500”的两种解决方法

热门文章

  1. Educational Codeforces Round 111 (Rated for Div. 2) D. Excellent Arrays 组合数学
  2. P4768 [NOI2018] 归程 Kruskal重构树 + 倍增 + 最短路
  3. Codeforces Round #716 (Div. 2) D. Cut and Stick 主席树 + 思维
  4. Educational Codeforces Round 112 (Rated for Div. 2)
  5. [C++]试一试结构体struct node的构造函数
  6. P4299 首都(LCT、重心)
  7. AT3913-XOR Tree【状压dp】
  8. P4450-双亲数,P5221-Product,P6055-[RC-02]GCD【莫比乌斯反演,杜教筛】
  9. CF1251F-Red-White Fence【NTT】
  10. jzoj6803-NOIP2020.9.26模拟tom【构造】