目录

  • 一、 基础
    • 1. 线程介绍
    • 2. 创建并启动线程
    • 3. 函数式接口编程
    • 4. Thread 构造器
    • 5. 守护线程
      • 线程关系
    • 6. join
    • 7. interrupt
    • 8. 优雅的结束线程
    • 9. 线程安全、数据共享
      • synchronized核心
    • 10. 死锁
    • 11. 线程间的通讯(生产者与消费者)
    • 12. sleep与wait的区别
    • 13. 综合案例--数据采集
    • 14. 显式锁(实现自定义锁)
    • 15. 钩子方法处理系统退出工作
    • 16. ThreadException与stackTrace(了解)
    • 17. ThreadGroup
    • 18. 手写线程池
  • 二、多线程中的设计模式
    • 1. 多线程中的单例模式
    • 2. WaitSet
    • 3. volatile详解
    • 4. 多线程中的观察者模式
    • 5. 多线程中的单线程执行模式
    • 6. 读写锁的分离
    • 7. 不可变对象设计模式
    • 8. Future设计模式
    • 9. Guarded Suspension设计模型
    • 10. 线程保险箱 ThreadLocal
    • 11. 上下文设计模式
    • 12. Balking设计模式
    • 13. CountDown设计模式
    • 14. Thread-Pre-Message设计模式
    • 15. Two-phase terminatioin
  • 三、ClassLoader
    • 1. 类加载的三个过程
    • 2. 类的引用
    • 3. 初始化
    • 4. JVM内置类加载器
    • 5. 类加载器模式:双亲委托代理模式
    • 6. 自定义类加载器
    • 7. 加密解密类加载器
    • 8. 打破双亲委派机制
    • 9. 类加载器的命名空间与运行时包
    • 10. 线程上下文类加载器
  • 四、Java并发包
    • 1. 原子类型
      • CAS(乐观锁)
    • CAS锁
    • CAS的优缺点
    • Unsafe
    • 2. Java并发包工具
      • CountDownLatch
    • 3. Executors框架
    • 4. 并发集合
    • 5. Google Guava
  • 工具整理

一、 基础

1. 线程介绍

概念

  • 程序:静态的概念(资源分配的单位)
  • 进程:运行的程序(调度执行的单位)
  • 线程:一个程序中有多个事件同时执行,每个事件一个线程,多个线程共享代码和数据空间

2. 创建并启动线程

当JVM启动时,会创建一个非守护线程 main,作为整个程序的入口,以及多个与系统相关的守护线程

package concurrency.chapter1;public class TryConcurrency {public static void main(String[] args) {new Thread(){@Overridepublic void run() {write();}}.start();new Thread(){@Overridepublic void run() {read();}}.start();}private static void write(){System.out.println("start to write");try {Thread.sleep(1000*10L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("write success!");}private static void read(){System.out.println("start to read");try {Thread.sleep(1000*10L);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("read success!");}
}

线程生命周期(Thread.State)

除了新生状态和死亡状态,任意状态都可能发生线程死亡。

  • NEW:新生状态,作为普通java对象,尚未启动
  • RUNNABLE:在JVM中执行(包含就绪状态和运行状态)
  • BLOCKED:被阻塞
  • WAITING:正在等待另一个线程执行特定动作
  • TIMED_WAITING:正在等待另一个线程达到指定运行时间
  • TERMINATED:死亡状态

传统的多线程共享数据方法是使用static修饰,但是由于static修饰的数据将在整个程序结束之后才释放,因此比较耗资源。可参考内存模型,类加载
而且这种方式会发生并发问题。

实例:银行有多个柜台,柜台共用一个叫号系统

package concurrency.chapter2;/*** 叫号的柜台*/
public class TicketWindow extends Thread{private static final int MAX=500;private static int index=1;private String name;TicketWindow(String name){this.name=name;}@Overridepublic void run() {while (index<=MAX){System.out.println(name+" 叫号:"+(index++));}}
}
package concurrency.chapter2;/*** 银行叫号*/
public class Bank {public static void main(String[] args) {final TicketWindow t1 = new TicketWindow("一号柜台");final TicketWindow t2 = new TicketWindow("二号柜台");final TicketWindow t3 = new TicketWindow("三号柜台");t1.start();t2.start();t3.start();}
}

上述方式通过继承Thread的形式实现多线程,但是显然,业务数据与被混在了线程类当中,这种方式显得混乱,我们使用更好的办法——使用runnable。

package concurrency.chapter2;public class Bank2 {public static void main(String[] args) {final TicketWindow2 ticketWindow2 = new TicketWindow2();Thread t1 = new Thread(ticketWindow2,"1号柜台");Thread t2 = new Thread(ticketWindow2,"2号柜台");Thread t3 = new Thread(ticketWindow2,"3号柜台");t1.start();t2.start();t3.start();}
}
package concurrency.chapter2;/*** 叫号的柜台*/
public class TicketWindow2 implements Runnable{private static final int MAX=500;private static int index=1;public void run() {while (index<=MAX){System.out.println(Thread.currentThread().getName()+" 叫号:"+(index++));try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}
}

依旧有线程安全问题。

3. 函数式接口编程

Thread支持函数式接口编程,Runnable接口就是一个函数式接口。

首先看看什么是函数式接口编程。

Java中使用@FunctionalInterface注解标注函数式接口。

所谓的函数式接口,当然首先是一个接口,然后就是在这个接口里面只能有一个抽象方法

这种类型的接口也称为SAM接口,即Single Abstract Method interfaces
特点

  • 接口有且仅有一个抽象方法
  • 允许定义静态方法
  • 允许定义默认方法
  • 允许java.lang.Object中的public方法

@FunctionalInterface注解不是必须的,如果一个接口符合"函数式接口"定义,那么加不加该注解都没有影响。加上该注解能够更好地让编译器进行检查。如果编写的不是函数式接口,但是加上了@FunctionInterface,那么编译器会报错

例子

// 正确的函数式接口
@FunctionalInterface
public interface TestInterface {// 抽象方法public void sub();// java.lang.Object中的public方法public boolean equals(Object var1);// 默认方法public default void defaultMethod(){}// 静态方法public static void staticMethod(){}
}// 错误的函数式接口(有多个抽象方法)
@FunctionalInterface
public interface TestInterface2 {void add();void sub();
}

函数式接口其实就是策略模式中的“策略”

计算纳税款的实例:
纳税款的计算方式可能多变,因此将计算方式抽象出来作为一个接口(策略),然后可以实现动态的改变改计算方法。

接口:传入工资

package concurrency.func;@FunctionalInterface
public interface SimpleCalculator {double calculated(double money);
}

计算器:关联上述接口

package concurrency.func;public class Calculator {private double money;private final SimpleCalculator calculator;Calculator(double money,SimpleCalculator calculator){this.calculator=calculator;this.money=money;}public double calculated(){return calculator.calculated(money);}
}

客户端

public class CalculatorMain {public static void main(String[] args) {final Calculator calculator = new Calculator(10000, (m)->m*0.2 );System.out.println(calculator.calculated());}
}

通过传入不同的lambda表达式,实现不同的计算方法。

通过函数式接口编程,将银行叫号整合到一个类中。

public class Bank3 {public final static int MAX=50;public static int index=1;public static void main(String[] args) {final Runnable runnable = ()->{while (index<=50){System.out.println(Thread.currentThread().getName()+" 柜台叫号 "+(index++));try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}};new Thread(runnable,"1号柜台").start();new Thread(runnable,"2号柜台").start();new Thread(runnable,"3号柜台").start();}
}

上述代码同样有严重的并发问题。

4. Thread 构造器

构造器
1.Thread()
只做命名工作,命名方式为
Thread-i (i从0开始)

2.Thread(Runnable)
因为runnable只有一个run方法,因此相当于声明一个run方法。如果Runnable为空,则线程什么也不做。

3.Thread(String)
传入线程名字,由于Runnable为空,因此什么也不做。

4.Thread(Runnable,String)
声明run方法,以及线程名字,此时就是一个可工作的线程。

5.ThreadGroup的概念:
在构造器中可以传入一个ThreadGroup,当ThreadGroup为空时,查看源码后可知会设定CurrentThread的ThreadGroup为当前的ThreadGroup。
CurrentThread就是启动当前线程的线程。
比如在main函数中启动的线程,那么CurrentThread就是main,main的ThreadGroup名为main

6.stacksize概念
首先得知道基本的内存模型

堆和方法区是线程共享,而虚拟机栈是线程私有,stacksize主要影响虚拟机栈。
栈作为内存结构,肯定有限制大小,通过改变stacksize,能够更改自定义线程的虚拟机栈大小。
在官方文档中有说明,stacksize高度依赖平台,不同的运行平台,stacksize可能不起作用。
如果不传stacksize,则默认为0,表示会被忽略。
stacksize被JVM使用,Java中没有直接引用。

5. 守护线程

Daemon:守护线程,会跟随父线程结束。
非守护线程(默认),父线程结束之后,依旧运行。

这么理解: “守护”的对象指的是系统,而非线程本身。当父线程结束之后子线程依旧还在跑,可能出现一些意外的情况,因此需要对系统进行守护。

问题:
1.非守护线程outer中有个长耗时的守护线程inner,那么当outer结束时,inner是什么状态?
inner是守护线程,会随父线程结束,因此当outer结束后,inner未执行完就结束了。

2.守护线程outer中有个长耗时的非守护线程inner,那么当outer结束时,inner是什么状态?
inner是非守护线程,不会随父线程结束,因此当outer结束后它会继续运行。

无论外层的线程是什么类型,只关注本身的类型即可。

通过T.setDaemon(true)设为守护线程。
注意,只有在start之前设置才生效。

线程关系

以下内容为个人实验结果,可能存在偏颇之处

线程之间的关系分为三种:
最外层:线程之间是平等的,线程开始之后,就有了自己独立的运行空间,不会受执行该线程的线程所影响
举例:守护线程中执行非守护线程,当守护线程随着main结束之后,内部的非守护线程依旧在执行,既不会阻塞守护线程,也不会跟着守护线程结束。

package concurrency.chapter1;public class Try {public static void main(String[] args) throws InterruptedException {// 守护线程中套一个长时间的非守护线程,// 一般来说,当main结束后,守护线程会结束,但是由于其内还有个非守护线程,那么它会被阻塞住吗?Thread t1 = new Thread(()->{Thread t2 = new Thread(()->{while (true){System.out.println("尽管父线程都结束了,但是我还在跑");try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}}});t2.setDaemon(false);t2.start();System.out.println("t1结束");});t1.setDaemon(true);t1.start();// 模拟t1在工作,// 如果不加主线程睡眠,那么程序会直接结束// 猜测:t2的创建需要耗时间,还没等t2创建,main就结束了,也导致守护线程t1结束Thread.sleep(100);}
}

关系二:线程之间存在执行与被执行关系
举例:CurrentThread、Thread的静态方法受执行位置所影响、join等都是这个关系的体现。
比较重要的应用就是内部为守护线程时,内部线程的运行时间受外部线程所影响。

关系三:ThreadGroup
这个通常是人为设置,将多个线程放在一个组中进行管理。

6. join

让CurrentThread等待join线程执行完毕,CurrentThread才继续前进。

在main方法中使用Thread.CurrentThread.join(),相当于让main等待自己结束,这会陷入死循环。

7. interrupt

当线程处于block状态(wait、join、sleep)时,调用interrupt会让线程接收一个InterruptException(如果线程中没有捕获该异常,即便interrupt了,线程也不会收到中断信号)

interrupt不会中断线程,而是将线程的状态改为中断。通过在线程内捕获该状态,然后边写处理代码,实现中断。

block状态也有对象之分:x.wait、x.sleep,当x是谁(或者在哪个线程内)调用,则wait、sleep对象就是x(或当前调用的线程),但是x.join,对象仅仅指CurrentThread,如在main中调用thread1.join(),join的对象是main,即main进入join状态,而非thread1。
根据上述描述,interrupt可以打断block状态的线程,当线程处于join时,虽然可以进行打断,但是注意进入join状态的线程是哪个。

可以试试以下实验:

package concurrency.chapter3;public class Interrupt {public static void main(String[] args) {// 启动测试线程对象 tThread t = new Thread(()->{while (true){}});t.start();// 由于t.join()之后的代码都不执行,因此新建一个临时线程用于监控t.join()之后的状态new Thread(()->{try {// sleep保证t.join()执行完毕Thread.sleep(1000);// 拿到main线程ThreadGroup group = Thread.currentThread().getThreadGroup();Thread[] threads = new Thread[group.activeCount()];group.enumerate(threads);for (Thread thread:threads){if (thread.getName().equals("main")){System.out.println("main state:"+thread.getState());System.out.println("t state:"+ t.getState());}}} catch (InterruptedException e) {e.printStackTrace();}}).start();try {// 在main中调用t.join()System.out.println("t.join() invoked in main()");t.join();} catch (InterruptedException e) {e.printStackTrace();}}
}

结果为

t.join() invoked in main()
main state:WAITING
t state:RUNNABLE

说明t.join()之后,main进入join阻塞状态,而非t。

因此,我们通过以下代码想实现通过join打断t是不行的,因此进入join的不是t,而是CurrentThread

package concurrency.chapter3;public class Interrupt {public static void main(String[] args) {// 启动测试线程对象 tThread t = new Thread(()->{while (true){}});t.start();// 由于t.join()之后的代码都不执行,因此新建一个临时线程用于监控t.join()之后的状态new Thread(()->{System.out.println("在临时线程中interrupt->t");t.interrupt();}).start();try {// 在main中调用t.join()System.out.println("t.join() invoked in main()");t.join();} catch (InterruptedException e) {System.out.println("t在join过程中被interrupt");}}
}

以上代码相当于改变了t的interrupt状态为中断状态,但是没有对象去捕获该异常进行处理。
因此,上述代码通过join捕获interrupt异常,只能捕获到CurrentThread。

8. 优雅的结束线程

方式一:flag

package concurrency.chapter4;public class Stop1 {public static class Test extends Thread{private volatile boolean start=true;@Overridepublic void run() {while (start){}}public void shutdown(){this.start=false;}}public static void main(String[] args) {Test test = new Test();test.start();// 模拟test工作耗时try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}test.shutdown();}
}

方式二:使用interrupt+block

package concurrency.chapter4;public class Stop2 {public static void main(String[] args) {Thread thread = new Thread(()->{while (true) {try {Thread.sleep(0);} catch (InterruptedException e) {System.out.println("结束进程");return;}}});thread.start();try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}thread.interrupt();}
}

方式三:Thread.interrupted()
Thread.interrupted()用于检测当前线程是否被interrupt,相当于thread.isInterrupted(),只是一个是静态方法,一个是实例方法。
之所以多一个静态方法,是因为在lambda或者匿名内部类中不能使用实例方法。

package concurrency.chapter4;public class Stop2 {public static void main(String[] args) {Thread thread = new Thread(()->{while (true) {if (Thread.interrupted()){System.out.println("结束进程");return;}}});thread.start();try {Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}thread.interrupt();}
}

考虑以下情况,在线程中,有个事件本身就block了(比如预计10秒,但是一个小时也没结束),这时候,它无法读取到flag,也监听不到interrupted,该如何结束它?
利用守护线程与非守护线程的概念。
父线程结束,守护线程也会结束,因此,我们可以用一个父线程来执行目标线程,并将目标线程设为守护线程。通过控制父线程的结束,实现目标线程的结束。

方式四:利用守护线程
**父线程ThreadService **

package concurrency.chapter4;public class ThreadService {private Thread executeThread;private volatile boolean finished=false;private String taskName;ThreadService(String taskName){this.taskName=taskName;}// 将执行任务设为守护线程public void execute(Runnable task){executeThread = new Thread(()->{Thread runner = new Thread(task);runner.setDaemon(true);runner.start();// executeThread等待runner执行完毕try {runner.join();} catch (InterruptedException e) {// 打断,结束程序体}finally{finished=true;}});executeThread.start();}public void shutdown(long mills){long currentTime = System.currentTimeMillis();// 如果没有结束while (!finished){// 如果超时if (System.currentTimeMillis()-currentTime>mills){// 并非interrupt()结束了线程,而是interrupt()控制了线程执行语句// 调用本行代码时,会进入第21行,// 由于executeThread本身没有其他处理逻辑,因此executeThread结束,同时runner也跟着结束executeThread.interrupt();System.out.println(this.taskName+" 任务超时!强行结束");break;}// 由于finished加了volatile关键字,因此当线程结束时,此处的finished也会被观测为true// 所以如果没有超时,while也能够自动断开}finished=false;}
}

客户端实例

package concurrency.chapter4;public class Stop3 {public static void main(String[] args) {ThreadService threadService = new ThreadService("test线程");long start = System.currentTimeMillis();threadService.execute(()->{// 执行一个非常重的任务
//            while (true){//            }try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}});threadService.shutdown(1000);long end = System.currentTimeMillis();System.out.println(end-start);}
}

9. 线程安全、数据共享

同步锁 synchronized
可锁对象:

  1. 实例对象:this,synchronized作为实例方法关键字时,
  2. 类对象:class,synchronized作用于静态方法、修饰静态对象时
  3. 块对象:锁住对应字节码

死锁:相互等待对方的资源,一般发生在锁套锁的情况。

synchronized核心

实例锁:锁特定实例,this
类锁:锁类,class

每次执行之前模拟以下过程:判断是否需要锁-->锁竞争-->得到锁-->执行-->释放锁①synchronized修饰非静态方法-->锁this-->1.1 多个线程访问同一个对象的该方法-->同步-->1.2 同一个对象,一个线程访问synchronized方法,另一个对象访问非synchronized方法-->异步-->1.3 同一个对象,一个线程访问synchronized方法,另一个对象访问另一个synchronized方法-->同步
结论:一个实例只有一个this锁,
对于1.1,存在锁竞争的过程,因此同步
对于1.2,非synchronized方法不需要锁竞争,因此异步
对于1.3,尽管是两个不同的synchronized方法,但是是同一个锁,也需要锁竞争,因此同步②synchronized修饰静态方法-->锁class-->2.1 一个线程访问synchronized静态方法,另一个对象访问非synchronized静态方法-->异步-->2.2 一个线程访问synchronized静态方法,另一个对象访问另一个synchronized静态方法-->同步
对于2.1,非synchronized方法不需要锁竞争,因此异步
对于2.2,尽管是两个不同的synchronized方法,但是是同一个锁,也需要锁竞争,因此同步③定义静态变量lock,通过synchronized(lock){}对代码块进行加锁-->锁lock变量-->3.1 一个线程访问synchronized静态方法,另一个线程访问包含synchronized(lock){}的静态方法-->异步-->3.2 一个线程访问synchronized静态方法,另一个线程访问包含synchronized(当前类名.class){}的静态方法-->同步
对于3.1,一个锁是class,一个锁是lock,不存在锁竞争,因此异步
对于3.2,两个锁都是当前类名.class,存在锁竞争,因此异步
注意,有种lock的写法是 private static 当前类名 lock = new 当前类名();
此时的lock与当前类名.class依旧不是同一个锁。④定义静态变量int i,一个线程运行synchronized方法修改i,另一个线程运行非synchronized方法修改i,此时是异步,而且数据不安全。
--> synchronized没有锁住资源,只锁住了代码,在其他入口访问同一份资源依旧会出现数据不同步问题。⑤定义静态代码块内的synchronized
static{synchronized(xx){}
}
-->静态代码块会阻塞该类中的所有资源,因为加载静态代码块属于类的初始化过程综上,其实synchronized的核心就在于加锁过程,我们需要判断当前锁是否存在、是否是同一个锁对象,进而判断是否存在锁竞争,从而得知是否是同步、异步。

上述内容皆为实验所得,如有遗漏,敬请留言。

10. 死锁

死锁:相互等待对方的资源,一般发生在锁套锁的情况。
A等待B,B等待C,C等待A

常见于以下情况:
在使用第三方service时,第三方service需要传入我们自己写的类。我们自己写的类又加了锁,就可能出现锁套锁的情况。

检测死锁的方法:
如果线程迟迟不动,cpu也不存在异常,
打开cmd
jps 查看所有java进程
jstack 进程号 如果有死锁,控制台会通知

11. 线程间的通讯(生产者与消费者)

模型一:单生产者+单消费者

package concurrency.chapter6;// 生产者消费者模型
public class ProductorConsumerModel {private int i;private boolean needProduct=true;void product(){synchronized (this) {// 如果需要生产,那么就生产if (needProduct) {System.out.println("生产:" + (i++));// 生产完了通知消费者消费this.notify();needProduct=false;}else{// 如果不需要生产,那么就让生产者等待。try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}void comsumed(){synchronized (this) {// 如果不需要生产,那就消费if (!needProduct) {System.out.println("消费:" + (--i));// 消费完了通知生产者生产this.notify();needProduct=true;}else {// 需要生产,就让消费者等待try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}public static void main(String[] args) {ProductorConsumerModel model = new ProductorConsumerModel();new Thread(()->{while (true){model.product();}}).start();new Thread(()->{while (true) {model.comsumed();}}).start();}
}

注意,xx.wait()是让执行这个方法的线程等待,直到被通知。xx.notify()是唤醒被当前锁锁住的对象

这个模型只允许构建一个生产者线程和一个消费者线程,多个生产者消费者的时候会出现假死锁状态,也会出现虚假唤醒
因为不能确定notify作用于哪个对象,导致所有线程进入wait状态

模型二:多消费者-多生产者
把模型一中的notify改为notifyAll()即可(注意被唤醒并且被执行的线程是从上次阻塞的位置从下开始运行,也就是从wait()方法后开始执行。
因此判断是否进入某一线程的条件 是用while判断,而不是用If判断判断。)

官网文档建议,在多线程中有关wait的判断语句都使用while,避免虚假唤醒。

package concurrency.chapter6;// 生产者消费者模型
public class ProductorConsumerModel2 {private int i;private boolean needProduct=true;void product(){synchronized (this) {// 如果需要生产,那么就生产while (needProduct) {System.out.println(Thread.currentThread().getName() + "生产:" + (i++));// 生产完了通知消费者消费this.notifyAll();needProduct = false;}// 如果不需要生产,那么就让生产者等待。try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}void comsumed(){synchronized (this) {// 如果不需要生产,那就消费while (!needProduct) {System.out.println(Thread.currentThread().getName()+"消费:" + (--i));// 消费完了通知生产者生产this.notifyAll();needProduct=true;}// 需要生产,就让消费者等待try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) {ProductorConsumerModel2 model = new ProductorConsumerModel2();new Thread(()->{while (true){model.product();}},"P1").start();new Thread(()->{while (true){model.product();}},"P2").start();new Thread(()->{while (true){model.product();}},"P3").start();new Thread(()->{while (true){model.product();}},"P4").start();new Thread(()->{while (true) {model.comsumed();}},"C1").start();new Thread(()->{while (true) {model.comsumed();}},"C2").start();}
}

方式三:将数据、生产者、消费者解耦(重点掌握)

package concurrency.chapter6;import concurrency.chapter4.ThreadService;import java.util.stream.Stream;// 生产者消费者模型
public class ProductorConsumerModel4 {public static void main(String[] args) {Data data = new Data();Stream.of("P1","P2").forEach(name->new Thread(()->{Productor productor = new Productor(data);while (true){productor.producted();}},name).start());Stream.of("C1","C2").forEach(name->new Thread(()->{Consumer consumer = new Consumer(data);while (true){consumer.consumed();}},name).start());}
}// 数据容器
class Data{private int i=0;private boolean needPush;synchronized public void push() {while (needPush){System.out.println(Thread.currentThread().getName()+"生产: "+ (++i));needPush=!needPush;notifyAll();}try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}synchronized public void pop() {while (!needPush){System.out.println(Thread.currentThread().getName()+"消费: "+ (i--));needPush=!needPush;notifyAll();}try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}
}// 生产者
class Productor{private Data data;Productor(Data data){this.data=data;}public void producted(){data.push();}
}// 消费者
class Consumer{private Data data;Consumer(Data data){this.data=data;}public void consumed(){data.pop();}

说明:原理其实非常简单:生产者和消费者的速度都是一样的,那么理论情况下,生产一个,就被消费一个。因此,需要同步的对象就是当前这个被生产/被消费的数据,与之相关的操作就是“生产”与“消费”,因此只需要让这两个操作去竞争同一个锁就行了

思考:如果是一个生产者和一个消费者,并且生产者和消费者都在抢一个锁(对数据的写入和写出权),这不还是单线程么?
此外,如果队列入口一个锁,出口一个锁,生产者和消费者各抢各的锁,用容器上限来判断是否生产\消费,那么多个生产者和消费者的速度是不是还是跟单个生产者\消费者一样?
毕竟队列只有一个入口和出口,同一时刻只允许一个生产者和消费者操作。

回答:对于“每个线程访问被锁的对象”这个操作来说,的确是串行的(单线程、排队),但是这个操作只是每个线程工作中的一部分。比如说访问一个数据队列,大家都排队,这里会耗时,但是当排队拿到数据之后,各个线程的行为是并行的,大大提高了效率。

如果访问被锁的对象是线程的主要目的,自然看起来就跟单线程没什么区别。
以下是线程个数的速度测试
100ms下,可以生成并消耗多少个产品

package concurrency.chapter6;import concurrency.chapter4.ThreadService;import java.time.Duration;
import java.time.Instant;
import java.util.Currency;
import java.util.stream.Stream;/*** @author sxuer*/ // 生产者消费者模型
public class ProductorConsumerModel4 {static volatile int i=0;public static void main(String[] args) throws InterruptedException {Data data = new Data();Stream.of("P1").forEach(name->new Thread(()->{Productor productor = new Productor(data);while (i++<100000){productor.producted();}},name).start());Stream.of("C1").forEach(name->new Thread(()->{Consumer consumer = new Consumer(data);while (i++<100000){consumer.consumed();}},name).start());Thread.sleep(5000);System.out.println("运行结束");}
}// 数据容器
class Data{private int i=0;private boolean needPush;synchronized public void push() {while (needPush){System.out.println(Thread.currentThread().getName()+"生产: "+ (++i));needPush=!needPush;notifyAll();}try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}synchronized public void pop() {while (!needPush){System.out.println(Thread.currentThread().getName()+"消费: "+ i);needPush=!needPush;notifyAll();}try {this.wait();} catch (InterruptedException e) {e.printStackTrace();}}
}// 生产者
class Productor{private Data data;Productor(Data data){this.data=data;}public void producted(){data.push();}
}// 消费者
class Consumer{private Data data;Consumer(Data data){this.data=data;}public void consumed(){data.pop();}
}

测试结果如下

2+2 25059\25065 指的是2个生产者+2个消费者在100000减至0期间内生产并消费的产品个数为25059(第一次测试)、25065(第二次测试)

    /*** 2+2 25059\25065* 1+2 33467\33435* 2+1 33433\33390* 1+1 49999\49999*/

可以看到1+1是最快的,因为上述简单的生产者消费者模型访问产品是线程的主要主要操作,2+2存在频繁的线程切换导致效率最低。

12. sleep与wait的区别

1.sleep是Thread方法,wait是Object方法
2.sleep不会释放锁,wait会释放锁
3.sleep不依赖于锁,wait的使用依赖于锁
4.sleep不需要唤醒,wait需要(wait(time)除外)

13. 综合案例–数据采集

线程切换是需要开销的,多线程效率是一个开口向下的抛物线,当线程过多的时候,效率会越来越慢。
案例:对n台机器进行数据采集工作,显然,我们需要定义一定数量的线程,当某个线程结束后,再启动一个线程去采集,保证线程的数量不超过设定的最大值。

假设有10台机器,线程最大数为5。

package concurrency.chapter7;import java.util.*;
import java.util.stream.Stream;public class DataCapture {final private static int MAXSIZE=5;// FIFO队列,用于控制运行时的线程个数final static private LinkedList<Object> CONTROLS = new LinkedList<>();public static void main(String[] args) {// 由于流是一次性的,我们用一个容器临时保存线程,保证在后面能够让所有线程joinList<Thread> threads = new ArrayList<>();// 创建10个线程,注意我们能够同时运行的线程最大个数为5,因此通过wait对线程的运行进行控制Stream.of("M1","M2","M3","M4","M5","M6","M7","M8","M9","M10").map(DataCapture::threadCreate).forEach(t->{t.start();threads.add(t);});// 拿到所有线程,进行jointhreads.forEach(t->{try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});Optional.of("所有线程工作结束").ifPresent(System.out::println);}private static Thread threadCreate(String name){return new Thread(()->{// 运行时控制线程个数,进队列抢锁synchronized (CONTROLS) {// 如果当前个数大于MAXSIZE了(也就是第六个)就让其waitwhile (CONTROLS.size() >= MAXSIZE) {try {System.out.println(Thread.currentThread().getName() + ": 正在等待!");CONTROLS.wait();} catch (InterruptedException e) {e.printStackTrace();}}CONTROLS.addLast(new Object());// 这句通知由队列发出(即在synchronized中),能够清楚看到工作顺序System.out.println(Thread.currentThread().getName()+": 开始工作");}// 工作时都在自己的工作空间,不需要进行synchronized// 模拟工作耗时try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}System.out.println(Thread.currentThread().getName()+": 工作结束");// 出队列抢锁synchronized (CONTROLS){// 工作完毕,自己退出队列,并通知其他线程CONTROLS.removeFirst();CONTROLS.notifyAll();}},name);}
}

在个数判断中必须使用while (CONTROLS.size() >= MAXSIZE) {,大于等于号,才能保证同时运行5个线程,我也不知道为啥。按照逻辑来说应该是用大于号,望懂得朋友留言告知。

14. 显式锁(实现自定义锁)

API接口设计

package concurrency.chapter8;import java.util.Collection;// 显式锁API接口
public interface Lock {// 超时异常class TimeOutException extends Exception{TimeOutException(String msg){super(msg);}}// 加锁void lock() throws InterruptedException;// 按时加锁void lock(long mills) throws InterruptedException,TimeOutException;// 解锁void unlock();// 查看阻塞线程Collection<Thread> getBlockedThreads();// 查看阻塞个数int getBlockedSize();
}

实现

package concurrency.chapter8;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;// 通过boolean值去操控锁
package concurrency.chapter8;import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeoutException;// 通过boolean值去操控锁
public class BooleanLock implements Lock {// 如果LOCK为true,说明锁被人持有,否则说明锁为空闲状态private boolean LOCK;// 保证当前操作锁的对象式currentThread,不然其他线程也能自由调用lock和unlockprivate Thread currentThread;// 保存当前被阻塞的线程private Collection<Thread> blockedThreads = new ArrayList<>();// synchronized不能在接口中声明,在这里进行标注@Overridesynchronized public void lock() throws InterruptedException {// 如果锁被人持有,那就等待while (LOCK){this.wait();this.blockedThreads.add(Thread.currentThread());}// 结束while之后说明锁被当前线程拿到,那就更改锁状态this.blockedThreads.remove(Thread.currentThread());LOCK=true;this.currentThread = Thread.currentThread();}// synchronized加的锁不具备超时的能力,因此我们自定义超时锁@Overridesynchronized public void lock(long mills) throws InterruptedException, TimeOutException {if (mills<=0){lock();return;}// flag用于判断是否超时long flag = mills;// timeout表示超时时间long timeout = System.currentTimeMillis()+mills;// 如果锁被持有,就让其等待while (LOCK){if (flag<=0){throw new TimeOutException(Thread.currentThread().getName()+"Time out");}blockedThreads.add(Thread.currentThread());this.wait();flag = timeout-System.currentTimeMillis();}// 拿到锁this.LOCK=true;blockedThreads.remove(Thread.currentThread());this.currentThread = Thread.currentThread();}@Overridesynchronized public void unlock() {// 如果锁被人持有,并且当前试图解锁的也是当前线程,那就释放锁while (LOCK && this.currentThread==Thread.currentThread()){System.out.println("锁已被释放");this.notifyAll();LOCK=false;}// 如果锁处于释放状态,那就不做操作}@Overridepublic Collection<Thread> getBlockedThreads() {// unmodifiableCollection 在返回的过程中,不允许对其进行修改return Collections.unmodifiableCollection(blockedThreads);}@Overridepublic int getBlockedSize() {return blockedThreads.size();}
}

客户端测试

package concurrency.chapter8;import com.sun.org.apache.xpath.internal.operations.Bool;import java.util.stream.Stream;public class LockTest {public static void main(String[] args) {Lock lock = new BooleanLock();// 普通lock测试Stream.of("T1","T2","T3").forEach(name->{new Thread(()->{// 当前线程拿到锁,开始工作try {lock.lock();System.out.println(Thread.currentThread().getName()+"  拿到锁");work();} catch (InterruptedException e) {e.printStackTrace();}finally {//结束后释放锁lock.unlock();}},name).start();});// main函数视图释放锁,但是我们有验证,实际操作不了lock.unlock();// 超时lock测试Stream.of("T4","T5","T6").forEach(name->{new Thread(()->{// 当前线程拿到锁,开始工作try {lock.lock(100L);System.out.println(Thread.currentThread().getName()+"  拿到锁");work();} catch (InterruptedException e) {e.printStackTrace();} catch (Lock.TimeOutException e) {System.out.println(Thread.currentThread().getName()+" 超时!");} finally {//结束后释放锁lock.unlock();}},name).start();});// main函数视图释放锁,但是我们有验证,实际操作不了lock.unlock();}// 模拟工作private static void work(){try {System.out.println(Thread.currentThread().getName()+"  正在工作");Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}}
}

15. 钩子方法处理系统退出工作

当系统被终止的时候,往往还有很多连接资源没有关闭,比如数据库连接、网络连接等等,因此我们在终止程序的时候,需要一些关闭各种资源的操作——用钩子方法。

package concurrency.chapter9;public class HookToExit {public static void main(String[] args) {// 钩子方法Runtime.getRuntime().addShutdownHook(new Thread(()->{System.out.println("系统正在退出");//进行退出资源回收nofigyAndRelease();}));System.out.println("开始工作");while (true){}}private static void nofigyAndRelease() {//这里处理资源回收或通知System.out.println("关闭缓存");System.out.println("关闭数据源连接");System.out.println("关闭socket");System.out.println("系统已退出");}
}

也能在nofigyAndRelease可以捕获异常

上述代码在linux下测试可以看到清楚的效果。
无论是手动shutdowm还是kill进程,都能处理退出工作。
注意kill -9 是强制终结命令,无法完成退出处理工作。

16. ThreadException与stackTrace(了解)

package concurrency.chapter9;import java.util.Optional;
import java.util.stream.Stream;public class ThreadException {private static int A=1;private static int B=0;public static void main(String[] args){Thread t = new Thread(()->{// 这个异常无法抛出,只能try-catchtry {Thread.sleep(100);A/=B;} catch (InterruptedException e) {e.printStackTrace();}});t.start();// 这里捕获异常并处理t.setUncaughtExceptionHandler((group,e)->{System.out.println(group);System.out.println(e);});System.out.println("打印方法调用栈");Stream.of(Thread.currentThread().getStackTrace()).filter(e->!e.isNativeMethod()).forEach(e-> Optional.of("类名:"+e.getClassName()+"  方法名:"+e.getMethodName()+"  行数:"+e.getLineNumber()).ifPresent(System.out::println));}}

17. ThreadGroup

如果不设置group,会将线程加到当前group中。
ThreadGroup是树形结构,允许子节点也是ThreadGroup。

group.destroy(),销毁线程组,如果期内还有活跃的线程,抛出异常。

group.enumerate(Thread[] list【, boolean recurse】);
拷贝线程到list中,recurse指定是否需要对其内的其他线程组进行递归拷贝(深拷贝)
不加recurse则默认为true

group.interrupt() 打断其内的所有线程(递归式)

group.setDaemon()
线程组的Daemon跟线程不一样,线程组的Daemon指的是,如果线程组被destroy或其内的所有线程都执行结束,那就销毁该线程。
也就是说,非Daemon线程组需要手动回收
如果创建ThreadGroup时制定了parentThreadGroup,则Daemon与parent一致。

18. 手写线程池

概念:
1.任务队列:所有执行线程去任务队列里面拿任务执行
2.线程队列:线程池有一个线程队列
3.拒绝策略(抛出异常、丢弃、阻塞、临时队列):当任务数量超过线程池设定的可接受数量时,进行拒绝的处理策略
4.容量:线程池的线程队列个数可以动态变化。

package chapter10;import java.util.*;// 线程池的简单实现
public class ThreadPool extends Thread {//定义线程池大小private int size;//定义线程池扩容最大容量private final static int MAXZISE=30;//可提交任务的最大值private final int taskQueueMaxSize;//线程池默认大小public final static int DEFAULT_SIZE=10;//任务队列,用于存放外部传进来的所有任务,执行一个就删除一个private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList<>();// 默认的任务最大个数public final static int DEFAULT_TASK_QUEUE_SIZE=2000;// 用于存放线程池的所有线程private final static List<WorkerTask> WORKER_THREAD_QUEUE = new ArrayList<>();// 用于线程命名自增private static volatile int seq;// 用于线程命名前缀private final static String THREAD_PREFIX = "THREAD_POOL-";private final static ThreadGroup GROUP = new ThreadGroup("THREAD_POOL");// 默认的拒绝策略,超过数量直接抛出异常public final static DiscardPolicy DEFAULT_DISCARD_POLICY = ()->{throw new DiscardException("提交任务数量过多,任务被拒绝!");};// 提供给外部传入private DiscardPolicy discardPolicy;// 线程池的状态private boolean isDead;//空构造,设为默认大小public ThreadPool(){this(DEFAULT_SIZE,DEFAULT_TASK_QUEUE_SIZE,DEFAULT_DISCARD_POLICY);}public ThreadPool(int size, int taskQueueMaxSize, DiscardPolicy discatdPolicy){this.size=size;this.taskQueueMaxSize = taskQueueMaxSize;this.discardPolicy = discatdPolicy;// 初始化线程池init();}private void init() {// 初始化创建size个工作线程for (int i=0;i<size;i++){createWorkTask();}this.start();}// 对外提供接口,加入任务public void submit(Runnable runnable){// 队列入口,抢锁synchronized (TASK_QUEUE){// 拒绝策略判断,任务队列的个数大于最大值if (TASK_QUEUE.size()> taskQueueMaxSize){discardPolicy.discard();}TASK_QUEUE.addLast(runnable);// 加入队列之后就通知其他休眠的(类似生产者消费者模型)TASK_QUEUE.notifyAll();}}// 线程池的状态监控:扩容、缩容等@Overridepublic void run(){while (!isDead){// 扩容extend();try {Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}// 缩容unextend();}}// 扩容private void extend(){synchronized (WORKER_THREAD_QUEUE){// 如果提交的任务个数TASK_QUEUE.size()/2 > WROKER_THREAD_QUEUE,就再创建一个工作线程if (TASK_QUEUE.size()<<1 > WORKER_THREAD_QUEUE.size() && WORKER_THREAD_QUEUE.size()<MAXZISE){System.out.println("扩容前:size:"+size);createWorkTask();size++;System.out.println("扩容后:size:"+size);}}}//缩容private void unextend(){synchronized (WORKER_THREAD_QUEUE){// 如果提交的任务个数TASK_QUEUE.size()/2 < WROKER_THREAD_QUEUE,就减少一个工作线程if (TASK_QUEUE.size()<<1 < WORKER_THREAD_QUEUE.size() && TASK_QUEUE.size()>0){System.out.println("缩容前:size:"+size);for (Iterator<WorkerTask> iterator = WORKER_THREAD_QUEUE.iterator();iterator.hasNext();){WorkerTask task = iterator.next();if (task.state==TaskState.BLOCKED){task.interrupt();task.close();size--;System.out.println("缩容后:size:"+size);}}}}}// 创建自定义的工作线程private void createWorkTask(){WorkerTask task = new WorkerTask(GROUP,THREAD_PREFIX+(seq++));// 线程池中的线程在线程池创建后就被启动,具体的状态根据任务需求会自动更改task.start();// 加到工作队列中WORKER_THREAD_QUEUE.add(task);}// 拒绝策略,提供接口,让外部也能实现自定义拒绝策略public interface DiscardPolicy{void discard() throws DiscardException;}// 通过抛出异常,抛出拒绝public static class DiscardException extends RuntimeException{public DiscardException(String msg){super(msg);}}// 关闭线程池public void shutdown() throws InterruptedException {// 如果任务队列还有,那就稍等一会while (!TASK_QUEUE.isEmpty()){Thread.sleep(1);}// 如果任务队列里面没有任务了,那就结束,需要循环结束int initVal = WORKER_THREAD_QUEUE.size();while (initVal>0){for (WorkerTask task :WORKER_THREAD_QUEUE) {//如果任务执行完毕,那么状态必为阻塞if (task.state==TaskState.BLOCKED) {task.close(); //state设为DADEtask.interrupt();//打断,退出任务循环initVal--;// 如果不是,就先等待一下,不要疯狂运行}else {System.out.println(TASK_QUEUE.size());Thread.sleep(1);}}}isDead=true;System.out.println("--------------线程池已被关闭----------------");}// 定义任务状态,空闲、运行、阻塞、死亡private enum TaskState {FREE,RUNNING,BLOCKED,DEAD}// 封装Thread对象,让其拥有我们定义的任务状态等其他信息private static class WorkerTask extends Thread{//初始化为空闲状态private volatile TaskState state = TaskState.FREE;// 获取任务状态public TaskState getTaskState(){return this.state;}//调用父类的group的构造方法public WorkerTask(ThreadGroup group,String name){super(group,name);}//run,让其执行完之后不能销毁,而是放回池中@Overridepublic void run() {//如果线程状态没有死亡,就去队列中获取任务,保证工作线程在系统运行期间永不消亡OUTER:while (this.state!=TaskState.DEAD){// 声明任务Runnable runnable;//出队,抢锁synchronized (TASK_QUEUE){// 如果队列为空,就让线程等待,释放锁while (TASK_QUEUE.isEmpty()){try {// 进入wait,修改状态state = TaskState.BLOCKED;TASK_QUEUE.wait();} catch (InterruptedException e) {//如果线程被打断,就重新去获取任务,保证工作线程永不消亡break OUTER;}}// 如果队列不为空,就拿到第一个任务runnable = TASK_QUEUE.removeFirst();}// 拿到任务释放锁,开始工作if (runnable!=null){// 开始执行,修改状态state = TaskState.RUNNING;runnable.run();// 执行完毕,修改状态state = TaskState.FREE;}}}// 关闭任务public void close(){this.state = TaskState.DEAD;}}public static void main(String[] args) throws InterruptedException {// 初始化一个线程池ThreadPool pool = new ThreadPool();// 提交40个任务for (int i = 0; i < 40; i++) {pool.submit(()->{System.out.println("任务被线程"+Thread.currentThread().getName()+"执行");try {// 模拟工作Thread.sleep(500);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("任务被线程"+Thread.currentThread().getName()+"执行完毕");});}// 等待线程工作Thread.sleep(20000);pool.shutdown();}
}

二、多线程中的设计模式

1. 多线程中的单例模式

最佳的两个方式
1.静态内部类

public class Test {private Test(){}private static class Instance{final private static Test INSTANCE=new Test();}public static Test get_Instance(){return Instance.INSTANCE;}
}

2.枚举

public class Test {private Test(){}static enum  Instance{INSTANCE;private Test test;Instance(){test=new Test();}public Test get_Instance(){return test;}}public static void main(String[] args) {Test test = Instance.INSTANCE.get_Instance();Test test2 = Instance.INSTANCE.get_Instance();System.out.println(test);System.out.println(test2);}
}

2. WaitSet

WaitSet是逻辑结构,实际并不存在。

  • 所有对象都有WaitSet。
  • 所有进入wait状态的线程,都会被加入到对应锁对象的WaitSet中
  • WaitSet中的线程被唤醒时,不一定按FILO顺序。

3. volatile详解

volatile实现 内存可见性和有序性,不支持原子性。

假如一个线程中对某个变量只有“读”操作,那么这个线程不会从主存中获取该变量,而是一直从线程操作空间(或者说线程缓存)中获取该变量。

底层解决线程数据共享的方法:
CPU高速缓存一致性协议(常用)、给数据总线加锁

核心思想:

  • 当cpu写入数据时,如果发现该变量被其他线程共享,则通知其他线程,该变量无效
  • 当其他线程访问该变量时,只从主存中获取。

Java实现原子性
对基本数据类型变量读取和复制,通常情况下保证了原子性

a=10   满足原子性
b=a     不满足原子性  1.read a;2. assign b;
c++     不满足原子性  1.read c;2.add;3.assign c;
c=c+1 不满足原子性  1.read c;2.add;3.assign c;

为什么是通常情况下保证原子性?
凡是分步操作的,都可能不满足原子性。
而有些较大的数据,可能是分步赋值的;如32位机器上,32位的值可能是先赋前16位,再赋后16位。

Java实现可见性
volatile关键字修饰的变量,总是在主存中获取值,保证了可见性。

Java实现有序性
volatile关键字修饰的变量,禁止重排序,保证了有序性。

4. 多线程中的观察者模式

观察者模式的基础看这里

以下实现方式是博主自己结合观察者模式改造而成,水平有限,偏颇之处,敬请指出。

通过观察者模式,对线程进行监控。

注意,以下代码是“单个观察者”(线程监听器)对“多个线程”进行观察,而非标准观察者模式中【多个观察者+单个被观察者】

首先是观察者(线程监听器)的抽象接口

public interface LifeCycleListener {void onEvent(ObservableRunnable.RunnableEvent event);
}

onEvent就是被通知方法(回调方法)

实际观察者

package concurrency.chapter11;import java.util.HashMap;
import java.util.Map;// 实际观察者
public class ThreadLifeCycleObserver  implements LifeCycleListener {// 由于多个被观察者使用同一个观察者,因此观察者的观察方法需要被锁住private final Object LOCK = new Object();// 保存每个线程上一次的线程状态private final Map<Thread, Thread.State> threadStateMap= new HashMap<>();@Overridepublic void onEvent(Thread event) {// 将当前线程状态保存synchronized (threadStateMap){threadStateMap.put(event,event.getState());}// 监控线程的未捕获异常event.setUncaughtExceptionHandler((group,e)->{System.out.println("线程异常!\t状态:"+event.getState()+"\t所属线程组:"+group +"\t异常原因:"+e);try {Thread.sleep(10);} catch (InterruptedException ex) {ex.printStackTrace();}});synchronized (LOCK){// 拿到上一次的线程状态final Thread.State lastState = threadStateMap.get(event);// 如果线程状态改变了,就发布通知if (event.getState()!=lastState){// 这里监控线程的各个状态,并处理switch (event.getState()){case NEW:System.out.println("通知:"+event.getName()+"由状态【"+lastState+"】"+"变为"+"【"+event.getState()+"】");break;case BLOCKED:System.out.println("通知:"+event.getName()+"由状态【"+lastState+"】"+"变为"+"【"+event.getState()+"】");break;case WAITING:System.out.println("通知:"+event.getName()+"由状态【"+lastState+"】"+"变为"+"【"+event.getState()+"】");break;case RUNNABLE:System.out.println("通知:"+event.getName()+"由状态【"+lastState+"】"+"变为"+"【"+event.getState()+"】");break;case TERMINATED:System.out.println("通知:"+event.getName()+"由状态【"+lastState+"】"+"变为"+"【"+event.getState()+"】");break;case TIMED_WAITING:System.out.println("通知:"+event.getName()+"由状态【"+lastState+"】"+"变为"+"【"+event.getState()+"】");break;default:break;}}}}}

接着是被观察者,也就是我们的线程,在客户端创建即可。

然后就是让观察者与被观察者建立联系,类似中介

package concurrency.chapter11;import java.util.List;// 中介
public class Mediator {// 依赖观察者private final LifeCycleListener listener;// 依赖所有被观察者private List<Thread> threads;// 注入观察者与被观察者public Mediator(List<Thread> threads, LifeCycleListener listener){this.listener=listener;this.threads = threads;}// 遍历所有被观察者,调用观察者的通知方法protected void notifyChange(){for (Thread event:threads) {new Thread(()->{do {listener.onEvent(event);}while (event.getState()!= Thread.State.TERMINATED);}).start();}}
}

客户端

package concurrency.chapter11;import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;public class ThreadLifeCycleClient {public static void main(String[] args) {// 创建观察者List<Thread> threads = new ArrayList<>();Stream.of("T1","T2","T3").forEach(name-> threads.add(new Thread(()->{int i = 0;while (i < 10) {System.out.println(Thread.currentThread().getName()+"正在运行:" + i++);try {Thread.sleep(1);
//                     模拟异常int j=1/0;} catch (InterruptedException e) {e.printStackTrace();}};},name)));// 开启观察new Mediator(threads,new ThreadLifeCycleObserver()).notifyChange();// 启动观察者threads.forEach(Thread::start);}
}

5. 多线程中的单线程执行模式

Single Thread Execution,也就是说在同一时刻只能有一个线程访问共享资源,也就是说共享资源同一时间只能被一个线程访问。

先完成以下代码

package concurrency.chapter12;// 门:共享资源、临界区
public class Gate {private int counter = 0;private String name="Nobody";private String address="Nowhere";// 线程通过这个门,打卡记录public void pass(String name,String address){counter++;this.name=name;this.address=address;if (!verify()){System.out.println("被阻拦!"+toString());};}// 验证打卡信息是否正确private boolean verify() {// 此处定义简单的逻辑规则,当名字的长度等于地址的长度就通过return this.name.length() == this.address.length();}@Overridepublic String toString() {return "No."+counter+":"+name+","+address;}
}
package concurrency.chapter12;// 线程用户
public class User extends Thread {private final String name;private final String address;private final Gate gate;public User(String name, String address, Gate gate) {this.name = name;this.address = address;this.gate = gate;}@Overridepublic void run() {System.out.println(name+"开始");while (true){this.gate.pass(name,address);}}
}
package concurrency.chapter12;import java.util.stream.Stream;public class Client {public static void main(String[] args) {// 共享的资源,门Gate gate = new Gate();// 按照筛选规则,他们都能通过检查门User u1 = new User("a","a",gate);User u2 = new User("bb","bb",gate);User u3 = new User("ccc","ccc",gate);Stream.of(u1,u2,u3).forEach(Thread::start);}
}

按照筛选规则,三个user都能通过检查门,但是运行后发现

一瞬间就被阻拦了。
很容易得知,由于gate是线程的共享资源,在这里面却没有任何锁,肯定会发生并发问题。
但是是如何发生的?

分析:
1.a, ccc被阻拦
原因:当a,a进入Gate时,此时来了个ccc,ccc,将a,a修改成a,ccc,因此被阻拦
2.a, a被阻拦:这个看似明明符合规则,依旧被阻拦
原因:当a,a进入Gate时,此时来了个bb,bb,将a,a修改成a,bb,因此被阻拦,但是在打印的时候,又来了个a,a,将a,bb修改成a,a,因此我们看到a,a被阻拦

因此,我们给Gate中的pass加个锁即可。

并发问题三要素:共享资源+临界值+资源竞争

6. 读写锁的分离

读+写,可能发生并发问题
写+写,可能发生并发问题
读+读,不会发生并发问题

对于一个资源,存在读或者写,如果能够将读写锁分离,当读+读时,并行化,就能提高效率。

package concurrency.chapter13;public class ReadWriteLock {// 正在读的用户个数private int readingReaders =0;// 正在写的用户个数private int writingWriters=0;// 正在读的被阻塞用户个数private int blockingReaders=0;// 正在写的被阻塞用户个数private int blockingWriters=0;// 写操作是否有更高的优先权(如果为false,大概几十秒写操作才能抢到一个锁)private boolean preferWriter = true;public ReadWriteLock(){// 默认preferWriter设为true;this(true);}public ReadWriteLock(boolean preferWriter){this.preferWriter = preferWriter;}// 当用户打算“读”的时候,抢这个锁// 抢锁这个行为本身也应该是串行的,因此加上synchronizedpublic synchronized void readLock() throws InterruptedException{// 既然抢锁,肯定writingthis.blockingReaders++;try{// 如果有人在写,就不允许读// 如果写具有更高权限,那么当存在“写”线程在等待时,就让读线程让出锁while(writingWriters>0 || (preferWriter&&blockingWriters>0)){this.wait();}// wait线程抢到锁之后从这开始执行,所以readers++this.readingReaders++;}finally {// reader执行完毕,让被阻塞的reader--// 此处不让readingReaders--,是因为只有当释放锁的时候才--this.blockingReaders--;}}// 释放锁public synchronized void readUnlock(){// 释放锁之后,读者减少一个this.readingReaders--;this.notifyAll();}// 当用户打算“写”的时候,抢这个锁public synchronized void writeLock() throws InterruptedException{// 既然抢锁,肯定writingthis.blockingWriters++;try{// 如果有人在写,或者有人在读,就不允许写while(writingWriters>0 || readingReaders>0){this.wait();}// wait线程抢到锁之后从这开始执行,所以writingWriters++this.writingWriters++;}finally {// 执行完毕,让被阻塞的blockingWriters--this.blockingWriters--;}}// 释放锁public synchronized void writeUnlock(){// 释放锁之后,writingWriters减少一个this.writingWriters--;this.notifyAll();}
}

共享资源

package concurrency.chapter13;// 共享的数据,可被写和读
public class ShareData {// 操作的数据private final char[] buffer;// 锁private final ReadWriteLock LOCK = new ReadWriteLock(false);// 传入共享数据的sizepublic ShareData(int size) {this.buffer = new char[size];// 初始化数据为*for (int i=0;i<size;++i){buffer[i]='*';}}// 读操作public char[] read() throws InterruptedException{try{// 上锁LOCK.readLock();// 模拟读操作耗时Thread.sleep(1);return this.buffer;}finally {// 释放锁LOCK.readUnlock();}}// 写操作public char[] write(char c) throws InterruptedException{try{// 上锁LOCK.writeLock();for(int i=0;i<buffer.length;++i){buffer[i]=c;}// 模拟写操作耗时Thread.sleep(5);}finally {// 释放锁LOCK.writeUnlock();}return this.buffer;}
}

客户端测试(最好是自己写一个继承Thread类的Reader\Writer)

package concurrency.chapter13;import java.util.Arrays;
import java.util.stream.Stream;public class Client {public static void main(String[] args) {final ShareData shareData = new ShareData(10);new Thread(()->{while (true){try {final char[] read = shareData.read();System.out.println("reader线程:"+String.valueOf(read)+"-----------");} catch (InterruptedException e) {e.printStackTrace();}}},"R1").start();Stream.of('a','b','c','d','e','f','g','i','h','s').forEach(c->{new Thread(()->{while (true){try {final char[] write = shareData.write(c);System.out.println("writer线程:"+ String.valueOf(write)+"++++++++++++");} catch (InterruptedException e) {e.printStackTrace();}}},"W1").start();});}
}

7. 不可变对象设计模式

不可变对象一定是线程安全的,不必加锁
上述这句话很容易理解:只有修改才会导致并发问题,不可变对象不允许修改(只读),自然是线程安全的。

因此只需要知道如何使对象成为不可变对象即可。

  1. 不要使用setter方法
  2. 所有属性都加上final private
  3. 不允许对象被继承,即类本身也有final修饰。或者私有化构造方法,然后用工厂返回对象(非单例),或者单例模式(子类会隐式调用父类构造器,当父类构造器私有化时,就不允许被继承
  4. 如果这个对象引用了其他可变对象,不要允许这些可变对象被改变。(参考String的设计,凡是涉及到其他可变对象,对其加上synchronized)
  5. 参照String的设计,若非要修改,那么将返回一个新的对象。
  6. 可被重写的方法(Object提供的,如toString)加上synchronized,防止多处重写导致错误。

final其实是限制了地址不可变,集合类尽管使用了final修饰,但是还是能对里面的数据进行修改操作,因此get方法返回某个List时,使用Collection.unmodifiableList(list)。 或者返回一个复制体出去

servlet不是线程安全的

8. Future设计模式

你去买蛋糕,蛋糕制作需要半小时,你有两个选择
1.原地等待半小时,拿到蛋糕
2.老板给你一张票,你离开做其他事情,半小时后回来用票换蛋糕。

第一种方法,就导致你被阻塞。
第二种办法,就是Future设计模式:调用其他方法的时候,暂时拿不到结果,但是你不选择阻塞等待,而是拿到结果的引用并继续自己的其他工作;等结果出来了,你用该引用得到结果。

涉及对象:
未来事件、你的工作线程、做蛋糕的线程、
关系:
你从未来事件中获取结果
未来事件从做蛋糕的线程那拿到结果

未来事件的接口

// 这是一个未来的事件接口
public interface Future<T> {T get() throws InterruptedException;
}

未来事件的实现:事件是否完成、事件产出结果

public class ImplFuture<T> implements  Future<T>{// 是否完成private volatile boolean done=false;// 得到结果private T result;// 如果蛋糕完成了,就把结果传进来public void done(T result){// 修改 -- 锁synchronized (this){this.result = result;this.done = true;this.notifyAll();}}// 返回蛋糕的方法@Overridepublic T get() throws InterruptedException {synchronized (this){// 如果还未完成,就让“取数据线程”等待while (!done){this.wait();}}return result;}
}

执行任务的接口(做蛋糕等其他动作)

作为函数式接口即可,具体动作由客户端定义

// 执行任务
public interface Task<T> {T call() throws InterruptedException;
}

做一个门面,将Future和task组合到一起,方便客户端调用

package concurrency.chapter14;public class Service {// 用户创建这个Service,然后传入task即可得到Future事件public <T> Future<T> submit(Task<T> task){ImplFuture<T> future = new ImplFuture<>();// 另起一个做蛋糕的线程new Thread(()->{T result = null;try {result = task.call();} catch (InterruptedException e) {e.printStackTrace();}// 把蛋糕传递给futurefuture.done(result);}).start();return future;}
}

客户端

package concurrency.chapter14;public class Client {public static void main(String[] args) throws InterruptedException {// 会被动阻塞,直到拿到数据
//        String result = get();
//        System.out.println(result);Service service = new Service();Future<String> future = service.submit(Client::get);Thread.sleep(50);System.out.println("你去做其他事情");Thread.sleep(5000);System.out.println(future.get());}private static String get() throws InterruptedException {System.out.println("老板正在制作蛋糕,预计耗时5s");Thread.sleep(5000);return "得到蛋糕";}
}

上述版本缺点明显:当你做其他事情时,调用future.get(),如果此时蛋糕没有做好,你也会被阻塞住。

其实上述代码只是利用了线程的异步,你的工作是主线程,制作蛋糕是main线程,共享数据是蛋糕。

改进办法:在Service::submit中,增加一个回调方法。
如何增加回调方法?
回调就是指当前方法结束的时候,再调用其他方法。
submit的结束标志就是当蛋糕制作完毕时,所以,改动如下:

Service::submit

package concurrency.chapter14;import java.util.function.Consumer;public class Service {public <T> Future<T> submit(Task<T> task, Consumer consumer){ImplFuture<T> future = new ImplFuture<>();new Thread(()->{T result = null;try {result = task.call();} catch (InterruptedException e) {e.printStackTrace();}future.done(result);// 当蛋糕做完了,就进行消费,消费方式由客户决定consumer.accept(result);}).start();return future;}
}

Client

package concurrency.chapter14;public class Client {public static void main(String[] args) throws InterruptedException {// 会被动阻塞,直到拿到数据
//        String result = get();
//        System.out.println(result);Service service = new Service();// 调用get方法制作蛋糕,将蛋糕结果Future<String> future = service.submit(Client::get,(t)->{System.out.println("蛋糕已经做完啦,得到"+t);});Thread.sleep(50);System.out.println("你去做其他事情");Thread.sleep(5000);System.out.println("你此时也可以继续从future.get()中获取蛋糕:"+future.get());}private static String get() throws InterruptedException {System.out.println("老板正在制作蛋糕,预计耗时5s");Thread.sleep(5000);return "布丁蛋糕";}
}

9. Guarded Suspension设计模型

guarded suspension 确保挂起
A在做饭,B敲门送快递,A说等一下,B被挂起。

模拟实例:
客户端发送请求,服务器接收请求。
服务器一次只能处理一个请求,当服务端正在处理请求的时候,就将客户端发送的请求加到队列中,等待服务端处理完当前请求

简单封装一个请求

public class Request {final private String value;public Request(String value) {this.value = value;}public String getValue() {return value;}
}

请求队列

package concurrency.chapter15;import java.util.LinkedList;public class RequestQueue {private final LinkedList<Request> queue = new LinkedList<>();public Request getRequest(){synchronized (queue){while(queue.isEmpty()){try {queue.wait();} catch (InterruptedException e) {// 如果被打断之后来到这里,最后一行removeFirst没值,导致出错// 因此这里就直接返回System.out.println("获取请求被打断");return null;}}return queue.removeFirst();}}public void putRequest(Request request){synchronized (queue){queue.addLast(request);queue.notifyAll();}}
}

客户端

package concurrency.chapter15;import java.util.Random;// 客户端发请求
public class ClientThread extends Thread{private final RequestQueue queue;private final String sendValue;public ClientThread(RequestQueue queue, String sendValue) {this.sendValue = sendValue;this.queue = queue;}// 发送请求@Overridepublic void run() {// 发送十个请求for (int i=0;i<10;i++){System.out.println("***客户端发送请求数据:"+sendValue);// 加到队列中queue.putRequest(new Request(sendValue));// 休眠一会try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}
}

服务端

package concurrency.chapter15;import java.util.Random;// 服务端接收请求
public class ServerThread extends Thread{private final RequestQueue queue;private volatile boolean closed=false;public ServerThread(RequestQueue queue) {this.queue = queue;}// 接收请求@Overridepublic void run() {while(!closed){final Request request = queue.getRequest();//如果此时并没有请求过来if (request==null){// 就进行下一次接收System.out.println("接收到空请求");continue;}System.out.println("+++服务端接收到请求数据:"+request.getValue());//休眠一会try {Thread.sleep(10);} catch (InterruptedException e) {System.out.println("服务器被打断");return;}}}public void close(){this.closed=true;// 即便closed是true,但是执行线程可能正处于bolcked状态,因此无法感知到closed,// 因此打断this.interrupt();}
}

启动测试

package concurrency.chapter15;public class SuspensionClient {public static void main(String[] args) throws InterruptedException {final RequestQueue requestQueue = new RequestQueue();new ClientThread(requestQueue,"test").start();final ServerThread serverThread = new ServerThread(requestQueue);serverThread.start();Thread.sleep(500);serverThread.close();}
}

当一个服务工作很忙,就可以使用这个模式。
建立一个队列,将待办事务放进去,等有空的时候再去取。

10. 线程保险箱 ThreadLocal

ThreadLocal

  • 用于getsetremove当前线程的一个数据
  • initialValue方法默认返回null,可将其重写。或者使用ThreadLocal.withInitial(supplier)
  • 数据用完了一定要remove,不然可能发生内存泄露
  • ThreadLocal存储数据的方式是通过map<Thread,value>的形式存放的,因此线程跟数据是一对一关系,相当于给每个线程一个独立的数据空间。

思考:ThreadLocal中的set、get方法均没有加synchronized,它是如何实现线程安全的?

 public T get() {Thread t = Thread.currentThread();ThreadLocalMap map = getMap(t);if (map != null) {ThreadLocalMap.Entry e = map.getEntry(this);if (e != null) {@SuppressWarnings("unchecked")T result = (T)e.value;return result;}}return setInitialValue();}public void set(T value) {Thread t = Thread.currentThread();ThreadLocalMap map = getMap(t);if (map != null)map.set(this, value);elsecreateMap(t, value);}
  1. 即便有多个线程同时调用set方法,由于各个线程都拥有各自的栈内存,而Thread.currentThread()是方法中的临时对象,因此不会出现线程冲突。
  2. 大家的Thread.currentThread()都不一样,Thread.currentThread()又是作为取数据的凭证,因此不会出现数据冲突。

如果使用ThreadLocal为各个线程开辟自己的内存空间,就相当于一个保险箱,将大家的数据进行了隔离。

11. 上下文设计模式

理由LocalThread,保存content,这样就可以随时拿到上下文。

定义我们的上下文可能涉及的数据,此处包括一个name和http数据

public class Context {private String name;private String http;public void setName(String name) {this.name = name;}public String getName() {return name;}public void setHttp(String http) {this.http = http;}public String getHttp() {return http;}
}

执行任务ExecutionTask,可能有多个任务需要执行,并得到多个结果。

package concurrency.chapter16;// 执行任务,可能存在多个任务
public class ExecutionTask implements Runnable{// 查询任务private QueryFromDBAction queryFromDBAction=new QueryFromDBAction();// 请求任务private QueryFromHttpAction queryFromHttpAction=new QueryFromHttpAction();@Overridepublic void run() {final Context context = ActionContext.getInstance().getContext();queryFromDBAction.execute();queryFromHttpAction.execute();System.out.println(Thread.currentThread().getName()+"  :  "+context.getHttp()+"  :  "+context.getName());}
}

具体任务操作,拿到数据之后保存到上下文中。

public class QueryFromDBAction {public void execute(){// 获得上下文final Context context = ActionContext.getInstance().getContext();// 模拟执行耗时try {Thread.sleep(10);// 查询出nameString name="abc";context.setName(name+Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}
}public class QueryFromHttpAction {public void execute(){//获得上下文final Context context = ActionContext.getInstance().getContext();// 模拟执行耗时try {Thread.sleep(10);// 模拟获取数据Integer http=101;context.setHttp(http+Thread.currentThread().getName());} catch (InterruptedException e) {e.printStackTrace();}}
}

利用ThreadLocal获取上下文,单例即可

// 用于获取上下文,单例模式
public final class ActionContext {// 创建保存上下文的ThreadLocalprivate static final ThreadLocal<Context> threadLocal = ThreadLocal.withInitial(Context::new);private ActionContext(){}private static class ContextHolder{private final static ActionContext ActionContext = new ActionContext();}public static ActionContext getInstance(){return ContextHolder.ActionContext;}public Context getContext(){return threadLocal.get();}
}

客户端测试

public class Client {public static void main(String[] args) {IntStream.rangeClosed(1,5).forEach((i)->{new Thread(new ExecutionTask()).start();});}
}

12. Balking设计模式

Balking:犹豫、阻碍
例一:
你去饭店吃饭,吃到途中想要再点一个小菜,于是你举起手示意服务员,其中一个服务员看到了你举手正准备走过来的时候,发现距你比较近的服务员已经准备要受理你的请求,于是中途放弃了
例二:
在系统资源的加载或者某些数据的初始化时,在整个系统生命周期中,资源可能只被加载一次。如果有两个线程都准备加载某一资源,其中一个资源发现其已经被加载了,就放弃继续加载
例三:
csdn草稿在编辑过程中每隔一段时间会自动保存一次,如果此时发现用户已经保存过一次了,那么csdn会取消本次自动保存

需求:
用户编辑一个文件,在未保存之前,数据写在缓冲区中,系统每隔一段时间会自动保存一次,用户也可以主动ctrl+s进行保存。

public class BalkingData {// 缓冲区中的数据private StringBuffer content=new StringBuffer("");// 记录缓冲区的数据是否被保存过private boolean saved;// 外部进行文件写入public void write(String newcontent) {this.content.append(newcontent);}// 提供给外部主动保存,public synchronized void save(){// 手动保存doSave(false);// 标志位重置this.saved=true;}// 每隔一段时间,系统自动调用这个方法,看看数据是否被更改public synchronized void check(){// 如果保存过,就不操作。// 这一步就是balkif (saved){// 如果发现用户手动保存过了,那就将saved初始化为false,等待下次保存this.saved=false;return;}// 如果还没保存,就触发自动保存doSave(true);// 标志位重置this.saved=false;}// 保存文件private void doSave(boolean isAuto) {// 此处操作文件流保存System.out.println("数据已被"+(isAuto?"自动保存":"手动保存")+"写入文件,保存成功!当前文本数据:【"+this.content+"】");}
}

客户端测试

public class Client {public static void main(String[] args) {final BalkingData balkingData = new BalkingData();balkingData.write("第一次写数据");// 手动保存balkingData.save();// 自动保存机制,但不触发保存balkingData.check();balkingData.write("第二次写数据");// 自动保存机制,触发保存balkingData.check();System.out.println();}
}

13. CountDown设计模式

当一些线程完成工作之后,另一些线程才开始工作。

ublic class Client {public static void main(String[] args) throws InterruptedException {// 其内数字填我们要被控制的线程个数CountDownLatch latch = new CountDownLatch(5);System.out.println("第一阶段工作开始");IntStream.rangeClosed(1,5).forEach((i)->{new Thread(()->{try {System.out.println(Thread.currentThread().getName()+"正在工作!");Thread.sleep(50);} catch (InterruptedException e) {e.printStackTrace();}// 每工作完一个线程,就countDown一次latch.countDown();},String.valueOf(i)).start();});// 在此等待第一阶段任务完成latch.await();System.out.println("第一阶段工作结束");System.out.println("第二阶段工作开始");System.out.println("...... ......");System.out.println("全部工作完成!");}
}

14. Thread-Pre-Message设计模式

其实就是:为每一个请求单独使用一个线程进行处理(往往结合线程池)
这里不做演示。

15. Two-phase terminatioin


在finally中引入一个clean方法,用于线程结束后的清理工作。

三、ClassLoader

简单来说,类加载就是将class文件中的二进制数据读取到内存中,将其放在方法区内,然后在堆区中创建一个java.lang.Class对象,用来封装在方法区的数据结构类加载的最终产品是位于堆区中的class对象。(堆中存放真实数据,方法区中声明数据结构)

1. 类加载的三个过程


注意:静态变量先于类本身被加载。
符号引用:
Object obj=new Object()
obj.xx();
// 这里的obj就是符号引用(名称),底层逻辑肯定是不认识这个名称的,因此转换为直接引用。

2. 类的引用

  1. 类的主动引用

类的主动引用一定会发生类的初始化

new一个类对象
调用类的静态域(成员和方法),不包括final常量
使用java.lang.reflect包的方法堆类进行反射调用
虚拟机启动类,如命令行编译后执行 java Test ,则Test类一定会被初始化
继承树回溯初始化,当父类没有被初始化时,优先初始化父类。
  1. 类的被动引用

类的被动引用不会发生类的初始化

访问静态域时,真正声明这个域的类才会被初始化(通过子类引用父类的静态变量,不会导致子类初始化)
通过数组定义类引用,不会导致类的初始化
引用常量不会触发初始化(常量在编译阶段就被放入method area中)

问:类加载的最终产品是位于堆区中的class对象,那么:new出来的实例对象与该对象的class对象有什么关联?
答: new出来的实例对象引用class对象;class对象引用堆中的数据;堆中的数据参照方法区中的数据结构将数据组织成该对象。
可以看出,实例对象通过class对象操作数据。class对象相当于 程序访问内部数据的外部接口

3. 初始化


注意:静态代码块中对相对位置在其之后的遍历,只允许写,不允许读!!

4. JVM内置类加载器

  • 引导类加载器(bootstrap):

    • 用于加载java最底层核心库的内容(jre/lib/rt.jar,sun.boot.class.path,java.lang.*),C语言编写
    • 加载扩展类和应用程序类加载器,并指定他们的父类加载器
  • 扩展类加载器(extensions):
    • 用于加载扩展库(jre/ext/*.jar,java.ext.dirs)
    • 由sun.misc.Launcher$ExtClassLoader实现
  • 应用程序类加载器(application)
    • 根据类路径(classpath, java.class.path)加载,一般的应用类都由其完成加载。
    • 由sun.misc.Launcher$AppClassLoader实现
  • 自定义类加载器
    • 通过继承java.lang.ClassLoader实现自定义

除了引导类使用C写的,其他都是java写的(继承Java.class.ClassLoader类)

注意:

  1. 一般的类都是app类加载器加载
  2. ext的父加载器是bootstrap,但是由于bootstrap是C写的,因此无法访问,因此ext.getParent()为null
  3. 类加载器之间不是继承关系,是包含(包装)关系

5. 类加载器模式:双亲委托代理模式

接收到加载类的请求时,先层层上递给父类(直到最高的引导类加载器),若父类无法加载,再往下放一级,重复直到加载成功。

这种模式能够保证核心库的安全,比如,用户定义了一个Object,但是不可能出现用户定义的Object覆盖java.lang.Object类的情况,被加载的一定是Object。

但并非所有的类加载器都是这种模式,tomcat服务器的类加载器恰恰相反,由子类加载,子类加载失败再层层委托给父类进行加载。

6. 自定义类加载器

自定义类加载器继承ClassLoader类,且必须重写findClass方法(源码仅仅抛出一个未找到类的异常)

核心就在于重写findClass:通过xxx.xxx.xxxClass路径,找到该class文件,然后将其作为字节数组读入内存,并通过defineClass方法将字节数组定义为一个class返回。

7. 加密解密类加载器

加密过程:
将class文件读入,然后加密,接着保存enCodeClass。

类加载器:
类加载器只允许加载enCodeClass

解密过程:
在类加载器中的findClass中对enCodeClass进行解密

8. 打破双亲委派机制

classLoader中有findClass方法和loadClass方法。
findClass默认抛出异常,loadClass则是通过双亲委派机制进行类加载。

查看loadClass方法可知:
如果父类加载器能够加载,则通过父类加载器加载;如果父类加载器加载不到,那么调用findClass方法进行加载(如果没有重写findClass,则抛出找不到异常)

因此,想要打破双亲委派,则需要重写loadClass方法(以及findClass方法)

即便打破了双亲委派,然后我们自定义一个java.lang.*中的类,依旧不允许通过自定义类加载器加载,因为这个行为java做了限制,不允许自定义加载java.lang.*

因此

  1. 同一个包下面的String,自定义类加载器依旧是加载不到的。
  2. 虽然可以打破双亲委派机制,但是由于java限制,也依旧不允许加载自定义的java.lang.*

9. 类加载器的命名空间与运行时包

  • 每个类加载器都有子的命名空间,命名空间由该类加载器及其所有父加载器所加载的类组成
  • 同一个命名空间,不会出现完整的名字

同一个加载器加载同一个class文件,那么得到的class都是同一个。

运行时包:

  • 父类加载器看不到子类加载器加载的类
  • 不同命名空间下的类加载器之间的类互相不可访问

class的卸载:

10. 线程上下文类加载器

Java设定了很多规范,比如sql,里面的class只有接口,没有具体实现。
这些具体实现都将由具体厂商来完成。

但是,sql接口是Java核心,在rt.jar中,因此会被引导类加载器加载,根据双亲委派机制,厂商的实现根本无法被载入(称为API+SPI(service provide interface)问题。 常见的SPI有JDBC、JCE、JNDI、JAXP和JBI等)。

解决这个问题,就是使用线程上下文类加载器。

线程上下文类加载器很简单,就是通过Thread.getCurrentThread().getContextClassLoader()获取到当前线程的类加载器来完成。(也可以通过set更改当前线程的类加载器)

应用实例:

  1. 通过Class.forName(“com.mysql.jdbc.Driver”);DriverManager.getConnection(" url ")将具体厂商传递给DriverManager。此处的线程上下文是app
  2. 在java.sql.driver中,new了一个Driver注册到DriverManager中。
  3. DriverManager中,通过线程上下文获取到app类加载器,然后对厂商的具体实现进行加载。

四、Java并发包

1. 原子类型

volatile只保证了可见性、有序性,但是不能保证原子性,因此,如果多线程中有某些值的变更操作,那么可能出现数据异常。
比如在多个线程中存在同一个int类型的a(volatile修饰),进行a+=1,直到a>1000结束,那么a+=1这一步可能出现因为volatile不保证原子性而导致的错误。
因为a+=1实际上包含多个步骤,在这些步骤之间,发生了线程的异步操作,就可能出现数据异常。

在java.util.concurrent.atomic中有各种类型的原子类型,使用方法简单。

CAS(乐观锁)

保证原子性使用的是CAS(compareAndSet,比较并交换)

public final int incrementAndGet() {for (;;) {int current = get();int next = current + 1;if (compareAndSet(current, next))return next;}}

这是原子类型中类似++current的操作。
compareAndSet(current,next)的意思是,检查期望值是否为current,如果是,就将其改为next。

  1. 先拿到旧值current
  2. 然后拿到新值next
  3. 比较旧值和新值是否相等,如果相等,则操作成功,否则,继续进入循环

如果在进行CAS的过程中,有另外一个线程将current更改了,那么操作就失败了,因此保证了原子性。

根据CAS,可以设计一个CAS锁

CAS锁

原理:
初始值设为0,0表示锁释放,1表示锁占用
抢锁:如果当前值是0,那就将其改为1,表示锁被抢到
释放锁:如果当前值是1,就将其改为0,表示锁为释放


自定义锁的释放注意使用currentThread进行控制,不然容易出现非使用者释放锁的情况,导致多个线程同时占用锁。

原子类型除了提供对基本类型的支持,还有AtomicReference能够通过泛型保证任意Object对象的原子性

CAS的优缺点

cas优点:如一描述在并发量不是很高时cas机制会提高效率。
cas缺点:
1、cpu开销大,在高并发下,许多线程,更新一变量,多次更新不成功,循环反复,给cpu带来大量压力。
2、只是一个变量的原子性操作,不能保证代码块的原子性。
3、ABA问题

ABA问题:内存值V=100;
threadA 将100,改为50;
threadB 将100,改为50;
threadC 将50,改为100;

场景:小牛取款,由于机器不太好使,多点了几次全款操作。后台threadA和threadB工作,
此时threadA操作成功(100->50),threadB阻塞。正好牛妈打款50元给小牛(50->100),
threadC执行成功,之后threadB运行了,又改为(100->50)。
这样就被吞了50块。

如何解决aba问题:
对内存中的值加个版本号,在比较的时候除了比较值还的比较版本号。

java:AtomicStampedReference就是用版本号实现cas机制。


Atomic中有专门实现属性原子性的类(xxxFieldUpdater),但是必须满足一些要求。

通过FieldUpdater能够实现无锁
思路:利用updater获得并发对象属性
需要修改该属性的时候,使用updater.compareAndSet实现修改。

Unsafe

之所以叫这个名字,是因为unsafe让Java变得不安全,能够实现各种不安全的操作。

unsafe的“破坏”操作

反射获取Unsafe

public static void main(String[] args) throws Exception {final Unsafe unsafe = getUnsage();System.out.println(unsafe);}private static Unsafe getUnsage() throws Exception {// 拿到Unsafe中的theUnsafe属性Field field = Unsafe.class.getDeclaredField("theUnsafe");// 允许我们访问私有的theUnsafefield.setAccessible(true);return (Unsafe) field.get(null);}

2. Java并发包工具

CountDownLatch

能够保证所有任务执行完毕再继续下一步。

  1. 传入计数个数
  2. 任务执行完毕后让个数减一
  3. 在收尾代码前进行await,当计数为0时(所有任务执行完毕),再继续往下。

使用CountDownLatch实现线程之间的状态监控
需求:有两个线程,在A阶段同时工作,在B阶段,线程一要等待线程二的某个信号才继续往下执行。

思路:线程二发出信号处让CountDownLatch减至0,线程一等待信号处(阻塞处)使用await阻塞。

await可以传入时间,能够灵活控制执行时长。
若任务超时,则await的放开阻塞;
若任务没有超时,而await的设置时长很长,也不会傻傻等待,而是立即继续往下。


CountDownLatch的另一个用法就是给离散平行任务增加逻辑层次关系。
通过计数器,让离散的任务形成簇,通过计数器递减,对该簇执行状态进行监控。

3. Executors框架

4. 并发集合

5. Google Guava

未完待续

工具整理

类名 重要方法/用途 备注
Atomicxxx compareAndSet
CountDownLatch 通过计数进行线程执行逻辑控制
CyclicBarrier 统一多个线程的执行终点
Exchange 成对的线程交换数据 只能成对操作、交换的是对象地址,而非拷贝,因此可能存在并发安全问题
Condition wait、notify的高效替代品
ForkJoinPool 任务分而治之 任务必须继承RecursiveTask类
Phaser 解决控制多个线程分阶段共同完成任务
备注
Semaphore 信号锁,设定访问上限个数,线程拿到一个,上限就减少一
ReentrantLock 可重入锁,可以实现公平锁,更灵活的synchronized
ReadWriterLock 读写分离锁,保证R-R情况下无锁,悲观读
StampedLock 不可重入,可实现乐观读

待续

Java高并发与多线程网络编程相关推荐

  1. java高并发与多线程汇总(一):基础知识(上)

    java高并发与多线程汇总 往期文章推荐:   java高并发与多线程汇总(一):基础知识(上)   java常见面试考点(四十二):序列化与反序列化   java常见面试考点(四十三):泛型   j ...

  2. Java高并发和多线程的面试笔试题——稳拿offer

    1.在java中守护线程和本地线程区别? java中的线程分为两种:守护线程(Daemon)和用户线程(User). 任何线程都可以设置为守护线程和用户线程,通过方法Thread.setDaemon( ...

  3. parallelstream启动的线程数_高并发与多线程网络学习笔记(三)线程组和线程池

    线程组 线程组的作用是:可以批量管理线程或线程组对象,有效地对线程或线程组对象进行组织. 构造函数 ThreadGroup(String name)//默认parent为当前线程组 ThreadGro ...

  4. Java高并发和多线程系列 - 1. 线程基本概念

    1. 什么是线程? 线程和进程的区别 在了解线程的概念前,我们应该先知道什么是进程? 进程是操作系统的基本概念之一, 它是正在执行的程序实例. * 下面的一些进程的基本概念你可以了解下 ------- ...

  5. java高并发(一)导学

    现在准备系统学习java高并发与多线程相关知识. 首先了解一下我们这一套知识的学习思路: 并发与高并发相关概念 CPU多级缓存 缓存一致性 乱序执行优化 java内存模型 JMM规定.抽象结构 同步操 ...

  6. Java并发编程实战_阿里P9整理分享的亿级流量Java高并发与网络编程实战PDF

    前言 为了帮助初级开发者快速掌握高并发.网络编程.微服务.海量数据的处理这些实用技术,本文以"理论+范例"的形式对各个知识点进行了详细的讲解,力争让读者在实践中快速掌握相关知识. ...

  7. 阿里P9整理分享的亿级流量Java高并发与网络编程实战PDF

    前言 有人调侃我们说: 程序员不如送外卖.送外卖是搬运食物,自己是搬运代码,都不产出新的东西-- 透支体力,又消耗健康,可替代性极强,30岁之后就要面临被优化的危险-- 想跳槽,但是更高的平台难进,同 ...

  8. 《亿级流量JAVA高并发与网络编程实战》笔记--------更新中

    <亿级流量JAVA高并发与网络编程实战>笔记 第一章 高并发概述 "高并发技术" 是一个广义的概念,是指一种高效的地实现并发需求的解决方案,是技术领域的名称,可以包含架 ...

  9. 《Java高并发核心编程.卷2,多线程、锁、JMM、JUC、高并发设计模式》

    <Java高并发核心编程.卷2,多线程.锁.JMM.JUC.高并发设计模式> 目录 第1章 多线程原理与实战 1.2 无处不在的进程和线程 1.2.1 进程的基本原理 1.2.2 线程的基 ...

最新文章

  1. 禁用java rmi_java-如何安全关闭rmi客户端?
  2. 银行卡为何要使用ISO8583格式
  3. C语言step-by-step(四)(循环控制)
  4. 配置SpringCloud Config Client连上Config Server
  5. 学习Docker从小白到入门
  6. 团队项目博客检查结果汇总
  7. OpenCV处理引起光学错觉的图像
  8. 最近对项目代码做的一些更改和感想
  9. .npy文件_Numpy库使用入门(六)文件的存取
  10. 技术员联盟Win11 64位官方全新旗舰版镜像V2021.08
  11. Leetcode好的微博
  12. 【华为云技术分享】实战笔记丨JDBC问题定位指南
  13. 光复用技术中三种重要技术_【技术文章】X射线无损检测仪在锂电池行业中的重要应用...
  14. Atitit TPL(事务处理语言 目录 1.1. 事务隔离级别 1 1.2. Savepoint技术 1 2. Tpl 1 2.1.  打开事务 START TRANSACTION; 1 2.2.
  15. 7805和78l05可以代换吗_7805引脚图稳压
  16. 计算机电脑配置ppt,计算机应用基础之word2010课件.ppt
  17. 最新微信养号、使用、解封必看的注意事项
  18. c语言中数的表示 叙述正确,关于C语言中数的表示,以下叙述正确的是(
  19. redis中的AKF理论和CAP理论详解
  20. 码农翻身——Redis:MySQL算老几?

热门文章

  1. 针对三星小型SSD移动硬盘T1的性能分析
  2. HP OEM XP的BIOS破解方法
  3. 计算机u盘驱动坏了如何的修复,U盘损坏怎么恢复?学会这两招,快速恢复里面的文件...
  4. macbook os 10.15.1中没有“任何来源”如何访问app?
  5. #今日论文推荐#北大校友发现级联光学新效应,首次验证不同类型非线性过程在量子尺度上的干涉现象,为集成量子光学带来新可能
  6. 模块化服务器供电系统,供电系统的模块化设计与模块化UPS详解.PDF
  7. python pygame实现五子棋双人联机(简约版)
  8. 3.8选择练习题之根据路程算折扣
  9. 如何在照片或者图片上增加文字
  10. 浅谈我与软件工程专业