Java并发编程

  • 一.线程基础
    • 1.进程和线程
    • 2.创建线程
    • 3.线程状态
    • 4.线程停止
    • 5.守护线程
  • 二.并发编程
    • 1.线程同步
    • 2.死锁
    • 3.ReentrantLock
    • 4.Condition
    • 5.JUC包的集合
    • 6.并发辅助类
    • 7.ReadWriteLock
    • 8.BlockingQueue
    • 9.线程池
    • 10.函数式接口
    • 11.异步回调 CompletableFuture使用
    • 12.JMM
    • 13.CAS
    • 14.CAS的ABA问题
    • 15.AQS

本文参考了许多文章,并标注了出处,如果有侵权,请及时告知

一.线程基础

1.进程和线程

进程是执行程序的一次执行,是一个动态的概念。是系统资源分配的单位

线程是CPU调度和执行的单位,一个进程可以包含多个线程,至少有一个线程

Java线程的状态:

1.初始(NEW):尚未启动的线程的状态

2.运行(RUNNABLE):可运行线程的线程状态。处于可运行状态的线程正在Java虚拟机中执行,但它可能正在等待来自操作系统的其他资源,比如处理器,就绪状态和运行中都是RUNNABLE

3.阻塞(BLOCKED):等待监视器锁(synchronized)的阻塞线程的线程状态。处于阻塞状态的线程正在等待监视器锁进入同步块/方法,或者在调用Object.wait后重新进入同步块/方法

4.等待(WAITING):等待线程的线程状态。线程由于调用以下方法之一而处于等待状态:

  • Object.wait with no timeout
  • Thread.join with no timeout
  • LockSupport.park
  • Condition.await

处于等待状态的线程正在等待另一个线程执行特定的操作。例如,在一个对象上调用object. wait()的线程正在等待另一个线程在该对象上调用object. notify()或object. notifyall()。调用了thread .join()的线程正在等待指定的线程结束

5.超时等待(TIMED_WAITING):具有指定等待时间的等待线程的状态。由于调用了以下方法中的一个,并且指定了一个正的等待时间,线程处于计时等待状态:

  • Thread.sleep
  • Object.wait with timeout
  • Thread.join with timeout
  • LockSupport.parkNanos
  • LockSupport.parkUntil

6.终止(TERMINATED):终止线程的线程状态。线程已完成执行

可以通过getState()方法获取线程的当前状态

2.创建线程

java创建线程的三种方式

  • 继承Thread类
  • 实现Runnable接口
  • 实现Callable接口

2.1 继承Thread类
继承Thread类,重写run方法,创建线程对象,调用start

// 创建线程方式一:继承Thread类,重写run方法,创建线程对象,调用start
public class TestThread01 extends Thread{@Overridepublic void run() {for (int i = 0; i < 20; i++) {System.out.println("继承Thread线程"+"-"+i);}}public static void main(String[] args) {TestThread01 testThread01 = new TestThread01();testThread01.start();for (int i = 0; i < 20; i++) {System.out.println("main主线程"+"-"+i);}}
}

2.2 实现Runnable接口
实现runnable接口,重写run方法,创建线程对象,对象丢入Thread类,调用start方法

public class TestThread02 implements Runnable{// 重写run方法@Overridepublic void run() {for (int i = 0; i < 20; i++) {System.out.println("实现runnable接口-"+i);}}public static void main(String[] args) {// 创建线程对象TestThread02 testThread02 = new TestThread02();// 对象丢入Thread类,代理Thread thread = new Thread(testThread02);// 调用start方法thread.start();for (int i = 0; i < 20; i++) {System.out.println("main主线程-"+i);}}
}

2.3 Lambda 表达式

public class ThreadTest {public static void main(String[] args) {Thread thread = new Thread(()->{System.out.println("一个线程"+Thread.currentThread().getName());});thread.start();System.out.println("我是main线程");}
}

3.线程状态

阻塞:当一个线程试图获取对象锁(非java.util.concurrent库中的锁,即synchronized),而该锁被其他线程持有,则该线程进入阻塞状态。不需要由另一个线程来显式唤醒自己,不响应中断,处于阻塞状态的线程正在等待监视器锁进入同步块/方法,或者在调用Object.wait后重新进入同步块/方法

等待:当一个线程等待另一个线程通知调度器一个条件时,该线程进入等待状态。它的特点是需要等待另一个线程显式地唤醒自己,实现灵活,语义更丰富,可响应中断。例如调用:Object.wait()、Thread.join()、sleep()

3.1 sleep()

sleep(毫秒),指定以毫秒为单位的时间,使线程在该时间内进入超时等待状态,期间得不到CPU的时间片,等到时间过去,线程重新进入就绪状态。注意sleep是暂停线程,它不会释放锁,也就是说如果当前线程持有对某个对象的锁,则即使调用sleep方法,其他线程也无法访问这个对象

3.2 wait()和notify()

wait() 和 notify() 方法

两个方法搭配使用,wait()使线程进入等待状态,当调用notify()唤醒时,线程进入就绪状态。wait()内可加或不加参数,加参数时是以毫秒为单位,当到了指定时间或调用notify()方法时,进入可执行状态。属于Object类,而不属于Thread类

wait()会先释放锁住的对象,然后再执行等待的动作。由于wait()所等待的对象必须先锁住,因此,它只能用在同步化程序段或者同步化方法内,否则,会抛出异常IllegalMonitorStateException

面试题:如何证明sleep不会释放锁,而wait会释放锁?

sleep测试

public class SleepTest {static Marry marry = new Marry();public static void main(String[] args) throws InterruptedException {new Thread(()->{  // 一个争夺锁的线程synchronized (marry){System.out.println("before sleep");marry.happy();try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("after sleep");}}).start();new Thread(()->{  // 另外一个争夺锁的线程synchronized (marry){System.out.println("before test");marry.happy();System.out.println("after test");}}).start();}}class Marry{public void happy(){System.out.println("Marry Me!!!");}
}

无论执行多少次,输出结果总是如此,这说明调用sleep()时不会释放锁

wait()测试

public class SleepTest {public static Marry marry = new Marry();public static void main(String[] args) throws InterruptedException {new Thread(()->{  // 一个争夺锁的线程synchronized (marry){System.out.println("before wait()");marry.happy();try {marry.wait();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("after wait()");}}).start();new Thread(()->{  // 另外一个争夺锁的线程synchronized (marry){System.out.println("before notify()");marry.happy();marry.notify();System.out.println("after notify()");}}).start();}}class Marry{public void happy(){System.out.println(Thread.currentThread().getName()+"Marry Me!!!");}
}

wait()会释放锁,然后线程Thread-1拿到了对象的锁,随后进行notify()唤醒,notify()或者notifyAll()调用时并不会真正释放对象锁,必须等到synchronized方法或者语法块执行完才真正释放锁,执行结果也可以证明这一点!

面试题:为什么wait()和notify()要在同步代码块中使用?

wait是让使用wait方法的对象等待,暂时先把对象锁给让出来,给其它持有该锁的对象用,其它对象用完后再告知(notify)等待的那个对象可以继续执行了,因此,只有在synchronized块中才有意义,否则等待-通知将无意义,Java也给了我们硬性规定,不这样用会抛出异常

3.3 join()

我们可以认为join()方法让其他线程插队,强制执行其他线程。join()方法只会使主线程(或者说调用t.join()的线程)进入等待池处于等待状态并等待t线程执行完毕后才会进入就绪状态,并不影响同一时刻处在运行状态的其他线程

public class TestJoin {public static void main(String[] args) throws InterruptedException {Thread thread = new Thread(()->{for (int i = 0; i < 200; i++) {System.out.println("我是thread线程"+i);}});thread.start();for (int i = 0; i < 20; i++) {if(i == 10){thread.join();}System.out.println("我是main线程"+i);}}
}

可以看到主线程一直等待thread线程结束才继续执行

3.4 线程礼让

礼让线程,当前线程释放CPU资源,使当前正在执行的线程变为就绪状态,并非阻塞,CPU重新根据规则调度线程,所以礼让不一定成功,它跟sleep方法类似,同样不会释放锁

// 两个线程开启
public class TestYield {public static void main(String[] args) {threadStart("A");threadStart("B");}public static void threadStart(String name){Thread thread = new Thread(()->{System.out.println(Thread.currentThread().getName()+"线程开始执行了");Thread.yield(); // 线程会礼让System.out.println(Thread.currentThread().getName()+"线程停止执行");},name);thread.start();}
}

4.线程停止

4.1 使用退出标志位

volatile(不稳定的):每次访问时,都从内存中重新读取该值,若改变了该值,会被写回内存

public class TestThread02 implements Runnable{private volatile boolean exit = false; //标志位// 重写run方法@Overridepublic void run() {while(!exit){System.out.println("run.....");}}public static void main(String[] args) throws InterruptedException {// 创建线程对象TestThread02 testThread02 = new TestThread02();// 对象丢入Thread类,代理Thread thread = new Thread(testThread02);// 调用start方法开启线程thread.start();// 主线程等待5000ms让线程运行一段时间Thread.sleep(5000);// 修改标志位testThread02.exit = true;}
}

4.2 interrupt()方式停止

Thread.interrupt()的作用是通知线程应该中断了,到底中断还是继续运行,应该由被通知的线程自己处理

当对一个线程,调用 interrupt() 时

1.如果线程处于sleep, wait, join 等状态,那么线程将立即退出状态,并抛出一个InterruptedException异常
2. 如果是synchronized状态,那么是不可中断的,他们会忽略

3.lock()即可以响应中断、也可以不响应中断

4.park()可以响应中断,但响应了还是会继续等待,不会中断运行,所以从宏观角度来看,是不响应中断的

5.如果线程处于正常活动状态,那么会将该线程的中断标志设置为true,被设置中断标志的线程将继续正常运行,不受影响

isInterrupted()和interrupted() 都是返回值为boolean类型的方法

1)isInterrupted()

此方法只会读取线程的中断标志位,检测调用该方法的线程是否被中断,中断返回true,并不会重置true为false

2)interrupted()

此方法读取线程的中断标志位,中断返回true,会重置true为中断标志位为false

5.守护线程

线程分为用户线程守护线程

虚拟机必须确保用户线程执行完毕,不用等待守护线程执行完毕

public class ThreadTest{public static void main(String[] args) {// 创建用户线程Thread master = new Thread(new Master());// 创建守护线程Thread daemon = new Thread(new Daemon());daemon.setDaemon(true); // 关键,将该线程设置为守护线程master.start();daemon.start();}}
// 用户线程
class Master implements Runnable{@Overridepublic void run() {for (int i = 1; i <= 5; i++) {System.out.println("今年你"+i+"岁");}}
}
// 守护线程
class Daemon implements Runnable{@Overridepublic void run() {while (true){System.out.println("我会一直守护你");}}
}

可以看到,守护线程不会一直执行,虚拟机只会关心用户线程是否完成!

二.并发编程

1.线程同步

当有一个线程在对内存进行操作时,其他线程都不可以对这个内存地址进行操作,直到该线程完成操作, 其他线程才能对该内存地址进行操作,而其他线程又处于等待状态。“同”字应是指协同、协助、互相配合,所以并发编程的核心问题之一就是如何确保线程同步!

线程不同步实例

public class UnsafeThread {public static void main(String[] args) {Unsafe unsafe = new Unsafe(5000);Thread thread = new Thread(unsafe);Thread thread02 = new Thread(unsafe);thread.start();thread02.start();}
}class Unsafe implements Runnable{private int money = 8000;private int get;Unsafe(int get){this.get = get;}@Overridepublic void run() {if(money - get < 0){System.out.println("余额不足");return;}try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}money = money - get;System.out.println(Thread.currentThread().getName()+"取走了5000"+"余额为:" + money);}
}

一共8000元,两个线程却分别取走了5000元,线程不同步会出现严重的问题!

synchronized同步方法及同步代码块

synchronized底层原理看这篇文章
https://blog.51cto.com/c959c/5332533

同步方法:public synchronized void method(args){}

说明:synchronized通过锁控制对方法的访问,访问该方法需要获得对应的锁,并且是独占该锁,直到方法返回才释放锁,其他想要获得锁的线程会被阻塞,而后在争夺到锁资源后恢复为RUNNABLE状态,但是这个过程涉及操作系统用户模式和内核模式的切换,代价其实挺高的


可以看到加锁后保证了线程同步,但缺点也是显而易见的,加锁会严重影响效率

同步代码块:synchronized(Obj) {}

Obj称为同步监视器,Obj可以是任何对象,但是最好是共享资源

同步监视器的执行过程

1.A线程访问,锁定同步监视器,执行代码
2.B线程想要访问,发现同步监视器被锁定,等待
3.A访问结束,解锁
4.B获得锁,锁定并访问

synchronized修饰的静态方法,当调用者访问该方法,锁的是Class文件,也就是类锁,对所有实例对象都上锁。当synchronized修饰普通方法,当调用者访问该方法,锁的是当前的类,也就是类锁

2.死锁

多个线程各自占有一些共享资源,并且互相等待其他线程占有的资源,而导致两个或者多个线程都在等待对方释放资源,都停止执行的情况

public class DeadLock {public static void main(String[] args) {new Shoes(true,"张三").start();new Shoes(false,"李四").start();}
}
// 右鞋
class RightShoe{}
// 左鞋
class LeftShoe{}
class Shoes extends Thread{private boolean flag;private String name;// 保证左右鞋各有一只static RightShoe rightShoe = new RightShoe();static LeftShoe leftShoe = new LeftShoe();Shoes(boolean flag,String name){this.flag = flag;this.name = name;}@Overridepublic void run() {if(flag == true){ // flag为true先拿右鞋synchronized (rightShoe){ // 锁定右鞋System.out.println(name+"拿到了右鞋");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (leftShoe){System.out.println(name+"拿到了左鞋");}}}else{synchronized (leftShoe){System.out.println(name+"拿到了左鞋");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}synchronized (rightShoe){System.out.println(name+"拿到了右鞋");}}}}
}

3.ReentrantLock

参考文章:https://blog.csdn.net/qq_38737992/article/details/89607758

  • Lock是JUC包下一个接口,实现类:ReentrantLock(重入锁), ReentrantReadWriteLock.ReadLock,ReentrantReadWriteLock.WriteLock
  • Lock锁,可以得到和 synchronized一样的效果,即实现原子性、有序性和可见性。相较于synchronized,Lock锁手动获取锁和释放锁、可中断的获取锁、超时获取锁,Synchronized是自动上锁、解锁
  • Synchronized和Lock锁都是可重入的(获取锁后再进入相关代码块无需再申请锁),都是非公平锁(多个线程去获取锁的时候,会直接去尝试获取,获取不到,再去进入等待队列,如果能获取到,就直接获取到锁),区别于公平锁(多个线程按照申请锁的顺序去获得锁,线程会直接进入队列去排队,永远都是队列的第一位才能得到锁)
  • Synchronized适合少量的同步代码,而Lock锁适合大量的同步代码

使用Lock锁(try-catch-finally)

 Lock l = ...;l.lock(); // 上锁try {// access the resource protected by this lock} finally {l.unlock(); // 解锁}

Lock锁常用API

public interface Lock {void lock(); // 获得锁// 获得锁,可中断// 两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有在等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程void lockInterruptibly() throws InterruptedException;// 锁在空闲的才能获取锁(未获得锁不会等待)。举个例子:当两个线程同时通过lock.trylock()想获取某个锁时,假若此时线程A获取到了锁,而线程B不会等待,直接放弃获取锁boolean tryLock();// 释放锁void unlock();}

Lock锁代码演示

public class LockTest {public static void main(String[] args) {TicketSale ticketSale = new TicketSale();// 三个线程共享资源new Thread(()->{for (int i = 0; i < 200; i++) {ticketSale.sale();}},"张三").start();new Thread(()->{for (int i = 0; i < 200; i++) {ticketSale.sale();}},"李四").start();new Thread(()->{for (int i = 0; i < 200; i++) {ticketSale.sale();}},"王五").start();}
}
class TicketSale{private int number = 200;// 只有200张票Lock lock = new ReentrantLock();// 创建锁public void sale(){lock.lock(); // 上锁try{ // 业务while(number >0){System.out.println(Thread.currentThread().getName()+"拿到了第"+number--+"张票");}}catch (Exception e){e.printStackTrace();}finally { // 释放锁lock.unlock();}}}

解决线程安全问题使用ReentrantLock就可以,但是ReentrantLock是独占锁,某时只有一个线程可以获取该锁,而实际中会有写少读多的场景,显然ReentrantLock满足不了这个需求,所以ReentrantReadWriteLock应运而生

ReentrantReadWriteLock采用读写分离的策略,读锁能同时被多个线程持有,而写锁是独占锁同一时刻只能有一个线程持有

获取读写锁的前提条件

线程获取读锁的前提条件

没有写锁

线程获取写锁的前提条件

没有其他线程的读锁(向获取写锁的线程可以持有读锁)
没有其他线程的写锁

锁降级:线程获取写入锁后可以获取读取锁,然后释放写入锁,这样就从写入锁变成了读取锁,从而实现锁降级特性

4.Condition

以生产者消费者模型为例 (判断等待-业务-通知)

synchronized生产者消费者

// 我们实现生产者消费者,当数字为0,生产者就要+1,否则等待消费者消费
// 当数字不为0,消费者消费,否则就要等待生产者+1
public class A {public static void main(String[] args) {Data data = new Data();new Thread(()->{try {for (int i = 0; i < 10; i++) {data.incement();}} catch (InterruptedException e) {e.printStackTrace();}},"线程生产者A").start();new Thread(()->{try {data.decemnet();} catch (InterruptedException e) {e.printStackTrace();}},"线程消费者B").start();new Thread(()->{try {for (int i = 0; i < 10; i++) {data.incement();}} catch (InterruptedException e) {e.printStackTrace();}},"线程生产者C").start();new Thread(()->{try {for (int i = 0; i < 10; i++) {data.decemnet();}} catch (InterruptedException e) {e.printStackTrace();}},"线程消费者D").start();}}
class Data{private int number; // 数字public synchronized void incement() throws InterruptedException {while(number != 0){ // 要用while,if只会判断一次,官方已经说明this.wait();}number++;System.out.println(Thread.currentThread().getName()+"+1工作完毕");this.notifyAll();}public synchronized void decemnet() throws InterruptedException {while (number == 0){this.wait();}number--;System.out.println(Thread.currentThread().getName()+"-1工作完毕");this.notifyAll();}
}


虚假唤醒:线程可以唤醒,而不会被通知,中断或者超时,即所谓的虚假唤醒。所以等待应该总是出现在while循环中。多线程环境下,有多个线程执行了wait()方法,需要其他线程执行notify()或者notifyAll()方法去唤醒它们,假如多个线程都被唤醒了,但是只有其中一部分是有用的唤醒操作,其余的唤醒都是无用功;对于不应该被唤醒的线程而言,便是虚假唤醒

面试题:如何应对虚假唤醒问题

同步代码块中的等待判断条件用while而不用if,因为线程被唤醒后,执行开始的地方是wait()之后,而if不会判断条件,while会重新判断条件,while保证了逻辑的正常

notify()和notifyAll()区别

调用notify时,只有一个等待线程会被唤醒而且它不能保证哪个线程会被唤醒,这取决于线程调度器。虽然如果你调用notifyAll方法,那么等待该锁的所有线程都会被唤醒,但是在执行剩余的代码之前,所有被唤醒的线程都将争夺锁定,这就是为什么在循环上调用wait,因为如果多个线程被唤醒,那么线程是将获得锁定将首先执行,它可能会重置等待条件,这将迫使后续线程等待。因此,notify和notifyAll之间的关键区别在于notify()只会唤醒一个线程,而notifyAll方法将唤醒所有线程,wait()和notify()只能用在同步化程序段或者同步化方法

Condition精确通知

上述生产者消费者模式中,我们可以看到A生产者可以通知B,D两个消费者

我们可以使用Lock锁和Condition实现精确通知唤醒

lock锁用来锁定,condition用来实现等待和唤醒

public class LockTest {public static void main(String[] args) {DataSource dataSource = new DataSource();new Thread(()->{for (int i = 0; i < 10; i++) {try {dataSource.incement();} catch (InterruptedException e) {e.printStackTrace();}}},"生产者A").start();new Thread(()->{for (int i = 0; i < 10; i++) {try {dataSource.decemnet();} catch (InterruptedException e) {e.printStackTrace();}}},"消费者B").start();}
}
class DataSource{private int number; // 数字Lock lock = new ReentrantLock(); // 创建锁Condition condition = lock.newCondition(); // 通过lock创建Condition对象Condition condition2 = lock.newCondition();public void incement() throws InterruptedException {lock.lock();// 上锁try {while(number != 0){ // 要用while,if只会判断一次,官方已经说明condition.await(); // 阻塞等待}number++;System.out.println(Thread.currentThread().getName()+"+1工作完毕");condition2.signal(); // 通知condition2,也就是消费者} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock(); // 释放锁}}public void decemnet() throws InterruptedException {lock.lock();try {while (number == 0){condition2.await(); // 消费者阻塞等待}number--;System.out.println(Thread.currentThread().getName()+"-1工作完毕");condition.signal(); // 通知生产者} catch (InterruptedException e) {e.printStackTrace();} finally {lock.unlock();}}
}

5.JUC包的集合

5.1 CopyOnWriteArrayList

简介

我们都知道,ArrayList是线程不安全的,多个线程操作会产生问题,JUC包下有个CopyOnWriteArrayList类,可以保证线程安全,并且读的性能很高

底层原理

CopyOnWrite意思就是写的时候复制,就对一块内存进行修改时,不直接在原有内存块中进行写操作,而是将内存拷贝一份,在新的内存中进行写操作,写完之后,再将原来指向的内存指针指到新的内存,原来的内存就可以被回收

读写分离:CopyOnWriteArrayList读取是完全不用加锁的,写入不会阻塞读取操作,只有写-写的时候会同步。写的时候会复制新内存,而此时读的时候读的是旧内存,实现了读写分离,也正是如此,读的时候未必读到的都是最新数据。使用volatile来修饰array,使得最新数据能够及时的写到主内存中,而其他线程在读取array时,是从主内存中读取而非工作内存,这是分布式中的最终一致性概念

在“添加/修改/删除”数据时,会先“获取互斥锁”,再修改完毕之后,先将数据更新到“volatile数组”中,然后再“释放互斥锁”;这样,就达到了保护数据的目的

5.2 CopyOnWriteArrayLSet

内部调用CopyOnWriteArrayList的addIfAbsent方法,如果元素已存在,就不会添加进入,也就保证了元素唯一性

CopyOnWriteArraySet的其他方法都是使用CopyOnWriteArrayList的方法

6.并发辅助类

6.1 CountDownLatch

CountDownLatch是一个同步工具类,它允许一个或多个线程一直等待,直到其他线程执行完后再执行后续代码

原理
CountDownLatch是通过一个计数器来实现的,计数器的初始化值为线程的数量。每当一个线程完成了自己的任务后,计数器的值就相应得减1。当计数器到达0时,表示所有的线程都已完成任务,然后在闭锁上等待的线程就可以恢复执行任务

代码示例

public class CountDownLatchDemo {public static void main(String[] args) {for (int i = 0; i < 3; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName());}).start();}System.out.println("主线程");}
}

从执行结果可以看到,主线程的普通输出语句在三个线程之前就输出了,我想让三个线程执行完成后再输出主线程就要用CountDownLatch

public class CountDownLatchDemo {public static void main(String[] args) throws InterruptedException {// 创建CountDownLatch,并指定线程个数3CountDownLatch countDownLatch = new CountDownLatch(3);for (int i = 0; i < 3; i++) {new Thread(()->{System.out.println(Thread.currentThread().getName());countDownLatch.countDown(); // 每执行一个线程,计数器就减1}).start();}countDownLatch.await(); // 只有当计数器为0,才会继续执行System.out.println("主线程");}
}

6.2 CyclicBarrier

利用CyclicBarrier类可以实现一组线程相互等待,当所有线程都到达某个屏障点后再进行后续的操作

public class CyclicBarrierDemo {public static void main(String[] args) {CyclicBarrier cyclicBarrier = new CyclicBarrier(2,()->{System.out.println("我穿好鞋子了"); // 指定等待的线程个数和后续操作});new Thread(()->{System.out.println("我穿好右鞋子了");try {cyclicBarrier.await();  // 每执行一个线程要等待,只有到了指定个数后才会进行后续操作} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();new Thread(()->{System.out.println("我穿好左鞋子了");try {cyclicBarrier.await();} catch (InterruptedException e) {e.printStackTrace();} catch (BrokenBarrierException e) {e.printStackTrace();}}).start();}
}

只有同时穿好左右鞋子才算穿好

6.3 Semaphore

Semaphore 通常我们叫它信号量, 可以用来控制同时访问特定资源的线程数量,通过协调各个线程,以保证合理的使用资源

可以把它简单的理解成我们停车场入口立着的那个显示屏,每有一辆车进入停车场显示屏就会显示剩余车位减1,每有一辆车从停车场出去,显示屏上显示的剩余车辆就会加1,当显示屏上的剩余车位为0时,停车场入口的栏杆就不会再打开,车辆就无法进入停车场了,直到有一辆车从停车场出去为止

通常用于那些资源有明确访问数量限制的场景,常用于限流

acquire()
获取一个令牌,在获取到令牌、或者被其他线程调用中断之前线程一直处于阻塞状态

release()
释放一个令牌,唤醒一个获取令牌不成功的阻塞线程

public class SemaphoreDemo {public static void main(String[] args) {Semaphore semaphore = new Semaphore(3); // 只有3个车位for (int i = 0; i < 6; i++) {new Thread(()->{try {semaphore.acquire(); // 获得信号量System.out.println(Thread.currentThread().getName()+"抢到了车位");// 做操作TimeUnit.SECONDS.sleep(2);System.out.println(Thread.currentThread().getName()+"离开了车位");} catch (InterruptedException e) {e.printStackTrace();}finally {semaphore.release();// 释放信号量}}).start();}}
}

可以看到每次只有三个线程抢到了车位

7.ReadWriteLock

ReadWriteLock管理一组锁,一个是只读的锁,一个是写锁。读锁可以在没有写锁的时候被多个线程同时持有,写锁是独占的

所有读写锁的实现必须确保写操作对读操作的内存影响。换句话说,一个获得了读锁的线程必须能看到前一个释放的写锁所更新的内容

Java并发库中ReetrantReadWriteLock实现了ReadWriteLock接口并添加了可重入的特性

无锁的代码

public class ReadWriteLockDemo {public static void main(String[] args) {MyCache myCache = new MyCache();for (int i = 0; i < 6; i++) { // 多个线程进行写入操作int temp = i;new Thread(()->{myCache.put(String.valueOf(temp),temp+"对应的value");}).start();}for (int i = 0; i < 6; i++) { // 多个线程进行读取操作int temp = i;new Thread(()->{myCache.get(String.valueOf(temp));}).start();}}
}class MyCache{private volatile Map<String,String> map = new HashMap<>();// 写入操作public void put(String key,String value){System.out.println(Thread.currentThread().getName()+"将要写入");map.put(key,value);System.out.println(Thread.currentThread().getName()+"写入完成");}// 读取操作public String get(String key){System.out.println(Thread.currentThread().getName()+"将要读取");String res = map.get(key);System.out.println(Thread.currentThread().getName()+"读取完成");return res;}
}

可以看到1在写入的时候被其他线程插队

加读写锁的实例

public class ReadWriteLockDemo {public static void main(String[] args) {MyCache myCache = new MyCache();for (int i = 0; i < 6; i++) {int temp = i;new Thread(()->{myCache.put(String.valueOf(temp),temp+"对应的value");}).start();}for (int i = 0; i < 6; i++) {int temp = i;new Thread(()->{myCache.get(String.valueOf(temp));}).start();}}
}class MyCache{private volatile Map<String,String> map = new HashMap<>();// 创建读写锁ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();// 写入操作public void put(String key,String value){try {readWriteLock.writeLock().lock(); // 上写锁,写的时候不允许其他线程写System.out.println(Thread.currentThread().getName()+"将要写入");map.put(key,value);} catch (Exception e) {e.printStackTrace();} finally {System.out.println(Thread.currentThread().getName()+"写入完成");readWriteLock.writeLock().unlock();// 释放写锁}}// 读取操作public String get(String key){String res = null;try {readWriteLock.readLock().lock();System.out.println(Thread.currentThread().getName()+"将要读取");res = map.get(key);} catch (Exception e) {e.printStackTrace();} finally {System.out.println(Thread.currentThread().getName()+"读取完成");readWriteLock.readLock().unlock();}return res;}
}

可以看到读取和写入都保证了线程同步

读写锁比互斥锁允许对于共享数据更大程度的并发。每次只能有一个写线程,但是同时可以有多个线程并发地读数据。ReadWriteLock适用于读多写少的并发情况

当有读锁时,写锁就不能获得;而当有写锁时,除了获得写锁的这个线程可以获得读锁外,其他线程不能获得读锁

8.BlockingQueue

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的

8.1 ArrayBlockingQueue

ArrayBlockingQueue底层是使用一个数组实现队列的,并且在构造ArrayBlockingQueue时需要指定容量,也就意味着底层数组一旦创建了,容量就不能改变了,因此ArrayBlockingQueue是一个容量限制的阻塞队列

ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);// 容量为3

ArrayBlockingQueue的并发阻塞是通过ReentrantLock和Condition来实现的,ArrayBlockingQueue内部只有一把锁ReentrantLock,意味着同一时刻只有一个线程能进行入队或者出队的操作

8.2 LinkedBlockingQueue

一个单向链表+两把锁+两个条件,两把锁,一把用于入队,一把用于出队,有效的避免了入队与出队时使用一把锁带来的竞争,在入队与出队都高并发的情况下,性能比ArrayBlockingQueue高很多,用了链表,最大容量为整数最大值,可看做容量无限

LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的

为了维持底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(可以+1),另一个是出队的方法(可以-1),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步

8.3 BlockingQueue四组API

方式 抛出异常 有返回值,不抛出异常 阻塞 超时等待
添加 add offer put offer(e,timeout,unit)
删除 remove poll take poll(timeout,unit)
队首元素 element peek - -

9.线程池

池化技术,通俗来讲就是:提前准备好资源,要用就从这里拿,用完再换回来。这样做的好处:对于那写创建销毁很费时的资源,可以减少这方面时间的消耗,优化资源利用

线程池
当执行大量异步任务时线程池能提供较好的性能(线程的复用)
线程池提供了一种资源限制和管理的手段(如线程的个数)

9.1 线程池三大方法

  • Executors.newSingleThreadExecutor(); 创建只有一个线程的线程池
  • Executors.newCachedThreadPool(); 创建一个最大线程数为Integer.MAX_VALUE的线程池
  • Executors.newFixedThreadPool(5); 创建一个核心线程数和最大线程数为传入参数的线程池

newSingleThreadExecutor()只会创建一个线程

public class PoolDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newSingleThreadExecutor();for (int i = 0; i < 20; i++) {threadPool.execute(()->{System.out.println(Thread.currentThread().getName());});}threadPool.shutdown(); // 关闭线程池}
}


Executors.newFixedThreadPool(5)会创建指定参数个数的线程

public class PoolDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newFixedThreadPool(5);for (int i = 0; i < 20; i++) {threadPool.execute(()->{System.out.println(Thread.currentThread().getName());});}threadPool.shutdown();}
}


Executors.newCachedThreadPool()会创建默认最大个数的线程池

public class PoolDemo {public static void main(String[] args) {ExecutorService threadPool = Executors.newCachedThreadPool();for (int i = 0; i < 20; i++) {threadPool.execute(()->{System.out.println(Thread.currentThread().getName());});}threadPool.shutdown();}
}

9.2 线程池七大参数

上面三种创建线程池的方法,通过源码可以看到,都是创建了一个ThreadPollExecutor对象

再继续深入,发现ThreadPollExecutor()有7个参数

  • corePoolSize -保留在池中的线程数,即使它们是空闲的,除非设置了allowCoreThreadTimeOut

  • maximumPoolSize -池中允许的最大线程数

  • keepAliveTime—当线程的数量大于核心时,这是剩余空闲线程在终止前等待新任务的最大时间

  • unit - keepAliveTime参数的时间单位

  • workQueue—在任务执行之前用来保存任务的队列。这个队列将只保存由execute方法提交的Runnable任务

  • threadFactory -执行器创建新线程时使用的工厂

  • handler:拒绝策略,当任务数已达到最大线程数,并且阻塞队列已满时执行的拒绝策略

比如现在corePoolSize为4,maximumPoolSize为10,已经被使用完了,现在又来了新的数据,那么它们进入到workQueue,当workQueue满了以后,才会开始创建第5个线程,直到10个,当数据来的少时,多余4个的线程在keepAliveTime时间内没用,就销毁

9.3 四大拒绝策略

  • AbortPolicy:如果阻塞队列满,则不再处理新的任务,并且抛出异常
  • CallerRunsPolicy:如果阻塞队列满,则将这个任务返回给创建这个任务的线程去处理,不抛出异常
  • DiscardOldestPolicy:尝试和最早的任务进行竞争,不抛出异常
  • DiscardPolicy:如果阻塞队列满了,则丢掉任务,不抛出异常

由于使用Executors创建线程池会导致OOM,所以我们一般通过ThreadPoolExeccutor的方式创建线程池

FixedThreadPool和SingleThreadPool允许请求的队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,导致OOM
CachedThreadPool和ScheduledThreadPool允许创建的最大线程为Integer.MAX_VALUE,可能导致创建大量的线程,导致OOM

public class ThreadPoolDemo {public static void main(String[] args) {ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,5,3,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());for (int i = 0; i < 20; i++) {poolExecutor.execute(()->{System.out.println(Thread.currentThread().getName());});}}
}

9.4 定义最大线程

CPU密集型

我们可以定义最大线程数为当前电脑的CPU核数,保证CPU的最高效率

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2,Runtime.getRuntime().availableProcessors(), 3,TimeUnit.SECONDS,new LinkedBlockingQueue<>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());

IO密集型

适用于IO十分占用资源的情况,一般设置最大线程数为IO占用线程的2倍,以便能让其他的任务可以得到处理

10.函数式接口

函数式接口(Functional Interface)就是一个有且仅有一个抽象方法,但是可以有多个非抽象方法的接口

interface MathOperation {  // 函数式接口int operation(int a, int b);
}

函数式接口可以被隐式转换为 lambda 表达式

(parameters) -> expression
或
(parameters) ->{ statements; }

Lambda表达式(也可称为闭包)就是一个匿名函数(匿名方法)

函数型接口

特点:有传入参数,有返回值

public class FunctionInter {public static void main(String[] args) {Function function = (age)->{return "我今年"+age+"岁";};System.out.println(function.apply(10));}
}

断定性接口

特点:有传入参数,进行判断,返回一个布尔值

public class FunctionInter {public static void main(String[] args) {Predicate<Integer> predicate = (num)->{return num == 0;};System.out.println(predicate.test(1));}
}


消费型接口

特点:只有输入,没有返回值

public class FunctionInter {public static void main(String[] args) {Consumer consumer = (str)->{System.out.println(str);};consumer.accept("zhang");}
}


供给型接口

特点:没有参数,只有返回值

public class FunctionInter {public static void main(String[] args) {Supplier supplier = ()->{return 1024;};System.out.println(supplier.get());}
}

11.异步回调 CompletableFuture使用

假设你是jack,老师让你去班里找个人rose

同步:调用者必须等待当前逻辑处理完后才能往下执行随后的逻辑程序

如果你找不到rose你就一直等她回来

异步:调用者不需要等待异步逻辑全部执行完毕,而可以继续往下执行调用者其它逻辑程序

如果你找不到rose你不会等,你就去做别的事情,然后你告诉她同桌,她回来告诉你,她同桌通知你她回来就是回调

这里我们主要说一下Future的实现类CompletableFuture

API1 — supplyAsync() 开启一个有返回值的异步任务

supplyAsync是一个供给型接口,也就是没有输入,只有输出

相似API — runAsync() 开启一个没有返回值的异步任务


场景一

我们假定一个场景,小白去餐厅吃饭,步骤是这样的

1.小白点餐
2.厨师炒菜
3.服务员上菜
4.小白开吃

我们要求2和3要先后执行,而且在2.3执行的时候,小白可以去做自己的事情,等到上菜后,再开吃,符合异步回调的场景

具体怎么实现呢?

API2 — thenCompose() 连接两个异步任务,一前一后,是个函数接口,有输入有输出

join开始执行任务,返回任务的执行结果

public class CompletableFutureDemo {public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白进入餐厅");System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白点了一份炒牛肉");// 异步调用 有返回值  CompletableFuture会进入另外一个线程!!!CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>厨师开始炒牛肉");// 等了3秒炒完了try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "厨师炒完了";// thenCompose上一个线程执行完成才开始,dish是上一个的返回值}).thenCompose(dish -> CompletableFuture.supplyAsync(()->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>"+dish+",服务员上菜");// 等了3秒上完了try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "服务员上菜完成";}));// join返回任务的执行结果System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白在等菜ing.......小白开始打LOLM,玩了一把杰斯");System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白在等菜ing.......小白又玩了把狮子狗");System.out.println(String.format("线程"+Thread.currentThread().getName()+"执行了%s,小白开吃",cf.join()));}
}


场景二

假设小白下次来又点了米饭,但是米饭没有熟的,那么炒菜和蒸米饭要同时进行

1.小白点菜和米饭
2.厨师炒菜 服务员蒸米饭 同时进行
3.服务员上菜和米饭
4.小白开吃

怎么保证厨师炒菜和服务员蒸米饭同时进行呢?

API3 — thenCombine() 合并两个异步任务,任务同时执行,会开启新的线程,同时根据两个任务的结果输出返回值

相似API

runAfterBoth() 合并两个异步任务,不需要任务的结果,没有返回值

thenAcceptBoth() 合并两个异步任务,需要任务的结果,没有返回值

public class CompletableFutureDemo2 {public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白进入餐厅");System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白点了一份炒牛肉");// 异步调用 有返回值  CompletableFuture会进入另外一个线程!!!CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>厨师开始炒牛肉");// 等了3秒炒完了try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "厨师炒完了";// thenCombine和上一个同时执行}).thenCombine(CompletableFuture.supplyAsync(()->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>服务员开始蒸米饭");try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}return "服务员蒸好了";}),(str1,str2) ->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>服务员开始打米饭");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "服务员打了米饭拿了牛肉,开始上菜和米饭";});// join返回任务的执行结果System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白在等菜ing.......小白开始打LOLM,玩了一把杰斯");System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白在等菜ing.......小白又玩了把狮子狗");System.out.println(String.format("线程"+Thread.currentThread().getName()+"执行了%s ,小白开吃",cf.join()));}
}

API4 — thenApply():用来对上一个线程的返回值做处理,有返回值

相似API

thenAccept() 需要参数 没有返回值

thenRun() 不需要参数,没有返回值

public class ApplyTest {public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白进入餐厅");System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白点了一份炒牛肉");// 异步调用 有返回值  CompletableFuture会进入另外一个线程!!!CompletableFuture<String> cf = CompletableFuture.supplyAsync(()->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>厨师开始炒牛肉");// 等了3秒炒完了try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "厨师炒完了";// thenCompose上一个线程执行完成才开始,dish是上一个的返回值}).thenApply(str ->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>,服务员上菜");try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "服务员上菜";});// join返回任务的执行结果System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白在等菜ing.......小白开始打LOLM,玩了一把杰斯");System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白在等菜ing.......小白又玩了把狮子狗");System.out.println(String.format("线程"+Thread.currentThread().getName()+"执行了%s,小白开吃",cf.join()));}
}

场景三

假设小明吃完了,要坐公交车回家,那么有两路公交车可以到家,101和102,小白决定谁先来就坐谁回家

API5 — applyToEither() 多个线程谁先结束就用谁的结果

相似API

acceptEither() 需要结果,没有返回值

runAfterEither() 不需要结果,也没有返回值

API6 — exceptionally() 用于处理异常,有输入有输出

相似API

handle 如果前面正常执行,获得正常结果,如果前面是异常,也会获得异常结果,并且有返回值

whenComplete() 如果前面正常执行,获得结果,如果前面是异常,也会获得结果,但是没有返回值

public class CompletableFutureDemo4 {public static void main(String[] args) throws ExecutionException, InterruptedException {System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白走出餐厅");System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白等待 101 或者 102 公交车");// 异步调用 有返回值  CompletableFuture会进入另外一个线程!!!CompletableFuture<Object> cf = CompletableFuture.supplyAsync(()->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>101 在路上ing....");// 等3秒车会来try {Thread.sleep(3000);} catch (InterruptedException e) {e.printStackTrace();}return "101 来了";//}).applyToEither(CompletableFuture.supplyAsync(()->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>102 在路上ing....");// 等4秒车会来try {Thread.sleep(4000);} catch (InterruptedException e) {e.printStackTrace();}return "102 来了";}),res ->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>"+ res +",小白上了公交车");System.out.println("车出现故障了.....");throw new RuntimeException();}).exceptionally(e ->{System.out.println("线程"+Thread.currentThread().getName() + "执行了=>小白开始打出租车");// 等2秒车会来try {Thread.sleep(2000);} catch (InterruptedException ex) {ex.printStackTrace();}return "出租车来了";});// join返回任务的执行结果System.out.println("线程"+Thread.currentThread().getName()+"执行了=>小白在等车ing.......小白打LOLM,玩了一把女警");System.out.println(String.format("线程"+Thread.currentThread().getName()+"%s,小白上车",cf.join()));}
}

关于xxxAsync相关的API

以上API都会有xxxAsync的后缀,这和没有后缀的什么区别呢?

方法不以Async结尾,意味着Action使用相同的线程执行。而有Async可可能使用新的线程执行任务。也可能会使用其他线程执行,因为如果是使用相同的线程池,可能会被同一个线程再次选中执行,也就是线程复用

关于Async的重载方法

方法以Async结尾的API还会有一个传入线程池的参数选项,我们可以根据需要传入一个构造器!

API — allof

批量执行任务,参数是一个CompletableFuture类型的数组

for(int i = 0;i < 8;i++){int finalI = i;CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {System.out.println(finalI);return finalI;});cfList.add(cf);
}CompletableFuture[] array = cfList.toArray(new CompletableFuture[cfList.size()]);CompletableFuture.allOf(array).join();

12.JMM

参考《深入理解Java 虚拟机》

java内存模型,是一种概念,和jvm内存模型不是一回事,主要目的是定义程序中各种变量的访问规则,即关注在虚拟机中把变量值存储到内存和从内存中取出变量值的这样的底层细节

这里的变量包括了实例字段、静态字段、构成数组对象的元素,不包括局部变量和方法参数,因为后者是线程私有的,不会被共享

学过计算机组成原理我们都知道,CPU和内存之间会有一层cache,来解决内存和CPU处理速度不一致的问题

read:作用于主内存变量,它把一个变量的值从主内存中传输到线程的工作内存中,等待load

load:作用于工作内存变量,把read操作从主内存中得到的变量值放入工作内存的变量副本中

use:作用于工作内存变量,它把工作内存中一个变量的值传递给执行引擎,每当虚拟机遇到一个需要使用变量的值的字节码指令将会执行这个操作

assign:作用于工作内存的变量,它把一个从执行引擎接收到的值赋给工作内存的变量

store:作用于工作内存的变量,把工作内存中一个变量的值传送给主内存中,以便随后的write操作

write:最用于主内存的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中

lock:作用于主内存的变量,把一个变量标识为一条线程独占的状态

unlock:作用于主内存的变量,把一个处于锁定的变量释放出来,释放后的变量才可以被其他线程锁定

这里,主内存主要对应JVM 堆中的对象实例数据部分,工作内存则对应JVM 栈中的部分区域

有关8种操作的规则

1.read和load,store和write不可以单独出现

2.不允许一个线程丢弃它最近的assign操作,即变量在工作内存改变了之后必须同步回主内存,也不允许一个线程无原因的(没有发生过assign)把数据从线程的工作内存同步回主内存

3.一个变量在use、store之前,必须先执行load和assign

4.一个变量可以被同一个线程多次lock,但必须执行相同次数的unlock,才会被解锁

5.对一个变量lock后,那将清空工作内存中此变量的值,在执行引擎使用这个变量前,需要重新执行load或者assign操作以初始化这个值

6.unlock之前必须有store和write

如上图所示,当线程B读取了变量a并且修改了它,写回了主存,但是线程A读的值依旧是以前的

为此,volatile关键字起了作用。它保证了①某一个共享变量对所有线程的可见性,即当一条线程修改了这个变量的值,新值对于其他线程来说是立即得知的

②增加内存屏障防止多个指令之间的重排序

volatile变量的运算不一定是安全的,是不保证原子性的,我们通过加锁或者原子类的方式实现原子性

public class VolatileTest {public static volatile int num = 0;public static void main(String[] args) {for(int i = 0;i < 20;i++){new Thread(()->{for(int j = 0;j < 500;j++){num++;}}).start();}while (Thread.activeCount() > 2)Thread.yield();System.out.println(num);}
}

得到的结果多次不是10000,可见volatile不保证原子性

可以通过原子类解决,所谓原子类,指的是java.util.concurrent.atomic包下移Atomic开头的包装类,例如AtomicInteger就是用于Integer类型的原子性操作

public class VolatileTest {static AtomicInteger num = new AtomicInteger();public static void main(String[] args) {for(int i = 0;i < 20;i++){new Thread(()->{for(int j = 0;j < 500;j++){num.getAndIncrement();}}).start();}while (Thread.activeCount() > 2)Thread.yield();System.out.println(num);}
}

13.CAS

参考公众号:程序员小灰

上述原子类的底层实现以及Lock系列类的底层实现就是CAS,什么是CAS?Compare And Swap,比较并且交换

CAS机制当中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B

更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B

看个例子

1.在内存地址V当中,存储着值为10的变量

2.此时线程1想要把变量的值增加1。对线程1来说,旧的预期值A=10,要修改的新值B=11

3.在线程1要提交更新之前,另一个线程2抢先一步,把内存地址V中的变量值率先更新成了11

4.线程1开始提交更新,首先进行A和地址V的实际值比较(Compare),发现A不等于V的实际值,提交失败

5.线程1重新获取内存地址V的当前值,并重新计算想要修改的新值。此时对线程1来说,A=11,B=12。这个重新尝试的过程被称为自旋

6.如果这一次比较幸运,没有其他线程改变地址V的值。线程1进行Compare,发现A和地址V的实际值是相等的


7.线程1进行SWAP,把地址V的值替换为B,也就是12


从思想上来说,Synchronized属于悲观锁,悲观地认为程序中的并发情况严重,所以严防死守。CAS属于乐观锁,乐观地认为程序中的并发情况不那么严重,所以让线程不断去尝试更新

关于悲观锁和乐观锁

悲观锁(Pessimistic Lock): 就是很悲观,每次去拿数据的时候都认为别人会修改。所以每次在拿数据的时候都会上锁。这样别人想拿数据就被挡住,直到悲观锁被释放,悲观锁中的共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程

但是在效率方面,处理加锁的机制会产生额外的开销,还有增加产生死锁的机会。另外还会降低并行性,如果已经锁定了一个线程A,其他线程就必须等待该线程A处理完才可以处理

数据库中的行锁,表锁,读锁(共享锁),写锁(排他锁),以及syncronized实现的锁均为悲观锁

悲观并发控制实际上是先取锁再访问的保守策略,为数据处理的安全提供了保证

乐观锁(Optimistic Lock): 就是很乐观,每次去拿数据的时候都认为别人不会修改。所以不会上锁,但是如果想要更新数据,则会在更新前检查在读取至更新这段时间别人有没有修改过这个数据。如果修改过,则重新读取,再次尝试更新,循环上述步骤直到更新成功(当然也允许更新失败的线程放弃操作),乐观锁适用于多读的应用类型,这样可以提高吞吐量

相对于悲观锁,在对数据库进行处理的时候,乐观锁并不会使用数据库提供的锁机制。一般的实现乐观锁的方式就是记录数据版本(version)或者是时间戳来实现,不过使用版本记录是最常用的。CAS也是乐观锁的体现

原文链接:https://blog.csdn.net/qq_14996421/article/details/106351873

CAS的缺点

1.CPU开销较大

在并发量比较高的情况下,如果许多线程反复尝试更新某一个变量,却又一直更新不成功,循环往复,会给CPU带来很大的压力

2.不能保证代码块的原子性

CAS机制所保证的只是一个变量的原子性操作,而不能保证整个代码块的原子性。比如需要保证3个变量共同进行原子性的更新,就不得不使用Synchronized了

3.ABA问题

这是CAS机制最大的问题所在

CAS的底层源码

AtomicInteger当中常用的自增方法getAndIncrement(); 该方法是自增1的一个原子方法

1.先看一下AtomicInteger类的源码

private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;static {try {valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));} catch (Exception ex) { throw new Error(ex); }
}private volatile int value;

什么是unsafe呢?Java语言不像C,C++那样可以直接访问底层操作系统,但是JVM为我们提供了一个后门,这个后门就是unsafe。unsafe为我们提供了硬件级别的原子操作

至于valueOffset对象,是通过unsafe.objectFieldOffset方法得到,所代表的是AtomicInteger对象value成员变量在内存中的偏移量。我们可以简单地把valueOffset理解为value变量的内存地址,我们的原子操作就是针对value来的

2.我们执行+1操作

AtomicInteger atomicInteger = new AtomicInteger(5);
atomicInteger.getAndIncrement();

再深入,发现getAndIncrement()只是一个外壳,实际又调用了getAndAddInt方法,这里出现了一个unsafe和valueOffset

public final int getAndIncrement() {return unsafe.getAndAddInt(this, valueOffset, 1);}

也就是说,在执行+1之前,我们通过当前对象var1以及value的内存偏移量获取到了value目前内存中最新的值,也就是期望值,记为var5,接下来我们即将更新了,会再次判断,如果当前var1以及value的内存偏移量所获取的值还是我期望的var5,我才会更新(var4是1),否则我就不断尝试,下列的do-while就是一种自旋锁(不断尝试,不行旋转回来再试)

public final int getAndAddInt(Object var1, long var2, int var4) {int var5;do {var5 = this.getIntVolatile(var1, var2);} while(!this.compareAndSwapInt(var1, var2, var5, var5 + var4));return var5;}

14.CAS的ABA问题

参考公众号:程序员小灰

1.假设内存中有一个值为A的变量,存储在地址V当中

2.此时有三个线程想使用CAS的方式更新这个变量值,每个线程的执行时间有略微的偏差。线程1和线程2已经获得当前值,线程3还未获得当前值


3.接下来,线程1先一步执行成功,把当前值成功从A更新为B;同时线程2因为某种原因被阻塞住,没有做更新操作;线程3在线程1更新之后,获得了当前值B

4.再之后,线程2仍然处于阻塞状态,线程3执行,成功把当前值从B更新成了A

5.最后,线程2终于恢复了运行状态,由于阻塞之前已经获得了“当前值”A,并且经过compare检测,内存地址V中的实际值也是A,所以成功把变量值A更新成了B

这个过程中,线程2获取到的变量值A是一个旧值,尽管和当前的实际值相同,但内存地址V中的变量已经经历了A->B->A的改变

场景

但是我们放在一个实际的场景中,假如银行卡里有100元,我想取走50元,不过出现了一点小问题,我的取50元被提交了两次,交给了线程1和线程2

线程1想把100变成50,OK,成功

线程2也想把100变成50,不过阻塞了

这时候别人给我转了50元,线程3把50元变成100元了,此时我卡里还有100元,我就开心的回家了!

这时候线程2好了,它以为我这100还是最初的100,一下给变成50了,那50元就离奇消失了!!!

解决办法也很简单,变量加个版本号就可以了!

复原场景

但是我们放在一个实际的场景中,假如银行卡里有100元,我想取走50元,不过出现了一点小问题,我的取50元被提交了两次,交给了线程1和线程2

线程1想把100[版本1]变成50[版本2],OK,成功

线程2也想把100变成50,不过阻塞了

这时候别人给我转了50元,线程3把50元[版本2]变成100元[版本3]了,此时我卡里还有100元[版本3],我就开心的回家了!

这时候线程2好了,它比较版本号,发现虽然值都是100,但是版本号不同,失败

15.AQS

参考原文:https://www.cnblogs.com/waterystone/p/4920797.html

ReentrantLock/Semaphore/CountDownLatch…等底层就是AbstractQueuedSynchronizer(AQS),也即抽象队列式同步器

基本思想

它维护了一个volatile int state 代表共享资源和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列),称为CHL队列,是一个双向链表

state的访问方式有三种:

getState()
setState()
compareAndSetState()

线程1首先通过getState()获取到了state为0,于是它会CAS为1,这样线程1就占用了资源(会记录加锁线程为线程1),线程2尝试获取,但是此时state不为0,由于ReetranLock是可重入的,它会判断是不是自己加的锁(想知道现在是不是重入),它判断不是自己加的锁,于是获取资源失败,加入到同步队列尾部

核心方法

AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)

不同的自定义同步器争用共享资源的方式也不同。自定义同步器在实现时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。自定义同步器实现时主要实现以下几种方法:

  • isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它
  • tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false
  • tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false
  • tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源
  • tryReleaseShared(int):共享方式。尝试释放资源,如果释放后允许唤醒后续等待结点返回true,否则返回false

以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的,但state要累加,这就是可重入的概念但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的

再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark主调用线程,然后主调用线程就会从await()函数返回,继续后余动作

题外话 - 关于LockSupport

有两个常用的API

  • park() 没有得到许可,挂起线程
  • unpark() 给线程许可,让他继续执行
LockSupport.unpark(Thread.currentThread());
LockSupport.park();
System.out.println("unpark");Thread.currentThread().resume();
Thread.currentThread().suspend();
System.out.println("suspend");

看上面的代码,park虽然停止线程,但是由于前面已经给当前线程许可了(注意,许可就是一个int值,只有0和1两种选择,1代表有许可,而且即使多次给许可多次unpark也只能是1,不会累加!),所以线程并不会卡死(park会消耗许可使许可变为0,然后不会阻塞,继续执行,假如此时再执行一次park就会阻塞,因为没有凭证了),会输出unpark。但是susupend就不一样了,直接卡死线程了,不会输出,也正是这样,AQS是用park()和unpark()控制线程运行状态的

一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock

核心方法流程

结点Node

图来自知乎

这里我们说下Node。Node结点是对每一个等待获取资源的线程的封装,其包含了需要同步的线程本身及其等待状态,如是否被阻塞、是否等待唤醒、是否已经被取消等。包括变量waitStatus则表示当前Node结点的等待状态,共有5种取值CANCELLED、SIGNAL、CONDITION、PROPAGATE、0

  • CANCELLED(1):表示当前结点已取消调度。当timeout或被中断(响应中断的情况下),会触发变更为此状态,进入该状态后的结点将不会再变化

  • SIGNAL(-1):表示后继结点在等待当前结点唤醒。后继结点入队时,会将前继结点的状态更新为SIGNAL

  • CONDITION(-2):表示结点等待在Condition上,当其他线程调用了Condition的signal()方法后,CONDITION状态的结点将从等待队列转移到同步队列中,等待获取同步锁

  • PROPAGATE(-3):共享模式下,前继结点不仅会唤醒其后继结点,同时也可能会唤醒后继的后继结点

  • 0:新结点入队时的默认状态

注意,负值表示结点处于有效等待状态,而正值表示结点已被取消。所以源码中很多地方用>0、<0来判断结点的状态是否正常

tryAcquire()

1.调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回,每个线程获取锁时会尝试直接抢占加塞一次(非公平锁的体现),而CLH(三个人名的首字母所以叫CLH)队列中可能还有别的线程在等待

2.没成功,则addWaiter()将该线程加入到同步队列的尾部,并标记为独占模式,此后就是公平锁的体现了,通过tryAcquire()和addWaiter(),该线程获取资源失败,已经被放入同步队列尾部了

3.然后就是acquireQueued()了,返回值是一个布尔,来说明该线程是否被中断过。这方法是一个自旋方法,成功获取到了资源会返回一个布尔值,如果在整个等待过程中被中断过,则返回true,否则返回false。如果由于某些情况失败,会取消在队列中的等待

自旋方法里面主要有两个if语句(源码可以看上面的链接,根据其源码的注释加自己的理解)

  • if判断前驱是不是head,即该结点已成老二,那么便有资格去尝试获取资源,可能是老大释放完资源唤醒自己的,这种情况会拿到资源。当然也可能被interrupt了,拿不到资源继续park

  • if判断自己能否休息,有三种情况,至于为什么这样做,个人可以理解,解释清楚有点麻烦,大家可以自己理解一下

    • (1)前驱刚好是signal,直接返回,放心休息,因为前面的会通知它啊,返回true
    • (2)前面的节点处于不正常的状态CANCELLED,那么该节点会加塞,找一个处于非CANCELLED的节点,加入到他们后面,返回false
    • (3)如果前驱正常,那就把前驱的状态设置成SIGNAL,告诉它拿完号后通知自己一下,返回false

4.如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上

等待队列

特别的,调用condition的await方法,将会使当前线程进入等待队列并释放锁(先加入等待队列再释放锁),同时线程状态转为等待状态

调用await方法时,相当于同步队列的首节点移到condition的等待队列中

调用condition的signal方法时,将会把等待队列的首节点移到同步队列的尾部,然后唤醒该节点。被唤醒,并不代表就会从await方法返回,也不代表该节点的线程能获取到锁,它一样需要加入到锁的竞争acquireQueued方法中去,只有成功竞争到锁,才能从await方法返回

tryRelease

release()是独占模式下线程释放共享资源的顶层入口。它会释放指定量的资源,如果彻底释放了(即state=0),它会唤醒等待队列里的其他线程来获取资源,用unpark()唤醒等待队列中最前边的那个未放弃线程

tryAcquireShared()

tryAcquireShared()尝试获取资源,成功则直接返回

负值代表获取失败;0代表获取成功,但没有剩余资源;正数表示获取成功,还有剩余资源,其他线程还可以去获取。所以这里acquireShared()的流程就是:

失败则通过doAcquireShared()进入等待队列,直到获取到资源为止才返回

跟独占模式比,还有一点需要注意的是,这里只有线程是head.next时(“老二”),才会去尝试获取资源,有剩余的话还会唤醒之后的队友。那么问题就来了,假如老大用完后释放了5个资源,而老二需要6个,老三需要1个,老四需要2个。老大先唤醒老二,老二一看资源不够,他是把资源让给老三呢,还是不让?答案是否定的!老二会继续park()等待其他线程释放资源,也更不会去唤醒老三和老四了。独占模式,同一时刻只有一个线程去执行,这样做未尝不可;但共享模式下,多个线程是可以同时执行的,现在因为老二的资源需求量大,而把后面量小的老三和老四也都卡住了。当然,这并不是问题,只是AQS保证严格按照入队顺序唤醒罢了(保证公平,但降低了并发)

tryreleaseShared()

此方法的流程也比较简单,一句话:释放掉资源后,唤醒后继。跟独占模式下的release()相似,但有一点稍微需要注意:独占模式下的tryRelease()在完全释放掉资源(state=0)后,才会返回true去唤醒其他线程,这主要是基于独占下可重入的考量;而共享模式下的releaseShared()则没有这种要求,共享模式实质就是控制一定量的线程并发执行,那么拥有资源的线程在释放掉部分资源时就可以唤醒后继等待结点。例如,资源总量是13,A(5)和B(7)分别获取到资源并发运行,C(4)来时只剩1个资源就需要等待。A在运行过程中释放掉2个资源量,然后tryReleaseShared(2)返回true唤醒C,C一看只有3个仍不够继续等待;随后B又释放2个,tryReleaseShared(2)返回true唤醒C,C一看有5个够自己用了,然后C就可以跟A和B一起运行

ReentrantReadWriteLock读锁的tryReleaseShared()只有在完全释放掉资源(state=0)才返回true,所以自定义同步器可以根据需要决定tryReleaseShared()的返回值

Java并发 JUC 一文快速入门相关推荐

  1. 一文快速入门分库分表中间件 Sharding-JDBC (必修课)

    书接上文 <一文快速入门分库分表(必修课)>,这篇拖了好长的时间,本来计划在一周前就该写完的,结果家庭内部突然人事调整,领导层进行权利交接,随之宣布我正式当爹,紧接着家庭地位滑落至第三名, ...

  2. Java基础-SSM之mybatis快速入门篇

    Java基础-SSM之mybatis快速入门篇 作者:尹正杰 版权声明:原创作品,谢绝转载!否则将追究法律责任. 其实你可能会问什么是SSM,简单的说就是spring mvc + Spring + m ...

  3. 学习笔记:Java 并发编程①_基础知识入门

    若文章内容或图片失效,请留言反馈. 部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 视频链接:https://www.bilibili.com/video/av81461839 视频下载: ...

  4. 多目标跟踪(MOT)最新综述,一文快速入门

    多目标跟踪(MOT)最新综述,一文快速入门 0 写在前面 去年暑期实习的时候,误打误撞进了一家自动驾驶公司,做了多目标跟踪的工作,工作也是秋招时靠着相关工作拿到了几个算法岗offer,后来毕业课题也换 ...

  5. java azure blob 查询_快速入门:适用于 Java 的 Azure Blob 存储客户端库 v8 | Microsoft Docs...

    您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn. 快速入门:使用 Jav ...

  6. java cookbook中文版_Java Client快速入门指南

    适用于与Amazon S3兼容的云存储的Minio Java SDK Minio Java Client SDK提供简单的API来访问任何与Amazon S3兼容的对象存储服务. 本快速入门指南将向你 ...

  7. Java NIO 非阻塞网络编程快速入门

    NIO 非阻塞网络编程快速入门 案例: 编写一个 NIO 入门案例,实现服务器端和客户端之间的数据简单通讯(非阻塞) 目的:理解 NIO 非阻塞网络编程机制 import java.net.InetS ...

  8. Java 诊断利器Arthas:快速入门

    简介     Arthas 是Alibaba开源的Java诊断工具,深受开发者喜爱.在线排查问题,无需重启:动态跟踪Java代码:实时监控JVM状态.     Arthas 支持JDK 6+,支持Li ...

  9. 一文快速入门分库分表(必修课)

    之前有不少刚入坑 Java 的粉丝留言,想系统的学习一下分库分表相关技术,可我一直没下定决心搞,眼下赶上公司项目在使用 sharding-jdbc 对现有 MySQL 架构做分库分表的改造,所以借此机 ...

  10. java 判断类型_如何快速入门Java编程学习(干货)

    一.初识Java 1.生活中的程序: 从起床到教室上课的过程 穿衣打扮>起床>洗漱>出宿舍>>吃早餐>到教室 按照特定的顺序去完成某一件事的过程我们叫做生活中的程序 ...

最新文章

  1. 我在 Spring 的 BeanUtils 踩到的那些坑,千万不要犯!
  2. 浏览器兼容性--IE11以及Edge等下载文件的中文名出现乱码,前后端解决方案
  3. openstack rabbitmq
  4. OSI七层模型与TCP/IP五层模型详解
  5. Java案例:简易记事本
  6. android中的多渠道打包,Android 多渠道打包简析
  7. 在Struts 2中使用JSON Ajax
  8. 程序防止SqlServer使用SqlServer Profiler跟踪
  9. 拓端tecdat|python关联规则学习:FP-Growth算法对药品进行“菜篮子”分析
  10. mysql集群session_集群session解决方案
  11. 小生不才,真实记录爬取链家网2584条租房信息,聊一聊框架爬取大量数据防止被ban的事
  12. Python 绘制游戏窗口
  13. 计算机导论中逻辑与或非的公式,12.函数与公式之逻辑函数(or,and,not,if)
  14. RSRP为什么是负数
  15. 在 Linux 中追加到内容的末尾
  16. c++海岛战争(无关机代码)
  17. sql显示服务器连接不上,sql服务器连接不上
  18. 第二十二章:如何管理信息系统
  19. android手机Down版本
  20. 计算机网络应用是什么专业类别,网络工程专业属于什么类别

热门文章

  1. 补码,负数比整数多表示一个
  2. h2支持mysql函数,H2数据库用户自定义函数方法及范例
  3. 网吧用服务器做虚拟化,网吧内共享服务器搭建
  4. 网吧服务器多长时间维护,网吧服务器常用设置维护工具
  5. 计算机学科发表顶级期刊论文,计算机类顶级期刊_计算机顶级期刊_mcs数学与计算机期刊...
  6. 通过fileProvider接收外部App传递文件路径的一些坑
  7. pyqt5-事件机制
  8. RTX3060(30系显卡)Windows10部署Pytorch深度学习环境步骤与心得
  9. MySQL 添加索引报错:BLOB/TEXT column used in key specification without a key length
  10. ROS 发布消息和订阅消息 for Python