一、并发简介

实现并发最直接的方式是在操作系统级别使用进程。进程是运行在它自己的地址空间内的自包含的程序。多任务操作系统可以通过周期性地将CPU 从一个进程切换到另一个进程,来实现同时运行多个进程(程序)。对于进程来说,他们之间没有任何彼此通信的需要,因为他们都是完全独立的。
但是如果将进程当做并发的唯一选择,那么进程来做并发来说的话,也有他的局限性,因为进程通常来说是有数量和开销的限制的,以避免他们在不同的并发系统之间的可应用性
java采用在顺序型语言的基础上提供对线程的支持,即线程机制是在由执行程序表示的单一进程中创建的任务。
并发编程使我们可以将程序划分为多个分离的、独立运行的任务,一个线程就是在进程中的一个单一顺序控制流,因此单个进程可以拥有多个并发执行的任务,其底层机制就是切分cpu时间。

二、基本的线程机制

1.任务

线程可以驱动任务,这可以由Runnable接口来提供,想要定义任务,只需实现Runnable接口并编写run()方法,使得该任务可以执行你的命令

/*** @program: java-demo* @description: 定义一个任务,将任务附在线程上,使线程驱动任务* @author: hs* @create: 2020-08-23 16:52**/
public class LiftOff implements Runnable{protected int countDown=10;private static int taskCount=0;private final int id=taskCount++;public LiftOff(){}public LiftOff(int countDown){this.countDown=countDown;}public String status(){return "#"+id+"("+(countDown>0?countDown:"Liftoff!")+"),";}@Overridepublic void run() {while (countDown-- >0){System.out.print(status());Thread.yield();}}
}

Thread.yield()是对线程调度器的一种建议
线程调度器:java线程机制的一部分,可以将cup从一个线程转移到另外一个线程

2.Thread类

声明注册一个线程,将Runnable对象转变为工作任务的方式是把他交给一个Thread构造器。调用Thread对象的start方法为该线程执行必需的初始化操作,然后会调用Runnable的run方法,在这个新线程中启动该任务

public class SimpleThread {/*** 使用默认线程启动任务* @param arg*//*public static void main(String []arg){LiftOff launch= new LiftOff();launch.run();}*//*** 创建线程,并启动* @param arg*/
/*    public static void main(String []arg){Thread t=new Thread(new LiftOff());t.start();System.out.println("Waiting for ListOff");}*/public static void main(String []arg){for(int i=0;i<5;i++){new Thread(new LiftOff()).start();}System.out.println("Waiting for LiftOff");}
}

在上面程序中调用start方法,start方法迅速的返回了,因为ListOff.run()是由不同的线程执行的,因此可以同时执行main方法内的其他操作

3.使用Executor执行器管理线程

Executor在客户端和任务执行之间提供了一层间接层,Executor循序你管理异步任务的执行,不需要显示地管理线程的生命周期。
ExecutorService是具有服务声明周期的executor

public class ThreadPool {/*** 动态获取线程,Executors.newCachedThreadPool()将为每一个任务都创建一个线程。** @param arg*//*public static void main(String []arg){ExecutorService exec= Executors.newCachedThreadPool();for(int i=0;i<5;i++){exec.execute(new LiftOff());}exec.shutdown();System.out.println("结束");}*//*** 获取固定数量的线程数,Executors.newFixedThreadPool(5)将一次性执行完指定数量的线程分配。** @param arg*//*public static void main(String []arg){ExecutorService exec= Executors.newFixedThreadPool(5);for(int i=0;i<5;i++){exec.execute(new LiftOff());}exec.shutdown();System.out.println("结束");}*//*** 序列化线程,Executors.newSingleThreadExecutor()箱式线程数量为1的Executors.newFixedThreadPool(1),只会创建一个线程** @param arg*//*public static void main(String []arg){ExecutorService exec= Executors.newSingleThreadExecutor();for(int i=0;i<5;i++){exec.execute(new LiftOff());}exec.shutdown();System.out.println("结束");}*//*** 以上三种创建线程的方式,单一、可变、定长都有一定问题,* 原因是FixedThreadPool和SingleThreadExecutor底层* 都是用LinkedBlockingQueue实现的,这个队列最大长度为Integer.MAX_VALUE,容易导致OOM。* OOM: out of memory,内存超出*  所以一般情况下都会采用自定义线程池的方式来定义* @param args*/public static void main(String[] args) {/*** 1、corePoolSize线程池的核心线程数* 2、maximumPoolSize能容纳的最大线程数* 3、keepAliveTime空闲线程存活时间* 4、unit 存活的时间单位* 5、workQueue 存放提交但未执行任务的队列* 6、threadFactory 创建线程的工厂类* 7、handler 等待队列满后的拒绝策略*/ExecutorService executor = new ThreadPoolExecutor(10,20,200L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(),new ThreadPoolExecutor.AbortPolicy());for(int i=0;i<5;i++){executor.execute(new LiftOff());}executor.shutdown();System.out.println("结束");}}

exec.shutdown()方法的调用是为了防止新任务被提交给这个Executor,这个程序将在Executor中所有的任务完成之后尽快退出。

4. Callable从任务中返回值

Runnable是执行工作的独立任务,他不会返回任何值。如果我们希望执行任务后返回一个值,这个时候一般使用Callable接口,实现call()方法

public class CallableDemo  {public static void main(String []arg){ExecutorService exec= Executors.newCachedThreadPool();ArrayList<Future<String>> results=new ArrayList<Future<String>>();for(int i=0;i<10;i++){results.add(exec.submit(new TaskWithResult(i)));//isDone()方法用来检查future是否已经完成results.get(0).isDone();}for(Future<String> fs:results){try{System.out.println(fs.get());}catch(InterruptedException e){System.out.println(e);return;}catch (ExecutionException e){System.out.println(e);}finally {exec.shutdown();}}}}class TaskWithResult  implements Callable<String> {private int id;public TaskWithResult(int id){this.id=id;}@Overridepublic String call() throws Exception {return "result of TaskWithResult "+id;}
}

随手记

有时使用excel导入数据时可能会遇到导入较长数值类型数据,如超过11位的银行卡号或手机号,这时Excel默认使用科学计数法,而这可能和我们的要求有些不符
可以采用这样的方法将科学计数法的数字转换成为正常的数字
BigDecimal bd = new BigDecimal(“3.40256010353E11”);
System.out.println(bd.toPlainString());

5. 休眠

sleep()方法是最简单的一种任务休眠方法,使任务中止执行给定的时间
方法有:Thread.sleep(100)、TimeUnit.MILLISECONDS.sleep(100)

6.优先级

线程的优先级就是将该线程的重要性传递给了调度器,调度器将会倾向于让优先权最高的线程先执行。当前,这并不是意味着优先权较低的线程将得不到执行(也就意味着优先权不会导致死锁)。优先级较低的线程仅仅是执行的频率较低。
注:试图操纵线程优先级通常是一个错误
设置优先级方法:Thread.currentThread().setPriority(Integer)
Thread.currentThread():获取驱动该任务的Thread对象引用
优先级需要在任务开始的地方进行设置,即:run()开头部分

7.让步

让步即可以给线程调度器一个暗示,表示可以让其他线程使用CPU了,这个暗示将通过Thread.yield()方法来做出(注:没有任何的机制保证它必须会被采纳)

8.后台线程

所谓后台线程,就是指程序运行的时候在后台提供一种通用服务的线程,并且这种线程并不属于线程中不可或缺的部分。因此,当所有非后台线程结束时,程序也就终止了,同时会杀死所有后台线程。
可以使用Thread实例对象的setDaemon()方法来设置该线程是后台线程
所有后台线程创建的任何线程默认被设置成为后台线程
可通过方法isDaemon来判断线程是否是后台线程
当非后台线程全部终止的时候,jvm会关闭所有的后台进程,哪怕后台线程中有finally程序块,也不会执行到finally,直接强硬关闭


/*** @author heshuai* @title: E_DaemonThread* @description: 基本线程机制,演示java编程思想中21.2.8节案例*                这里提到了后台(daemon)线程,也称之为服务线程,它是做什么的呢?在这种类型的线程中主要是做一些辅助性工作的,比如开启一个服务线程去时刻检查一个类的状态。*                它有几个特性:1、它由setDaemon(true)来设置后台线程,所有由后台线程创建的线程都默认是后台线程;*                           2、当所有非后台线程结束后,进程会直接停止,这里不会有序的去关闭后台线程,而是采用直接强制关闭的方式,所以当我们在后台线程的finally语句块中进行一些操作的*                           时候,会因为进程的关闭而不会执行到* @date 2021年01月30日 23:09*/
public class E_DaemonThread {/*public static void main(String[] args) {for (int i=0;i<5;++i){Thread daemon=new Thread(new SimpleDaemons());daemon.setDaemon(true);daemon.start();}System.out.println("全部后台线程已启动");try {TimeUnit.MILLISECONDS.sleep(9);} catch (InterruptedException e) {e.printStackTrace();}}
*/public static void main(String[] args) {/*** Executors 自1.5之后开始使用,可以创建ExecutorService、ScheduledExecutorService、ThreadFactory(默认一种的内部实现方式)、Callable* Executors.newCachedThreadPool();*          创建线程池对象,线程池即创建若干线程,若线程结束则不会全部杀死,而是保留一部分,当有新需要线程的时候就会直接从池子从拿出已有的线程来使用,这样就不会浪费反复创建线程资源了*          这个方法在这个工具类中一共有两个重载方法,若不设置ThreadFactory,则使用默认的线程工厂实现方式(Executors.defaultThreadFactory()),当然也可以和下面一样,使用自己自定义的线程工厂实现方式*          其他几种创建线程池的方法基本一样。不过阿里手册推荐使用自定义去创建线程池的方法( new ThreadPoolExecutor()),当然这个方法内部也是这样使用的*/ExecutorService es= Executors.newCachedThreadPool(new DaemonThreadFactory());for (int i=0; i<5;++i){// 执行es.execute(new SimpleDaemons());}System.out.println("全部后台线程已启动");try {TimeUnit.MILLISECONDS.sleep(9);} catch (InterruptedException e) {e.printStackTrace();}}/*** 自定义一个线程工厂,实现ThreadFactory*      在ThreadFactory只有一个方法newThread(),用来创建新的线程,当然这里也就是对创建线程的几个参数进行限定,如:priority、name、daemon status等(还有一个线程组,这个在java编程思想一书中提到在jdk5之后就没有用到了)**      在这个自定义ThreadFactory中,设置所有新Thread都是后台线程*/static class DaemonThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r) {Thread t=new Thread(r);// 定义线程为后台线程t.setDaemon(true);return t;}}static class SimpleDaemons implements Runnable{@Overridepublic void run(){try {System.out.println(Thread.currentThread()+":"+this);//isDaemon()判断是否为后台线程System.out.println("是否是后台线程:"+Thread.currentThread().isDaemon());TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}finally {//当非后台线程全部终止的时候,jvm会关闭所有的后台进程,并不会执行到finally,直接强硬关闭System.out.println("后台线程将关闭");}}}
}

9. 编码的变体

这部分内容简单点来说就是,上诉部分都是规规矩矩的实现Runnable或者是生成Thread线程的方式来进行执行的,但是有时候我们可能需要更加简单的方式来实现使用多线程

以下只是举一个例子,有兴趣的朋友可以看我的GitHut

  /*** 使用匿名内部类的方式声明线程*/class InnerThread2{private int countDown=5;private Thread t;public InnerThread2(String name){t=new Thread(name){@Overridepublic void run(){try{while(true){System.out.print(this);if(--countDown ==0){return;}Thread.sleep(10);}}catch (InterruptedException e){System.out.print("Interrupted");}}@Overridepublic String toString(){return getName()+":"+countDown;}};t.start();}}

10. join()

此方法是一个Thread实例对象的方法,它的作用是如果在某个线程上调用t.join()。此线程将被挂起,直到目标线程t结束才会继续执行(即t.isAlive()返回false时)

package com.kfcn.concurrent.abasics;/*** @program: concurrentTest* @description: Understanding join()* @author: hs* @create: 2020-06-10 21:51**/
public class Joining {public static void main(String []args){Sleeper sleepy=new Sleeper("Sleepy",150);Sleeper grumpy=new Sleeper("Grumpy",150);Joiner dopey=new Joiner("Dopey",sleepy);Joiner doc=new Joiner("Doc",grumpy);grumpy.interrupt();}
}class Sleeper extends Thread{private int duration;public Sleeper(String name, int sleepTime){super(name);duration=sleepTime;start();}@Overridepublic void run(){try {sleep(duration);}catch (InterruptedException e){System.out.println(getName()+" was interrupted. "+"isInterrupted(): "+isInterrupted());return;}System.out.println(getName()+"join completed");}
}class Joiner extends Thread{private Sleeper sleeper;public Joiner(String name, Sleeper sleeper){super(name);this.sleeper=sleeper;start();}@Overridepublic void run(){try {sleeper.join();}catch (InterruptedException e){System.out.println("Interrupted");}System.out.println(getName()+" join completed");}
}

11. 捕获异常

由于线程的本质特性,一般情况下不能捕获从线程中逃逸的异常,一旦异常逃出了任务的run()方法,它就会向外传播到控制台
为此,在JDK1.5之后,Thread线程提供了一个新接口Thread.UncaughtExceptionHandler,它允许你在每一个Thread对象上边都附着一个异常处理器。这个接口中的uncaughtException()方法会在线程因未捕获异常而临近死亡的时候被调用。由此在线程中未捕获的异常是通过uncaughtException()方法来捕获的


/*** @author heshuai* @title: H_CaughtExceptionThread* @description: 基本线程机制,演示java编程思想中21.2.14节案例*              捕获异常,我们都知道如果在串行开发的时候如何捕获异常,那么接下来就是如果处理线程抛出的异常* @date 2021年03月06日 12:38*/
public class H_CaughtExceptionThread {public static void main(String []args){ExecutorService exec= Executors.newCachedThreadPool(new HandlerThreadFactory());exec.execute(new ExceptionThread2());}static class ExceptionThread2 implements Runnable{@Overridepublic void run(){Thread t= Thread.currentThread();System.out.println("run() by "+t);System.out.println("eh= "+t.getUncaughtExceptionHandler());throw new RuntimeException();}}/*** 继承未捕获异常处理器* t:发生异常的线程* e:所发生的异常*/static class MyUncaughtException implements Thread.UncaughtExceptionHandler{@Overridepublic void uncaughtException(Thread t,Throwable e){System.out.println("caught: "+e );}}/*** 工厂类,生成可以捕获异常的Thread工厂类*/static class HandlerThreadFactory implements ThreadFactory {@Overridepublic Thread newThread(Runnable r){System.out.println(this+" creating new Thread ");Thread thread=new Thread(r);System.out.println("created "+thread);
//            t.setUncaughtExceptionHandler(new MyUncaughtException());// lambda表达式thread.setUncaughtExceptionHandler((Thread t,Throwable e) -> {System.out.println("caught: "+e );});System.out.println("en = "+thread.getUncaughtExceptionHandler());return thread;}}
}

自此你可以根据代码的需要在不同的线程中设置不同的异常处理器,但是如果有一些相同的异常处理方法的话,可以采用设置Thread类中的一个默认异常处理器的静态域的方式。

Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtException());

这个默认异常处理器只有在不存在线程专有未捕获异常处理器的时候才会被调用。

三、共享受限资源

有了并发就可以同时做多件事情了,但是两个或者多个线程彼此相互干涉的问题也就出现了。如果不防范这种冲突,就可能发生两个线程同时试图访问同一个银行账号,或向同一个打印机打印,改变同一个值等诸如此类的问题

1. 解决共享资源竞争

基本上所有的并发模式在线程冲突问题上,都是采用序列化访问共享资源的方案,这也就意味着给定的同一时刻只有一个任务在访问共享资源。
java提供关键字synchronized的形式,为防止资源冲突提供了内置支持。当任务执行到被synchronized关键字保护的代码片段时,它将检查锁是否可用,然后获取锁,执行代码,释放锁。
这里的锁主要分为两种,一种是对象锁(也被称之为监视器),一种是类锁,ava中有类方法,对象方法。这个与前面的锁相互对应。
对象锁,顾名思义就是针对于对象的锁,当前对象如果上锁后,就不可以被其他线程调用当前对象加锁的方法,这种互相排斥的,也被称之为互斥量(mutex,多介绍一个单词mutual)。
类锁,在类的级别上进行上锁,主要针对的是static所修饰的类方法,当前类被上锁后,可以在类的范围中防止其他线程对当前加锁的类方法的调用。
注意:

  • 所有在上synchronized修饰的资源,被称为互斥量
  • 属性不可以被synchronized修饰
  • 互斥量只针对于被synchronized修饰的资源,而不被它所修饰的共享资源并不会影响(不管类是否加锁)
  • 不管object锁还是class锁,它们的范围都是一个类的,也就是说当前类中一个加锁方法被一个线程调用,那么在这个方法退出之前,其他线程不可以调用这个类中的其他加锁方法。

其实上面也解释了工作中一些常识的理论:

  • 现在的类属性都要求是private,就是为了防止可以直接修改类属性,需要通过setget方法来进行修改
  • 类大多数都是单例的,这个在更高的层面上解决了资源竞争问题

大概就想到这么多,这节就到这。

2. Lock显式声明锁

在jdk5中提供一个显式声明互斥锁的机制,但是它和synchronized在使用层面上有明显的不同,Lock对象必须被显式的创建、锁定和释放。在代码量上,它是更多的,但是它也更加的灵活。

这里提供几个常用的方法,具体测试代码可以看我的GitHub

// 创建Lock对象
private Lock lock = new ReentrantLock();
lock.lock(); // 获得锁,若暂时获取不到则会一致等待,也就是串行处理
lock.tryLock() // 获取锁,若获取到则返回true,否则返回false,并不会等待直接返回
lock.tryLock(2,TimeUnit.SECONDS) // 等待特定时间去获取线程,若期间内可以获取则返回true并且获取到锁,否则false
lock.unlock(); // 释放锁,类似于synchronized,加锁几次就需要释放锁几次

基本上这几个方法就足够了,Lock和synchronized功能基本一致,也可以分为对象锁和类锁,并且不会互相影响。这里边有两个方法lock和tryLock,相对而言tryLock更加灵活,若一时获取不到锁,可以选择去执行其他操作,然后再回去继续等待。

3.原子性与易变性

这一节的知识有些晦涩,下面只对原子性以及可视性进行部分记录

突然项目开会,开到23:12,呼,差点都忘了

原子性:即指不可以被分割的操作,就是原子性操作。通常指的是不加锁状态下的原子性操作。
举个例子,通常基本数据类型的赋值和返回操作是原子性的,除了long、double类型,为什么呢,因为这两个类型是64位的,JVM可以将64位(long、double)的读取和写入当做两个分离的32位操作来执行,这就产生了在一个读取和写入操作中间发生上下文切换,从而导致不同任务可以看到不确定的结果的可能性(这有时被称为字撕裂,因为你可以看到部分你修改的值)。当然如果你对long、double类型的变量加上volatile关键字,那么就可以获得原子性(赋值、返回操作)。

可视性:在如今的多处理器系统中,可视性问题远比原子性问题多得多,即在一个线程中做出了对一个域进行了修改,对其他线程来说可能是不可视的,这就是可视性。
例如:修改后只是暂时性地存储在了本地处理器的缓存中,未向内存中同步,那么不同的线程可能就有不同的结果了。
可以使用volatile关键字还确保应用中的可视性,如果将一个变量声明为volatile,那么只要对这个变量进行了写操作,那么其他线程所有的读操作就都可以看到这个修改。这是因为volatile会立即被写入到主存中,而读取操作就发生在主存中。
为什么会造成不可视问题?那是因为JVM可能会对非volatile的原子性操作做移除写入和读取操作的优化,因此其他线程就看不到最新的值了。volatile就是为了告诉JVM取消这个优化,直接将对值的改变发生在内存中。

4. 原子类

这个就简单的介绍一下有哪些原子类吧,有AtomicInteger、AtomicLong、AtomicReference等,这些原子类一般来说在项目中很少用到,只有在性能调优的时候才会用到。

5. 临界区

临界区提出的意义:有时,你可能只是希望防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法。这样就可以使用synchronized语句块来将代码隔离出来,这样隔离出来的代码块也叫作临界区(critical section)

synchronized(syncObject) {// ...
}

在synchronized关键字的括号中,是用来指定某个对象,此对象锁用来对花括号中的代码进行同步控制。
这块多记录一下,这个代码块也被称为同步控制块,如果想进入这个代码块,那么必须得到syncObject对象的锁,没有锁那么就只能等待获取锁了。

很明显,这样做比直接synchronized修饰整个方法可以获得更多的访问。
自然也可以使用显式锁来创建临界区,那么就是下面的代码

private Lock lock = new ReentrantLock();
lock.lock();
try {//...
} finally {lock.unlock();
}

6.线程本地存储

这块就很有意思了,有句话是怎么说的,我不能打败你,我就不搭理你。
这个概念就很类似了,防止任务在共享资源上产生冲突的第二种方式就是根除对于变量域的共享。原理呢,是线程本地存储是一种自动化的机制,可以为使用相同变量的每个不同线程都创建不同的存储。具体是使用java.lang.ThreadLocal类来实现的。

public class E_ThreadLocal {public static void main(String[] args) throws InterruptedException {ExecutorService exec = Executors.newCachedThreadPool();for(int i=0; i<5; i++){exec.execute(new Accessor(String.valueOf(i)));}TimeUnit.SECONDS.sleep(1);exec.shutdownNow();}static class Accessor implements Runnable {private String id;public Accessor(String idn){this.id = idn;}@Overridepublic String toString() {return "Accessor{" +"id='" + id + '\'' +'}'+"+++++++"+ThreadLocalVariableHolder.get();}@Overridepublic void run() {while (!Thread.currentThread().isInterrupted()){ThreadLocalVariableHolder.increment();System.out.println(this);Thread.yield();}}}static class ThreadLocalVariableHolder {// 通常ThreadLocal对象当作静态域来存储private static ThreadLocal<Integer> value = new ThreadLocal<Integer>(){private Random random = new Random(47);protected Integer initialValue() {return random.nextInt(50000);}};// 此方法非synchronized,但是因为value为线程一级的变量,那么是可以保障不会出现竞争条件的。public static void increment(){ThreadLocalVariableHolder.value.set(ThreadLocalVariableHolder.value.get()+1);}public static Integer get(){return ThreadLocalVariableHolder.value.get();}}
}

以上案例是演示如何使用ThreadLocal这个类,基本上用到了这个类里边的三个方法,set、get、initialValue,initialValue主要是做初始化值的使用使用的,而且观察他的注释,这个初始化值是属于懒初始化的。

也就是说,它只有在调用get方法之前会进行一次初始化,如果在调用get之前调用了set,那么将不会进行初始化。还有一点,很有意思,如果调用了remove方法后调用get方法,那么这个值将再此被初始化(多次进行)。类似于下边这种:

四、终结任务

1. 在阻塞的时候终结

线程的状态

  1. 新建(new):当线程被创建时,它只会短暂的处于这种状态。此时他分配了必须的系统资源,并执行了初始化。因此也代表了此时的线程有资格获得CPU时间了,之后调度器将把这个线程转变为可运行状态或者阻塞状态。
  2. 就绪(Runnable):在这种状态下,只要调度器把时间片分配给线程,线程就可以运行了,也就是说,在任意时刻,线程可以运行也可以不运行。这不同于阻塞和死亡。
  3. 阻塞(Blocked):线程能够运行,但有某个条件阻止它的运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU。直到线程进入就绪的状态,它才有可能执行操作。
  4. 死亡(Dead):处于死亡或者说是终结状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已经结束,或是不再可运行的。任务死亡通常方式是从run()方法返回,但是任务的线程还可以被中断

有四种可能进入阻塞状态:

  1. 通过调用sleep()使任务进行休眠状态,任务在指定的时间内是不会运行的。
  2. 通过调用wait()使线程挂起,直到线程得到了notify()或notifyAll()消息(signal()或signalAll()消息),线程才会进入就绪状态。
  3. 任务在等待某个I/O完成
  4. 任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另外一个任务已经获取了这个锁。

2.中断

中断,中断一个正在运行的线程。这里的中断主要是指中断进入阻塞状态下的线程运行

public class A_InterruptedThread {static class SleepBlocked implements Runnable {@Overridepublic void run() {try {TimeUnit.SECONDS.sleep(100);}catch (InterruptedException e){e.printStackTrace();System.err.println("程序被打断");}System.out.println("SleepBlocked 执行完毕");}}static class IOBlocked implements Runnable {private InputStream in;public IOBlocked(InputStream inn) {this.in = inn;}@Overridepublic void run() {try {System.out.println("等待读:");in.read();}catch (IOException e){if (Thread.currentThread().isInterrupted()){System.err.println("IOBlocked程序被打断");}else{e.printStackTrace();}}System.out.println("IOBlocked 执行完毕");}}static class SynchronizedBlocked implements Runnable {public synchronized void f() {// 忙等待中,永远也不释放锁while (true)Thread.yield();}public SynchronizedBlocked() {new Thread(()->{// 开启一个线程锁住该对象f();}).start();}@Overridepublic void run() {System.out.println("尝试获取对象锁...");f();System.out.println("SynchronizedBlocked 执行完毕");}}static class ReentrantLockBlocked implements Runnable {private Lock lock = new ReentrantLock();public void f() throws InterruptedException {// 尝试获取锁,直到这个线程被中断lock.lockInterruptibly();}public ReentrantLockBlocked() {new Thread(()->{// 锁住当前对象lock.lock();}).start();}@Overridepublic void run() {System.out.println("尝试获取对象锁...");try {f();} catch (InterruptedException e) {System.out.println("ReentrantLockBlocked 被中断");}System.out.println("ReentrantLockBlocked 执行完毕");}}private static ExecutorService exec = Executors.newCachedThreadPool();private static void test(Runnable r) throws InterruptedException {// 获得线程上下文Future<?> f = exec.submit(r);TimeUnit.MILLISECONDS.sleep(10);System.out.println("将要打断线程:"+r.getClass().getName());// 中断指定线程,这个是中断单个线程的方式f.cancel(true);System.out.println("Interrupt 已经发送"+r.getClass().getName());}private static void testNormal() throws InterruptedException {test(new SleepBlocked());test(new IOBlocked(System.in)); // 不可被中断test(new SynchronizedBlocked()); // 不可被中断System.out.println("将要退出系统");System.exit(0);}private static void testIO() throws IOException, InterruptedException {ServerSocket socket1 = new ServerSocket(8080);ServerSocket socket2 = new ServerSocket(8081);InputStream socketInput1 = new Socket("localhost",8080).getInputStream();InputStream socketInput2 = new Socket("localhost",8081).getInputStream();exec.execute(new IOBlocked(socketInput1));exec.execute(new IOBlocked(socketInput2));TimeUnit.MILLISECONDS.sleep(100);System.out.println("中断由exec管理的所有的线程");exec.shutdownNow();TimeUnit.SECONDS.sleep(1);System.out.println("将关闭。。socketInput1资源");socketInput1.close(); // 关闭I/O资源来释放阻塞线程TimeUnit.SECONDS.sleep(1);System.out.println("将关闭。。socketInput2资源");socketInput2.close(); // 释放阻塞线程}public static void testReentrantLock() throws InterruptedException {exec.execute(new ReentrantLockBlocked());TimeUnit.SECONDS.sleep(1);System.out.println("将要中断ReentrantLockBlocked");exec.shutdownNow();System.out.println("中断ReentrantLockBlocked完成");}public static void main(String[] args) throws Exception {//        testNormal(); // 测试正  常情况下,三种阻塞下响应Interrupted
//        testIO(); // 测试关闭IO资源以 释放阻塞线程testReentrantLock(); // 测试互斥状态下,中断阻塞线程}
}

上面测试了对于三种不同阻塞下,调用中断操作后的不同反应,由上可以得知任何要求抛出InterruptedException的阻塞都可以直接中断的,而I/O中断就必须释放底层资源后才可以;锁等待的情况,一般情况下是不能进行中断了(但是可以中断正在占有锁的线程),而且在ReentrantLock上阻塞任务是例外的,是可以被中断的。因为可以看出来lockInterruptibly()方法是有所不同的,它是抛出InterruptedException异常的,也就是说基于上面说的它也是可以被中断的。

补充:
ReentrantLock,有四种获取锁的方法,分别是lock、tryLock、tryLock(long time, TimeUnit unit)、lockInterruptibly(),各有特点
lock:机制和synchronized类似,若锁被占用,那么将等待获取锁,直到锁被获取;
tryLock:尝试获取锁,若锁被占用,则返回false,不再等待。若是获取到锁,则返回true
tryLock(long time, TimeUnit unit):与上边相比,有一定的时间,在指定时间内,则等待获取锁,超时后,则返回false
lockInterruptibly:若锁被占用,则等待获取锁,直到锁被获取或者该线程被中断。

3.检查中断

这一节就很有意思了,上面我记住了如何中断线程,并且举例了三种阻塞情况下的中断方法,但是要知道,线程一共是有四种状态的,分别是准备、就绪、阻塞、死亡。只说了阻塞状态下的中断,那么其他三种情况呢?这种也是可以考虑一下,准备和死亡就不用说了,因为这是一个非常短的一个状态,不需要阻塞。就绪呢?
假如有一个Runnable,是一个循环,如果不被中断,那么一直是处于就绪的状态,那么如何打断呢?其实这里可以检查中断状态来判断是否中断。Thread.currentThread().isInterrupted()返回值是 boolean型,该线程被中断则返回true,否则false;
只要将这个作为一个循环条件,那么就可以在调用shutdownNow的时候,判断线程已被中断,然后退出当前线程了。
注意: 有些是操作是会清空中断状态的,比如调用它Thread.currentThread().isInterrupted()就会清空中断状态,所以有需要的话,必须将这个状态存储起来,当然它清空是为了确保并发状态下不会因为某个任务被中断而通知你两次。

五、线程之间的协作

上面提到的都是通过锁来同步两个线程的行为,从而使得一个线程不会干涉另外一个线程的资源。也就是说两个线程在交替使用某个共享资源的时候,可以通过互斥来使得任何时刻只有一个线程是可以访问这项资源的。
基于此,这一节学习的是如何使线程彼此之间进行协作,协作不同于以前的内容了,这一部分不是要解决彼此之间的干涉,而是彼此之间的协调。书中是举了个项目规划的例子,这里就举个生活中的例子吧

做西红柿炒蛋应该都是知道的,不过有一种做法是要把西红柿先用热水煮一下,煮的快熟了的状态,可以去掉外皮。
那么要完成一道西红柿炒蛋,首先可以完成什么?是否这道菜必须一步一步完成呢?哪些步骤可以同时进行?下面说一下我的见解
假设我们有三个灶台、三个锅,那么第一步我们需要将西红柿煮熟还有需要将鸡蛋炒熟,这两件事其实可以同时进行,这样可以大大减少等待时间。还有一件事那么就是需要放油入锅,放煮好的西红柿进行翻炒再放入鸡蛋,很明显这件事必须要第二步进行,而且必须是在西红柿煮熟后以及鸡蛋准备好后进行,所以哪怕我们已经有条件同时进行这三件事,但是因为做第二个步骤所需要的外部条件不符合,第二步骤也是不可以进行的。
那么真实地步骤应该是:将水放入A锅,放入西红柿;将油放入B锅,放入鸡蛋;将油放入C锅,但是因为没有配好西红柿和鸡蛋,那么需要等待(wait),等待A锅和B锅准备好并发出准备好的指令(notifyall),那么C锅就可以放入西红柿进行翻炒,再放入鸡蛋放入配料,这样一道西红柿鸡蛋就做好了。

1.wait()和notifyAll()

上面简单介绍了在什么场景下使用线程之间的协作。在程序中,我们可以通过wait()来将自身挂起,并且等待某个条件发生变化,通常这种变化 的改变超出了当前线程可控制的范围。在未接触wait()的时候,可以通过忙等待 的方式来实现,但是这是一种不良的CPU周期使用方式,因为所谓的忙等待就是一个不断循环检测某个外部条件变化的代码,这样的话,其实是一直在占有锁并且占用CPU时间段。
只有notify()notifyAll()发生时,即表示发生了某些外部条件的改变了,那么这个线程会被唤醒,也就是解除挂起状态。
wait()有三个重载方法,分别是wait(long timeout, int nanos)wait(long timeout)wait(),timeout的单位是milliseconds,意思是挂起后,在一个时间段内若没有被唤醒,则超时后自己唤醒自己。但是这个方法就比较有意思了,wait(long timeout, int nanos),我原以为它是更加对时间这里更加的细化,结果看了它的实现发现并不是这样的。

这个方法的注释中指出nanos的单位是nanoseconds,并且超时时间应该为1000000*timeout+nanos,这样的单位应该是nanoseconds了,时间粒度来说更加的细了,但是实际实现确实上图所示。这地方就不深究了,用到这个方法的时候再说吧。另外两个方法其实就够用了。

wait方法和sleep方法的区别:

  • 这个区别是很大的,虽然都是阻塞当前线程,但是wait()期间对象锁是释放的,sleep()并不会释放锁;
  • wait()可以通过notify、notifyAll或时间超时来使wait()恢复。

注:一般来说线程都是由一个工厂来管理的,并且会指定线程的核心线程数和最大线程数等参数(ThreadPoolExecutor),那么使用wait挂起线程的时候,它虽然将锁释放了,但是它依然占有着这个线程,也就是当因为wait出现死锁的状态下,并将所有的线程数都占有了,那么是有整个jvm进程停止工作的可能

notify()notifyAll()wait()他们都是属于Object对象的一部分,并且只能在同步控制方法中或者同步控制块中使用这三个方法。否则将会报错。

下面将写一个代码,实现一个打蜡抛光的过程,这个代码一共两个过程,分别是将蜡涂在车上,然后抛光它,下一层打蜡发生在抛光之后,以此反复进行,也就是多次打蜡抛光。

public class A_WaitAndNotifyAll {static class Car {private boolean waxOn = false;// 打蜡public synchronized void waxed(){this.waxOn = true;notifyAll();}// 抛光public synchronized void buffed() {this.waxOn = false;notifyAll();}// 等待打蜡执行完毕public synchronized void waitForWaxing() throws InterruptedException {while (this.waxOn == false) {wait();}}// 等待抛光执行完毕public synchronized void waitForBuffing() throws InterruptedException {while (this.waxOn == true) {wait();}}}// 打蜡流程static class WaxOn implements Runnable {private Car car;public WaxOn(Car car) {this.car = car;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){System.out.println("打蜡...");TimeUnit.MILLISECONDS.sleep(200); // 模拟打蜡过程car.waxed();car.waitForBuffing();}}catch (InterruptedException e) {System.out.println("WaxOn 被中断");}System.out.println("WaxOn 执行完毕");}}// 抛光流程static class WaxOff implements Runnable {private Car car;public WaxOff(Car car) {this.car = car;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){car.waitForWaxing();System.out.println("抛光...");TimeUnit.MILLISECONDS.sleep(200); // 模拟抛光过程car.buffed();}}catch (InterruptedException e) {System.out.println("WaxOff 被中断");}System.out.println("WaxOff 执行完毕");}}public static void main(String[] args) throws InterruptedException {Car car = new Car();ExecutorService executor = Executors.newCachedThreadPool();executor.execute(new WaxOn(car));executor.execute(new WaxOff(car));TimeUnit.SECONDS.sleep(6);executor.shutdownNow();}
}

上面代码基本上在关键处也是有注释的,只有个惯用的方式需要注意一下:

while (someCondition) {wait();
}

如上例,对这个进行解释一下,如果两个线程陷入挂起状态等待被唤醒,那么唤醒后,可能并不是自身需要的外部条件发生了变化,那么就需要重新进入wait状态,其实这块涉及了之前的一部分内容,因为多个线程陷入了阻塞状态,那么当他们被唤醒被重新尝试占用锁的时候,是没有办法控制某个线程可以获取到锁的,所以就有了这个惯用的做法。

2.notify()和notifyAll()

关于这两个方法和我原来想法有点歧义,特殊记录一下
当有多个线程处于wait()状态的时候,那么调用notifyAll()要比notify()要更加合理与安全,但是只有一个线程处于wait()的状态中时,那么使用notify()来代替notifyAll()是一种优化的选择。
说说这两个区别:
使用notify()时,在等待同一个锁的线程中,只有一个被唤醒(随机),因此需要保证被唤醒的那个是恰当的线程。所以当有多个线程时,就比较难以控制了,需要这众多的线程都等待相同的条件,这样外部条件发生变化时,使用notify()唤醒哪个线程都可以满足现在的条件。
notifyAll()这个就很好理解了,就是唤醒所有等待的线程,这所谓的所有等待的线程,是指等待唤醒所特定对象锁的线程。

3.生产者与消费者

public class B_ProducerAndConsumer {// 菜static class Meal {private final int orderNum;public Meal(int orderNum) {this.orderNum = orderNum;}@Overridepublic String toString() {return "Meal{" +"orderNum=" + orderNum +'}';}}// 服务生static class WaitPerson implements Runnable{private Restaurant restaurant;public WaitPerson(Restaurant restaurant) {this.restaurant = restaurant;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){synchronized (this){// 判断当前的菜是否已经做完,若还没做完则挂起while (this.restaurant.meal== null){wait();}}System.out.println("服务员拿到餐..."+this.restaurant.meal);// 同步厨师,判断厨师是否正在尝试做菜,若是则等待厨师尝试失败后挂起,然后拿到锁将餐取走,并通知厨师synchronized (this.restaurant.chef) {this.restaurant.meal=null;this.restaurant.chef.notifyAll();}}}catch (InterruptedException e){System.out.println("WaitPerson 被中断");}}}static class Chef implements Runnable{private Restaurant restaurant;private int count = 0;public Chef(Restaurant restaurant) {this.restaurant = restaurant;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){synchronized (this){// 若餐还在取餐口,则挂起。只有取餐口没有餐才开始做菜while (this.restaurant.meal!= null){wait();}}// 做完九道菜,将不再做菜if(++count == 10){System.out.println("已经做完,不再做菜");this.restaurant.service.shutdownNow();}else{System.out.println("开始做菜...");}// 尝试获取服务生的锁,若服务生挂起则将餐放到取餐口,并通知服务生synchronized (this.restaurant.waitPerson) {this.restaurant.meal=new Meal(count);this.restaurant.waitPerson.notifyAll();}TimeUnit.MILLISECONDS.sleep(100);}}catch (InterruptedException e){System.out.println("Chef 被中断");}}}static class Restaurant {Meal meal;private WaitPerson waitPerson = new WaitPerson(this);private Chef chef = new Chef(this);ExecutorService service = Executors.newCachedThreadPool();public Restaurant() {System.out.println("开始执行");service.execute(waitPerson);service.execute(chef);}public static void main(String[] args) {new Restaurant();}}
}

这里其实也是通过一个饭店(Restaurant)中的服务生和厨师来表示一个生产者、消费者之间交互的一个案例,其实还是线程之间的协调。

我认为这一节主要是一个概念的引入,而且在上面的注释中其实并没有将一个概念解释的很透彻,那就是取餐口。这个在代码中是什么概念呢,其实就是Restaurant的meal变量,因为不管是厨师还是服务员都是在对这个变量进行操作,进行取餐放餐。那么对于一个线程而言,只有一个单一的地点用于存放对象,从而另外一个线程可以使用这个对象。那么最经典的实现方式,其实是使用先进先出队列来存储被生产和消费的对象。

4.使用显式的Lock和Condition对象

Lock在前面是有介绍的,是显式的锁的形式。Condition是和wait()notify()notifyAll()相对应,并且也同样有三个方法与之对应,那就是await()signal()signalAll()。并且因为await()signal()signalAll()三个方法为Condition所有,所以不管是挂起还是唤醒针对的都是Condition上被其自身挂起的锁。因此与notifyAll()相比,signalAll()更加的安全。

public class C_AwaitAndSignalAll {static class Car {private Lock lock = new ReentrantLock();// 和Object中的wait、notify、notifyAll一样,如果需要操作await()、signal()、signalAll(),必须先获得锁private Condition condition = lock.newCondition();private boolean waxOn = false;// 打蜡public void waxed(){lock.lock();// 每个对Lock()的调用都必须紧跟一个try-catch子句,用来保证在任何情况下都可以释放锁。try {this.waxOn = true;condition.notifyAll();}finally {lock.unlock();}}// 抛光public void buffed() {lock.lock();try {this.waxOn = false;condition.notifyAll();}finally {lock.unlock();}}// 等待打蜡执行完毕public void waitForWaxing() throws InterruptedException {lock.lock();try {while (this.waxOn == false) {// 必须先获得锁才能对锁进行操作condition.await();}}finally {lock.unlock();}}// 等待抛光执行完毕public void waitForBuffing() throws InterruptedException {lock.lock();try {while (this.waxOn == true) {// 必须先获得锁才能对锁进行操作condition.await();}}finally {lock.unlock();}}}// 打蜡流程static class WaxOn implements Runnable {private A_WaitAndNotifyAll.Car car;public WaxOn(A_WaitAndNotifyAll.Car car) {this.car = car;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){System.out.println("打蜡...");TimeUnit.MILLISECONDS.sleep(200); // 模拟打蜡过程car.waxed();car.waitForBuffing();}}catch (InterruptedException e) {System.out.println("WaxOn 被中断");}System.out.println("WaxOn 执行完毕");}}// 抛光流程static class WaxOff implements Runnable {private A_WaitAndNotifyAll.Car car;public WaxOff(A_WaitAndNotifyAll.Car car) {this.car = car;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){car.waitForWaxing();System.out.println("抛光...");TimeUnit.MILLISECONDS.sleep(200); // 模拟抛光过程car.buffed();}}catch (InterruptedException e) {System.out.println("WaxOff 被中断");}System.out.println("WaxOff 执行完毕");}}public static void main(String[] args) throws InterruptedException {A_WaitAndNotifyAll.Car car = new A_WaitAndNotifyAll.Car();ExecutorService executor = Executors.newCachedThreadPool();executor.execute(new A_WaitAndNotifyAll.WaxOn(car));executor.execute(new A_WaitAndNotifyAll.WaxOff(car));TimeUnit.SECONDS.sleep(6);executor.shutdownNow();}
}

使用显式的Lock和Condition肯定是更加灵活,但是对于这个案例来说,并没有太大的区别,反而增加了复杂性,因此Lock和Condition对象只有在更加困难的多线程问题中才是必须的

5.BlockingQueue

wait()和notifyAll()方法以一种非常低级的方式解决了任务互相协作的问题,即每次交互时都握手。
可以在更高抽象级别下,来解决这个问题,那就是使用同步队列来解决线程协作问题,同步队列在任何时刻都只允许一个线程插入或者移除元素。在java.util.concurrent.BlockingQueue接口中提供了这个队列,有很多标准实现,如LinkedBlockingQueue、ArrayBlockingQueue等。
这个特点在哪里呢?
BlockingQueue在消费者试图从这个队列中获取对象时,若该队列中没有值,那么这些队列将挂起消费者线程,并且当有更多元素的时候,恢复消费者任务。

public class D_BlockingQueue {// 烤面包的实体static class Toast {public enum Status {DRY, BUTTERED, JAMMED}private Status status = Status.DRY;private final int id;public Toast(int id) {this.id = id;}public void butter() {status = Status.BUTTERED;}public void jam() {status = Status.JAMMED;}public Status getStatus() {return this.status;}public int getId(){return id;}@Overridepublic String toString() {return "Toast{" +"status=" + status +", id=" + id +'}';}}// 继承自LinkedBlockingQueue,指定类型static class ToastQueue extends LinkedBlockingQueue<Toast>{}
//    开始制作吐司,并将它放入延迟队列中static class Toaster implements Runnable{private ToastQueue toasts;private int count =0;private Random random = new Random(47);public Toaster(ToastQueue toasts) {this.toasts = toasts;}@Overridepublic void run() {try {while (!Thread.interrupted()){TimeUnit.MILLISECONDS.sleep(100+random.nextInt(500));Toast t=new Toast(count++);System.out.println(t);toasts.put(t); // 添加到LinkedBlockingQueue中,如果存在挂起的线程则立刻唤醒}}catch (InterruptedException e){System.out.println("Toaster被中断");}System.out.println("Toaster执行完毕");}}
//  将吐司涂上黄油,并将它放入butterQueue队列中static class Butter implements Runnable{private ToastQueue dryQueue,butterQueue;public Butter(ToastQueue dryQueue,ToastQueue butterQueue) {this.dryQueue = dryQueue;this.butterQueue = butterQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){// queue中无元素,则等待获取到元素。若有则在queue的head取出移除该元素。先进先出Toast t=dryQueue.take();t.butter();System.out.println(t);butterQueue.put(t);}}catch (InterruptedException e){System.out.println("Butter被中断");}System.out.println("Butter执行完毕");}}
//  将吐司堵上果酱,并将它放入finishedQueue队列中static class Jammer implements Runnable{private ToastQueue butterQueue,finishedQueue;public Jammer(ToastQueue butterQueue,ToastQueue finishedQueue) {this.finishedQueue = finishedQueue;this.butterQueue = butterQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){// queue中无元素,则等待获取到元素。若有则在queue的head取出移除该元素。先进先出Toast t=butterQueue.take();t.jam();System.out.println(t);finishedQueue.put(t);}}catch (InterruptedException e){System.out.println("Jammer被中断");}System.out.println("Jammer执行完毕");}}
//  将做好的吐司放到顾客面前,并判断吐司是否是按照顺序以及是否已经做完的状态static class Eater implements Runnable{private ToastQueue finishedQueue;private int counter =0;public Eater(ToastQueue finishedQueue) {this.finishedQueue = finishedQueue;}@Overridepublic void run() {try {while (!Thread.interrupted()){// queue中无元素,则等待获取到元素。若有则在queue的head取出移除该元素。先进先出Toast t=finishedQueue.take();// 检查是否是先进先出按照顺序来的以及状态是否是Status.JAMMEDif (t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED){System.err.println("出现错误"+t);System.exit(1);}else {System.out.println("吃掉它==="+t);}}}catch (InterruptedException e){System.out.println("Eater被中断");}System.out.println("Eater执行完毕");}}public static void main(String[] args) throws InterruptedException {ToastQueue dryQueue = new ToastQueue(),butteredQueue = new ToastQueue(),finishedQueue = new ToastQueue();ExecutorService service = Executors.newCachedThreadPool();service.execute(new Toaster(dryQueue));service.execute(new Butter(dryQueue,butteredQueue));service.execute(new Jammer(butteredQueue,finishedQueue));service.execute(new Eater(finishedQueue));TimeUnit.SECONDS.sleep(5);service.shutdownNow();}
}

关于这个案例详细的阐述了BlockingQueue的基本操作

6.线程间使用管道进行输入/输出

这个所谓的管道就是指PipedWriter、PipedReader,提供了线程以“管道”的形式对线程间输入/输出提供了支持,这个模型也可以看成是“生产者-消费者”问题的变体。本质上看,PipedWriter、PipedReader是一个阻塞队列。

  • PipedWriter: 允许线程向管道写入
  • **PipedReader:**允许不同线程从同一个管道中获取

案例:

public class E_Pipe_WriterAndReader {static class Sender implements Runnable{private Random random = new Random(47);private PipedWriter writer = new PipedWriter();public synchronized PipedWriter getWriter() {return writer;}@Overridepublic void run() {try {while (true) {for (char c = 'A'; c<='Z';c++){writer.write(c);TimeUnit.MILLISECONDS.sleep(random.nextInt(500));}}} catch (InterruptedException e) {System.out.println("Sender 被中断");} catch (IOException e) {System.err.println("Sender 发生异常");}}}static class Receiver implements Runnable{private PipedReader reader;public Receiver(Sender sender) throws IOException {// 指定在哪个stream中读取this.reader = new PipedReader(sender.getWriter());}@Overridepublic void run() {try {while (true){//                   若读取不到数据则自动阻塞System.out.println("收到:"+(char)reader.read());}} catch (IOException e) {System.err.println("发生异常:Receiver");}}}public static void main(String[] args) throws Exception {Sender sender = new Sender();Receiver receiver = new Receiver(sender);ExecutorService service = Executors.newCachedThreadPool();service.execute(sender);service.submit(receiver);TimeUnit.SECONDS.sleep(3);service.shutdownNow();}
}

可以看出来,在创建PipedWriter和PipedReader的时候,必须将它们相关联。可以使用如上方式,也可以在PipedWriter中进行关联,PipedReader的read方法和BlockingQueue中的take方法类似,当读取不到数据时,它将阻塞线程。
有一点需要注意:PipedReader的read方法可以被中断,并且和I/O还有一个不同点就在于它在阻塞的时候不会占有锁,I/O阻塞状态下依然占有锁

六、死锁

死锁一个很好理解的概念,因为线程分为四种状态,准备、就绪、阻塞、死亡,在阻塞状态下,一般情况下都是可以唤醒的,或通过中断,或通过超时,或通过一些外部条件。
那么一些例外的情况下,就会出现死亡的状态,比如A线程在等待B线程,B线程在等待C线程,而C线程又在等待A线程,这就是典型的死锁问题。在书中着重提到了Edsger Dijkstra提出的哲学家就餐问题。
死锁是不易发现的,这个需要有很强的经验,因为死锁会造成很多意想不到的问题,而且基本重现不了。

这里讨论一下死锁造成的影响:

  1. 肯定是对业务逻辑产生 一定的影响,造成延迟、超时
  2. 阻塞主要分为四种,分别是I/O阻塞、互斥阻塞、sleep阻塞以及wait阻塞(后面的await、BlockingQueue、PipedReader都可以归于这类),前三种在阻塞的同时都会占用锁,而最后一种会将线程挂起释放锁。那么对于前三种来说,尤其是第二种(当互斥阻塞变成循环互斥的时候,那就是死锁),那么不但占有线程池资源,而且影响其他线程对于这个公共资源的使用,从而达到使整个程序瘫痪的地步(因为其他线程在使用这个公共资源的时候,首先要等待锁释放)。最后一种,危害要少一点,但是也会一直占用线程池资源,但是不用影响其他线程的使用。

下面看一个死锁的例子,也就是Edsger Dijkstra提出的哲学家就餐问题的代码实现,为防止不了解这个就餐问题,先对这个问题用语言描述一下

哲学家就餐问题,指的是有五个哲学家并由五根筷子,他们围坐在一个圆桌上准备吃饭,每个人的左右两边都有一根筷子,哲学家想要吃饭必须拿到左右两边的两根筷子才可以就餐,他们是按照先拿右边筷子再拿左边的筷子的方式去拿的,如果所需要的筷子已经被其他的哲学家使用的话,那么它将陷入等待中,直到可以拿到筷子为止

public class A_PhilosopherRepastProblem {// 描述筷子被拿起放下的状态static class Chopstick {private boolean taken = false;public synchronized void taken() throws InterruptedException {while (taken){wait();}taken = true;}public synchronized void drop(){taken = false;notifyAll();}public synchronized void testLock(){System.err.println("获取到锁");}}static class Philosopher implements Runnable{private Chopstick left;private Chopstick right;private final int id;private final int ponderFactor;private Random random = new Random(47);public Philosopher(Chopstick left, Chopstick right, int id, int ponderFactor) {this.left = left;this.right = right;this.id = id;this.ponderFactor = ponderFactor;}// 模拟哲学家思考的过程private void pause() throws InterruptedException {if (ponderFactor ==0){return;}TimeUnit.MILLISECONDS.sleep(random.nextInt(ponderFactor*250));}@Overridepublic void run() {try {while (!Thread.interrupted()){System.out.println("哲学家开始思考...");pause();// 思考饿了,开始干饭System.out.println("哲学家开始拿右边的筷子");right.taken();System.out.println("哲学家开始拿左边的筷子");left.taken();System.out.println("哲学家开始干饭");pause();System.out.println("吃饱了,放下筷子,给其他人用");right.drop();left.drop();}} catch (InterruptedException e) {System.err.println("Philosopher 被中断了");}}}public static void main(String[] args) throws InterruptedException {// 将ponder哲学家思考时间设置足够短以达到可以尽快达到死锁状态int ponder =0;// 设置成5并不是不会出现死锁,而是几率比较低
//        int ponder =5;int size =5;ExecutorService service = Executors.newCachedThreadPool();Chopstick[] chopsticks = new Chopstick[size];// 生成五个哲学家for (int i=0;i<size;i++){chopsticks[i]=new Chopstick();}// 哲学家开始吃饭for (int i=0;i<size;i++){// 这里涉及一个叫做数据循环下标的处理方式,就是取余,这是正循环,负循环是(size+i-1)%sizeservice.execute(new Philosopher(chopsticks[i],chopsticks[(i+1)%size],i,ponder));}// 测试当造成死锁后,因为是wait阻塞,是否会导致其他线程无法获取锁// 结果其他线程依然可以获取锁int i = 0;while (true){chopsticks[i%size].testLock();TimeUnit.SECONDS.sleep(1);}// 注释掉中断线程的方法,以便达到死锁效果
//        TimeUnit.SECONDS.sleep(5);
//        service.shutdownNow();}}

上面案例详细的对哲学家就餐问题进行了演示,分析后,可以得出想要造成死锁也并不容易,必须同时满足四条条件:

  1. 互斥条件。任务使用的资源中至少有一个是不能共享的,这里,一根Chopstick一次就只能被一个Philosopher使用。
  2. 至少有一个线程它必须持有一个资源且正在等待另外一个当前被别的线程持有的资源。也就是说,要发生死锁,Philosopher必须拿着一根筷子并且等待另外一根。
  3. 资源不能被线程强占,线程必须把资源释放当做普通事件。Philosopher很有礼貌,他们不会从其他的Philosopher手中抢筷子;
  4. 必须有循环等待,这时,一个线程等待其他线程所持有的资源,后者又在等待另一个线程所持有的资源,这样下去,直到有一个线程在等待第一个线程所持有的资源。使得大家都被锁住,在上面的案例中,因为每个Philosopher都试图先得到右边的筷子,然后得到左边的筷子,所以发生了循环等待。

因为要发生死锁,必须全部满足上面的所有条件,那么解决死锁问题,就是破坏其中任意一个条件就可以了。在上例代码中,解决最简单的方式就是破坏第四个条件,在上例中Philosopher都是先拿右边的筷子,再拿左边的筷子造成了循环等待。我们只需要让其中一个Philosopher先拿左边的筷子,再拿右边的筷子,这样循环等待自然就不存在了。

七、新类库中的构件

主要是指java.util.concurent中引入了一些其他的组件类,利用这些类可以更加简单的写出符合并发的代码。

1.CountDownLatch

它被用来同步一个或多个任务,强制他们等待由其他任务执行的**“一组“**操作完成,调用它的await()来阻塞的线程,必须等待count值为0后才能唤醒,初始化时设置count的值,通过countDown()方法减少count的值来达到唤醒线程的效果。
下面简单案例

public class A_CountDownLatch {static class TaskPortion implements Runnable{private static int count = 0;private final int id = count++;private static Random random = new Random(47);private final CountDownLatch latch;public TaskPortion(CountDownLatch latch) {this.latch = latch;}@Overridepublic void run() {try {doWork();// 将CountDownLatch初始化的count值依次递减,如果count==0,那么所有等待的线程将被唤醒,再次调用这个方法将什么也不会发生。latch.countDown();} catch (InterruptedException e) {System.err.println("TaskPortion 被中断");}}private void doWork() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(random.nextInt(2000));System.out.println(this+"已完成");}@Overridepublic String toString() {return "TaskPortion{" +"id=" + id +", latch=" + latch +'}';}}static class WaitingTask implements Runnable{private static int count = 0;private final int id = count++;private final CountDownLatch latch;public WaitingTask(CountDownLatch latch) {this.latch = latch;}@Overridepublic void run() {try {// 阻塞线程,直到count变成0,如果本来就为0,那么将不再阻塞latch.await();System.out.println("在此之前一组操作已完成,现在完成最后操作:"+this);} catch (InterruptedException e) {System.err.println("WaitingTask 被中断");}}@Overridepublic String toString() {return "WaitingTask{" +"id=" + id +", latch=" + latch +'}';}}public static void main(String[] args) {int size = 100;ExecutorService service = Executors.newCachedThreadPool();CountDownLatch latch = new CountDownLatch(size);for (int i=0; i<10;i++){service.execute(new WaitingTask(latch));}for (int i=0; i<size;i++){service.execute(new TaskPortion(latch));}System.out.println("全部任务已经就绪......");service.shutdown();}
}

2.CyclicBarrier

这个CyclicBarrierCountDownLatch类似,都是创建一组线程,并行执行它们,然后在进行下一步骤前等待所有线程完成,只是有一点不同的是CountDownLatch只触发一次,也就是它不会重置count,而CyclicBarrier可以多次重用。
当然它们还有另外一个不同之处,那就是CyclicBarrier有一个barrierAction,当计数值达到0时自动执行。

下面通过一个案例来介绍CyclicBarrier,这个案例很有意思,建议自己测试一下,自己水平有限,研究两个小时才将这里研究明白。
先介绍一个这个案例,虽然这个也是书中的案例,但是我现在描述的可能和书中描述的不太一致,但是我感觉更加贴近我(中国人)的思想。

这是一场赛马游戏,每匹马几乎是同时从起来开始出发,每个阶段马根据自己的能力跑0-2步之间,可以将这个每个阶段理解成每秒,那么当一匹马跑完了这一秒的步伐了,那么他需要等待其他马也跑完这一秒,直到所有的马都在这一秒中做出了动作,那么将会查看每批马跑了多少步,如果有任意一匹马到达了终点,那么将停止比赛,否则将继续比赛。

案例:

public class B_CyclicBarrier {static class Horse implements Runnable{private static int counter = 0;private final int id= counter++;// 步幅private int strides =0;private static Random random = new Random(47);private static CyclicBarrier cyclicBarrier;public Horse(CyclicBarrier barrier) {this.cyclicBarrier = barrier;}public synchronized int getStrides() {return strides;}@Overridepublic void run() {try {while (!Thread.interrupted()){synchronized (this){// 每个阶段所跑的步数strides += random.nextInt(3);}/***  等待,直到所有的线程都到达指定的barrier处*  如果将CyclicBarrier比作CountDownLatch,那么这里就是在做countDown(),告知cyclicBarrier已经有一个线程完成了任务*  当所有的线程都完成任务后,那么将自动触发barrierAction,如果它为null,则忽略。*  当barrierAction执行完成或忽略后,count将重置,也就是又可以countDown()了,这个就是CyclicBarrier和CountDownLatch的区别,*  CyclicBarrier可以多次使用*/cyclicBarrier.await();}} catch (InterruptedException e) {System.err.println("Horse 被中断...");} catch (BrokenBarrierException e) {// 当一组线程中,其他线程超时或者中断,则报此异常System.err.println("Horse BrokenBarrierException");}}@Overridepublic String toString() {return "Horse{" +"id=" + id +", strides=" + strides +'}';}// 记录当前线程的所跑的步数public String tracks(){StringBuilder s = new StringBuilder();for (int i=0; i<getStrides(); i++){// 以“*”来代替马匹跑的步数s.append("*");}// 唯一标识这匹马s.append(id);return s.toString();}}static class HorseRace {// 这场比赛总共的步数static final int FINISH_LINE = 75;// 存放查看马匹状态private List<Horse> horses = new ArrayList<>();// 线程池private ExecutorService service = Executors.newCachedThreadPool();private CyclicBarrier barrier;/*** 构造器* @param nHorses 几匹马匹参赛* @param pause 停顿时间*/public HorseRace(int nHorses, final int pause){/*** 第一个参数parties,一组线程中有多少个线程,类似于CountDownLatch的Count* 第二个参数barrierAction:直译为栅栏行为,我也不太清楚如何翻译好,大概意思是一个业务分为两个阶段,* 第一阶段分为N个部分,可以由N个线程来同时处理,提高效率;* 第二阶段:当第一阶段全部处理完成后会自动触发第二阶段的行为,也就是第一阶段所做的事情是第二阶段的先决条件。* 注:tripped 翻译为被触发*/barrier = new CyclicBarrier(nHorses, new Runnable() {// 以下演示赛马动画效果@Overridepublic void run() {StringBuilder s = new StringBuilder();// 显示赛道的长度for (int i=0; i<FINISH_LINE; i++){s.append("=");}System.out.println(s);// 显示每个阶段后每匹马所跑的距离for (Horse horse:horses){System.out.println(horse.tracks());}// 判断是否有马已经赢得了比赛,若有则停止比赛for (Horse horse:horses){if(horse.getStrides()>=FINISH_LINE){System.out.println(horse+":赢得比赛");service.shutdownNow();return;}}// 暂停一段时间try {TimeUnit.MILLISECONDS.sleep(pause);} catch (InterruptedException e) {System.err.println("HorseRace 被中断...");}}});// 将每匹马放入赛道for (int i=0; i<nHorses; i++){Horse horse = new Horse(barrier);horses.add(horse);service.execute(horse);}}public static void main(String[] args) {int nHorses =3;int pause =200;new HorseRace(nHorses,pause);}}
}

必须介绍一下这个动画显示是什么样的,我感觉很有意思。哈哈

3.DelayQueue

这个是比较常见的组件类,在工作中实现延时任务的时候,它是一种实现方式,当然肯定不是最好的实现方式,在而今各种中间件、各种框架漫天飞的情况下,如果jdk原生支持就可以超过中间件,那么它们就没有生存空间了

以下直接上案例吧,不过有一点很重要的,这个队列只可以存储实现了Delayed接口的类,这是因为所谓的延迟需要对队列中实体的一个延迟时间做一个掌控,而这个就需要存储在这个队列中的实体实现一定的规范来支持了。

public class C_DelayQueue {static class DelayedTask implements Runnable, Delayed {private static int counter =0;// 标记IDprivate final int id =counter++;// 延迟时长,单位Millisecond,非必须private final int delta;// 触发时间,单位nanosecond,写Delayed类必须属性private final long trigger;// 顺序集protected static List<DelayedTask> sequence = new ArrayList<>();public DelayedTask(int delayMilliseconds) {this.delta = delayMilliseconds;this.trigger = System.nanoTime()+TimeUnit.NANOSECONDS.convert(delta,TimeUnit.MILLISECONDS);sequence.add(this);}// 实现自Comparable接口方法,用于在DelayedQueue中进行排序时比较方法一致,对比触发时间@Overridepublic int compareTo(Delayed o) {DelayedTask that = (DelayedTask)o;return this.trigger>that.trigger?1:(this.trigger==that.trigger?0:-1);}@Overridepublic void run() {System.out.println(this+"");}/*** 返回当前对象的剩余延迟时间,可以通过TimeUnit来指定对应的时间单位*/@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(trigger-System.nanoTime(),TimeUnit.NANOSECONDS);}@Overridepublic String toString() {return "DelayedTask{" +"id=" + id +", delta=" + delta +", trigger=" + trigger +'}';}public String summary(){return "("+id+":"+delta+")";}/*** 直译:结束监控,当调用这个类的时候,结束线程池中的所有线程*/public static class EndSentinel extends DelayedTask{private ExecutorService service;public EndSentinel(int delayMilliseconds, ExecutorService service) {super(delayMilliseconds);this.service = service;}@Overridepublic void run() {// 打印每个元素for (DelayedTask pt:sequence){System.out.print(pt.summary()+"");}System.out.println();System.out.printf(this+"\n启动 ShutDownNow\n");service.shutdownNow();}}}/*** 消费者*/static class DelayedTaskConsumer implements Runnable{private DelayQueue<DelayedTask> q;public DelayedTaskConsumer(DelayQueue<DelayedTask> q) {this.q = q;}@Overridepublic void run() {try {// 在queue中取出元素并运行while (!Thread.interrupted()){q.take().run();}} catch (InterruptedException e) {System.err.println("DelayedTaskConsumer 被中断");}System.out.println("DelayedTaskConsumer 执行完成");}}public static void main(String[] args) throws InterruptedException {Random random= new Random(47);ExecutorService service = Executors.newCachedThreadPool();DelayQueue<DelayedTask> queue = new DelayQueue<>();for (int i=0; i<20; i++){queue.put(new DelayedTask(random.nextInt(5000)));}queue.add(new DelayedTask.EndSentinel(5000, service));service.execute(new DelayedTaskConsumer(queue));// API测试testAPI();}/*** 测试常用的方法*/static void testAPI() throws InterruptedException {TimeUnit.SECONDS.sleep(6);// BlockingQueue 是可以有容量限制的BlockingQueue linkedBlocking = new LinkedBlockingDeque(1);BlockingQueue arrayBlocking = new ArrayBlockingQueue(1);// 声明DelayQueueDelayQueue<DelayedTask> queue = new DelayQueue<>();// **DelayQueue 是没有容量限制的,默认队列大小为Integer.MAX_VALUE**System.out.println(queue.remainingCapacity());// **存元素所涉及的几个API方法,不可以向队列中添加null**// add 添加元素到queue队列中,基于Collection的形式,内部实现调用的offer方法queue.add(new DelayedTask(1999));queue.add(new DelayedTask(1998));// put 添加元素到queue队列,内部实现调用的offer方法queue.put(new DelayedTask(1994));queue.put(new DelayedTask(1995));// offer 添加元素queue.offer(new DelayedTask(1996));queue.offer(new DelayedTask(1993));// **取元素**// 显示队列中到期的元素(如果没有,那么显示下一个即将到期的元素),并不会移除元素// 如果队列元素为空,那么返回null// peek:窥探、偷看; Retrieves: 检索System.err.println(queue.peek());// 检索并移除元素,如果没有到期元素,则返回null// 此poll方法实现于Queue接口,但是Queue接口定义的poll方法是取队列头元素,没有延迟作用。System.err.println(queue.poll());// 检索并移除元素,如果没有到期元素,则等待元素到期,如果队列中没有元素,则无限等待添加元素后到期。System.err.println(queue.take());TimeUnit.SECONDS.sleep(1);System.err.println(queue.poll());System.err.println(queue.peek());List<DelayedTask> list = new ArrayList<>();// 将【到期】元素添加指定集合中,这个集合必须是Collection的子类// drainTo实现自BlockingQueue,在BlockingQueue中是将可以获取的元素(即队列中存在元素)添加到指定元素System.out.println(queue.drainTo(list));Iterator<DelayedTask> iterator = list.iterator();System.out.println("=======================================");while (iterator.hasNext()){System.out.println(iterator.next());}// 检索并移除到期元素,若没有到期元素,则报NoSuchElementException异常,// 这个方法在poll方法上进行了对返回null这种情况加了一层校验System.out.println(queue.remove());// 清空队列中元素queue.clear();}
}

4.PriorityBlockingQueue

这个队列是一个很基础的优先级队列,它具有阻塞的读取操作,按照优先级从队列中拿取任务。优先级是越大越排在前面,其实上面的DelayQueue也可以看成是一个变异的优先级队列。
下面是书上的例子,但是这里其实很简单

public class D_PriorityBlockingQueue {static class PrioritizedTask implements Runnable, Comparable<PrioritizedTask>{private static int counter =0;// 标记IDprivate final int id =counter++;private Random random = new Random(47);private final int priority;protected static List<PrioritizedTask> sequence = new ArrayList<>();public PrioritizedTask(int priority) {this.priority = priority;sequence.add(this);}// 刚好和Comparable接口中定义的顺序是反的,PriorityBlockingQueue队列中存储实现compareTo的实体类@Overridepublic int compareTo(PrioritizedTask o) {PrioritizedTask that = o;// 队列中大的排在header位置return this.priority<that.priority?1:(this.priority>that.priority?-1:0);}@Overridepublic void run() {try {// 延迟一会,模拟操作TimeUnit.MILLISECONDS.sleep(random.nextInt(250));} catch (InterruptedException e) {System.out.println("PrioritizedTask 被中断");}System.out.println(this);}@Overridepublic String toString() {return "PrioritizedTask{" +"id=" + id +", priority=" + priority +'}';}public String summary(){return "("+id+":"+priority+")";}public static class EndSentinel extends PrioritizedTask{private ExecutorService service;public EndSentinel(ExecutorService service) {super(-1);this.service = service;}@Overridepublic void run() {int count =0;for (PrioritizedTask task:sequence){System.out.print(task.summary());if (++count%5==0)System.out.println();}System.out.println();System.out.println("将要调用shutdownNow");service.shutdownNow();}}}/*** 生产者*/static class PrioritizedTaskProducer implements Runnable{private Random random = new Random(47);private Queue<Runnable> queue;private ExecutorService service;public PrioritizedTaskProducer(Queue<Runnable> queue, ExecutorService service) {this.queue = queue;this.service = service;}@Overridepublic void run() {for (int i=0; i<20; i++){queue.add(new PrioritizedTask(random.nextInt(10)));Thread.yield();}try {for (int i=0; i<10; i++){TimeUnit.MILLISECONDS.sleep(250);// 插入元素,该元素必须实现了Comparablequeue.add(new PrioritizedTask(10));}for (int i=0; i<10; i++){queue.add(new PrioritizedTask(10));}queue.add(new PrioritizedTask.EndSentinel(service));} catch (InterruptedException e) {System.out.println("PrioritizedTaskProducer 被中断");}System.out.println("PrioritizedTaskProducer 结束...");}}static class PrioritizedTaskConsumer implements Runnable{private PriorityBlockingQueue<Runnable> priorityBlockingQueue;public PrioritizedTaskConsumer(PriorityBlockingQueue<Runnable> priorityBlockingQueue) {this.priorityBlockingQueue = priorityBlockingQueue;}@Overridepublic void run() {try {//                TimeUnit.SECONDS.sleep(4);while (!Thread.interrupted()){priorityBlockingQueue.take().run();}} catch (InterruptedException e) {System.out.println("PrioritizedTaskConsumer 被中断");}System.out.println("PrioritizedTaskConsumer 结束");}}public static void main(String[] args) {Random random=new Random(47);ExecutorService service = Executors.newCachedThreadPool();PriorityBlockingQueue<Runnable> priorityBlockingQueue =new PriorityBlockingQueue<>();service.execute(new PrioritizedTaskProducer(priorityBlockingQueue,service));service.execute(new PrioritizedTaskConsumer(priorityBlockingQueue));}
}

5.使用ScheduledExecutor的温室控制器

调度,这个应该就是原生支持的一个调度器,ScheduledThreadPoolExecutor是调度的一个线程池,他的内部实现依然是使用ThreadPoolExecutor来实现的。ScheduledThreadPoolExecutor中提供了一些比较好的调度方法,比如schedule()运行一次任务、scheduleAtFixedRate()每隔规则的时间重复执行任务等等。。

目前工作中对于调度这块用的比较多的是xxl-job,一个非常简单的中间件,因为是国人开发的,所以各种中文文档都有,学习成本比较低。

public class F_GreenhouseScheduler {private volatile boolean light = false;private volatile boolean water = false;private String thermostat = "Day";public synchronized void setThermostat(String thermostat) {this.thermostat = thermostat;}public synchronized String getThermostat() {return thermostat;}/*** ScheduledThreadPoolExecutor 继承自ThreadPoolExecutor,内部依旧使用ThreadPoolExecutor实现线程池的方式,10是核心线程数,最大线程是Integer.MAX_VALUE*/ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(10);public void schedule(Runnable event, long delay){// 执行一次给定时间的调度行为scheduler.schedule(event,delay, TimeUnit.MILLISECONDS);}public void repeat(Runnable event, long initialDelay, long period){/*** 执行多次给定时间调度任务,每个任务固定时间执行一次* command:需要执行的任务* initialDelay:第一次开始执行时间* period:每个任务执行间隔时间,如第二次执行时间为:initialDelay+period。第三次:initialDelay+period*2* unit: 时间单位*/scheduler.scheduleAtFixedRate(event,initialDelay,period,TimeUnit.MILLISECONDS);}class LightOn implements Runnable{@Overridepublic void run() {System.out.println("开灯");light = true;}}class LightOff implements Runnable{@Overridepublic void run() {System.out.println("关灯");light = false;}}class waterOn implements Runnable{@Overridepublic void run() {System.out.println("开水闸");water = true;}}class waterOff implements Runnable{@Overridepublic void run() {System.out.println("关水闸");water = false;}}class thermostatNight implements Runnable{@Overridepublic void run() {System.out.println("恒温器,夜间模式");setThermostat("夜间模式");}}class thermostatDay implements Runnable{@Overridepublic void run() {System.out.println("恒温器,日间模式");light = true;setThermostat("日间模式");}}class Bell implements Runnable{@Overridepublic void run() {System.out.println("响铃.........");}}class Terminate implements Runnable{@Overridepublic void run() {System.out.println("终止....");scheduler.shutdownNow();new Thread() {public void run(){for (DataPoint d: data){System.out.println(d);}}}.start();}}class DataPoint {final Calendar time;final float temperature;final float humidity;public DataPoint(Calendar time, float temperature, float humidity) {this.time = time;this.temperature = temperature;this.humidity = humidity;}@Overridepublic String toString() {return "DataPoint{" +"time=" + time +", temperature=" + temperature +", humidity=" + humidity +'}';}}private Calendar lastTime = Calendar.getInstance();{lastTime.set(Calendar.MINUTE,30);lastTime.set(Calendar.SECOND,00);}private float lastTemp =65.0f;private int tempDirection = +1;private float lastHumility= 50.0f;private int humilityDirection = +1;private Random random = new Random(47);// Collections.synchronizedList() 看了内部实现和注释说明,这个工具主要是确保线程安全,防止资源竞争。它会在每一个list方法包装// 一层synchronized(){}语句块来确保线程安全,mutex默认this,可以指定List<DataPoint> data= Collections.synchronizedList(new ArrayList<DataPoint>());class CollectData implements Runnable{@Overridepublic void run() {System.out.println("收集数据");synchronized (F_GreenhouseScheduler.this){lastTime.set(Calendar.MINUTE,lastTime.get(Calendar.MINUTE)+30);if (random.nextInt(5)==4){tempDirection = -tempDirection;}lastTemp += tempDirection*(1.0f+random.nextFloat());;if (random.nextInt(5)==4){humilityDirection = -humilityDirection;}lastHumility+=humilityDirection*random.nextFloat();data.add(new DataPoint((Calendar)lastTime.clone(),lastTemp,lastHumility));}}}
}
public class E_ScheduledExecutor {public static void main(String[] args) {F_GreenhouseScheduler gh = new F_GreenhouseScheduler();gh.schedule(gh.new Terminate(),5000);gh.repeat(gh.new Bell(),0,1000);gh.repeat(gh.new thermostatNight(),0,2000);gh.repeat(gh.new LightOn(),0,   200);gh.repeat(gh.new LightOff(),0,400);gh.repeat(gh.new waterOn(),0,600);gh.repeat(gh.new waterOff(),0,800);gh.repeat(gh.new thermostatDay(),0,1400);gh.repeat(gh.new CollectData(),0,1000);gh.repeat(gh.new Bell(),500,500);}
}

en…上边的仿真程序看了一遍,写了一遍,没看懂什么意思。不过把里边涉及到的知识点理解了,就先这样吧,不纠结了,以后处理。

6 Semaphore

Semaphore:信号旗,在书中称之为计数信号量。这个是类似于lock、synchronized的锁,但是不同的是,它是通过一个计数来管理进入资源的。
当每个线程要进入一个方法时,会调用acquire()方法为每个线程分发一个permit,如果当前已经没有permit,那么将阻塞线程,并等到拿到permit后,再释放阻塞线程,可以通过release()来添加permit。
permit的总数量设置在在初始化Semaphore对象的时候进行设置,最多不可以超过初始化时的数量;在初始化时,可以设置fair是true还是false,若为true,则保证FIFO原则(先进先出),这个FIFO指的是调用acquire()的顺序,也就是说哪个线程先调用的acquire(),那么它将先获得permit;若为false,则刚好相反,不保证FIFO,那么很有可能出现无限等待的情况,相对来讲,肯定是false的性能更好一些。
不计时的tryAcquire()是不遵守FIFO原则的,即使fair为true,如果当前有可以获取的permit,那么它将直接获取,不会管是否有其他的等待线程,如果不想破坏FIFO原则,可以使用tryAcquire(0,TimeUnit.MILLISECONDS)来代替,是一样的意思。
注:在代码中实际是没有permit对象的,permit只是一个计数的信号量,并不是一个实际对象

在把书上的例子打出来后,我发现书上的例子和Semaphore类中提供的Example非常像。。。
大师的思想都是一致的

public class G_Semaphore {static class Pool<T> {private int size;private List<T> items = new ArrayList<>();private volatile boolean[] checkedOut;private Semaphore available;public Pool(Class<T> classObject, int size) {this.size = size;checkedOut = new boolean[size];/*** 初始化一个Semaphore,指定permit数量,并且指定fair为true*/available = new Semaphore(size, true);// 加载一个对象池来使用for (int i=0; i<size; ++i){try {items.add(classObject.newInstance());} catch (IllegalAccessException e) {e.printStackTrace();} catch (InstantiationException e) {e.printStackTrace();}}}private synchronized T getItem() {for (int i=0; i<size; ++i){if (!checkedOut[i]){checkedOut[i] = true;return items.get(i);}}return null;}private synchronized boolean releaseItem(T item) {int index = items.indexOf(item);if (index == -1){return false;}if(checkedOut[index]){checkedOut[index] = false;return true;}return false;}public T checkOut() throws InterruptedException {// 获取一个permit,若没有,则阻塞线程available.acquire();/*** acquire()的一些重载方法进行必要解释* available.tryAcquire();  // 可以破坏FIFO原则,和tryLock差不多,调用时可以获得permit则获得,返回true,否则立即返回false* available.tryAcquire(0,TimeUnit.MILLISECONDS); // 准守FIFO原则,若permit没有,则等待指定时间,超时返回false; 否则返回true* available.tryAcquire(10); // 破坏FIFO原则,尝试获得10个permit,若足够可以获得,那么返回true;否则返回false* available.tryAcquire(10,0,TimeUnit.MILLISECONDS); // 遵守FIFO原则,获取指定数量的permit,获取不到等待指定时间,超时返回false,否则返回true*/return getItem();}public void checkIn(T x) {if (releaseItem(x)){// 添加(释放)一个permit,最多不会超过初始化的数量available.release();
//                available.release(10); // 添加(释放)指定数量的permit}}}static class Fat {private volatile double d;private static int counter =0;private final int id = counter++;Random random = new Random(47);public Fat() throws InterruptedException {TimeUnit.MILLISECONDS.sleep(random.nextInt(500));}public void operation(){System.out.println(this);}@Overridepublic String toString() {return "Fat{" +"d=" + d +", id=" + id +'}';}}static class CheckoutTask<T> implements Runnable{private static int counter =0;private final int id = counter++;private Pool<T> pool;public CheckoutTask(Pool<T> pool) {this.pool = pool;}@Overridepublic void run() {try {T item = pool.checkOut();TimeUnit.SECONDS.sleep(1);System.out.println(this+"将存入"+item);pool.checkIn(item);} catch (InterruptedException e) {System.out.println("CheckoutTask 被中断"+this);}}@Overridepublic String toString() {return "CheckoutTask{" +"id=" + id +'}';}}public static void main(String[] args) throws InterruptedException {final int size = 25;final Pool<Fat> pool = new Pool<>(Fat.class,size);ExecutorService service = Executors.newCachedThreadPool();for (int i =0; i<size; i++){service.execute(new CheckoutTask<Fat>(pool));}System.out.println("全部CheckoutTask被创建");List<Fat> list = new ArrayList<>();for (int i=0; i<size; i++){Fat f=pool.checkOut();System.out.println(i+" _main 线程取出对象_ "+f);f.operation();list.add(f);}Future<?> blocked = service.submit(new Runnable() {@Overridepublic void run() {try {// 因为pool中所有对象已经全部被取出,所以这里将阻塞,直到调用cancel才会终止线程pool.checkOut();} catch (InterruptedException e) {System.out.println("Future<?> blocked 被中断");}}});TimeUnit.SECONDS.sleep(2);blocked.cancel(true);System.out.println("将list中的对象放入pool中:"+list);for (Fat f:list){pool.checkIn(f);}// 多次向线程中放入对象,是没有影响的for (Fat f:list){pool.checkIn(f);}service.shutdownNow();}
}

7 Exchanger

Exchanger,这个名字和Rabbitmq中的Exchanger一样,可以翻译为交换机,当然Rabbitmq中的Exchanger是用来做消息路由,而这个java.util.concurrent.Exchanger真的知名识意,就是交换,类似于两个人的fair(集市),相当于我去卖东西,只够和一个人交易的,如果这个人还没到,那么就等着,如果这个人来了,那么就和他以物换物,他将他的货物给我,我将我的货物给他。
莫名有点黑帮交易的感觉出来了。

这个工具就是做两个线程之间交换对象所用,当然主要的是会保障线程安全。

public class H_Exchanger {static Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();public static void main(String[] args) {new Thread(new FillingLoop()).start();new Thread(new EmptyingLoop()).start();}static class FillingLoop implements Runnable {public void run() {DataBuffer currentBuffer = new DataBuffer("full");try {System.out.println("FillingLoop 开始前:"+currentBuffer);if (currentBuffer.isFull()){// 等待交换对象,若另一个线程没有到达交换点,则阻塞等待currentBuffer = exchanger.exchange(currentBuffer);System.out.println("FillingLoop 交换后:"+currentBuffer);}else {Thread.currentThread().interrupt();}} catch (InterruptedException ex) {System.out.println("FillingLoop 被中断");}}}static class EmptyingLoop implements Runnable {public void run() {DataBuffer currentBuffer =  new DataBuffer("");try {System.out.println("EmptyingLoop 开始前:"+currentBuffer);if (currentBuffer.isEmpty()) {currentBuffer = exchanger.exchange(currentBuffer);System.out.println("EmptyingLoop 交换后:"+currentBuffer);}else {Thread.currentThread().interrupt();}} catch (InterruptedException ex) {System.out.println("FillingLoop 被中断");}}}static class DataBuffer {private String state;public DataBuffer(String state) {this.state = state;}public boolean isFull() {return state.equals("full") ;}public boolean isEmpty() {return state.equals("");}@Overridepublic String toString() {return "DataBuffer{" +"state='" + state + '\'' +'}';}}
}

这个例子并不是书中的例子,因为书中的例子,这块少了一个对象,所以干脆就写了一个简单的案例来描述这个工具类的作用。

八、仿真

根据书中案例,这个仿真程序呢,是用于生产汽车的机器人组装线,每辆汽车都将分为多个阶段构建,从创建底盘开始,紧跟着是安装发动机、车厢和轮子,最后汇报汽车的完成情况。下面看例子

public class A_Assignment {public static void main(String[] args) throws InterruptedException {// 阻塞队列,实现线程间的协调CarQueue chassisQueue = new CarQueue(), finishingQueue = new CarQueue();// 线程池ExecutorService service = Executors.newCachedThreadPool();// 机器人池,类似于线程池的容器概念,每一个生产机器人需要在这个中取RobotPool pool = new RobotPool();service.execute(new EngineRobot(pool));service.execute(new DriveTrainRobot(pool));service.execute(new WheelRobot(pool));service.execute(new Assembler(chassisQueue,finishingQueue,pool));// 这里和书中案例有所不同,书中案例只有一个线程对于chassisBuilder的执行,但是考虑到生产汽车的机器人流水线主要的产能瓶颈在于生产chassisBuilder,所以使用多个线程运行来提高产能ChassisBuilder chassisBuilder = new ChassisBuilder(chassisQueue);service.execute(chassisBuilder);service.execute(chassisBuilder);service.execute(chassisBuilder);// 查看十秒钟内的产线产能TimeUnit.SECONDS.sleep(10);service.shutdownNow();// 打印最后生产出来的汽车,书中例子是放到上面线程池中执行的,是实时汇报生产的情况,但是打印到console中,有点乱,放到这里最后统一打印吧new Thread(new Reporter(finishingQueue)).start();}// car 实体static class Car {// 每辆汽车唯一标识private final int id;// 汽车组装三种状态private boolean engine = false, driveTrain = false, wheels=false;public Car(int id) {this.id = id;}public Car() {this.id = -1;}public synchronized int getId() {return id;}public synchronized void addEngine() {this.engine = true;}public synchronized void addDriveTrain() {this.driveTrain = true;}public synchronized void addWheels() {this.wheels = true;}@Overridepublic synchronized String toString() {return "Car{" +"id=" + id +", engine=" + engine +", driveTrain=" + driveTrain +", wheels=" + wheels +'}';}}// 阻塞队列static class CarQueue extends LinkedBlockingQueue<Car> {}// 流水线 机器人池static class RobotPool {// Set容器,并不是k-v的形式,Set中元素不可以重复,因此只可以有一个nullprivate Set<Robot> pool = new HashSet<Robot>();// 添加机器人public synchronized void add(Robot r) {pool.add(r);notifyAll();}/*** 雇佣机器人* @param robotType 机器人类型* @param d 流水线*/public synchronized void hire(Class<? extends Robot> robotType, Assembler d) throws InterruptedException {for (Robot r : pool){if (r.getClass().equals(robotType)){// 取出对应机器人pool.remove(r);// 指派机器人到对应流水线r.assignAssembler(d);// 开始工作r.engage();return;}}wait();// 递归hire(robotType,d);}// 返还机器人public synchronized void release(Robot robot) {this.add(robot);}}// 生产汽车底盘,生产汽车第一步static class ChassisBuilder implements Runnable{private CarQueue cars;// 累计生产汽车标识private int counter =0;public ChassisBuilder(CarQueue cars) {this.cars = cars;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()) {TimeUnit.MILLISECONDS.sleep(500);// 制作底盘Car c = new Car(incrementCounter());System.out.println("创建了一辆车_"+c);cars.put(c);}} catch (InterruptedException e) {System.out.println("ChassisBuilder 被中断");}}// 通过synchronized来防止多个线程之间的资源强占private synchronized int incrementCounter(){return this.counter++;}}// 组装汽车流水线static class Assembler implements Runnable{private CarQueue chassisQueue, finishingQueue;private Car car;// 这个组件用来同步多线程间的协调问题,这里barrierAction是null,也就是没有触发行为private CyclicBarrier barrier = new CyclicBarrier(4);private RobotPool robotPool;public Assembler(CarQueue chassisQueue, CarQueue finishingQueue, RobotPool robotPool) {this.chassisQueue = chassisQueue;this.finishingQueue = finishingQueue;this.robotPool = robotPool;}public Car car() {return car;}public CyclicBarrier barrier() {return barrier;}@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){this.car = chassisQueue.take();// 雇佣robotPool.hire(EngineRobot.class,this);robotPool.hire(DriveTrainRobot.class,this);robotPool.hire(WheelRobot.class,this);// barrier 等待其他线程完成barrier.await();finishingQueue.put(this.car);}} catch (InterruptedException | BrokenBarrierException e) {System.out.println("Assembler 被打断");}}}// 报告汽车完成情况static class Reporter implements Runnable {private CarQueue cars;public Reporter(CarQueue cars) {this.cars = cars;}@Overridepublic void run() {try {while (!cars.isEmpty()){System.err.println(cars.take());}} catch (InterruptedException e) {System.out.println("Reporter 被中断");}System.out.println("离开 Reporter");}}// Robot 抽象基类static abstract class Robot implements Runnable {private RobotPool pool;public Robot(RobotPool pool) {this.pool = pool;}protected Assembler assembler;public Robot assignAssembler(Assembler assembler){this.assembler = assembler;return this;}// 是否开始工作private boolean engage = false;public synchronized void engage() {engage = true;notifyAll();}// 由各个子类实现的行为abstract protected void performService();@Overridepublic void run() {try {powerDown();while (!Thread.currentThread().isInterrupted()){performService();assembler.barrier().await();powerDown();}} catch (InterruptedException | BrokenBarrierException e) {System.out.println("Robot 被中断");}System.out.println("离开 Robot");}// 释放资源private synchronized void powerDown() throws InterruptedException {engage = false;assembler = null;pool.release(this);while (engage == false){wait();}}@Overridepublic String toString() {return getClass().getName();}}static class EngineRobot extends Robot{public EngineRobot(RobotPool pool) {super(pool);}@Overrideprotected void performService() {System.out.println(this+"安装 引擎");assembler.car().addEngine();}}static class DriveTrainRobot extends Robot{public DriveTrainRobot(RobotPool pool) {super(pool);}@Overrideprotected void performService() {System.out.println(this+"安装 动力装置");assembler.car().addDriveTrain();}}static class WheelRobot extends Robot{public WheelRobot(RobotPool pool) {super(pool);}@Overrideprotected void performService() {System.out.println(this+"安装 轮子");assembler.car().addWheels();}}
}

例子很长,但是还是比较好理解的。

九、性能调优

9.1 比较各类互斥技术

这一节主要是对比各个互斥技术在性能方面的表现,主要是针对synchronized、lock、atomic进行测试。

微基准测试:指在隔离的、脱离上下文环境的情况下对某个特性进行性能测试

下面这个书中案例,为了免jvm对于代码的特殊优化,而进行的必要的读取和写入操作,并且进一步复杂化程序和不可预测性,以使得jvm没有机会执行积极优化。
以下根据书中实例进行了部分调整,而且BaseLine是一个有问题的测试,因为在BaseLine中是没有互斥的,多线程在公共资源这里,造成了竞争,间接导致了一些问题,比如本实例中的ArrayIndexOutOfBoundsException,这个我尝试了优化,在不保证结果正确的情况下,解决了异常问题。

public class A_CompareConcurrentTech {// 模板方法设计模式static abstract class Accumulator {// 线程中循环执行读取和写入操作次数public static long cycles = 50000L;// 线程数量private static final int N =4;// 线程池public static ExecutorService service = Executors.newFixedThreadPool(N*2);// 协调每一个线程之间的进度private static CyclicBarrier barrier = new CyclicBarrier(N*2+1);// preLoaded 中坐标protected volatile int index =0;// preLoaded 中累加的值protected volatile long value =0;// 执行线程所花费时间protected long duration =0;// 唯一标识protected String id = "error";// preLoaded 数组中的元素个数protected final static int SIZE = 100000;// 预先设置的数据集合protected static int[] preLoaded = new int[SIZE];static {// 初始化preLoaded数组Random random = new Random(47);for (int i=0; i< SIZE; i++){preLoaded[i] = random.nextInt();}}// 模板方法public abstract void accumulate();public abstract Long read();// 写入任务private class Modifier implements Runnable{@Overridepublic void run() {for (Long i =0L;i<cycles; i++){accumulate();}try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}// 读取任务private class Reader implements Runnable{private volatile Long value;@Overridepublic void run() {for (Long i =0L;i<cycles; i++){value = read();}try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}}// 测试执行多个线程进行读取写入所花费的时间public void timedTest() {Long start = System.nanoTime();for (int i=0; i<N; i++){service.execute(new Modifier());service.execute(new Reader());}try {barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}duration = System.nanoTime()-start;System.out.printf("%s:%d\t,%s:%d\t,%s:%d\n",id,duration,"index",index,"value",value);}// 线程不同互斥机制之间的性能差异public static void report(Accumulator acc1, Accumulator acc2){System.out.printf("%s:%f\n",acc1.id+"/"+acc2.id,(double)acc1.duration/acc2.duration);}}/*** 无互斥条件,这里会导致ArrayIndexOutOfBoundsException,因为多个线程在运行到*             if(index>=SIZE) {  // index=99999,不符合条件*                 index =0;*                 return;*             }*             value+=preLoaded[index++];  // 第一个线程修改了index后,index得到100000,第二个线程因为index被volatile标识,保证了可视性,那么将会数组越界**/static class BaseLine extends Accumulator{{id = "BaseLine"; }@Overridepublic void accumulate() {// 这里将index加N来确定数组的临界点if(index+Accumulator.N>=SIZE) {index =0;return;}value+=preLoaded[index++];}@Overridepublic Long read() {return value;}}// 测试synchronized锁static class SynchronizedTest extends Accumulator{{id = "synchronized"; }@Overridepublic synchronized void accumulate() {value+=preLoaded[index++];if(index>=SIZE) {index =0;}}@Overridepublic synchronized Long read() {return value;}}// 测试Lock锁,Lock在性能表现上要比synchronized要好一些static class LockTest extends Accumulator{{id = "Lock"; }private Lock lock = new ReentrantLock();@Overridepublic void accumulate() {lock.lock();try {value+=preLoaded[index++];if(index>=SIZE) {index =0;}}finally {lock.unlock();}}@Overridepublic Long read() {lock.lock();try {return value;}finally {lock.unlock();}}}// 测试根据Atomic来保证线程安全,但是Atomic并不适合有多个原子性操作的场景,因为在多个原子性操作中间可能会发生上下文// 切换,作者希望通过这个来展示Atomic的性能优势。// 注:如果涉及到多个Atomic对象,那么就不可以使用Atomic来确保线程的一致性了,而应该使用更加常规的操作。JDK文档中也有声明,当对一个对象的临界更新被限制为只涉及单个变量时,才可以考虑使用这个方式。static class AtomicLine extends Accumulator{{id = "Atomic"; }// Atomic类private AtomicInteger index=new AtomicInteger(0);private AtomicLong value = new AtomicLong(0);@Overridepublic void accumulate() {int i =index.getAndIncrement();// 这里可能发生上下文切换,如此操作会造成最后结果不一致if (++i>=SIZE){index.set(0);return;}value.getAndAdd(preLoaded[i]);}@Overridepublic Long read() {return value.get();}}// 测试各个互斥条件的不同static class SynchronizationComparisons {static BaseLine baseLine = new BaseLine();static SynchronizedTest sync= new SynchronizedTest();static LockTest lock = new LockTest();static AtomicLine atomic = new AtomicLine();static void test() {System.out.println("===================================");System.out.printf("%s:%d\n","Cycles", Accumulator.cycles);baseLine.timedTest();sync.timedTest();lock.timedTest();atomic.timedTest();Accumulator.report(sync,baseLine);Accumulator.report(lock,baseLine);Accumulator.report(atomic,baseLine);Accumulator.report(sync,lock);Accumulator.report(sync,atomic);Accumulator.report(lock,atomic);}}public static void main(String[] args) {int iterations = 5;System.out.println("开始:");// 在开始的时候完成所有线程的创建,以免在测试过程中产生任何额外的开销,确保性能测试的准确性SynchronizationComparisons.baseLine.timedTest();for (int i=0; i<iterations;i++){SynchronizationComparisons.test();Accumulator.cycles*=2;}Accumulator.service.shutdown();}
}

如果测试过,很明显可以看出来Lock通常比使用synchronized要高效很多,而且synchronized的开销看起来变化范围比较大,而Lock相对比较统一。
上面实例中,有一个非常重要的点就是方法体是非常短的,这个符合多线程的好习惯,那就是互斥你必须互斥的部分。但是实际工作中,需要互斥的部分往往比上面要大的多,因此在这些方法体重花费的时间的百分比可能会明显大于进入和退出互斥的开销,这样就体现不到提高互斥速度而带来的所有好处了。
代码中有一个很重要的标准,那就是可读性,上面实例中可以看出来,synchronized关键字比Lock所需的“lock-try/finnal-unLock”要有更好的可读性。在程序中代码的阅读次数是远远大于被编写的次数的,尤其是在团队合作的时候,与其他人交流相对于与计算机交流而言,要重要的多。
因此可以得出一个结论,编写多线程代码时,以synchronized 入手,只有在性能调优的时候才考虑替换为Lock对象这种方式。
关于Atomic,那可能是更优解了,但是一定要符合场景,并不是所有场合下都是那么完美的。

9.2 免锁容器

首先,加强一个认知,容器是所有编程中的基础工具。
一般的容器类库是不同步的,但是在Conllections类提供了各种static的同步的装饰方法,从而来同步不同步的容器。基本上这个同步机制是基于synchronized加锁机制的。

在jdk5的时候添加了新的容器,那么就是通过使用更加灵巧的技术来消除加锁,从而提高线程安全的性能。
这些免锁容器背后的通用策略就是:对容器的修改可以与读取操作同时发生,只是读取者只能看到完成修改的结果即可。修改是在容器数据结构的某个部分的一个单独副本(有时是整个数据结构的副本)上执行的,并且这个副本在修改过程中是不可视的。只有当修改完成时,被修改的部分才会自动地与注数据结构进行交换,之后读取者就可以看到这个修改了。
是不是对这个概念有点眼熟,以前这部分内容在工作中不常见到,但是与之类似的mysql事务,应该都是非常了解的,事务基本和这个概念类似,每个事务看成一个线程,当事务的隔离级别是读已提交时,也就是第二个隔离级别,那么当一个事务对数据进行修改时,另一个事务可以对同一行数据进行查询,但是只能查询未修改的值。当第一个事务提交了,那么第二个事务就可以查看提交的值了。
书中介绍了四种免锁容器,分别是CopyOnWriteArrayListCopyOnWriteArraySetConcurrentHashMapConcurrentLinkedQueue

  • CopyOnWriteArrayList:写入将导致创建整个底层数组的副本,而源数组将保留在原地,使得复制的数组在被修改时,读取操作可以正常进行。
  • CopyOnWriteArraySet:底层使用CopyOnWriteArrayList来实现,因此与上面一致。
  • ConcurrentHashMapConcurrentLinkedQueue:允许并发的读取和写入,但是容器中不再是创建整个副本,而是只有部分内容被复制和修改。

以上分为创建整个副本和部分副本,我认为这个主要是数据结构产生的差异,对于线性表而言,必须创建整个副本才可以对整个线性表进行修改。而对于Hash表、链表,只对部分进行修改就可以了。

9.2.1 乐观锁

由上面内容,可以知道免锁容器针对于主要是读取操作的是有性能提高的,它会比synchronized要快的多,即使有少量的写入操作,依然是如此的,但是这个少量界定就很有意思了,这个需要进行测试,这个也就是下面这个书中的案例。

public class B_unLockContainer {abstract static class Tester<C> {// 重复测试次数static int testReps = 10;// 读写重复次数static int testCycles = 1000;// 容器大小static int containerSize = 1000;// 初始化容器abstract C containerInitializer();// 开始读写操作abstract void startReadersAndWriters();// 容器C testContainer;// 标识String testID;// 读取线程数int nReaders;// 写入线程数int nWriters;volatile long readResult = 0;// 读取所花费时间volatile long readTime = 0;// 写入所花费时间volatile long writeTime =0;// 协调线程CountDownLatch endLatch;static ExecutorService service = Executors.newCachedThreadPool();// 写入数据Integer[] writeData = new Integer[containerSize];public Tester(String testID, int nReaders, int nWriters) {this.testID = testID+" "+nReaders+"r "+nWriters+"w";this.nReaders = nReaders;this.nWriters = nWriters;// 初始化写入数据for (int i=0; i<containerSize; i++){writeData[i] = new Random(47).nextInt();}for (int i=0; i<testReps; i++){runTest();readTime=0;writeTime=0;}}// 执行测试,并且输出执行时间void runTest(){endLatch = new CountDownLatch(nReaders+nWriters);testContainer = containerInitializer();startReadersAndWriters();try {endLatch.await();} catch (InterruptedException e) {e.printStackTrace();}System.out.printf("%s %d %d\t",testID,readTime,writeTime);if (readTime!=0&&writeTime!=0){System.out.println("readTime+writeTime="+(readTime+writeTime));}}// 执行任务abstract class TestTask implements Runnable{// 测试读或者写abstract void test();// 统计结果abstract void putResults();long duration;@Overridepublic void run() {long startTime = System.nanoTime();test();duration = System.nanoTime()-startTime;synchronized (Tester.this){putResults();}endLatch.countDown();}}}// List集合static abstract class ListTest extends Tester<List<Integer>>{public ListTest(String testID, int nReaders, int nWriters) {super(testID, nReaders, nWriters);}// 读class Reader extends TestTask {long result =0L;@Overridevoid test() {for (long i =0L; i<testCycles; i++){for (int index=0; index<containerSize;index++){result+=(testContainer.get(index)==null?0:testContainer.get(index));}}}@Overridevoid putResults() {readResult += result;readTime += duration;}}// 写入class Writer extends TestTask {@Overridevoid test() {for (long i =0L; i<testCycles; i++){for (int index=0; index<containerSize;index++){// 使用特定元素来代替list集合中特定位置的元素testContainer.set(index,writeData[index]);}}}@Overridevoid putResults() {writeTime += duration;}}// 通过多个线程来进行读写操作@Overridevoid startReadersAndWriters() {for (int i=0;i<nReaders; i++){service.execute(new Reader());}for (int i=0;i<nWriters; i++){service.execute(new Writer());}}}// synchronized互斥机制static class SynchronizedArrayListTest extends ListTest{public SynchronizedArrayListTest(int nReaders, int nWriters) {super("SynchronizedArrayList", nReaders, nWriters);}@OverrideList<Integer> containerInitializer() {return Collections.synchronizedList(new ArrayList<Integer>(Arrays.asList(new Integer[containerSize])));}}// 免锁容器static class CopyWriteArrayListTest extends ListTest{public CopyWriteArrayListTest(int nReaders, int nWriters) {super("CopyWriteArrayList", nReaders, nWriters);}@OverrideList<Integer> containerInitializer() {return new CopyOnWriteArrayList<Integer>(Arrays.asList(new Integer[containerSize]));}}static class ListComparisons {public static void main(String[] args) {new SynchronizedArrayListTest(10,0);new SynchronizedArrayListTest(9,1);new SynchronizedArrayListTest(5,5);new SynchronizedArrayListTest(3,7);new SynchronizedArrayListTest(1,9);new CopyWriteArrayListTest(10,0);new CopyWriteArrayListTest(9,1);new CopyWriteArrayListTest(5,5);new CopyWriteArrayListTest(3,7);new CopyWriteArrayListTest(1,9);Tester.service.shutdown();}}// Map集合static abstract class MapTest extends Tester<Map<Integer,Integer>>{public MapTest(String testID, int nReaders, int nWriters) {super(testID, nReaders, nWriters);}// 读class Reader extends TestTask {long result =0L;@Overridevoid test() {for (long i =0L; i<testCycles; i++){for (int index=0; index<containerSize;index++){result+=(testContainer.get(index)==null?0:testContainer.get(index));}}}@Overridevoid putResults() {readResult += result;readTime += duration;}}// 写入class Writer extends TestTask {@Overridevoid test() {for (long i =0L; i<testCycles; i++){for (int index=0; index<containerSize;index++){// 使用特定元素来代替list集合中特定位置的元素testContainer.put(index,writeData[index]);}}}@Overridevoid putResults() {writeTime += duration;}}// 通过多个线程来进行读写操作@Overridevoid startReadersAndWriters() {for (int i=0;i<nReaders; i++){service.execute(new Reader());}for (int i=0;i<nWriters; i++){service.execute(new Writer());}}}// synchronized互斥机制static class SynchronizedHashMapTest extends MapTest{public SynchronizedHashMapTest(int nReaders, int nWriters) {super("SynchronizedHashMap", nReaders, nWriters);}@OverrideMap<Integer,Integer> containerInitializer() {return Collections.synchronizedMap(new HashMap<>());}}// 免锁容器static class ConcurrentHashMapTest extends MapTest{public ConcurrentHashMapTest(int nReaders, int nWriters) {super("ConcurrentHashMap", nReaders, nWriters);}@OverrideMap<Integer,Integer> containerInitializer() {return new ConcurrentHashMap<Integer,Integer>();}}static class MapComparisons {public static void main(String[] args) {new SynchronizedHashMapTest(10,0);new SynchronizedHashMapTest(9,1);new SynchronizedHashMapTest(5,5);new SynchronizedHashMapTest(3,7);new SynchronizedHashMapTest(1,9);new ConcurrentHashMapTest(10,0);new ConcurrentHashMapTest(9,1);new ConcurrentHashMapTest(5,5);new ConcurrentHashMapTest(3,7);new ConcurrentHashMapTest(1,9);Tester.service.shutdown();}}
}

对书中案例进行部分修改,主要是书中案例无法测试,有些类没有的原因,主要是看测试效果。以上同样是使用尽可能的复杂的代码逻辑来防止jvm优化。这里没有把结果打出来,本篇已经太长了,主要说结果吧。
像上面案例中一样,假设有十个线程同时对某个容器进行操作,那么读线程和写线程根据业务的不同肯定比例有所不同,尤其是针对于免锁容器的实现原理来看,写操作可能会影响免锁容器的性能,但是在机器中的实际情况来看,当10线程中有五个写线程,整体的性能还是高于synchronized的。
ConcurrentHashMap在这方面更加明显,因为ConcurrentHashMap使用一种不同的技术,可以明显地最小化写入所造成的影响。

9.3 乐观加锁

本来还以为这节有难度,结果这节应该算是后面来比较简单的概念了,这节就是乐观锁,和数据库中的乐观锁差不多一个概念,不管是进入方法,还是改变对象都是不加锁的,但是会在改变对象域的进行一个比较,如对象域的值依然等于当时拿出来的值,那么就进行一次原子性的更新操作。这个Atomic对象就可以提供这个样的支持。

public class C_FastSimulation {static final int N_ELEMENTS = 100000;static final int N_GENES =30;static final int N_EVOLVERS = 50;static final AtomicInteger[][] GRID = new AtomicInteger[N_ELEMENTS][N_GENES];static final AtomicInteger id = new AtomicInteger(0);static Random random = new Random(47);static class Evolver implements Runnable {@Overridepublic void run() {while (!Thread.interrupted()) {int element = random.nextInt(N_ELEMENTS);for (int i =0; i<N_GENES; i++){int previous = element-1;if(previous<0){previous = N_ELEMENTS-1;}int next = element+1;if (next>=N_ELEMENTS){next =0;}int oldValue = GRID[element][i].get();int newValue = oldValue + GRID[previous][i].get()+GRID[next][i].get();newValue/=3;// 这个类似于数据库乐观锁版本号的概念,如果给定的oldValue等于现在的值,那么进行一次原子性更新,并返回true;// 如果不等于,则返回falseif (!GRID[element][i].compareAndSet(oldValue,newValue)){System.out.println(id.incrementAndGet()+",有一个值在操作时发生了变化:"+oldValue);}}}}}public static void main(String[] args) throws InterruptedException {ExecutorService service = Executors.newCachedThreadPool();for (int i=0; i<N_ELEMENTS; i++){for (int j =0;j<N_GENES;j++){GRID[i][j] = new AtomicInteger(random.nextInt());}}for (int i=0;i<N_EVOLVERS;i++){service.execute(new Evolver());}TimeUnit.SECONDS.sleep(10);service.shutdownNow();}
}

9.4 ReadWriteLock

这个和前面的免锁容器有些共同之处,不太清楚免锁容器是不是使用了这个技术,ReadWriteLock指向数据结构相对不频繁的写入,但是有多个线程经常读取这个数据结构时可以使用当前这个锁。ReadWriteLock允许有多个读取者,只要他们不试图写入就可以了。如果写锁已经被其他线程所占有,那么任何读取者都不可以访问,直到写锁被释放。也就是说这里,只有写锁会强占资源,阻塞其他线程访问。
这个ReadWriteLock对于性能提高取决于对数据读取的频率与被修改的频率相比较的结果,当然这样做锁也会变得更加复杂,因此短操作并不能带来好处。

那么如果这个时候读锁正在占用,但是写锁要开始进入怎么办?当然是写锁等待读取操作完成,然后在拿到锁。
在最开始编写多线程程序的时候最好选择最直观的方式去完成,比如synchronized,只有在提高性能的时候才可以考虑这个方案

public class D_ReaderWriterLock {static class ReaderWriterList<T> {private ArrayList<T> lockedList;// 创建对象的时候指定是true或者false(默认),这个指的是是否保证公平竞争// 这个概念前面也提到过,但是忘记是哪个了。大概指的是先进先出(FIFO)的概念,也就是// 说当有多个线程同时竞争这个锁的时候,那么等待时间最长的那个线程将获取这个锁,这就是所谓的公平竞争。设置为true,将保证这个规则。// 但是在ReentrantReadWriteLock中是有Write锁和Read锁,可以将等待Read锁的一组线程看做一个,将等待Write锁的每个线程看做单独的一个,谁等待时间长就由谁获取锁。// false:首先肯定是更加的简单,性能更高,但是可能会出现一个线程持续等待的情况发生private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);public ReaderWriterList(int size, T initiaValue) {this.lockedList = new ArrayList<T>(// 返回一个copy指定的initiaValue对象size次的集合,也就是返回的list集合中有size个一样的initiaValue对象Collections.nCopies(size,initiaValue));}public T set(int index, T element){// 写锁Lock wlock = lock.writeLock();wlock.lock();try{// 使用指定的元素替换list集合中指定位置的元素return lockedList.set(index,element);}finally {wlock.unlock();}}public T get(int index){// 读锁Lock rlock = lock.readLock();rlock.lock();try {// 获取当前锁的读锁被持有的数量,这个是为了监听系统状态,不是为了同步使用;// lock.getReadHoldCount(); 当前线程持有锁的数量if (lock.getReadLockCount()>1){System.out.println(lock.getReadLockCount());}return lockedList.get(index);}finally {rlock.unlock();}}}static class ReaderWriterListTest {ExecutorService service = Executors.newCachedThreadPool();private final static int SIZE = 100;private static Random random = new Random(47);private ReaderWriterList<Integer> list = new ReaderWriterList<>(SIZE,0);private class Writer implements Runnable{@Overridepublic void run() {try {for (int i=0; i<20; i++){list.set(i, random.nextInt());TimeUnit.MILLISECONDS.sleep(100);}} catch (InterruptedException e) {e.printStackTrace();}System.out.println("Writer 已经完成,将要终止线程...");service.shutdownNow();}}private class Reader implements Runnable{@Overridepublic void run() {try {while (!Thread.currentThread().isInterrupted()){for (int i=0; i<SIZE; i++){list.get(i);TimeUnit.MILLISECONDS.sleep(1);}}} catch (InterruptedException e) {e.printStackTrace();}}}public ReaderWriterListTest(int readers, int writers) {for (int i=0; i<readers; i++){service.execute(new Reader());}for (int i=0; i<writers; i++){service.execute(new Writer());}}}public static void main(String[] args) {new ReaderWriterListTest(30,2);}

十 活动对象

通过上面的内容,不难发现java的线程机制是比较复杂的,写到这里,我认为我远远不能称之为熟悉多线程,最多就是基础的了解,自大一些说是整体地了解了一下,后续肯定还需继续对这些内容进行学习的。言归正传,在java中是没有任何编译器检查形式的安全防护措施,因此在这里这本书的作者提出了另外一个理论,那就是活动对象,或者运动者,当然这是中译后的称呼,书中是ActionObject。
下面对这个概念进行一个解释,其实类似于代理模式,之所以称这些对象是是“活动的”,是因为每个对象都维护着它自己的工作器线程以及消息队列,并且所有对这种对象的请求都将进入队列排队,任何时刻都只能运行其中的一个。因此可以串行化消息而不是方法,这也意味着不用再防备一个线程在其循环的过程中间被中断的这种问题了。
当你向一个活动对象发送消息时,这条消息转换为一个任务,该任务会插入到这个对象的队列中,等待以后的某个时刻运行。可以使用Future来实现这个模式。

public class A_ActionObject {// 单线程池,序列化任务private ExecutorService service = Executors.newSingleThreadExecutor();private Random random = new Random(47);// 模拟程序耗时private void pause( int factor) {try {TimeUnit.MILLISECONDS.sleep(random.nextInt(factor)+100);} catch (InterruptedException e) {e.printStackTrace();}}// 计算方法,调用时立即返回Futurepublic Future<Integer> calculateInt(final int x, final int y){return service.submit(new Callable<Integer>() {@Overridepublic Integer call() throws Exception {System.out.println("开始,x:"+x+",y:"+y);pause(500);return x+y;}});}public Future<Float> calculateFloat(final float x, final float y){return service.submit(new Callable<Float>() {@Overridepublic Float call() throws Exception {System.out.println("开始,x:"+x+",y:"+y);pause(500);return x+y;}});}// 停止处理任务public void shutdown(){service.shutdown();}public static void main(String[] args) {A_ActionObject actionObject = new A_ActionObject();// 免锁容器List<Future<?>> results = new CopyOnWriteArrayList<>();for (float f =0f; f<1.0f; f+=0.2f){results.add(actionObject.calculateFloat(f,f));}for (int i=0; i<5; i++){results.add(actionObject.calculateInt(i,i));}System.out.println("请求完成");while (results.size() > 0){for (Future<?> f:results){// 任务完成返回true,任务正常结束、异常、取消,都是完成if (f.isDone()){try {// 完成后拿到结果,如果没有完成将等待System.out.println(f.get());} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}results.remove(f);}}}actionObject.shutdown();}
}

上面这样的代码方式有什么好处呢?

  1. 每个对象都可以维护自己的工作线程
  2. 每个对象都将维护对自己域的所有控制权。(普通类只是拥有防护他们域的选择权)
  3. 所有在活动对象之间的通信都将以在这些对象之间的消息的形式发生。
  4. 活动对象之间的所有消息都是要排队的。

那又有什么缺点呢?
缺点也很明显,那就是没有直接的编译器支持这一操作,这样的编码方式太过麻烦了。

十一、21章小结

上面内容全部都是java线程的基础知识,线程的好处无外乎是提供了轻量级的上下文切换,而不是重量级的进程切换。
因为给定的进程内的所有线程共享相同的内存空间,轻量级的上下文切换只是改变了程序的执行序列和局部变量;进程切换的话,是改变所有的内存空间的。

提到优点,那么不得不提缺点了,缺点也很多,其实上面很多知识都是在帮助我们规避或减少多线程的缺点,深入了解多线程也就是深入了解多线程的不足,那么缺点是什么呢?

  1. 等待共享资源的时候性能降低;
  2. 需要处理线程的额外CPU消耗;
  3. 糟糕的程序设计导致不必要的复杂度;
  4. 有可能产生一些病态行为,比如饿死、竞争、死锁和活锁(多个运行在各自线程的任务,使得整体无法完成);
  5. 不同平台导致不一致性;这个主要指一些线程问题可能在一些系统中是不易出现的,但是在另外一些系统中又比较容易出现。

书中提到如果线程问题变得大而复杂,那么可以考虑使用Erlang这样的语言,这是专门用于线程机制的几种函数型语言之一,这也将是我以后的学习计划。预估两年后把

总结

来个小总结吧,有些感慨,这一章我看了两遍,案例写了两遍,耗时很久,估计四五个月都是有的,学习这个的过程中因为工作、其他学习计划或者什么都是把这个耽误下来,现在算是暂时告一段落了,但是还远远不能称之为学会,这一章还是要反复去看去研究,但是还有其他的安排,所以再隔一段时间再来温习这个内容。
这一章我写了大概有八万多字,这是一个学习的过程,不能用辛苦不辛苦来形容,但是我相信如果有朋友认真从上面一点点看下来,那么你肯定很辛苦,因为有些概念我并不能很好地用语言形容出来,如果有时间建议还是看一下《java编程思想》这一本书,收益良多!当然中文版确实有些翻译内容有点看的费劲,有能力还是推荐英文版。就这样,共勉!

java编程思想学习笔记——21多线程相关推荐

  1. Java编程思想学习笔记-第11章

    <?xml version="1.0" encoding="utf-8"?> Java编程思想学习笔记-第11章 Java编程思想学习笔记-第11章 ...

  2. JAVA编程思想学习笔记——第一章 对象导论

    搞了一年多java,野路子出身,发现java基础这块还是相当的薄弱!故决定学习<Java编程思想>这本书.在此把学习的知识点记录下! 面向对象的五大特性 1.万物皆为对象 2.程序是对象的 ...

  3. Java编程思想学习笔记4 - 序列化技术

    今天来学习下Java序列化和反序列化技术,笔者对<Java编程思想>中的内容,结合网上各位前辈的帖子进行了整理和补充,包括: 序列化概述 Java原生序列化技术 Hessian序列化技术 ...

  4. Java编程思想 学习笔记1

    一.对象导论 1.抽象过程 Alan Kay曾经总结了第一个成功的面向对象语言.同时也是Java所基于的语言之一的Smalltalk的五个基本特性,这些特性表现了纯粹的面向对象程序设计方式 1)万物皆 ...

  5. Java编程思想 学习笔记7

    七.复用类 1.组合语法 在新的类中产生现有类的对象.由于新的类是由现有类的对象所组成,所以这种方法叫做组合. 类中域为基本类型时能够自动被初始化为零.对象引用被初始化为null. 编译器不是简单地为 ...

  6. java编程思想 学习笔记(2)

    第二章     一切都是对象 用引用(reference)操纵对象 String s = "asdf"; String s; 但这里所创建的只是引用,并不是对象.如果此时向s 发送 ...

  7. java编程思想学习笔记(第七章:复用类)

    复用代码是java众多引人注目的功能之一.但是要想成为极具革命性的语言,仅仅能够复制代码并对之加以改变是不够的,它还必须能够做更多的事情. 7.1组合语法 将对象引用置于新类中.每一个非基本类型的对象 ...

  8. 01.Java 编程入门学习笔记20210307

    Java 编程入门学习笔记-day01 第0章:编程入门 1.计算机的概述 计算机 = 硬件 + 软件 1.1硬件:冯诺依曼体系 CPU: CPU的衡量标准:速度的计量单位是赫兹(Hz),1Hz相当于 ...

  9. JAVA编程思想读书笔记(三)--RTTI

    接上篇JAVA编程思想读书笔记(二) 第十一章 运行期类型判定 No1: 对于作为程序一部分的每个类,它们都有一个Class对象.换言之,每次写一个新类时,同时也会创建一个Class对象(更恰当的说, ...

  10. Java编程思想读书笔记(七)

    点击蓝字 关注我们 今天是端午节,在这里祝大家端午节安康.过节的同时也不要忘记知识储备,今天我 为大家带来了Java编程思想第七节多形性读书笔记.请大家一起品鉴,如果发现里面有啥写的不对的地方,请大家 ...

最新文章

  1. ggplot2可视化水平箱图并使用fct_reorder排序数据、使用na.rm处理缺失值(reorder boxplot with fct_reorder)、按照箱图的中位数从小到大排序水平箱图
  2. 【Linux】一步一步学Linux——pkill命令(126)
  3. Windows下PHP多线程扩展pthreads的安装
  4. Ibatis - Open quote is expected for attribute {1} associated with an element type '
  5. Xamarin中国峰会2019
  6. 利用 dbghelp.dll 生成 dump 文件
  7. php ecdsa secp256k1,从上的压缩派生ECDSA未压缩公钥
  8. 【个人笔记】OpenCV4 C++ 图像处理与视频分析 12课
  9. 2021 全国大学生电子设计竞赛题目
  10. 查询vue版本时报错:“文件名、目录名或卷标语法不正确“
  11. 电脑连接WiFi,浏览器打不开网页
  12. 33-Jenkins-修改插件源
  13. Lesson 38
  14. 朋友圈评论、点赞测试点
  15. (四)双击放大与缩小图片
  16. Hbase 热点问题3种解决方案
  17. CSS基础-02-基础选择器
  18. 胜博发表示玩游戏也能做公益!守望先锋为乳癌研究基金会募得一千多万美金
  19. 趣拿洞察:盒马上海实现盈利背后的“玄机”
  20. 高等工程数学第7 8章部分答案(吴)

热门文章

  1. python绘制彩色地震剖面断层解释_地震剖面上的断层分析及相关意义
  2. Ansys workbench静应力分析基本流程
  3. 51nod 1457:小K vs. 竹子
  4. 【资源下载】分享个嵌入式开发的入门教程(包含视频)
  5. mysql heartbeat_mysql管理工具之pt-heartbeat
  6. 台式机linux系统无线上网,CentOS 6.5 安装无线网卡驱动实现无线上网
  7. K8S各种各样的证书介绍
  8. 正态分布t个标准差范围内的概率
  9. 网页数据提取----网络投票软件开发
  10. LBS(基于位置服务(Location Based Service))