文章目录

  • 前言
  • 各类锁汇总
    • 相关锁知识点
    • 可重入锁与不可重入锁
    • 乐观锁与悲观锁
    • 自旋锁(含自定义自旋锁)
  • 知识补充
    • 上下文切换
    • CPU多层缓存架构
      • 介绍CPU的三级缓存
      • 缓存一致性协议
      • 导致的问题
  • 一、初识JUC
    • 1.1、JUC是什么?
    • 1.2、JUC三个包介绍
      • java.util.concurrent包
      • java.util.concurrent.locks包(含两个模板)
      • java.util.concurrent.atomic包
    • 1.3、synchronized与lock区别
  • 二、生产者消费者问题
    • 2.1、synchronized实现
    • 2.2、Lock实现(ReentrantLock)
      • 实现生产者消费者
      • 通过Condition实现精准通知唤醒
  • 三、8锁问题
  • 四、并发容器
    • 4.1、认识CopyOnWrite容器
    • 4.2、ConcurrentModificationException异常
      • 问题描述
      • 问题分析及解决(源码分析)
    • 4.3、读写集合类及并发类
      • ①CopyOnWriteArrayList
      • ②CopyOnWriteArraySet
      • ③ConcurrentHashMap
  • 五、Callable使用
  • 六、同步工具辅助类使用
    • CountDownLatch(倒计时器)
    • CyclicBarrier(循环珊栏)
    • semaphore(信号量)
    • 总结
  • 七、读写锁(ReentrantReadWriteLock)
    • 7.1、初识ReentrantReadWriteLock
    • 7.2、程序示例
  • 八、阻塞队列
    • 8.1、认识阻塞队列
    • 8.2、两个阻塞队列实现类
      • ①ArrayBlockingQueue
      • ②SynchroniousQueue(同步队列)
  • 九、线程池(重点)
    • 9.1、介绍线程池
    • 9.2、Executors创建线程池的三个方法
    • 9.3、自定义线程池(七大属性)
      • 认识ThreadPoolExecutor(详细介绍各个参数)
      • 介绍四大拒绝策略
    • 9.4、CPU密集型与IO密集型
  • 十、异步操作
    • 10.1、ForkJoin(并行计算)
      • 认识ForkJoin
      • 示例(整数累加)
    • 10.2、CompletableFuture(异步回调)
      • 介绍CompletableFuture
      • 示例(无返回值与有返回值)
    • 总结
  • 十一、CAS
    • 11.1、介绍与引出CAS
    • 11.2、CAS解决变量增值线程安全问题(AtomicInteger)
    • 11.3、两个案例的源码分析
    • 11.4、原子类中的ABA问题
      • 引出ABA问题
      • 问题模拟说明(问题源头)
      • 解决ABA问题(版本号机制,使用AtomicStampedReference)
    • 总结
  • 十二、死锁问题及排查
    • 12.1、死锁问题还原
    • 两种方式
  • 参考资料

前言

这段时间学习了下JUC,通过视频+博客文章+书籍让我对于一些并发知识的学习有了大致的了解,本章节内容demo示例见:Gitee-JUC学习。

博主文章汇总:博客目录索引(持续更新)


各类锁汇总

相关锁知识点

排他锁:同一时刻只允许一个线程访问共享资源。。Java中synchronizedReentrantLock实现。

读写锁:适用于共享资源读多写少的场景,分别读锁,写锁。Java中,如ReadWriteLock接口,包含实现类ReentrantReadWriteLock(包含读锁,写锁)。

  • 读操作(共享锁):同一时间允许多个线程对同一共享资源进行读操作。同一时刻所有线程的写操作会被阻塞。
  • 写操作(排他锁、独占锁):同一时间允许一个线程对同一共享资源进行写操作。同一时刻其他线程的读操作会被阻塞。

公平锁与非公平锁:一般情况下非公平锁性能比非公平锁高

private ReentrantLock lock = new ReentrantLock();//默认为非公平锁
private ReentrantLock lock1 = new ReentrantLock(true);//设置为公平锁
  • 非公平锁:一般默认是非公平锁,就是对于不同的线程请求获取同一个锁并不是公平来分配的,一般都是在这个锁的等待队列中随机挑选一个,并且与优先级也是有一点关系的,这种方式的话对于某些线程可能是不太公平的。
  • 公平锁:很公平的,按照时间的先后顺序,保证先到先得,后到后得,特点是不会产生饥饿现象,只要你排队最终还是能够等到资源的。
    • 哪些具有公平、非公平锁?如ReentrantLockReentrantReadWriteLock,默认是非公平锁。

互斥锁:对象互斥锁,用来保证共享数据操作的完整性,每个对象都对应于一个可称为" 互斥锁" 的标记,这个标记用来保证在任一时刻,只能有一个线程访问该对象。实现同步方法。

自旋锁:与互斥锁相同,一个执行单元要想访问被自旋锁保护的共享资源,必须先得到锁,在访问完共享资源后,必须释放锁。

  • 如果在获取自旋锁时,没有任何执行单元保持该锁,那么将立即得到锁;
  • 如果在获取自旋锁时锁已经有保持者,那么获取锁操作将自旋在那里,直到该自旋锁的保持者释放了锁。
  • 实际应用:原子包中的原子类许多方法都采用了自旋的操作,如原子引用AtomicReference类中的getAndSet(),以及通过使用cas操作我们来自定义锁,如下面一小节使用。

可重入锁与不可重入锁

可重入锁:可重入锁是可以进行反复进入的(上A锁内可以多次使用A锁,但需要注意释放否则会有阻塞),仅局限于在一个线程中。

  • synchronizedReentrantLock

不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。

  • 可定义实现。可见 Java不可重入锁和可重入锁理解

可重入锁(递归锁)

demo见demo9中的SynchronizedTest.java、`ReentrantLockTest.java

  • 例如synchonzied关键字(可自动释放锁)、ReentrantLock(需要手动上锁解锁)都是可重入锁。
/*** @ClassName Test* @Author ChangLu* @Date 2021/4/6 9:33* @Description Synchronzied(可重入锁)测试*/
//测试synchronized本质可重入锁
public class SynchronizedTest {public static void main(String[] args) {new SynchronizedTest().use1();}//锁为LockTest实例public synchronized void use1(){System.out.println("首次上锁使用use1()方法");use2();//调用了同步方法}//该方法锁依旧为LockTest实例public synchronized void use2(){System.out.println("二次上锁使用use2()方法");}
}

/*** @ClassName ReentrantLockTest* @Author ChangLu* @Date 2021/4/6 9:40* @Description ReentrantLock(可重入锁)测试*/
public class ReentrantLockTest {//创建一个可重入锁private ReentrantLock lock = new ReentrantLock();public static void main(String[] args) {new ReentrantLockTest().use1();}public void use1() {lock.lock();//上锁try {System.out.println("use1()方法使用");use2();//使用use2()方法} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//解锁}}public void use2() {lock.lock();//上锁try {System.out.println("use2()方法使用");} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//解锁}}
}

说明:通过上面例子能够看到在一个线程中若某一方法使用了锁A,在其中也可以重复使用锁A,但需要注意的是ReentrantLock需要手动释放,上多少锁就要释放多少锁!synchronized不用担心因为其是自动释放锁的。

  • 获取锁次数>释放锁次数(一个线程中):由于释放其中锁相当于线程还持有这个锁,其他线程无法进入临界区,就会阻塞。
  • 获取锁次数<释放锁次数(一个线程中):会报出一个异常java.lang.IllegalMonitorStateException(非法监控状态异常)。

结论:使用可重入锁在一个线程中可以多次获得同一把锁,那么也需要释放同样把锁,否则会出现阻塞或者异常问题。

乐观锁与悲观锁

乐观锁与悲观锁

悲观锁:很悲观,每次拿数据时都会认为其他别的线程会修改该数据,因此会上锁(操作之前上锁)。抢到锁的线程执行过程中,其他想要获得该锁的线程都是阻塞挂起状态。

  • 核心:不支持多并发,是单线程操作,通过抢占时间片方式来获取锁的使用权,并发变串行。
  • 优点:保证了线程安全和数据安全。
  • 应用场景:适用于多写的场景,例如mysql中的行锁、表锁、读锁、写锁,java中的synchronized关键字。

乐观锁:很乐观,每次拿数据都会认为别的线程不会修改该数据,因此不会给数据上锁。自旋锁也是乐观锁一种。

  • 额外操作:虽说不会上锁,但会在数据更新时判断在此期间其他线程有没有对该数据做更新,最终通过线程的逐一更新获取数据的最终值。

    • 判断是否更新的操作哪些?①version版本号机制(运用于SQL命令,对应SQL语句可见面试必问的CAS,你懂了吗?)、②CAS算法。
  • 核心:支持多线程并发,每个线程在不同的时间节点对数据做更新操作,每次更新时候会判断其他线程是否对数据做更新。
  • 应用场景:适用于多读的场景,获取数据不再创建、销毁锁,减少了使用锁的情况,加大数据的吞吐量。如Redis等非关系型数据库。(Redis是单线程操作,将事务封闭在单一线程中,避免了线程的安全问题)

自旋锁(含自定义自旋锁)

自旋锁:一个执行单元要想访问被自旋锁保护的共享资源,必须先得到锁,在访问完共享资源后,必须释放锁,若是线程A首先获得了自旋锁,接着线程B也想获得该自旋锁,此时由于该自旋锁已经有保持着,那么线程B的获取锁操作会一直自旋在那里,一直到自旋锁的保持者A释放了该锁,线程B才能获取到!

JDK中自旋锁的实例说明

在原子包java.util.concurrent.atomic中的getAndSet()方法中有自旋锁的体现:

//AtomicReference类(原子包下)
public final V getAndSet(V newValue) {return (V)unsafe.getAndSetObject(this, valueOffset, newValue);
}//Unsafe类
public final Object getAndSetObject(Object var1, long var2, Object var4) {Object var5;//do while形式为自旋锁的一种体现do {var5 = this.getObjectVolatile(var1, var2);//获取对应原子类实例中的值} while(!this.compareAndSwapObject(var1, var2, var5, var4));//进行比较交换操作(cas操作),若是比较交换失败,会不断重复执行直至交换值为止!return var5;
}

说明:这是官方提供的一种自旋锁的体现,我们也可以进行自定义锁来模拟上锁、解锁操作(借助cas)。

自定义锁(自旋锁)

demo见demo9中的MySpinLock.java

程序描述:自定义自旋锁,并且自定义其中的上锁与解锁,通过使用AtomicReference中的cas操作,在上锁过程中将执行线程作为原子引用的值(null->执行线程),其是while()循环操作。解锁操作就是将原子引用类中的值设置为null(执行线程->null)即可表示解锁。

  • 若已经有一个线程占有了自旋锁,那么其他线程想要使用该锁的需要不断的进行重复请求,保持自旋的状态。
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;/*** @ClassName SpinLockTest* @Author ChangLu* @Date 2021/4/6 10:20* @Description 自定义自旋锁(借助cas)*/
public class MySpinLock {//默认其中值为nullAtomicReference<Thread> threadAtomicReference = new AtomicReference<>();//上锁public void myLock() {System.out.println(Thread.currentThread().getName()+"开始准备上锁");//只有当其中值为null时才能够进行更改,相当于进行上锁//若是一直执行不了cas操作,那么就会一直处于循环阻塞while (!threadAtomicReference.compareAndSet(null,Thread.currentThread())){//用于提示阻塞效果!!!进行延时,若是过多调用compareAndSet程序会终止System.out.println(Thread.currentThread().getName()+"阻塞等待锁中....");try {TimeUnit.MILLISECONDS.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}}System.out.println(Thread.currentThread().getName()+"已经上锁");}//解锁public void myUnlock(){//将原子引用中的线程设置为null,表示为解锁threadAtomicReference.compareAndSet(Thread.currentThread(),null);System.out.println(Thread.currentThread().getName()+"已解锁");}//测试一下自定义自旋锁通过使用cas来进行上锁、解锁public static void main(String[] args) throws InterruptedException {//自定义锁MySpinLock lock = new MySpinLock();new Thread(()->{lock.myLock();//进行上锁try {System.out.println(Thread.currentThread().getName()+"执行任务....");TimeUnit.SECONDS.sleep(3);} catch (Exception e) {e.printStackTrace();} finally {lock.myUnlock();//进行解锁}},"A").start();//确保线程A先执行TimeUnit.SECONDS.sleep(1);new Thread(()->{lock.myLock();//进行上锁try {System.out.println(Thread.currentThread().getName()+"执行任务....");} catch (Exception e) {e.printStackTrace();} finally {lock.myUnlock();//进行解锁}},"B").start();}}

  • 线程B需要在线程A释放了自旋锁之后才能够执行。

知识补充

Java无法开线程,Thread线程中start方法中实际上调用的是本地方法native void start0(),该方法是调用的本地C++方法。

线程创建规范:一般对于多线程使用时调用的方法其类不应该在Thread实现类或者是实现Runnable接口的类中。这样会造成代码的耦合性。

//一般如下写
public class Main {public static void main(String[] args)  {new Thread(()->{....}).start();}
}class OtherClass{public void test(){...}
}

Runtime可通过该类来动态获取一些虚拟机的信息,例如处理器数量等

Runtime类:运行时类。

介绍:每个Java应用程序都有一个Runtime类的Runtime ,允许应用程序与运行应用程序的环境进行接口。

//cpu密集型、IO密集型
System.out.println(Runtime.getRuntime().availableProcessors());//返回可用于Java虚拟机的处理器数量,即你电脑的核心数量

应用场景:在创建线程池时可以进行使用其来动态获取对应java虚拟机可用的处理器数量。


上下文切换

在进程中包含了多个线程,线程可以进行并发执行,即不断的进行上下文切换不同的线程执行,宏观上来看是在同时进行的,实际上是在不断交替执行,每次交替的时间很短几十毫秒而已,就会给我们一种错觉是在同时进行的。

那么我们来了解一下上下文切换做了哪些操作呢?

  • 调度操作只能由核心态来执行,所以在线程不断被调度获取对应的时间片这个过程实际上就是用户态与核心态不断切换的过程,如上首先由核心态进行调度指定的时间片,接着切换为用户态来执行指定的迅雷线程,当执行完之后挂起保存对应线程的信息又切换为核心态进行调度再切换为用户态执行QQ线程,这一切换过程指的就是上下文切换

上下文切换过程中需要进行挂起线程,保存线程信息,当重新执行被挂起线程还要进行load加载等操作,即进行切换核心态、用户态操作,实际上是特别耗资源的操作。

  • 一定有用户态与内核态的相互转化。

CPU多层缓存架构

介绍CPU的三级缓存

速度排名:CPU->内存->硬盘

CPU处理内存数据时,为了提升运行性能,设计了多级缓存策略,在每个CPU中都包含了一级缓存、二级缓存、三级缓存,若是CPU想要处理某个值时首先会去一级缓存找,若找不到去二级缓存,再找不到去三层,若还是找不到就会去主存中找:

  • CPU查找顺序:CPU->L1->L2->L3->内存->硬盘

  • 图片引用 b站IT楠老师—java多线程 视频中的课件

缓存一致性协议

在执行程序时,每一条指令都是在CPU中执行的,执行指令过程中会包含读取与写入的操作。一般来说程序运行过程中的临时数据都存放在主存(即内存)中,由于CPU执行速度很快,但每次从内存读取数据和向内存写入数据的过程与CPU执行指令速度比起来要慢的多,若是每次对数据的操作都与内存进行交互的话,会大大降低指令执行的速度,为了提升性能,CPU中出现了高速缓存(cache,现在一般都有三级缓存)!

  • CPU有了高速缓存之后,但程序在运行时,会将运算需要的数据从主存中复制一份到CPU的高速缓存中,接着进行读取与写入操作即就从其高速缓存中进行,当运算结束后,会将高速缓存中的数据刷新到主存中。

举例执行i=i+1,该条指令在单线程中并不会有问题,但是在多线程中就会出现线程安全问题!

问题描述:由于是多线程,可能多个线程会同时拷贝一份主存中的对应变量,接着在线程中不断对对应自己线程的副本进行读取写入操作,当多个线程执行完之后,重新刷新高速缓存中的值到主存,这时候就会出现缓存不一致问题

解决缓存不一致问题方法:硬件层面提供方法

  1. 通过在总线加LOCK#锁的方式。
  2. 通过缓存一致性协议。

由于使用第一种方法会造成在锁住总线期间,其他CPU无法访问内存,导致效率低下,接着就出现了缓存一致性协议!这里介绍Intel的MESI协议:

MESI协议 规定每条缓存都有一个状态位(额外两位表示),该状态位可对应四种状态:①修改态(Modified):此缓存被修改过,与主存数据不一致,为此缓存专有。②专有态(Exclusive):此缓存与主内存一致,但是其他CPU中没有。③共享态(Shared):此缓存与主内存一致,但也出现在其他缓存中。④无效态(Invalid):此缓存无效,需要从主内存重新读取

核心思想:当CPU写数据时,若发现操作的变量是共享变量(其他线程也包含该副本),会发出信号通知其他CPU将该变量的缓存行执行无效状态,当其他CPU读取这个变量时会发现自己缓存中的对应变量缓存是无效的,那么就会从内存中重新读取。

说明:这仅仅是CPU硬件层面的,对于Java的话需要去知晓对应的Java内存模型(JMM)对应的规范说明。


导致的问题

伪共享:缓存中的数据与主存中的数据不是实时同步的,各个CPU缓存之间的数据也不是同步的,在同一时间点,各个CPU所看到的同一内存地址的数据可能是不一致的。

指令重排:CPU为了提升指令执行的性能,会对一些没有依赖关系的指令进行指令重排,在多核多线程情况下就可能会因为指令重排的问题导致问题出现,即线程安全问题。


一、初识JUC

1.1、JUC是什么?

并发编程的本质:充分利用CPU的效率。

JUC:就是java.util .concurrent工具包的简称,是一个处理线程的工具包,在jdk1.5版本时就引出的。其有三个主要的包分别是:并发包、原子包、锁。

  • 第四个包是函数式接口包,在其他三个并发包中经常使用。

1.2、JUC三个包介绍

java.util.concurrent包

认识java.util.concurrent

在这个包下包含了阻塞队列(双端队列)、完成的未来、并发集合、线程池等接口;包含了对应的实现类,如一些处理并发的集合类、数组阻塞队列、并发HashMap、List、Set集合、同步工具辅助类、并行计算池等等。

TimeUnit:枚举类,表示给定粒度单位的持续时间,包含纳秒、毫秒、分钟、小时、天对应的实例。内部封装了sleep()方法,使我们设置睡眠时间更加的方便。

TimeUnit.DAYS.sleep(3);//睡三天
TimeUnit.HOURS.sleep(3);//睡3小时

java.util.concurrent.locks包(含两个模板)

认识java.util.concurrent.locks

首先看下该包中的有哪些接口与实现类:看这名字就知道是包中都是一些锁的相关内容

  • Condition:精确通知
  • Lock:lock接口,下面是对应的一些实现类
    • ReentrantLock:可重入锁(常用)
    • ReentrantReadWriteLock.ReadLock:可重用读写锁的读锁,用于处理高性能
    • ReentrantReadWriteLock.WriteLock:可重用读写锁的写锁
  • ReadWriteLock:读写锁

介绍一下Lock接口中的常用方法

  • lock():上锁。
  • tryLock():方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。
    • tryLock(long time, TimeUnit unit):方法和tryLock()方法是类似的,只不过区别在于这个方法在拿不到锁时会等待一定的时间,在时间期限之内如果还拿不到锁,就返回false。如果如果一开始拿到锁或者在等待期间内拿到了锁,则返回true。
  • unlock():释放锁。

注意点:在并发包中提供了一些锁,如可重入锁,读写锁,这些锁与synchronized关键字不同的是,其需要手动上锁与解锁,一般在try-catch-finally中的finally里释放锁,核心代码写在try中。

模板1如下:

//主要三步骤:①Lock锁实例化 ②上锁  ③释放锁
class LockDemo{Lock lock = new ReentrantLock();//①可重入锁实例化//使用lock锁public void sellTicket(){lock.lock();//②上锁try {....//核心代码} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//③释放锁}}
}
  • 不释放锁的话会出现死锁状况。Lock锁在异常情况下不会释放锁,因此将释放锁的操作放在finally()中。

模板2如下:

if(lock.tryLock()){//尝试获取锁,可设置时间try {....//核心代码} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//③释放锁}
}else{//如果不能获取锁,则直接做其他业务
}

上面可以说是可重入锁的小例子,我们看下可重入锁ReentrantLock的创建实例时的源码:

public class ReentrantLock implements Lock, java.io.Serializable {private final Sync sync;public ReentrantLock() {sync = new NonfairSync();//创建ReentrantLock时默认默认使用的是非公平锁}//可通过传入一个布尔值来创建公平或非公平锁//true:公平锁   false:非公平锁public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
}

java.util.concurrent.atomic包

该包中主要都有一些原子类,一般在并发中用于代替一些基本类型、引用类型的复合操作,避免出现线程安全问题:

分类如下


1.3、synchronized与lock区别

之前我们让线程进行同步主要使用的是synchronized关键字,在并发包中提供了一些锁来提升并发时的性能,并且在并发过程中能够通过锁来做更多细粒度的事情以及实现一些扩展功能,首先我们来看下synchronized关键字与使用lock实现类的区别?

1、Synchronized 是内置的java关键字;Lock 是一个java类。

2、Synchronized 无法判断获取锁的状态;Lock 可以判断是否获取到了锁(tryLock)。

3、Synchronized 会自动释放锁;Lock 必须要手动释放锁,否则出现死锁。

4、Synchronized 线程1(获得锁,阻塞) 线程2(无法获取锁一直等待下去);Lock 通过设置不同锁就不会一直等待下去。

5、Synchronized 本身是可重入锁,不可以中断,非公平锁;Lock 可重入锁,可以判断锁,可设置公平或非公平。


二、生产者消费者问题

2.1、synchronized实现

demo见demo2目录中的Synchronizeddemo1.java

程序描述:通过使用add()方法作为生产操作、minus()模拟消费操作。

class Data{private int num;//+1操作public synchronized void add() throws InterruptedException {//num≠0阻塞释放锁if (num != 0){wait();}num++;System.out.println(Thread.currentThread().getName()+"=>"+num);notifyAll();}//-1操作public synchronized void minus() throws InterruptedException {//num为0阻塞释放锁if (num == 0){wait();}num--;System.out.println(Thread.currentThread().getName()+"=>"+num);notifyAll();}
}

测试程序

public class SynchronizedDemo1 {public static void main(String[] args) {Data data = new Data();//一个线程为生产者、一个线程为消费者//仅有两个线程时不会出现问题,当出现三四个线程时会出现安全问题new Thread(()->{for (int i = 0; i < 20; i++) {try {data.add();} catch (InterruptedException e) {e.printStackTrace();}}},"A").start();new Thread(()->{for (int i = 0; i < 20; i++) {try {data.minus();} catch (InterruptedException e) {e.printStackTrace();}}},"B").start();//        new Thread(()->{//            for (int i = 0; i < 20; i++) {//                try {//                    data.add();
//                } catch (InterruptedException e) {//                    e.printStackTrace();
//                }
//            }
//        },"C").start();
//
//        new Thread(()->{//            for (int i = 0; i < 20; i++) {//                try {//                    data.minus();
//                } catch (InterruptedException e) {//                    e.printStackTrace();
//                }
//            }
//        },"D").start();}
}

说明:在上面代码中会出现线程安全问题

  1. 当有两个线程时(分为作为消费、生产且消费、生产次数一致时)不会出现线程安全问题(若次数不一致有阻塞问题);
  2. 若有四个线程时(如两个线程进行消费、两个线程进行生产)则会出现线程安全问题(也可能会有阻塞问题)。

以上代码两个线程不会出现安全问题:

若是四个线程就会出现安全问题,甚至还可能出现阻塞问题:

问题原因:首先wait()时是该线程进入阻塞并且释放锁,在四个线程(其中两个生产、两个消费),若是一个生产线程进行if()判断通过进入阻塞释放锁,此时另一个生产线程也进行if()判断进入阻塞释放锁,说明此时num>0,接着消费线程只会判断num==0才会阻塞否则进行了num--,使用notifyAll()将其他两个生产线程同时唤醒此时由于只是一层if判断所以会直接执行下面的num++,就会导致出现进行2次相加甚至是多次相加。

解决方案

  • ①使用if..else..,将下面执行操作放置到else中,不过这样的话会有很多生产、消费操作无效。
  • ②直接将if直接改为while,当被唤醒时会再次判断是否满足条件,就不会出现线程安全问题。

demo见demo2目录下的Synchronizeddemo2.java,这里仅列出更改部分代码:

class Data2{private int num;//+1操作public synchronized void add() throws InterruptedException {//使用while确保在被唤醒时不会直接执行下面num++while (num != 0){wait();}num++;System.out.println(Thread.currentThread().getName()+"=>"+num);notifyAll();}//-1操作public synchronized void minus() throws InterruptedException {//使用while确保在被唤醒时不会直接执行下面代码while (num == 0){wait();}num--;System.out.println(Thread.currentThread().getName()+"=>"+num);notifyAll();}
}

测试代码:使用四个线程来测试,效果如下

说明:可以看到修改为while()过后解决了多个生产线程、消费线程出现的线程安全问题!


2.2、Lock实现(ReentrantLock)

实现生产者消费者

使用synchronized实现生产者消费者,需要使用到wait()notify()配合进行。

现在使用并发包中的Lock锁实现时,同样也需要配合Condition类中的方法来进行阻塞、唤醒操作达到与之前同样的效果。

  • Lock锁搭配使用的await()方法、signalAll()方法都是通过Condition实例调用的。
  • Lock相关锁、Condtion类都是属于java.util.concurrent.locks包下的。

demo见demo2目录中的LockDemo1.java

程序描述:其中Condition实例是通过ReentrantLock实例(可重入锁,该实例为非公平锁)调用newCondition()获取,其await()signalAll()使用效果与之前wait()notify()方法效果一致,代码如下:

class Data3{private int num;private Lock lock = new ReentrantLock();//①可重入锁实例化Condition condition = lock.newCondition();//通过指定锁来获取Condition实例(ConditionObject实例)//生产public void add() throws InterruptedException {lock.lock();//上锁try {//核心业务while (num != 0) {condition.await();//阻塞}num++;System.out.println(Thread.currentThread().getName() + "=>" + num);condition.signalAll();//唤醒所有等待线程} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();//释放锁}}//消费public void minus() throws InterruptedException {lock.lock();//上锁try {while(num == 0){condition.await();//阻塞等待}num--;System.out.println(Thread.currentThread().getName()+"=>"+num);condition.signalAll();//唤醒所有线程} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();//释放锁}}
}
  • Lock锁使用过程一般是先上锁,接着将释放锁写在finally中,核心业务一般写在try()中,await()signalAll()用于与之前使用的wait()notifyAll()方式相同。

测试程序

public class LockDemo {public static void main(String[] args) {Data3 data = new Data3();//多个生产线程、消费线程测试new Thread(()->{for (int i = 0; i < 20; i++) {try {data.add();} catch (InterruptedException e) {e.printStackTrace();}}},"A").start();new Thread(()->{for (int i = 0; i < 20; i++) {try {data.minus();} catch (InterruptedException e) {e.printStackTrace();}}},"B").start();new Thread(()->{for (int i = 0; i < 20; i++) {try {data.add();} catch (InterruptedException e) {e.printStackTrace();}}},"C").start();new Thread(()->{for (int i = 0; i < 20; i++) {try {data.minus();} catch (InterruptedException e) {e.printStackTrace();}}},"D").start();}
}

  • 可以看到输出结果与之前的大致相同,但是线程的执行是随机的状态,通过Lock锁实现还可以对其线程的顺序进行指定。

通过Condition实现精准通知唤醒

demo见demo2目录下的LockDemo2.java

Condition(同步监视器):精准通知和唤醒线程,能够更好的控制Lock。

下面的例子目的是想通过Condition同步监视器来精确通知唤醒指定线程,来通过多线程精确执行指定任务:

//多线程下执行顺序为A->B->C
class Work{private Lock lock = new ReentrantLock();//可重入锁//创建三个同步监视器,通过对某个同步监视器进行阻塞唤醒来达到指定任务执行Condition condition1 = lock.newCondition();Condition condition2 = lock.newCondition();Condition condition3 = lock.newCondition();//通过num的值来让指定线程进入到阻塞状态private int num = 1;public void printA(){lock.lock();//上锁try {//业务(判断、执行、通知)while (num != 1){condition1.await();//进入阻塞}System.out.println(Thread.currentThread().getName()+"=>AAAAAAA");num = 2;condition2.signal();//唤醒condition2} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//释放锁}}public void printB(){lock.lock();//上锁try {//核心业务while (num != 2){condition2.await();//进入阻塞}System.out.println(Thread.currentThread().getName()+"=>BBBBBBB");num = 3;condition3.signal();//唤醒condition3} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//释放锁}}public void printC(){lock.lock();//上锁try {while (num != 3){condition3.await();//进入阻塞}System.out.println(Thread.currentThread().getName()+"=>CCCCCCC");num = 1;condition1.signal();//唤醒condition1} catch (Exception e) {e.printStackTrace();} finally {lock.unlock();//释放锁}}}
  • 可以将其想象为一条生产线:下单->支付->交易->物流,不过实际这种场景并不会像这么简单。

测试程序

public class LockDemo2 {public static void main(String[] args) {Work work = new Work();//三个线程进行执行new Thread(()->{for (int i = 0; i < 20; i++) work.printA();},"1").start();new Thread(()->{for (int i = 0; i < 20; i++) work.printB();},"2").start();new Thread(()->{for (int i = 0; i < 20; i++) work.printC();},"3").start();}
}


三、8锁问题

理清synchronized使用于普通方法、静态方法时使用什么作为锁的概念即可!

  • 作用于普通方法,将this(调用方法的实例本身)作为锁。如:public synchronized void method(){}
  • 作用于静态方法,将其方法类.class作为锁。如:public synchronized static void method(){}

可通过8个不同例子来进行练习:关于8锁问题详细介绍


四、并发容器

4.1、认识CopyOnWrite容器

Copy-On-Write简称COW,是一种用于程序设计中的优化策略。一开始都共享同一个内容,当要进行修改或写操作的时候,会把内容先Copy出去紧接着一个新的内容之后改变原有的共同内容,这是一种延时懒惰策略。

CopyOnWrite容器:即写时复制的容器,当我们往一个容器中添加元素时,并不会直接往当前的容器中添加,而是先将当前容器进行copy一份复制出一份新的容器,接着向新的容器中添加元素,在添加元素之后,再将新的容器引用给原有的容器变量。针对于写会上锁

  • 对于读操作并不会上锁,始终会读到原有容器中的内容。

重点:写操作是针对于新的容器添加,而读操作是针对于旧的容器读。所以CopyOnWrite容器也是一种读写分离的思想,即读和写不同的容器。

JDK1.5开始Java并发包中提供了两个使用CopyOnWrite机制实现的并发容器分别是CopyOnWriteArrayListCopyOnWriteArraySet


4.2、ConcurrentModificationException异常

问题描述

demo见demo3目录下的CopyOnWriteListTest.java

问题描述:多线程情况下使用普通集合来进行读写操作会报出异常,例如下面情况:add()表示写,sout表示读。

public class CopyOnWriteListTest {public static void main(String[] args) {List<Object> list = new ArrayList<>();//多线程下进行add操作new Thread(()->{for (int i = 0; i < 20; i++) {list.add(UUID.randomUUID().toString());System.out.println(list);}//System.out.println(list);},"A").start();new Thread(()->{for (int i = 0; i < 20; i++) {list.add(UUID.randomUUID().toString());System.out.println(list);}//System.out.println(list);},"B").start();}
}

  • java.util.ConcurrentModificationException:并发修改异常。

问题分析及解决(源码分析)

问题分析过程(通过源码定位)

ArrayList中其方法并不是线程安全的,在多线程情况下进行读写操作会出现ConcurrentModificationException并发修改异常,什么原因导致呢?

  • 我们看其中的报错,源头是println() -> valueOf() -> AbstractCollection.toString() -> Itr.next() -> Itr.checkForComodification
//ArraryList类中的Itr类里的方法
final void checkForComodification() {if (modCount != expectedModCount)//若是不相等,则会抛出并发修改异常!!!throw new ConcurrentModificationException();
}
  • 原因是modCountexpectedModCount不相等抛出的异常。那么我们此时有个问题modCountexpectedModCount指的是什么?

我们println()输出list集合实际上是遍历集合中的迭代器(Itr,实现Iterable接口),将每个元素通过StringBuilder()合并起来(关键是其中的遍历迭代器操作)。

首先看ArrayList中的迭代器Itr

public abstract class AbstractList<E> extends AbstractCollection<E> implements List<E> {protected transient int modCount = 0;//表示的是修改的次数()
}public class ArrayList<E> extends AbstractList<E> implements List<E>, RandomAccess, Cloneable, java.io.Serializable//ArrayList中的迭代器private class Itr implements Iterator<E> {int cursor;        //下一个要访问元素的索引int lastRet = -1;  //上一个访问元素的索引int expectedModCount = modCount;//期望修改的值(即使用迭代器时修改的值)//next()方法获取下一个值public E next() {//进行检查操作(重点,本部分出现情况)checkForComodification();int i = cursor;if (i >= size)throw new NoSuchElementException();Object[] elementData = ArrayList.this.elementData;//若是下一个要访问的索引>=数组中的长度(表示获取不到了,针对于多线程情况下其他线程删除操作导致原本数组长度变小抛出异常)if (i >= elementData.length)throw new ConcurrentModificationException();//索引+1cursor = i + 1;//返回对应索引下标的元素return (E) elementData[lastRet = i];}//检查方法final void checkForComodification() {//若是迭代器中修改的值与集合中修改的值不一致时说明同时进行了读写操作//报出并发修改异常if (modCount != expectedModCount)throw new ConcurrentModificationException();}}//ArrayList的写操作add()方法public boolean add(E e) {//其中我们跳过中间调用方法直接看46行的方法(指定修改次数+1)ensureCapacityInternal(size + 1);  // Increments modCount!! 增加修改次数(modCount)elementData[size++] = e;return true;}//ArrayList中的确保方法private void ensureExplicitCapacity(int minCapacity) {modCount++;//集合修改次数+1if (minCapacity - elementData.length > 0)grow(minCapacity);}
}

通过源码分析快速简洁说明:在sout输出集合过程中实际上遍历的是迭代器。迭代器在执行next()方法获取值时(相当于读操作)会首先进行判断modCount != expectedModCount

  • modCount指的是针对于集合中的修改次数(如add()remove(),执行时会modCount++,并不直接对于迭代器)。
  • expectedModCount指的是在迭代器中的修改次数(如remove(),执行时会expectedModCount=modCount)。

在上面这种情况两个线程分别包含读(迭代器next())、写(集合的add())操作,若是线程A在对迭代器中执行next()中的检查方法的同时,其他线程B对于集合add()方法执行多次,其modCount也就会多次+1,该过程中expectedModCount值并不会变,自然轮到线程A进行检查方法判断则会出现false情况了,就会报出异常(通过简单的调用次数判断就能够来发现你是否出现了并发修改异常真的是秒啊)。

源码设计者通过这种方式能够阻止在多线程并发时读写出现的问题来抛出异常,不得不配合其设计能力!!!

注意:其实并不仅仅针对于checkForComodification()会抛出异常,还有上面21行判断读取的值是否>=数组的长度也能够检测出是否出现并发修改异常!

列举出现该异常情况

1、单线程情况下,普通集合迭代器遍历时删除时不使用Itrremove()方法,而使用集合中的remove方法删除同样出现该异常情况。

2、多线程情况下,普通集合进行读(迭代器遍历,如调用toString()方法)、写(调用集合中方法)会抛出异常。

3、多线程情况下,使用Vector进行迭代器遍历、写操作时,也会抛出该异常。

  • 尽管Vector中的方法都采用了Synchronized进行了同步,若多个线程都对一个集合进行读(迭代器)、写操作会抛出该异常,因为就算是每个方法同步了,例如线程A在执行遍历迭代器next()方法结束后释放锁,线程B会执行add()方法,执行完后(modcount++)也释放锁,接着线程A再次遍历时,该迭代器中的expectedModCountmodCount会出现不等,则又会抛出异常!

  • //下面情况尽管使用Vector也会出现并发修改异常
    public class Test {//其有一个共同操纵属性modCountstatic List<Integer> list = new Vector<Integer>();public static void main(String[] args)  {list.add(1);list.add(2);list.add(3);list.add(4);list.add(5);Thread thread1 = new Thread(){public void run() {Iterator<Integer> iterator = list.iterator();while(iterator.hasNext()){Integer integer = iterator.next();System.out.println(integer);try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}}};};Thread thread2 = new Thread(){public void run() {Iterator<Integer> iterator = list.iterator();while(iterator.hasNext()){Integer integer = iterator.next();if(integer==2)iterator.remove();}};};thread1.start();thread2.start();}
    }
    

解决方案

解决方案:①在使用iterator迭代器时候使用synchronizedlock进行同步。②使用并发容器CopyOnWriteArrayList来代替ArrayListVector

若是Vector直接输出list集合则是不会出现线程安全问题的,直接输出的话实际上调用的是其toString()方法(线程安全的),其中包裹着遍历迭代器。


4.3、读写集合类及并发类

下面列举的三个类都是线程安全的,其都具有各自的特点!

①CopyOnWriteArrayList

源码分析

前面也介绍了这类容器是读写分离的,读的是旧容器,写的是新容器(写完之后重新赋值引用)。

我们看下其源码中究竟是怎么做的?

public class CopyOnWriteArrayList<E>implements List<E>, RandomAccess, Cloneable, java.io.Serializable {//默认是使用的可重入锁(非公平锁)final transient ReentrantLock lock = new ReentrantLock();private transient volatile Object[] array;final void setArray(Object[] a) {array = a;}//写操作public boolean add(E e) {final ReentrantLock lock = this.lock;//首先获取到可重入锁//上锁lock.lock();try {Object[] elements = getArray();//获取到原有的对象数组int len = elements.length;//复制一份新的对象数组Object[] newElements = Arrays.copyOf(elements, len + 1);//注意!!!写操作实际上是在新的对象数组中完成的,并且本部分内容是具有原子性的newElements[len] = e;//新的对象数组赋值完之后将新的容器引用给原来的变量名setArray(newElements);return true;//返回true表示添加成功} finally {lock.unlock();}}//读操作public E get(int index) {return get(getArray(), index);//调用36行方法}private E get(Object[] a, int index) {//直接返回读到的内容,并没有上锁或者同步方法(因为使用是对旧的容器读不会与之前的写操作造成冲突)return (E) a[index];}
}

demo见demo3目录下的CopyOnWriteListTest.java

程序描述:用于测试CopyOnWriteList是否在多线程下有线程安全问题。

/*** @ClassName CopyOnWriteList* @Author ChangLu* @Date 2021/3/26 14:10* @Description CopyOnWriteList:测试读写分离*/
public class CopyOnWriteListTest {public static void main(String[] args) {//问题描述:List<Object> list = new ArrayList<>(); 线程不安全的//解决1:Collections.synchronizedList(new ArrayList<>()) 线程安全的(实际就是包裹了一层synchronized)//解决2:new Vector<>() 线程安全的(jdk1.0就出现了,其所有方法都是同步的)//解决3:使用CopyOnWriteArrayList(读写分离)List<Object> list = new CopyOnWriteArrayList<>();//多线程下进行add操作new Thread(()->{for (int i = 0; i < 20; i++) {list.add(UUID.randomUUID().toString());System.out.println(list);}//System.out.println(list);},"A").start();new Thread(()->{for (int i = 0; i < 20; i++) {list.add(UUID.randomUUID().toString());System.out.println(list);}//System.out.println(list);},"B").start();}
}

  • 测试结果是正确的,并没有在读写操作过程中发生异常。

注意点(建议)

  1. 减少扩容开销,根据实际需要初始化CopyOnWriteMap的大小,避免多次写入时进行扩容操作。
  2. 尽量使用批量添加方法addAll(),防止每次添加时容器都会进行复制一份,减少内存开销。

优点:读写分离,提升性能。

缺点:内存占用问题与数据一致性问题。

  • 内存占用问题:每次添加一个元素时都会复制一份新的数组。
  • 数据一致性问题:不要期望每次读取数据时都能够读到最新添加进入的数据。有时可能会读到旧的容器中数据。

改进

  1. 针对于内存占用问题,可以通过压缩容器中的元素来减少对象的内存,例如10进制的数字可以压缩为更大的进制表示,如16位、32位、64位。若不使用该容器可使用其他容器如ConcurrentHashMap
  2. 若是你希望写入的数据实时被读取出来建议不要使用该并发容器。

②CopyOnWriteArraySet

demo见demo3包下的CopyOnWriteSetTest.java

程序描述:解决普通Set集合框架在多线程下出现的安全问题,可以使用CollectionssynchronizedSet()来包装或者使用CopyOnWriteArraySet(读写分离):

import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArraySet;/*** @ClassName CopyOnWriteSetTest* @Author ChangLu* @Date 2021/3/28 21:50* @Description CopyOnWriteSet(读写分离Set并解决线程安全问题)*/
public class CopyOnWriteSetTest {//private static Set<String> set = new HashSet<>();:多线程下有线程安全问题//1、Collections.synchronizedSet(new HashSet<>()):使用工具类来包装一下,线程安全//2、new CopyOnWriteArraySet():使用并发包中的cow读写分离的set,线程安全的!private static Set<String> set = new CopyOnWriteArraySet();public static void main(String[] args) {//开辟20个线程for (int i = 0; i < 20; i++) {new Thread(()->{for (int j = 0; j < 20; j++) {set.add(UUID.randomUUID().toString());}System.out.println(set);}).start();}}
}


③ConcurrentHashMap

特点描述:将数组的每一个表头锁住,在并发情况下若是获取了不同的表头,则可以进行同步执行。当一个线程占用锁访问其中的一个段数据时,其他段的数据也能够被其他线程访问,实现并发访问!

Demo见demo3中的ConcurrentHashMapTest

程序描述:使用普通集合框架如HashMap在多线程下会出现安全问题,解决安全问题可以使用HashTableCollections.synchronizedMap(new HashMap<>()),对于这两个都是直接使用的synchronized关键字来保证方法的同步,效率比较低下,之后JDK1.5出现并发包引出了ConcurrentHashMap其提高了并发内容,在内部采用了segment的结构(类似于一个类Hash Table的结构,内部维护一个链表数组)。

/*** @ClassName ConcurrentHashMapTest* @Author ChangLu* @Date 2021/3/28 22:21* @Description ConcurrentHashMap解决Map的安全问题*/
public class ConcurrentHashMapTest {//Map<String,String> map = new HashMap<>():使用普通的集合在多线程下会有线程安全问题(ConcurrentModificationException)//1、Collections.synchronizedMap(new HashMap<>()):线程安全//2、new ConcurrentHashMap<>():线程安全(并发包中引出的)private static Map<String,String> map = new ConcurrentHashMap<>();public static void main(String[] args) {//创建10个线程for (int i = 0; i < 10; i++) {new Thread(()->{//每个线程存储10个for (int j = 0; j < 20; j++) {map.put(Thread.currentThread().getName(), UUID.randomUUID().toString());System.out.println(map);}}).start();}}
}


五、Callable使用

介绍Callable

Callable接口属于java.util.concurrent包,其有①返回值。②可以抛出异常。③支持泛型自定义返回值类型。

Thread调用start()方法如何执行Callable中的call()方法

  • 由于Callable接口是使用的call()方法并包含返回值,所以需要使用一个可以说适配器吧FutureTask,该FutureTaskrun()方法内部就是调用的call()方法。

应用场景

  1. 执行多任务计算,创建一个FutureTaskList集合。
  2. 在高并发环境下确保任务只执行一次。

demo见demo3目录下的CallableTest.java

/*** @ClassName CallableTest* @Author ChangLu* @Date 2021/3/28 23:25* @Description 测试使用Callable*/
public class CallableTest {public static void main(String[] args) throws ExecutionException, InterruptedException {FutureTask<String> task = new FutureTask<String>(new MyCallable());for (int i = 0; i < 10; i++) {new Thread(task).start();}//获取返回值String s = task.get();//会有阻塞System.out.println(s);}
}class MyCallable implements Callable<String>{@Overridepublic String call() throws Exception {System.out.println("执行MyCallable()中的call()方法");return "调用成功!";}
}

细节部分

  1. FutureTask对于call()只会执行一次。
  2. 通过使用get()方法获取返回值需要等待,有阻塞。

源码分析

①为什么一个FutureTask只执行一次call()方法?

FutureTask中使用state来保存任务的状态,初始构造器设置state状态为NEW

private static final int NEW          = 0;//任务尚未开始或处于执行期间
private static final int COMPLETING   = 1;//任务即将执行完成
private static final int NORMAL       = 2;//任务执行完毕
private static final int EXCEPTIONAL  = 3;//任务执行期间出现未捕获异常
private static final int CANCELLED    = 4;//任务被取消
private static final int INTERRUPTING = 5;//任务正在被中断
private static final int INTERRUPTED  = 6;//任务已被中断public FutureTask(Callable<V> callable) {if (callable == null)throw new NullPointerException();this.callable = callable;this.state = NEW;       // ensure visibility of callable
}

run()方法中一旦执行call()方法就会改变state状态

public void run() {//会判断state是否为NEW,若不为null,则直接退出该方法if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {//执行其中的Callable中的call()方法result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)//该方法来设置返回值set(result);}} finally {runner = null;int s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}
}protected void set(V v) {if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {outcome = v;//将state改为NORMALUNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final statefinishCompletion();}
}

说明:我们可以看到当执行之后,将state更改为NORMAL,所以我们使用多线程时直接执行一次call()方法。

②为什么get()方法有阻塞

public V get() throws InterruptedException, ExecutionException {int s = state;//如果任务没有执行完,则会进行等待执行操作if (s <= COMPLETING)s = awaitDone(false, 0L);return report(s);
}

说明:可以看到若是状态是没有结束的情况,需要阻塞等待得到结果。


六、同步工具辅助类使用

CountDownLatch(倒计时器)

认识CountDownLatch—减法计数器

CountDownLatch是通过一个计数器来实现的,计数的初始值一般为你线程的数量,当调用该CountDownLatch实例的await()方法的线程会进入阻塞状态,直到计数器减到0的时候,才能继续往下执行。

主要方法

  • CountDownLatch(int count):构造器,指定计数的数量。
  • CountDownLatch(int count):计数-1,一般放置在线程中。
  • void await():调用该方法的线程会进行阻塞,直到计数器为0时才会继续往下执行。
  • boolean await(long timeout, TimeUnit unit):阻塞指定的时长。

join方法比较:可以发现该工具类使用与线程的join方法很像都是等待线程完成之后执行

  1. 相对于join()方法更加灵活。
  2. 可以手动控制在n个或单个线程中使计数器进行减一操作。
  3. join()实现原理是不停的检查join线程是否存活,若是join线程存活则会当前线程永远等待,而CountDownLatch可以设置等待时长。

应用场景:启动一个服务时,主线程需要等待多个组件加载完毕之后继续执行。

实操使用

两个demo见demo4目录中的CountDownLatchTest.javaCountDownLatchTest2.java

①线程完成任务的指定数量之后,才去执行的事如进行结果汇总

/*** @ClassName CountDownLatchTest* @Author ChangLu* @Date 2021/3/29 16:39* @Description CountDownLatch:同步工具类(完成指定数量任务之后一定要做的事)*/
public class CountDownLatchTest {public static void main(String[] args) throws InterruptedException {CountDownLatch latch = new CountDownLatch(6);for (int i = 1; i <= 6; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName()+"执行了");latch.countDown();//计数器-1},String.valueOf(i)).start();}latch.await();//主线程进入阻塞状态等待锁存器计数到0,才继续往下执行System.out.println("close door!");}
}

  • 可以看到可以将计数器-1操作放置到线程中去,指定数量的线程完成任务之后再执行结果汇总操作。

②秒杀场景,如模拟高并发,让一组线程在指定时刻(秒杀时间)来进行抢购。

依旧可以通过使用CountDownLatch来去实现,只不过这里阻塞等待放置在线程中:

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;/*** @ClassName CountDownLatchTest2* @Author ChangLu* @Date 2021/3/29 17:16* @Description CountDownLatch使用2:秒杀场景(倒计时开抢)*/
public class CountDownLatchTest2 {public static void main(String[] args) throws InterruptedException {//初始计数为1CountDownLatch latch = new CountDownLatch(1);for (int i = 0; i < 10; i++) {new Thread(()->{try {latch.await();//进行等待操作} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"开始执行!");}).start();}//模拟倒计时2秒for (int i = 3; i >=1; i--) {TimeUnit.SECONDS.sleep(1);System.out.println(i);}System.out.println("开抢!!!");//计数-1指挥线程开始行动latch.countDown();}

注意点

  1. 若是计数器始终没有到0会进入阻塞状态,可以使用await()的带参方法。

CyclicBarrier(循环珊栏)

认识CyclicBarrier

CyclicBarrier(循环屏障):通过设置一个珊栏可以拦截指定数量的线程,对应线程会进入阻塞,拦截到指定数量之后会先去执行CyclicBarrier中的run()方法之后再将珊栏中的线程全部唤醒。

  • 做的事情就是,让一组线程到达一个屏障(同步点)时被阻塞,知道指定数量的最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续执行对应任务。

方法

  • CyclicBarrier(int parties, Runnable barrierAction):有参构造器,设置循环拦截的指定数量以及添加一个runnable()方法。
  • int await():计数器会-1,没有满指定数量的线程会进入阻塞状态,一旦计数器为0时先执行指定的runnable中的方法,接着唤醒其他数量的线程。

  • 珊栏会阻塞指定数量的线程,当线程数量达到时会先执行其指定的任务,接着唤醒其他所有的线程。

实操:demo见demo4目录中的CyclicBarrierTest.java

下面是对CyclicBarrier工具类的测试使用,将每个线程当做为执行一个任务,执行await()时线程会进入阻塞,当达到珊栏中的指定数量线程之后会先执行珊栏中的任务接着唤醒其他线程,若是还有其他线程的话就会继续下一轮珊栏:

/*** @ClassName CyclicBarrierTest* @Author ChangLu* @Date 2021/3/29 17:51* @Description CyclicBarrier(循环屏障)使用*/
public class CyclicBarrierTest {public static void main(String[] args) {//创建一个循环屏障,设置为7CyclicBarrier barrier = new CyclicBarrier(7, () -> {System.out.println("集齐七颗龙珠,终极进化!");});for (int i = 1; i < 8; i++) {//i属于局部变量存在于栈,在下面Lambda表达式中若想要使用该变量应该将其存放于一个final变量中//闭包,jdk1.8可以不写final,jvm会默认加上去的int temp = i;new Thread(()->{System.out.println(Thread.currentThread().getName()+"集齐了第"+temp+"龙珠");try {//等待有7个线程执行这个等待方法时一起通过barrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"结束");}).start();}}
}

注意点:若是指定线程数量不是珊栏设置的个数倍数,那么其他的线程很有可能会一直在阻塞状态。


semaphore(信号量)

认识semaphore

semaphore(信号量,即操作系统中的pv操作):该类可以使同一段时间内指定数量得到线程。需要通过该信号量进行获取与释放。

  • 可以控制同时访问资源的线程个数,如实现一个文件允许的并发数量。

主要方法

  • Semaphore(int permits):有参构造,设置许可证的数量。
  • void acquire():获得的意思,从信号量中获取一个许可证,若是许可证满了该线程进入阻塞直到其他线程释放许可证为止。
  • void release():释放一个许可(+1)将其返回给信号量,之后唤醒等待的线程。
  • int availablePermits():返回此信号量中当前可用的许可数。
  • boolean hasQueuedThreads():查询是否有线程正在等待获取。

应用场景:限流(SpringCloud)。

示例:demo见demo4目录中的SemaphoreTest.java

(1)、假设信号量为停车场,提供三个停车位(达到限制目的),之后模拟多辆车来占用释放停车位:

/*** @ClassName SemaphoreTest* @Author ChangLu* @Date 2021/3/29 18:52* @Description Semaphore(信号量)简单使用*/
public class SemaphoreTest {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3);//通过有参构造器创建指定数量的许可证//创建8个线程(相当于8辆车去占3个车位)for (int i = 0; i < 8; i++) {new Thread(()->{try {//首先需要发布semaphore.acquire();System.out.println(Thread.currentThread().getName() + "抢到了停车位");TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + "归还了停车位");} catch (InterruptedException e) {e.printStackTrace();}//释放掉许可证semaphore.release();}).start();}}
}

注意:若是某个线程来获取许可证,许可证始终为满时,该线程可能会一直在阻塞状态。

(2)、可通过使用tryAcquire()来进行尝试获取许可证,若是没有获取到可做其他事情,修改下上面的例子

Semaphore semaphore = new Semaphore(3);//通过有参构造器创建指定数量的许可证
for (int i = 0; i < 8; i++) {new Thread(()->{//尝试获取许可证if(semaphore.tryAcquire(){System.out.println(Thread.currentThread().getName() + "抢到了停车位");TimeUnit.SECONDS.sleep(1);System.out.println(Thread.currentThread().getName() + "归还了停车位");//释放掉许可证semaphore.release();}else{//若是没有获取到许可证则做其他事情...}}).start();
}

总结

1、CountDownLatch可以看作一个减法计数器,可指定线程执行的数量,若是使用countDown()方法计数器就会减一,使用await()方法的线程会进入阻塞等待直到计数器为0时执行下面的操作。应用场景如:需完成指定任务执行重要的结果汇总、秒杀、主线程需要等待几个组件完成之后再执行。

2、CyclicBarrier珊栏屏障,每个屏障拦截指定数量的线程(通过await()方法),被拦截下来的线程会进入阻塞状态,一旦达到屏障中达到了指定数量的线程就会先执行屏障自带的方法接着释放其他阻塞线程,循环执行。应用场景:循环指定个数的线程执行并进行总的执行方法。

3、semaphore信号量,可设置指定数量的许可证,同一时间内只有指定数量的线程能够获取到许可证,其他线程若想要获取许可证的话需要其他线程先释放许可证之后才能执行。应用场景:限流。


七、读写锁(ReentrantReadWriteLock)

7.1、初识ReentrantReadWriteLock

介绍:ReentrantReadWriteLock除了提供读锁、写锁及其各自释放与获取外,还提供了一些其他和锁状态有关的方法

一般在读多写少的场景应用:

  • 读操作(共享锁):同一时间允许多个线程对同一共享资源进行读操作。同一时刻所有线程的写操作会被阻塞。
  • 写操作(排他锁):同一时间允许一个线程对同一共享资源进行写操作。同一时刻其他线程的读操作会被阻塞。

  • 获取读锁(上锁最大数量为216-1),写锁(上锁最大数量为216-1))之后,同样使用lock()unlock()来上锁与释放锁。
//无参、有参构造器。默认使用的是非公平锁
public ReentrantReadWriteLock() {this(false);
}public ReentrantReadWriteLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();readerLock = new ReadLock(this);writerLock = new WriteLock(this);
}

7.2、程序示例

示例:demo见demo4中的ReadWriteLockTest

使用不同线程来进行读写操作:

/*** @ClassName ReadWriteLockTest* @Author ChangLu* @Date 2021/3/30 11:23* @Description 读写锁ReadWriteLock测试*/
public class ReadWriteLockTest {public static void main(String[] args) {WRdata2 wRdata = new WRdata2();for (int i = 1; i <= 3; i++) {//闭包获取外部变量需要设置为final,对于这里int temp=i,jdk1.8默认会加上finalint temp = i;new Thread(()->{for (int j=(temp-1)*10;j<=(temp-1)*10+10;j++){//进行写操作wRdata.write(j);}}).start();}new Thread(()->{for (int i = 1; i <= 30; i++) {wRdata.read(i);}}).start();}
}//使用读写锁
class WRdata2{//volatile保证可见性与防止指令重排private volatile Map<String,String> map = new HashMap<>();//创建一个读写锁实例(默认是非公平锁)private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();//分别获取读锁、写锁private Lock readLock = lock.readLock();private Lock writeLock = lock.writeLock();//写操作public void write(int i){writeLock.lock();//上锁(写锁)try {System.out.println(Thread.currentThread().getName()+"执行写入操作,写入"+i);map.put(String.valueOf(i),Thread.currentThread().getName()+"执行"+i);System.out.println(Thread.currentThread().getName()+"写入"+i+"的操作完成");} catch (Exception e) {e.printStackTrace();} finally {writeLock.unlock();//释放锁(写锁)}}//读操作public void read(int i){readLock.lock();//上锁(写锁)try {String o = map.get(String.valueOf(i));System.out.println(Thread.currentThread().getName()+"读取到key="+i+"为"+o);} catch (Exception e) {e.printStackTrace();} finally {readLock.unlock();//释放锁(读锁)}}}

注意:读锁与写锁上锁的数量不应超过216-1(65535)。


八、阻塞队列

8.1、认识阻塞队列

阻塞队列都是使用的BlockingQueue接口:实现了Queue接口,也属于Collection容器

  • 阻塞思想:无法得到需要的资源时,线程等待资源进行阻塞,等到资源可用时唤醒。

一般在多线程并发处理,线程池使用阻塞队列。


8.2、两个阻塞队列实现类

①ArrayBlockingQueue

介绍:数组阻塞队列,其中的方法内部都是上锁的,并发下是安全的。

其中包含了四套API

  1. 抛出异常
  2. 不会抛出异常
  3. 阻塞等待
  4. 超时等待
方式 ①抛出异常 ②不抛出异常 ③阻塞等待 ④超时等待
添加操作 add(E e) offer(E e) put(E e) offer(E e, long timeout, TimeUnit unit)
移除操作 remove() poll() take() poll(long timeout, TimeUnit unit)
检测队首元素 element() peek()
  • 添加操作中②④都有返回值true或false,①是返回true或抛异常IllegalStateException
  • 移除操作中②③④都有返回值元素或null,①是返回元素或抛异常NoSuchElementException
  • 检查队首元素①返回元素或抛异常NoSuchElementException,②是返回元素或null

说明:对于①实际上内部实际上就是使用的offer()方法,其余的方法内部都是上锁的,是线程安全的。

四组API测试:demo见demo5中的ArrayBlockingQueueTest.java

①抛出异常API

//抛出异常
public static void test01(){//创建空间为3的阻塞队列ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);System.out.println(queue.add("a"));System.out.println(queue.add("b"));System.out.println(queue.add("c"));//添加队列成功返回——true//System.out.println(queue.add("d"));//添加队列满情况抛异常——IllegalStateExceptionSystem.out.println("-------------------");System.out.println(queue.remove());System.out.println(queue.remove());System.out.println(queue.remove());//移除队列成功返回移除——元素//System.out.println(queue.remove());//队列为空情况抛出异常——NoSuchElementExceptionSystem.out.println(queue.element());//获取队首元素,若无队首抛出异常——NoSuchElementException
}

②不抛出异常API

//不抛出异常
public static void test02(){ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);//插入队列System.out.println(queue.offer("a"));System.out.println(queue.offer("b"));System.out.println(queue.offer("c"));//插入成功过的返回——true//System.out.println(queue.offer("d"));//队列满无法插入时无异常,返回——falseSystem.out.println("-------------------");//移除队列System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll());//移除成功返回的是元素//System.out.println(queue.poll());//队列空无法移除无异常,返回——nullSystem.out.println(queue.peek());//获取队首元素,若无队首返回——null
}

③阻塞等待API测试

//阻塞等待
public static void test03() throws InterruptedException {//ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);//插入队列queue.put("a");queue.put("b");queue.put("c");//插入队列成功无返回值//queue.put("d");//队列满无法插入时会进入阻塞System.out.println("-------------------");//移除队列System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());//移除队列成功有返回值为——元素System.out.println(queue.take());//队列元素为空无法移除时会进入阻塞(直到插入元素之后停止)
}

④超时等待API测试

//超时等待
public static void test04() throws InterruptedException {ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(3);//插入队列System.out.println(queue.offer("a"));System.out.println(queue.offer("b"));System.out.println(queue.offer("c"));//插入队列成功的话返回trueSystem.out.println(queue.offer("d", 2, TimeUnit.SECONDS));//队列已满情况时阻塞等2秒后若没有元素插入结束返回——falseSystem.out.println("-------------------");//移除队列System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll());System.out.println(queue.poll(2, TimeUnit.SECONDS));//队列为空时阻塞等待2秒,若还没有移除的元素返回——null
}

②SynchroniousQueue(同步队列)

介绍SynchroniousQueue(java.util.concurrent)

可以看到该实现类与ArrayBlockingQueue的继承关系一致,是同级实现类。

SynchronousQueue:内部无容器,使用CAS实现线程的安全访问,任意时间只能有一个线程操作,只能存储一个元素。该类同样也包含公平锁、非公平锁。由于其独有机制—配对通信机制。

  • 若是put()一个元素,没有使用take()取出,那么就会进入阻塞状态。

示例:demo见demo5目录中的SynchronousQueueTest.java

目的:put()一个元素到同步队列中,若是没有take()操作就会进入阻塞。

操作:使用两个线程,第一个线程中进行三次put()操作,第二个线程进行三次拿操作其中会进行睡眠以确保进入阻塞状态有效果。

/*** @ClassName SynchronousQueueTest* @Author ChangLu* @Date 2021/3/31 21:48* @Description SynchronousQueue小案例**/
public class SynchronousQueueTest {public static void main(String[] args) {//创建一个同步队列SynchronousQueue<String> queue = new SynchronousQueue<>();//该线程用于入队操作new Thread(()->{try {queue.put("A");System.out.println(Thread.currentThread().getName()+"put A");queue.put("B");System.out.println(Thread.currentThread().getName()+"put B");queue.put("C");System.out.println(Thread.currentThread().getName()+"put C");} catch (InterruptedException e) {e.printStackTrace();}}).start();//该线程用于取队列中的元素new Thread(()->{try {TimeUnit.SECONDS.sleep(2);queue.take();TimeUnit.SECONDS.sleep(2);queue.take();TimeUnit.SECONDS.sleep(2);queue.take();} catch (InterruptedException e) {e.printStackTrace();}}).start();}
}

说明:可以看到put()一个元素之后,若是没有take()操作就会进入阻塞状态。


九、线程池(重点)

9.1、介绍线程池

线程池(知识点):三大方法、七大参数、4种策略模式

池化技术:如线程池、连接池、内存池、对象池…。对于原本创建、销毁线程是十分浪费资源的,通过使用线程池能够事先准备好一些资源可供使用,不用时关闭即可。

好处介绍

  1. 降低资源的消耗。
  2. 提高响应的速度。
  3. 方便管理线程。

通过各个线程复用能够控制最大并发数,并且更有效的管理线程。


9.2、Executors创建线程池的三个方法

认识Executors创建的三个ExecutorService实现类实例,实际上是四个方法,这里介绍三种。

Executors是一个工具类可提供创建线程池执行不同的执行服务,三个方法如下:

  • newSingleThreadExecutor():创建一个单线程化的线程池,保证所有的任务按照指定顺序执行。

    • 应用场景:一个任务一个任务执行的场景。
  • newFixedThreadPool(int nThreads):创建一个定长线程池,可控制线程的最大并发数,超出的线程会在队列中等待。
  • 应用场景:执行长期的任务,性能好很多
  • newCachedThreadPool():创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
  • 应用场景:执行很多短期异步的小程序或者负载较轻的服务器。

实际上还有一个newScheduledThreadPool(),这里没有提及该方法。

示例:demo见demo5中的ExecutorsThreeMethodTest

目的:用于测试不同的线程池对于指定的任务数量分别会有多少个线程执行。

操作:创建三个不同的线程池,分别执行20次看其中都使用了多少线程。

/*** @ClassName ExecutorsThreeMethodTest* @Author ChangLu* @Date 2021/3/31 22:43* @Description Executors三大线程池方法的使用*/
public class ExecutorsThreeMethodTest {public static void main(String[] args) {ExecutorService es1 = Executors.newSingleThreadExecutor();//单个线程ExecutorService es2 = Executors.newFixedThreadPool(5);//创建一个固定线程的线程池ExecutorService es3 = Executors.newCachedThreadPool();//创建一个可伸缩的线程池,可根据你的执行次数来分配//测试es1//testExecutors(es1);//测试es2//testExecutors(es2);//测试es2testExecutors(es3);}//传入一个通过Executors不同创建的方法所获取的ExecutorServicepublic static void testExecutors(ExecutorService es){//执行20次方法来查看其中分别使用了多少个线程try {for (int i = 0; i < 20; i++) {es.execute(()->{System.out.println(Thread.currentThread().getName()+"执行!");});}} catch (Exception e) {e.printStackTrace();} finally {es.shutdown();//开闭线程池}}}

  • es1测试,可以看到整个过程都是1个线程来执行。

  • es2测试

  • es3测试

说明:可以看到通过Executors创建的不同线程池,对于执行任务效果也都是不同的,需要注意的是阿里巴巴官方手册上说尽量自己去创建线程池而不要使用Executors工具类来创建。


9.3、自定义线程池(七大属性)

本部分demo见demo6目录中的ThreadPoolExecutorTest.java

认识ThreadPoolExecutor(详细介绍各个参数)

Executors创建的线程池源码分析

//可以看到使用工具类创建不同的线程池实际上都是创建的ThreadPoolExecutor实例,需要注意其中的参数
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

看下ThreadPoolExecutor构造器中的七大参数

  • 包含了四个构造方法,若是使用第一个构造器的话会给你默认最后两个参数(线程工厂和拒绝策略),分为为Executors.defaultThreadFactory()new AbortPolicy()
//七个参数的构造器
public ThreadPoolExecutor(int corePoolSize,//池中核心线程数量(默认开启的)int maximumPoolSize,//池中最大核心数量(当阻塞队列满时会开启)long keepAliveTime,//没有任务时等待指定时长若是没有新的任务多余的空闲线程就会终止TimeUnit unit,//keepAliveTime参数的时间单位BlockingQueue<Runnable> workQueue,//可设置的阻塞队列ThreadFactory threadFactory,//线程工厂,默认为Executors.DefaultThreadFactoryRejectedExecutionHandler handler){}//拒绝策略(四种),默认为AbortPolicy//官方文档翻译介绍
corePoolSize –保留在池中的线程数(即使它们处于空闲状态),除非设置了allowCoreThreadTimeOut
maximumPoolSize –池中允许的最大线程数
keepAliveTime –当线程数大于内核数时,这是多余的空闲线程将在终止之前等待新任务的最长时间。
unit – keepAliveTime参数的时间单位
workQueue –用于在执行任务之前保留任务的队列。 此队列将仅保存execute方法提交的Runnable任务。
threadFactory –执行程序创建新线程时要使用的工厂
handler –因达到线程边界和队列容量而被阻止执行时使用的处理程序,包含四种不同拒绝策略

各个参数细致描述:这部分暂时不去探讨拒绝策略

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,//核心线程数量5,//最大线程数量2,TimeUnit.SECONDS,//配合上面的时间长度为2秒无任务时等待(针对于非核心线程开启情况)new LinkedBlockingDeque<>(3),//阻塞队列容量为3Executors.defaultThreadFactory(),new ThreadPoolExecutor.DiscardPolicy());

看下面这个图,各个参数对照下图描述:

  • 核心线程为2:就是默认在线程池中开启2个线程(下图红色框1、2),其为核心线程。
  • 最大线程数量为5:前面定义了2个核心线程,最大有5个,那么3个为非核心线程(3、4、5)。开启时间:核心线程(1、2)都有任务并且阻塞队列此时也满了,此时再进来一个任务就会去开启非核心线程(开启几个根据额外任务定)。
  • 等待时间为2秒(第3、4参数):针对于非核心线程,若是非核心线程任务执行完成后,会默认等待连接其他任务2秒,超时的话会默认关闭。
  • 线程工厂:一般默认使用的Executors.defaultThreadFactory()即可。
  • 拒绝策略(AbortPolicy()):对拒绝任务执行的拒绝策略。拒绝任务指的是线程池中所有的线程(核心+非核心)都有任务,并且阻塞队列都满了,此时来的线程都成为拒绝任务,会根据指定的拒绝策略来执行。

说明:对于上面ThreadPoolExecutor使用的参数,对应可能发生的大致情况如下:

  • ①当任务数为1-5时只会交由核心线程执行,其3个任务会放置在阻塞队列。
  • ②当任务数为6-8时,就会开启非核心线程。
  • ③当任务数为9时,即任务9为拒绝任务会根据指定的拒绝策略来处理该任务。

测试程序:主要测试不同数量的任务会使用多少个线程池中的线程

public class ThreadPoolExecutorTest {public static void main(String[] args) {ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,//核心线程2个5,//池中最多有5个线程2,//保持连接2秒TimeUnit.SECONDS,//2秒连接new LinkedBlockingDeque<>(3));//链表阻塞队列,容量为3try {//测试不同数量的任务(更改5即可)情况有:5、8、9for (int i = 0; i < 5; i++) {int temp = i;poolExecutor.execute(()->{System.out.println(Thread.currentThread().getName()+" 执行任务"+temp);});//执行线程}} catch (Exception e) {e.printStackTrace();} finally {poolExecutor.shutdown();//关闭线程池}}
}

执行5个任务:

执行8个任务:

执行9个任务:


介绍四大拒绝策略

RejectedExecutionHandler是一个接口,其包含四个实现类,分别就对应着四种不同的拒绝策略(定义在ThreadPoolExecutor中),如下:

四种不同策略(针对于连接数一旦大于max线程+阻塞队列容量情况)
①new ThreadPoolExecutor.AbortPolicy():即抛出异常RejectedExecutionException,不执行超出边界的任务
②new ThreadPoolExecutor.CallerRunsPolicy():呼叫main线程执行任务
③new ThreadPoolExecutor.DiscardOldestPolicy():丢弃最旧的未处理请求,并且重试任务请求,除非执行器突然被关闭该任务才会被丢弃
④new ThreadPoolExecutor.DiscardPolicy():直接丢弃拒绝的任务

测试程序如下:主要测试四种不同策略,直接上9个任务

public class ThreadPoolExecutorTest {public static void main(String[] args) {ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,//核心线程2个5,//池中最多有5个线程2,//保持连接2秒TimeUnit.SECONDS,//2秒连接new LinkedBlockingDeque<>(3),//链表阻塞队列,容量为3Executors.defaultThreadFactory(),//默认的线程工厂new ThreadPoolExecutor.DiscardPolicy());//分别来测试4种不同的拒绝策略//测试线程try {for (int i = 0; i < 9; i++) {int temp = i;poolExecutor.execute(()->{System.out.println(Thread.currentThread().getName()+" 执行任务"+temp);});//执行线程}} catch (Exception e) {e.printStackTrace();} finally {poolExecutor.shutdown();//关闭线程池}}
}

下面是四种不同策略的使用效果:

  • 使用new ThreadPoolExecutor.AbortPolicy()策略,抛出异常不执行任务。

  • 使用的new ThreadPoolExecutor.CallerRunsPolicy()策略,可看到额外的任务让主线程执行。

  • 使用new ThreadPoolExecutor.DiscardOldestPolicy()策略,会丢弃在延迟队列中呆最旧的任务来执行拒绝策略的任务。

  • 使用new ThreadPoolExecutor.DiscardPolicy()策略,会直接丢弃被拒绝的任务。

注意说明:有时候CPU执行的够快时对于上面9个任务的情况也不会出现拒绝任务。


9.4、CPU密集型与IO密集型

核心是可以分别独立运行程序指令的计算单元。

线程是操作系统能够进行运算调度的最小单位。

CPU密集型

说明:指应用需要非常多的CPU计算资源(如复杂运算,逻辑处理),尽量让每一个CPU核心来去执行任务,不浪费资源,对于密集型的应用完全靠CPU的核数来工作。

线程数设置:一般来说只需要CPU核数的线程即可。

  • 返回可用于Java虚拟机的处理器数:Runtime.getRuntime().availableProcessors()

IO密集型

说明cpu使用率较低,程序中会存在大量I/O操作(如web应用中的大量网络传输、数据库缓存间的交互),一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好之后线程才会继续执行。那么对于这类IO密集型的应用,我们可以多设置一些线程池中的线程数量,这样能够在等待的这段时间中,让别的线程去做其他事,提高并发处理效率。

线程数最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目线程数= CPU核心数/(1-阻塞系数),线程可以设置多一些,例如2核的可以设置20线程。

总结:对于CPU密集型往往处理需要复杂问题,则需要线程数量较少一般根据你电脑的核心数量决定,减少上下文切换资源的消耗;对于IO密集型通常对于CPU处理要求并不高,会有很多等待情况,所以需要越多线程。

  • 线程数设置到ThreadPoolExecutor构造器中的int maximumPoolSize参数中。

十、异步操作

10.1、ForkJoin(并行计算)

认识ForkJoin

使用ForkJoin目的:将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果。

  • 与归并算法中的思想很相像,可以说是同样的思路吧。

特点:工作窃取,其中维护的是双端队列。

执行流程图

  • 通过使用Fork()方法来调用其子任务,Join()方法来获取子任务运算的结果值。
  • 最终使用get()来获取所有子任务运算值的合并。

下面需要使用的类

  • ForkJoinPool:最终执行并行计算。
  • RecursiveTask<v>:需要实现该抽象类,作为参数放置到ForkJoinPoolsubmit()中执行。
  • ForkJoinTask:通过ForkJoinPoolget()获取最终的结果运算值。

源码分析思路

使用ForkJoin的思路:

①首先需要自定义一个类,该类继承RecursiveTask抽象类(递归任务)

public abstract class RecursiveTask<V> extends ForkJoinTask<V> {private static final long serialVersionUID = 5232453952276485270L;//需要去实现计算任务protected abstract V compute();....
}
  • 该方法中可配合使用fork()join()核心方法来执行调用。

    • fork():相当于递归调用计算任务。
    • join():获取计算任务的返回值。

②接着创建ForkJoinPool实例,将之前自定义任务作为参数给到ForkJoinPool的submit()方法中,即会去执行任务。

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {public ForkJoinPool() {this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),defaultForkJoinWorkerThreadFactory, null, false);}//参数为ForkJoinTask,表示可丢入一个递归任务类,返回一个ForkJoinTask类public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {if (task == null)throw new NullPointerException();externalPush(task);return task;}
}

③通过ForkJoinPoolget()方法获取最终的值

public abstract class ForkJoinTask<V> implements Future<V>, Serializable {   //获取返回值public final V get() throws InterruptedException, ExecutionException {int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?doJoin() : externalInterruptibleAwaitDone();Throwable ex;if ((s &= DONE_MASK) == CANCELLED)throw new CancellationException();if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)throw new ExecutionException(ex);return getRawResult();}
}

获取的结果是之前自定义类时的泛型类型。

说明:大师就是大师真的设计的很精妙,暴露出一些接口方法直接让使用者把核心内容填充进去就可以提升性能高效完成任务。


示例(整数累加)

示例:demo见demo7中的ForkJoinTest.java

通过使用ForkJoin来进行1-1000000000的相加:

  • 其中的操作就是递归+归并,与归并算法执行的过程一致只不过这里使用作求和,并且采用进行并行计算。
/*** @ClassName ForkJoinTest* @Author ChangLu* @Date 2021/4/1 17:02* @Description ForkJoin使用(工作窃取特性 , 并行处理)*/
public class ForkJoinTest {//继承递归任务抽象类static class MyForkJoinTask extends RecursiveTask<Long> {private Long start;private Long end;private Long temp = 10000L;public MyForkJoinTask(Long start, Long end) {this.start = start;this.end = end;}@Overrideprotected Long compute() {//最终达到我们预期范围来去真正执行的操作if (end - start < temp) {Long sum = 0L;for (Long i = start; i <= end; i++) {sum += i;}return sum;} else {Long middle = (start + end) / 2;MyForkJoinTask task1 = new MyForkJoinTask(start, middle);task1.fork();//拆解任务MyForkJoinTask task2 = new MyForkJoinTask(middle + 1, end);task2.fork();//拆解任务return task1.join() + task2.join();//将拆解的两个任务相加获取返回值}}}//测试使用ForkJoin来并行归并处理任务public static void main(String[] args) throws ExecutionException, InterruptedException {long start = System.currentTimeMillis();//1、创建ForkJoin池ForkJoinPool forkJoinPool = new ForkJoinPool();//2、需要提交一个递归任务(自定义的递归任务),获取到ForkJoinTask实例ForkJoinTask<Long> submit = forkJoinPool.submit(new MyForkJoinTask(1L, 1000000000L));//3、获取运算结果(中途若是计算量会有阻塞情况)Long sum = submit.get();long end = System.currentTimeMillis();System.out.println("时间为:" + (end - start) / 1000.0 + "秒");System.out.println("sum=" + sum);}

说明forkjoin使用的是并行计算,其中实际上采用了递归+合并的内部操作,并且其中的一个线程一旦完成工作会直接帮另一个线程进行工作(工作窃取),提升性能。

我们来尝试其他两种方式来进行运算操作:

//方式一
//①普通计算(仅仅是主线程执行)
@Test
public void test01() {long start = System.currentTimeMillis();long sum = 0L;for (long i = 1L; i <= 1000000000L; i++) {sum += i;}long end = System.currentTimeMillis();System.out.println("时间为:" + (end - start) / 1000.0 + "秒");System.out.println("sum=" + sum);
}

  • 我去什么竟然还更快了,说明并不是什么场景都使用Forkjoin来进行并行操作才是性能最佳的,因为使用并行操作还需要进行上下文切换,并且其中内部还有更多的细节封装,例如双端队列等等,对于一些场景能够使用并行计算会达到比较好的效果。
//方式二
//②流计算(LongStream)
@Test
public void test02(){long start = System.currentTimeMillis();//①获取操作(rangeClosed()):获取LongPipeline实现类。//②中间操作(parallel()):设置该流为并行流//③结果操作(reduce()):归约,合并操作。属性:起始值  函数式(相隔两个操作数执行的操作)long sum = LongStream.rangeClosed(0L, 1000000000L).parallel().reduce(0, Long::sum);long end = System.currentTimeMillis();System.out.println("时间为:" + (end - start) / 1000.0 + "秒");System.out.println("sum=" + sum);
}

  • 好家伙使用并行流计算更快了!!!一些CPU密集型流操作就适合使用流来操作。

总结:我们可以使用ForkJoin(并行计算)去优化一些实际场景,比如说优化归并排序,其好处是能够并行执行其中的一个个子任务达到提升性能的好处!千万不要什么场景都去使用ForkJoin,否则会适得其反。


10.2、CompletableFuture(异步回调)

介绍CompletableFuture

属于java.util.concurrent并发包下的类

CompletableFuture:JDK1.8版本新引入的一个类,一个CompletableFuture就表示一个任务。其是接口Future的实现类,该类中的很多方法都使用到了JDK1.8出现的函数式接口,如ConsumerSuppilerfunctionpredicate

优点介绍:首先看这名字"未来的完成"与它实际要做的事描述的很相近,使用该类能做executorService配合futures做不了,其能够获取到任务的返回值。该类执行任务可以不返回值也可以返回值可进行设置,并且可以使用thenwhen等等方法来预先设置碰到种种情况对应要做的事情,来防止一些事情的出现。

  • 并且其中的很多方法都是返回的自己本身实例,能够进行链式方法调用!

获取无返回值或有返回值的实例方法:可以看到返回的是CompletableFuture实例

  • static CompletableFuture<Void> runAsync(Runnable runnable):使用runnable接口,无返回值,其泛型默认为Void,即返回值为空。
  • static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier):使用Supplier供给型接口能够返回一个自定义类型的值.

其中的核心方法

  • T get():执行初始设置的任务,并且获取返回值(该过程可能会有阻塞情况),若是无返回值默认为null。
  • CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action):当执行任务完成时做的操作,参数为BiConsumer接口,其中两个参数分别为返回值以及异常信息描述(若无为null)。
  • CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn):当出现异常时执行的操作,其中参数为一个函数式接口,也是带有返回值的(你可以设置当出现异常时返回对应值)。

说明:只有get()方法调用时,任务才会去执行,上面列举的其他操作你都可以看做时做的提前准备如执行的任务、完成任务的操作、出现异常的操作,并且通过get()方法能够获取到对应任务的返回值。


示例(无返回值与有返回值)

无返回值

目的:执行任务(无返回值)并且测试其是否会让主线程进行阻塞。

程序说明:三条打印语句是用来测试其中的线程执行情况,runAsync()会创建一个带有无返回值的任务并创建一个CompletableFuture实例,get()方法被调用时会执行该实例中的方法,并获取返回值(这里为null)。

/*** @ClassName CompletableFutureTest* @Author ChangLu* @Date 2021/4/3 16:54* @Description CompletableFuture(未来完成):无返回值与有返回值示例*/
public class CompletableFutureTest {//无返回值示例:使用runAsync()静态方法@Testpublic void test01() throws ExecutionException, InterruptedException {System.out.println("---1111---");//1、调用runAsync()获取CompletableFuture实例,默认返回值为Void,即为空CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName() + "执行....");});System.out.println("---2222---");//2、get():调用时才执行其中任务,并获取到运行结果(可能出现阻塞)future.get();//执行任务即可,无需返回值(该案例为null)System.out.println("---3333---");}
}

  • 可以看到执行CompletableFuture实例中的任务是由ForkJoinPool线程池中的一条线程来执行的,并且该线程执行时主线程会进行阻塞。

有返回值

目的:执行一个异步任务,当任务成功完成时返回200,任务出现异常返回404,出现异常打印异常信息。

程序说明supplyAsync()创建一个带有返回值的任务的CompletableFuture实例,接着使用whenComplete()、exceptionally()设置在完成任务时、出现异常时做出的操作。

注意:其中13行,来模拟异常,本案例会测试两种情况。

//有返回值的异步回调CompletableFuture.supplyAsync()
@Test
public void test02() throws ExecutionException, InterruptedException {System.out.println("---1111---");//1、使用supplyAsync()方法可以有返回值,可自由设置范围CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+"执行任务.....");int num = 10/0;//模拟异常return 200;//操作正常,返回200});System.out.println("---2222---");//2、给执行的任务未来添加一系列的情况处理,如完成时操作,出现异常情况(可带返回值)//①、whenComplete——当完成任务时的操作Integer resultCode = future.whenComplete((u, v) -> {System.out.println(u);//u:执行完成任务的返回值System.out.println(v);//v:若无异常返回null,若有异常返回异常全限定类名+错误描述}).exceptionally((e) -> {//②、exceptionally():抛出异常时的操作,应当有返回值e.printStackTrace();return 404;//表示执行有异常,返回错误码}).get();//③、get()——执行任务,获取到最终返回值(若案例若是无异常返回200,有异常返回400)System.out.println("获取返回值:"+resultCode);System.out.println("---3333---");}

情况1:无异常返回200

情况2:有异常返回404

  • 注意了注意了这个输出结果可以看出当执行任务时是阻塞的(主线程会进入阻塞等待),对于提前预估任务方法(如whenComplete、exceptionally)的执行线程(依旧是worker-1)与主线程能够进行互相切换执行(并不是同步)。

说明:使用CompletableFuture可以实现异步操作(执行指定任务过程是同步操作)并且可以有返回值以及可以设置一系列未来发生情况!!!


总结

1、ForkJoin用于并行计算,通过递归+合并的方式来将一个大问题分解成一个个子问题并且最终获取其值,其中使用到了fork()与join()方法来进行递归调用以及获取递归调用方法的值。针对于一些场景能够进行性能优化。

2、CompletableFuture是异步方法,通过使用runAsync()创建一个无返回值任务的实例,其实例调用get()方法时才会执行其中的任务,执行任务的线程是其他线程,并且该线程执行时主线程会进行阻塞等待。


十一、CAS

11.1、介绍与引出CAS

介绍CAS

CAS(Compare and swap,比较与交换):其是可以保证线程安全的一种较为高效的方法,要想使用CAS需要有三个数(内存地址V、旧的预期值A、更新的目标值B)。

  • CAS执行时,首先会获取到在内存地址V的值,接着会进行一个方法操作(若是内存地址V的值与预期值A相同,那么将内存地址V的值更改为B,否则就不做),整个过程是无限循环的。
  • 注意:比较与交换的方法操作是一个原子操作

好处:实际调用的是native方法其调用操作系统平台的汇编指令,不用切换线程状态,提高性能,避免用锁造成的性能开销。

缺点:①循环时间开销长。②只能保证一个变量的原子操作。③ABA问题。

  • 针对①:因为CAS中对于比较与交换方法通常是配合无限循环一起使用,若是CAS失败会不断进行尝试,长时间不成功会给CPU带来很大开销。
  • 针对②:对于一个变量执行操作可以使用CAS来保证原子操作,若是对于多个变量则无法直接保证操作的原子性。解决方案1:使用互斥锁来保证原子性;解决方案2:将多个变量封装成对象,使用AtomicReference 保证原子性。

认识了CAS之后,我们来看看什么时候可以使用到CAS以及该如何使用?

引出CAS:demo见demo8中的AtomicIntegerTest.java里的test01()方法

看下面的程序,创建了10个线程,分别对静态变量num执行10000次自增,预期的结果是100000:

/*** @ClassName AtomicIntegerTest* @Author ChangLu* @Date 2021/4/5 16:18* @Description TODO*/
public class AtomicIntegerTest {//案例一:通过i++自增复合操作来引出CAS,本案例是具有线程安全问题的private volatile static int num = 0;@Testpublic void test01() throws InterruptedException {Thread[] threads = new Thread[10];//10个线程分别进行100次自增,预期结果为num=100000for (int i = 0; i < 10; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 10000; j++) {num++;}});threads[i].start();}//让10个线程优先执行for (int i = 0; i < 10; i++) {threads[i].join();}System.out.println(num);//获取最终的num值}
}

问题描述:由于num++其实是复合操作,volatile关键字修饰并不能够保证其原子性,在多线程情况下,由于这个复合操作并不原子性的,最终就会出现值与预期不符的结果!

第一种解决方法:使用synchronizedlock来对这个复合操作设置为同步,同一时间只能有一个线程来执行复合操作,但是由于线程的切换以及锁的获取即释放同样需要很大的性能开销,若是频繁使用则会降低性能!

第二种解决方法:使用CAS,本部分的主题,通过使用CAS来保证线程安全。在Java中提供了并发原子操作类java.util.concurrent.atomic,原本intnum更改为AtomicInteger的实例num1,将原本使用i++执行+1操作的改为实例num1调用getAndIncrement()方法。


11.2、CAS解决变量增值线程安全问题(AtomicInteger)

demo见demo8中的AtomicIntegerTest.java里的test02()方法

使用JDK提供的原子包中的AtomicIntegergetAndIncrement()方法来进行+1操作:

/*** @ClassName AtomicIntegerTest* @Author ChangLu* @Date 2021/4/5 16:18* @Description TODO*/
public class AtomicIntegerTest {//案例二:改进案例一,解决num复合操作带来的线程安全问题,通过使用Atomic原子类private static AtomicInteger num1 = new AtomicInteger();//改进一:使用原子类AtomicInteger@Testpublic void test02() throws InterruptedException {Thread[] threads = new Thread[10];for (int i = 0; i < 10; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 10000; j++) {num1.getAndIncrement();//表示+1  改进二:将num++更改使用getAndIncrement()}});threads[i].start();}for (int i = 0; i < 10; i++) {threads[i].join();}System.out.println(num1.get());}
}

demo见demo8中的AtomicIntegerTest.java里的test03()方法

目的:主要用来测试Unsafe类的本地方法compareAndSwapInt()

程序说明:使用AtomicInteger类中的compareAndSet()来测试Unsafe类的本地方法compareAndSwapInt()

//案例3:测试AtomicInteger中的CAS方法compareAndSet():实际上就是直接调用的Unsafe的本地方法compareAndSwapInt(),下面是解释
//             若是当前值为期望值,那么会将当前值更改为更新值,并返回true;反之当前值不为期望值,更改失败返回false
private static AtomicInteger num2 = new AtomicInteger(1011);
@Test
public void test03(){//期望值为1011,更新值为1012System.out.println(num2.compareAndSet(1011, 1012));System.out.println(num2);//期望值为1011,更新值为1013System.out.println(num2.compareAndSet(1011, 1013));System.out.println(num2);
}


11.3、两个案例的源码分析

针对于test02()案例分析

之前通过使用AtomicInteger中的getAndIncrement()来实现+1操作,看一下其中的源码:

public class AtomicInteger extends Number implements java.io.Serializable {//Java无法直接操作内存,但是Java可以调用native本地方法来执行c++代码(扩展java程序功能),c++可以操作内存,在Unsafe类中包含了大量的native方法,其提供了提供了硬件级别的原子操作!private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }}//执行+1操作并获取+1之后的值public final int getAndIncrement() {//传入三个参数:当前AtomicInteger实例、偏移值、增长值1return unsafe.getAndAddInt(this, valueOffset, 1);//调用了Unsafe类中的静态方法!!!}
}

上面的16行调用了Unsafe类中的静态方法,我们看下面源码:

public final class Unsafe {//Unsafe中的实现方法public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {//通过AtomicInteger实例、偏移值来获取到内存地址V中的值var5 = this.getIntVolatile(var1, var2);} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));//执行本地比较与交换方法,若是内存地址V的值与预期值A相等,就将内存地址V的值更改为var5 + var4(即为num+1,针对于上面案例)return var5;}//本地方法:比较与交换(即为CAS),执行了内存操作,效率很高//var1:AtomicInteger实例    var2:偏移值   var4:获取到的内存地址V中的值  var5:更新值//根据偏移值来确定内存地址public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
}
  • 对于6-9行do while即为自旋锁。

说明:我们可以看到compareAndSwapInt方法最终使用的是本地方法使用的是汇编指令,其中同样也进行了锁定保证CAS操作是一个原子性操作,并且具有内存屏障效果即CAS同时具有volatile读volatile写的内存语义。

  • 面试必问的CAS,你懂了吗? 包含了汇编指令的源码解读!!!

注意点:CAS高效的解决了原子操作问题,针对于调用Unsafe类中的getAndAddInt()其中会有一个dowhile循环判断的操作,一旦比较与交换失败那就会一直进入循环判断操作,造成性能开销大!

针对于test03()案例的源码分析

//AtomicInteger类
//expect为预期值,update为更新值
public final boolean compareAndSet(int expect, int update) {//直接调用的是Unsafe类中的compareAndSwapInt()本地方法return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}//Unsafe类
public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

说明:通过该案例能够测试出其本地方法compareAndSet(CAS操作)的使用。

  1. 若是当前原子类的值与期望值相同,那么会根据指定的内存地址将原子类的值更新为更新值,返回true
  2. 若是当前原子类的值与期望值不同,那么不会执行更新操作,返回false

11.4、原子类中的ABA问题

引出ABA问题

引出ABA问题

首先回顾一下CAS的使用流程(比较与交换):①首先从内存地址V中读出值S。②将值S与预期值A进行比对。③若是相同的话将内存地址V中的值更改为更新值B。

//Unsafe类中的CAS操作
public final class Unsafe {//var1:原子类的实例    var2:偏移值   var4:预期值A  var5:更新值B//通过var1与var2能够获取到内存地址V中值Spublic final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);
}

而对于Unsafe类中需要setadd方法都进行了实现,中间都会有个do while循环操作

  • do代码体中是获取对应原子类实例中的值,获取到之后会不断的进行循环执行上面的CAS本地方法操作。

    • 注意注意:是先获取当前值,之后再CAS判断操作!!!
  • 下面黄色框中都具有do while循环体与上面实现类基本一致!

出现的问题描述:在执行获取了当前值之后,CPU调度到cas操作前,在这个过程中其他线程对其值进行更改接着再改回去,之后执行cas操作时会认为没有改变过。

  • 这种情况除了AtomicStampedReference类都会有这种问题,为啥这么说呢,因为Java是值传递,当出现上面这种情况,传递的引用值可能没变,但是引用对象变了(尤其是其中的值变了,我们会毫无察觉)!

解决方式:版本号机制,通过使用AtomicStampedReference,该类中会有一个Stamp作为版本号,使用该类的话每次进行更新值操作都需要手动增加版本号来防止上面的ABA问题。


问题模拟说明(问题源头)

demo见demo8中的ABAProblemTest.java

实际模拟ABA问题的出现:

import sun.misc.Unsafe;import java.lang.reflect.Field;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;/*** @ClassName ABAProblemTest* @Author ChangLu* @Date 2021/4/5 22:35* @Description 模拟测试AtomicInteger(其他原子类)可能会出现的ABA问题*/
public class ABAProblemTest {//使用AtomicInteger来模拟出ABA问题public static void main(String[] args) {AtomicInteger atomicInteger = new AtomicInteger(10);//线程A模拟ABA问题情况new Thread(()->{//在线程B睡眠过程中执行交换操作atomicInteger.compareAndSet(10,22);atomicInteger.compareAndSet(22,10);//ABA问题就出现在这里,当上面换完之后,引用依旧不会变,再执行下面41行时不会检测到其已经进行了更新操作},"A").start();//线程B进行主要核心替换操作new Thread(()->{do{try {//模拟Unsafe中的获取值//报出异常:java.lang.SecurityException: Unsafe,无法通过自定义类来获取到该实例Integer i = Unsafe.getUnsafe().getIntVolatile(atomicInteger,getUnsafeValueOffset());System.out.println(i);TimeUnit.SECONDS.sleep(4);} catch (Exception e) {e.printStackTrace();}}while (!atomicInteger.compareAndSet(10,66));//重要!经过上面的交换操作本句代码依旧会正常执行!替换为66System.out.println(atomicInteger);},"B").start();}//通过反射来获取到Unsafe类中的valueOffset值public static long getUnsafeValueOffset() throws IllegalAccessException, NoSuchFieldException {Field field = Unsafe.class.getDeclaredField("valueOffset");field.setAccessible(true);return (long) field.get(Unsafe.getUnsafe());}
}

注意:上面代码不能够执行会报错主要理解想要表达的问题描述即可!!!

重点需要知道的一个点是:一般对应一些基本类型对应的原子包装类几乎不会产生啥影响改变不就不改变了嘛就是一个值,而对于引用类型的话就不一样了,由于Java是值传递,若是更改了其中引用对象的值,对应引用依旧不会变指的是同一个对象地址,问题来了,这时候若是直接进行cas操作会判断为同一个引用就会进行替换操作!!!


解决ABA问题(版本号机制,使用AtomicStampedReference)

前面说到对应原子引用类若是出现ABA问题的话可能会有不好的影响,如何解决呢?JDK中提供了AtomicStampedReference类,该类是一个原子标记引用类。

  • 其中来通过一个标记来实时判断当前是否要进行更改操作,其为版本号机制。

demo见demo8中的SolveABAProblemTest.java

程序描述:线程A来模拟出现ABA问题,线程B中最先获取到其中的版本号,接着睡眠来让线程A完成模拟操作,接着来测试通过使用AtomicStampedReference中的版本号来解决ABA问题:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicStampedReference;/*** @ClassName SolveABAProblemTest* @Author ChangLu* @Date 2021/4/5 23:14* @Description 通过使用AtomicStampedReference(版本号机制),利用其标记机制来放置出现ABA问题!*/
public class SolveABAProblemTest {//设置泛型为Integer,参数设置初始值为1,标记为1private static AtomicStampedReference<Integer> sr = new AtomicStampedReference<>(1,1);public static void main(String[] args) {//线程A模拟ABA问题new Thread(()->{try {TimeUnit.SECONDS.sleep(1);//睡一秒保证线程B先获取到标记} catch (InterruptedException e) {e.printStackTrace();}System.out.println("标记为"+sr.getStamp());//模拟ABA问题,从1->11  11->1boolean b1 = sr.compareAndSet(1, 11, sr.getStamp(), sr.getStamp() + 1);System.out.println(Thread.currentThread().getName()+"=>标记为"+sr.getStamp()+",当前值为"+sr.getReference()+","+b1);boolean b2 = sr.compareAndSet(11, 1, sr.getStamp(), sr.getStamp() + 1);System.out.println(Thread.currentThread().getName()+"=>标记为"+sr.getStamp()+",当前值为"+sr.getReference()+","+b1);},"A").start();//线程B,主要测试AtomicStampedReference通过标记是否有效解决ABA问题new Thread(() -> {//获取当前标记int stamp = sr.getStamp();//睡眠一会让线程A模拟ABA问题try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {e.printStackTrace();}//看一下是否能够预防ABA问题出现!注意这里的stamp使用的是之前获取到的标记号//若是true则表示更改成功,false表示更改失败System.out.println(sr.compareAndSet(1, 66, stamp, stamp + 1));System.out.println(Thread.currentThread().getName()+"=>标记为"+sr.getStamp()+",值为"+sr.getReference());}, "B").start();}
}

注意:一定要确保拿到版本号操作是在ABA操作之前(上面程序通过在线程A中加入睡眠保证线程B的操作),避免出现线程执行顺序的问题。

说明:我们可以通过使用AtomicStampedReference其中的版本号机制来防止ABA问题的出现!!!


总结

1、CAS的含义就是比较与交换,其是保证线程安全的一种高效方法比使用synchronziedlock锁性能更高,因为CAS方法实际上调用的是本地C++代码,其中包含了汇编指令,主要就是保证CAS是一个原子性操作以及具有内存屏障效果(Volatile读、写的内存语义)。

2、对于一些基本类型进行自增(复合操作)对于在多线程下是不安全的,在Java中提供了一个原子包,其中包含了对应不同的原子包装类型及引用类型,将原本的自增操作更改为对应的调用方法如+1操作的getAndIncrement()方法。

  • 一定要知道cas操作指定的是Unsafe类中的compareAndSwapInt()本地方法,因为在对应的原子类中的部分方法并不是直接调的该cas方法,而是调用如下方法:
    • 这些实现方法都是先获取到对应原子类的实例,接着再进行cas操作,并且对于cas没有成功的操作会不断循环直至成功,因为其中有个do while操作,对于失败情况则会大大降低性能,也是它的一大缺点。

3、对于普通的原子类(包装类、引用类)都会有ABA问题,即在获取原子类实例后,CPU调度到cas操作前,其他线程将原子类值A改为B,B又改为A后,此时真正执行cas操作时会依旧认为是原本的引用从而进行比较与交换。

  • 解决方式:使用版本号机制,即使用AtomicStampedReference中的版本号判断,在每次进行更新操作时增加版本号值,将版本号值作为判断的依据看是否要进行比较与交换。

十二、死锁问题及排查

12.1、死锁问题还原

死锁描述:若是在线程A获取到了锁A,线程B获取到了锁B,紧接着线程A、B中还有获取锁的操作锁B、锁A的操作,此时就很有可能出现死锁情况,即两个线程都要阻塞等待对应线程释放自己所需要的锁,造成死锁问题。

demo见demo9目录中的DeadLockProblemTest.java

import java.util.concurrent.TimeUnit;/*** @ClassName DeadLockProblemTest* @Author ChangLu* @Date 2021/4/6 11:41* @Description 死锁案例,主要用于死锁排查测试*/
public class DeadLockProblemTest implements Runnable {private String A;private String B;public DeadLockProblemTest(String a, String b) {A = a;B = b;}@Overridepublic void run() {synchronized (A){System.out.println(Thread.currentThread().getName()+"获取到了锁:"+A);//进行延时确保出现死锁问题try {TimeUnit.SECONDS.sleep(2);} catch (InterruptedException e) {e.printStackTrace();}synchronized (B){System.out.println(Thread.currentThread().getName()+"获取到了锁:"+A);}}}public static void main(String[] args) {String A = "A";String B = "B";new Thread(new DeadLockProblemTest(A,B)).start();new Thread(new DeadLockProblemTest(B,A)).start();}}

  • 可以看到两个线程都会在阻塞等待中,造成死锁问题。

两种方式

方式一:通过使用jpsjstack命令(查看堆栈信息),排查出死锁问题。

方式二:直接使用jconsole图形化界面查看死锁问题!

方式一

通过jdk自带的工具类来进行排查:

  1. jps(Java Virtual Machine Process Status Tool):显示当前系统的java进程情况及进程id,一般用来在linux/unix平台上简单查看当前java进程的一些简单情况。
  2. jstack:查看指定pid进程的堆栈工作工具。

接着我们来使用这两个命令来排查上面程序的死锁问题(保证程序执行使用下面命令):

(1)、jps -l 查看所有的java进程及进程id

(1)、jstack pid进程号:查看指定进程号的堆栈信息

说明:通过该工具能够查看是否具有死锁问题!


参考资料

[1]. 并发容器之CopyOnWriteArrayList

[2]. 并发容器之ConcurrentHashMap

[3]. ConcurrentModificationException异常原因和解决方法

[4]. FutureTask的用法及两种常用的使用场景 实际使用的场景

[5]. jdk1.8 探讨FutureTask的两个问题 只执行1次与get()获取返回值阻塞问题

[6]. CountDownLatch的两种使用场景 包含源码解析、使用场景及与join区别

[7]. CountDownLatch踩过的坑 解决dubbo线程池满的线上问题

[8]. 深入理解CyclicBarrier原理 包含源码分析

[9]. 理解Semaphore及其用法详解

[10]. 读写锁ReadWriteLock的实现原理 读写锁详细介绍,含源码

[11]. java并发之SynchronousQueue实现原理

[12]. Executors的四种线程池 简单介绍原理、应用场景及执行流程

[13]. 线程池使用:CPU密集型和IO密集型 介绍了两种类型设置线程的数量,根据应用情况来定,其中还介绍了一些场景分析

[14]. 【并发编程】IO密集型和CPU密集型任务 两种类型描述的比较清晰

[15]. Fork/Join框架基本使用 包含原理分析、执行过程,fork()与join()方法,两个例子说明,包含一个使用ForkJoin来解决归并排序问题性能更高(并行处理)

[16]. JDK1.8新特性CompletableFuture总结

[17]. 面试必问的CAS,你懂了吗? 包含源码分析最主要的是其中的汇编源码分析,三个缺点的详细说明

[18]. Java并发之CAS理解 描述简洁,也推荐看

[19]. 悲观锁 & 乐观锁的原理及应用场景 两种锁的介绍说明及应用场景

[20]. 自旋锁—百度百科

[21]. 缓存一致性协议(MESI协议) 介绍了CPU缓存一致性协议


我是长路,感谢你的耐心阅读,如有问题请指出,我会听取建议并进行修正。
欢迎关注我的公众号:长路Java,其中会包含软件安装等其他一些资料,包含一些视频教程以及学习路径分享。
编程学习qq群:891507813 我们可以一起探讨学习
注明:转载可,需要附带上文章链接

JUC快速入门各个知识点汇总相关推荐

  1. java--JUC快速入门(彻底搞懂JUC)

    java–JUC快速入门(彻底搞懂JUC) 文章目录 java--JUC快速入门(彻底搞懂JUC) 1.学习多线程之前需要知道的一些概念. 2.JUC的结构 3.Lock锁(重点) 4.集合类不安全 ...

  2. 带你快速入门AXI4总线--AXI4-Stream篇(1)----AXI4-Stream总线

    写在前面 随着对XILINX器件使用的深入,发现越来越多的IP都选配了AXI4的接口.这使得只要学会了AXI4总线的使用,基本上就能对XILINX IP的使用做到简单的上手.所以学会AXI4总线,对X ...

  3. 带你快速入门AXI4总线--AXI4-Full篇(3)----XILINX AXI4-Full接口IP源码仿真分析(Master接口)

    写在前面 接slave接口篇,本文继续打包一个AXI4-Full-Master接口的IP,学习下源码,再仿真看看波形. 带你快速入门AXI4总线--AXI4-Full篇(2)----XILINX AX ...

  4. 带你快速入门AXI4总线--AXI4-Full篇(1)----AXI4-Full总线

    写在前面 AXI4系列链接:带你快速入门AXI4总线--汇总篇(直达链接) 1.什么是AXI4-Full? AXI 表示 Advanced eXtensible Interface(高级可扩展接口), ...

  5. 树莓派从零开始快速入门系列汇总

    树莓派从零开始快速入门系列汇总 树莓派从零开始快速入门第0讲--环境安装 树莓派从零开始快速入门第1讲--命令行 树莓派从零开始快速入门第2讲--更换国内源 树莓派从零开始快速入门第3讲--文件编辑 ...

  6. C# 零基础入门知识点汇总

    C# 零基础入门 知识点汇总 前言 一,基础语法(1~10) 二,流程控制(11~20) 三,数组相关(21~30) 四,函数介绍(31~40) 五,类和对象(41~50) 六,面向对象(51~60) ...

  7. redis原理快速入门知识点总结

    redis原理快速入门知识点总结 1. 项目中缓存是如何使用的?为什么要用缓存?缓存使用不当会造成什么后果? 为什么用缓存? 1.高性能: 一些需要复杂操作耗时查出来的结果,且确定后面不怎么变化,但是 ...

  8. sklearn快速入门教程:补充内容 -- sklearn模型评价指标汇总(聚类、分类、回归)

    sklearn集成了大多数模型评价指标,这可以很大程度上方便我们的使用,尤其在对进行进行自动调参时可以方便我们进行选择. 做下这个笔记主要是为了补充之前的内容:sklearn快速入门教程:(四)模型自 ...

  9. matlab撤销上一步命令_CAD快速入门技巧:CAD软件中撤销操作的方法汇总

    在使用浩辰CAD软件画图的过程中都难免会误操作,因此CAD与WORD.EXCEL等其他软件一样,在误操作后可以通过"放弃"来取消刚进行的操作,Windows软件的常规操作也适用于C ...

最新文章

  1. android如何设置默认关闭虚拟按钮,android – Chrome自定义标签更改默认关闭按钮不起作用...
  2. Spring----自定义异常类
  3. BLE 数据包格式解析
  4. 微信小程序 setData动态设置数组中的数据
  5. 轻量级NuGet—BaGet
  6. 阿里云服务器部署django项目
  7. C语言课后习题(15)
  8. 【英语学习】【Daily English】U07 Restaurant L02 I don't think this is what I ordered?
  9. 带你全面掌握高级知识点!java修改map中的value
  10. 从门户到搜索:谁为百度打工?
  11. OpenCasCade在一个窗体中的两个picture控件中 分别显示
  12. python爬虫获取数据失败请稍后访问_Python爬取微博评论数据,竟被反爬封号了!...
  13. 立创eda入门-原理图,PCB制作
  14. 使用wxjava实现发表内容、预览信息以及推送文章
  15. idea运行项目流程
  16. 如何通过js改变css样式,如何通过JS 动态改变CSS样式
  17. Vs code快捷键及常用插件
  18. JAVA-SSH2:JSch试用
  19. error creating overlay mount to /var/lib/docker/overlay2
  20. Python调用,爬虫JS混淆——数据加密获取步骤和方法

热门文章

  1. 深度linux运行wine,如何在Deepin深度桌面环境下直接运行exe安装Wine
  2. Nginx (5):nginx URLRewrite伪静态配置
  3. 神经网络--基于mnist数据集取得最高的识别准确率
  4. LSTM+word2vec电影情感分析
  5. JAVA WEB DAY 11_ JDBC 连接池
  6. gmpy2常见函数使用
  7. bzoj5082 弗拉格 矩阵乘法
  8. 【元胞自动机】基于matlab元胞自动机模拟交通路况(含超车)【含Matlab源码 2389期】
  9. sitemesh的使用---修饰器
  10. 0基础学习数据分析师,这样做人人都是数据分析师