一、并发编程面临的挑战

并发编程的目的是为了让程序运行更快,但并不是启动多线程就能让程序最大限度地并发执行。还会面临上下文切换、死锁、软硬件的资源限制等问题。

上下文切换

CPU通过给每个线程分配CPU时间片来实现多线程并发执行。时间片一般几十毫秒,因此CPU通过不停地切换线程执行,让我们感官上觉得多个线程是同时执行的。

通过时间片分配算法来循环执行任务,当前任务执行一个时间片后切到下一时间片,但在切换前会保存上一任务的状态,以便下次切回来时可以再加载该任务状态。

任务从保存到再加载的过程就是一次上下文切换。

线程有创建和上下文切换的开销。上下文切换开销可通过无锁并发编程、CAS算法、使用最少线程和使用协程来减少。

无锁并发编程:减少多线程锁的竞争,如数据ID按hash算法取模分段,不同线程处理不同段的数据(JUC下的ConcurrentHashMap);

CAS算法:Atomic包即使用的CAS算法来更新数据;

使用最少线程:避免创建不必要的线程,如通过查看dump线程信息发现大量等待的线程时减少工作线程数;

协程:在单线程里实现多任务调度,并在单线程里维持多个任务间的切换。

死锁避免

a、避免一个线程同时获取多个锁;

b、避免一个线程在锁内同时占用多个资源,尽量保证每个锁只占用一个资源;

c、尝试使用定时锁,使用Lock.tryLock(timeout)来替代使用内部锁机制

d、对于数据库锁,加锁和解锁都必须在一个数据库连接里,否则会出现解锁失败的情况。

资源限制

即并发执行时程序执行速度受限于软硬件。如带宽的上传/下载速度、硬盘读写速度和CPU的处理速度、数据库连接数、socket连接数等。

资源限制可能导致本来串行的任务在并发执行时不起作用,仍在串行地执行,而增加了上下文切换和资源调度时间,反而变得更慢了。

硬件上可考虑集群多机运行,如不同机器处理不同数据。软件可考虑资源池进行资源复用,如数据库连接池的连接复用。

二、JAVA并发机制的底层实现原理

Java代码编译后生成Java字节码,经过类加载器加载到JVM中,JVM执行字节码,最终转化为汇编指令在CPU上执行,所以java并发机制依赖于JVM的实现和CPU指令。

volatile

volatile是轻量级的synchronized,其“可见性”保证共享变量修改的立即可见,不会引起线程上下文切换和调度。一个字段被声明为volatile,则java 线程内存模型确保所有线程都能看到这个变量的值是一致的。

内存屏障:处理器指令,实现对内存操作的顺序限制。

原子操作:不可中断的一个或一系列操作。

volatile修饰的共享变量在转换为汇编语言后会出现一个lock前缀指令,使得线程的工作内存中的数据会被写会主内存中,同时使得其他工作内存的该变量的数据失效,只能从主内存中读取。

synchronized

普通同步方法,锁是当前实例对象;静态同步方法,锁是类对象;同步方法块,锁是synchronized括号内设置的对象。锁的是对象。

同步代码块采用monitorenter、monitorexit指令显式的实现。

同步方法则使用ACC_SYNCHRONIZED标记符隐式的实现。

monitorenter

每一个对象都有一个monitor,一个monitor只能被一个线程拥有。当一个线程执行到monitorenter指令时会尝试获取相应对象的monitor,获取规则如下:

  • 如果monitor的进入数为0,则该线程可以进入monitor,并将monitor进入数设置为1,该线程即为monitor的拥有者。

  • 如果当前线程已经拥有该monitor,只是重新进入,则进入monitor的进入数加1,所以synchronized关键字实现的锁是可重入的锁。

  • 如果monitor已被其他线程拥有,则当前线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor。

monitorexit

只有拥有相应对象的monitor的线程才能执行monitorexit指令。每执行一次该指令monitor进入数减1,当进入数为0时当前线程释放monitor,此时其他阻塞的线程将可以尝试获取该monitor。

对象头(32bit的JVM中)

HotSpot虚拟机中,对象在内存中存储的布局可以分为三块区域:对象头(Header)、实例数据(Instance Data)和对齐填充(Padding)。  synchronized用的锁是存在于Java对象头里的。如果对象是数组,则用3字宽(word,32bitJVM中1字宽为4字节),如果是非数组类型,则2字宽。

Mark Word: 锁状态、25bit hashCode、4bit 分代年龄、1bit 是否是偏向锁、2bit 锁标志位

Class MetaData Address:对象类型数据的指针。

Array length:数组长度(是数组的话)。

运行期间Markword的数据会随着锁标志位的变化而变化。

锁状态(JDK1.6引入)

级别由低到高:无锁状态、偏向锁、轻量级锁和重量级锁。状态随着竞争逐渐升级,但不能降级。这种锁只能升级不能降级的策略,其目的是为了提高获得锁和释放锁的效率

偏向锁

无锁竞争的情况下为了减少锁竞争的资源开销,引入偏向锁。当线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID,以后该线程在进入和退出时不需要进行CAS操作来加锁和解锁,只需测试下对象头Markword里是否存储着指向当前线程的偏向锁。成功则表示获得了锁。失败则再测试下Markword偏向锁标识是否置为1,没有则使用CAS竞争锁,否则尝试CAS将对象头的偏向锁指向当前线程等到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁。撤销偏向锁时,在没有正在执行的字节码的情况下,会先暂停拥有偏向锁的线程,然后检查持有偏向锁的线程是否活着,若是线程不处于活动状态,则将对象头置为无锁状态;如果仍然活着,拥有偏向锁的栈会被执行,遍历偏向对象的锁记录,栈中的锁记录和对象头的Markword要么重新偏向于其他线程,要么恢复到无锁的状态或标记对象不适合作为偏向锁,最后唤醒暂停的线程。

-XX:-UserBiasedLocking=false关闭偏向锁,程序默认进入轻量级锁状态。

轻量级锁

轻量级锁所适应的场景是线程交替执行同步块的情况。

加锁:线程执行同步块前,JVM先在当前线程的栈帧中创建用于存储锁记录的空间,并将对象头重的Markword复制到锁记录中,即displaced Markword。然后线程尝试使用CAS将对象头重的Markword替换为指向锁记录的指针。如果成功,则当前线程获得锁,失败则表示其他线程竞争锁,当前线程便尝试自旋来获取锁。

解锁:使用原子的CAS操作将displaced Markword替换回对象头,如成功表示没有竞争发生。失败表示当前锁存在竞争,锁就会膨胀成重量级锁。

自旋会消耗CPU,为避免无用自旋,一旦锁升级成重量级锁,就不会再恢复到轻量级锁状态。该状态下,其他线程试图获取锁时都会被阻塞。持有锁的线程释放后会唤醒这些线程,开启新一轮的夺锁之争。

锁粗化(Lock Coarsening): 也就是减少不必要的紧连在一起的unlock,lock操作,将多个连续的锁扩展成一个范围更大的锁,如方法内多个连续的StringBuffer.append()操作。

锁消除(Lock Elimination): 锁削除是指虚拟机即时编译器在运行时,对一些代码上要求同步,但是被检测到不可能存在共享数据竞争的锁进行削除,如方法内多个连续的StringBuffer.append()操作的作用域在方法内,sb的引用不会逃逸到方法外部。

适应性自旋(Adaptive Spinning): 自适应意味着自旋的时间不再固定了,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也很有可能再次成功,进而它将允许自旋等待持续相对更长的时间,比如100个循环。另一方面,如果对于某个锁,自旋很少成功获得过,那在以后要获取这个锁时将可能省略掉自旋过程,以避免浪费处理器资源。


 CAS

CAS,即比较并交换,是一个原子操作,它比较一个内存位置的值并且只有相等时修改这个内存位置的值为新的值,保证了新的值总是基于最新的信息计算的,如果有其他线程在这期间修改了这个值则CAS失败。CAS返回是否成功或者内存位置原来的值用于判断是否CAS成功。

JVM中的CAS操作是利用了处理器提供的CMPXCHG指令实现的。JUC包下的并发框架通过自旋CAS来实现原子操作。

优点:竞争不大的时候系统开销小。

缺点ABA问题:加版本号,JDK1.5引入AtomicStampedReference来解决ABA问题:当前引用是否等于预期引用,当前标志是否等于预期标志,全相等才以原子方式将该引用和该标志的值设置为给定的新值。循环时间长开销大。如长时间自旋CAS会给CPU带来非常大的执行开销。只能保证一个共享变量的原子操作。可将多个共享变量合成一个来进行处理,如JDK1.5引入的AtomicReference来保证引用对象之间的原子性。

三、Java内存模型

线程的通信是指线程之间以何种机制来交换信息。线程之间的通信机制有两种,共享内存和消息传递。在共享内存的并发模型里,线程之间通过写-读内存中的共享变量来隐式进行通信,典型的共享内存通信方式就是通过共享对象进行通信。在消息传递的并发模型里,线程之间没有共享变量,线程之间必须通过明确的发送消息来显式进行通信,在java中典型的消息传递方式就是wait()和notify()。

堆内存在线程间共享。Java线程间的通信由Java内存模型JMM控制,JMM决定一个线程对共享变量的写入何时对另一线程可见。线程之间的共享变量存储在主内存中,每个线程都有一个私有的本地内存,本地内存中存储了该线程以读写共享变量的副本。

共有以下操作:lock/unlock(主内存中)、read/load(从主内存到工作内存,前主后工)、use/assign(工作内存,传给执行引擎/执行引擎计算结果的赋值)、store/write(从工作内存到主内存,先工后主)。其中从read起都是原子操作,具有原子性;可通过volatile来保证可见性和有序性,防止指令重排

先行发生原则

程序顺序规则:按控制流顺序,线程中书写在前的操作先行发生于后面的操作;

管程锁定原则:一个锁的解锁,先行发生于随后对这个锁的加锁;

volatile变量原则:对一个volatile变量的写,先行发生于后面对这个变量的读;

传递性:A先行发生于B,B先行发生于C,则A先行发生于C。

JUC包实现:首先声明共享变量为volatile;然后使用CAS的原子条件更新来实现线程之间的同步。

final的内存语义

编译器和处理器要遵守两个重排序规则:

  • 在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。
  • 初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序。

final域为引用类型:

  • 增加了如下规则:在构造函数内对一个final引用的对象的成员域的写入,与随后在构造函数外把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。

final语义在处理器中的实现:

  • 会要求编译器在final域的写之后,构造函数return之前插入一个StoreStore障屏。
  • 读final域的重排序规则要求编译器在读final域的操作前面插入一个LoadLoad屏障

双重检查锁定与延迟初始化

即类似于懒汉式单例模式及双重校验的例子。序列化可破坏volatile的单例模式。(饿汉式即类初始化就生成实例)

// 懒汉式,双重检验
public class SafeDoubleCheckedLocking {private volatile static Instance instance;public static Instance getInstance() {if (instance == null) {synchronized (SafeDoubleCheckedLocking.class) {if (instance == null)instance = new Instance();//instance为volatile,现在没问题了}}return instance;}
}
// 饿汉式
1.线程安全;:类加载时已创建实例,不存在线程安全的问题。
2.在类加载的同时已经创建好一个静态对象,调用时反应速度快
缺点
资源效率不高,可能getInstance()永远不会执行到,但执行该类的其他静态方法或者加载了该类(class.forName),那么这个实例仍然初始化class Singleton{private Singleton(){}//final可有可无,有了更好private static final Singleton s=new Singleton();public static Singleton getInstance(){return s;}
}

四、Java并发编程基础

操作系统运行一个程序时,会为其创建一个进程。如运行一个Java程序就会为其创建一个Java进程。线程是操作系统调度的最小单元,也叫轻量级进程,如main线程。一个进程可以有多个线程,这些线程拥有各自的计数器、局部变量等属性,并可访问共享内存变量。处理器在这些线程上高速切换,让我们觉得这些线程是在同时执行。

线程优先级

线程分配到的时间片多少决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多活少分配一些处理器资源的线程属性。priority变量来控制优先级,由低到高为1-10,默认为5。优先级高的线程分配时间片的数量要多于优先级低的线程。针对频繁阻塞的设置高优先级,而偏重计算的设置较低优先级,确保处理器不会被占用。

线程状态

初始状态:刚被创建,但还未调用start方法;

运行状态:准备就绪和运行两种,统称为运行中;

阻塞状态:线程阻塞于锁;block(synchronized未获取到锁,Lock是等待,用了LockSupport)

等待状态:线程进入等待,需其他线程唤醒或中断;wait会释放锁;

超时等待:可在指定时间自行返回;

终止状态:表明当前线程执行完毕。

线程的构建方式

// 1、继承Thread类,重写run方法,创建线程对象并启动
public class MyThread extends Thread {public void run() {...}
}public static void main(String[] args) {new MyThread().start();
}// 2、实现runnable接口,重写run方法,创建实现类实例,并作为target来创建Thread对象,并启动
// 该Thread对象才是真正的线程对象
public class MyThread implements Runnable {public void run() {...}
}
public static void main(String[] args) {new Thread(new MyThread()).start();
}// 3、实现Callable接口
// 与 Runnable 接口不一样,Callable 接口提供了一个 call() 方法作为线程执行体,call() 方法比 // run() 方法功能要强大,比如:call() 方法可以有返回值、call() 方法可以声明抛出异常。
// Java5 提供了 Future 接口来代表 Callable 接口里 call() 方法的返回值,
// 并且为 Future 接口提供了一个实现类 FutureTask,这个实现类既实现了 Future 接口,
// 还实现了 Runnable 接口,因此可以作为 Thread 类的 target。
// 在 Future 接口里定义了几个公共方法来控制它关联的 Callable 任务。
// 使用 Callable 和 Future 创建线程的一般步骤如下:
// 创建实现 Callable 接口的类 myCallable;以 myCallable 为参数创建 FutureTask 对象;
// 将 FutureTask 作为参数创建 Thread 对象;调用线程对象的 start() 方法。
public class MyCallable implements Callable { @Override public Integer call() throws Exception { System.out.println(Thread.currentThread().getName()); return 99;  }
}
public static void main(String[] args) { FutureTask futureTask = new FutureTask<>(new MyCallable()); // 创建线程 Thread thread = new Thread(futureTask); // 启动线程 thread.start(); // 结果返回 try { Thread.sleep(1000); System.out.println("返回的结果是:" + futureTask.get()); } catch (Exception e) { e.printStackTrace(); }
}// 4.  使用线程池创建线程
// Executors 提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService 接口。
//主要有四种:newFixedThreadPool、newCachedThreadPool、
// newSingleThreadExecutor、newScheduledThreadPool
public class MyRunnable implements Runnable{ @Override public void run() { System.out.println(Thread.currentThread().getName()); }
}
public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); MyRunnable myRunnable = new MyRunnable(); for(int i = 0; i < 10; i++){ executorService.execute(myRunnable); } executorService.shutdown();}
}

Daemon线程

Daemon线程,即守护线程,是一种支持型线程,主要被用作程序中后台调度及支持性工作。

当一个Java虚拟机不存在非daemon线程时,Java虚拟机会立即退出。

Thread.setDaemon(true)。需要在线程启动之前设置,不能在启动之后设置。

在构建daemon线程时,不难依靠finally块中的内容来确保执行关闭或清理资源的逻辑。

线程启动和终止

构造线程:Thread t = new Thread();

启动线程:t.start();

中断:其他线程通过调用该线程的interrupt()方法对其进行中断操作。isInterrupted()检查自身中断情况。suspend()、resume()、stop()完成暂停、恢复、停止工作,但已过期,不建议使用。如suspend()调用后不会释放锁而进入睡眠状态,容易引发死锁。stop()方法在终结一个线程时不会保证线程资源的正常释放(通常不会给予完全释放的机会),会导致程序可能工作在不确定状态下。

线程间通信

Object类提供了线程间通信的方法:wait()notify()notifyaAl(),它们是多线程通信的基础,而这种实现方式的思想自然是线程间通信。

注意: wait和 notify必须配合synchronized使用,wait方法释放锁,notify方法不释放锁

等待/通知机制,即一个线程A调用了对象O的wait进入等待,而另一个线程B调用对象O的notify,使得A收到通知后从对象O的wait状态返回,进而执行后面的一些操作。

线程调用wait()方法,释放它对锁的拥有权,同时会在等待的位置加一个标志,为了以后使用notify()或者notifyAll()方法  唤醒它时,它好能从当前位置获得锁的拥有权,变成就绪状态,要确保调用wait()方法的时候拥有锁,即,wait()方法的调用必须放在synchronized方法或synchronized块中。  在哪里等待被唤醒时,就在那里开始执行。

notif()方法:notify()方法会唤醒一个等待当前对象的锁的线程。唤醒在此对象监视器上等待的单个线程。notifAll()方法:notifyAll()方法会唤醒在此对象监视器上等待的所有线程。

当执行notify/notifyAll方法时,会唤醒一个处于等待该对象锁的线程,然后继续往下执行,直到执行完退出对象锁锁住的区域(synchronized修饰的代码块)后再释放锁。

从这里可以看出,notify/notifyAll()执行后,并不立即释放锁,而是要等到执行完临界区中代码后,再释放。故,在实际编程中,我们应该尽量在线程调用notify/notifyAll()后,立即退出临界区。即不要在notify/notifyAll()后面再写一些耗时的代码。

Thread.join()

如果一个线程A执行了thread.join(),即当前线程A等待thread线程终止之后才从thread.join()返回。

ThreadLocal

即线程变量,一个以ThreadLocal对象为键、任意对象为值的存储结构。这个结构被附带到线程上。即一个线程可以根据一个ThreadLocal对象查询绑定在这个线程上的一个值.set(T)来设置值。.get()来获取值ThreadLocal<Long> local = new ThreadLocal<Long>();

线程池技术

面对成千上万的任务创建对应的线程,会使得操作系统频繁的进行线程上下文切换,无故增加系统负载,同时线程的创建和销毁都是需要消耗系统资源的,无疑造成了资源的浪费。线程池技术,在预先创建若干数量的线程,且不能由用户直接对线程的创建进行控制的前提下,重复使用固定或较为固定数目的线程来完成任务的执行。一方面消除了频繁创建和消亡线程的系统开销,另一方面面对过量任务的提交能平缓的劣化。

TreadPool。线程池的本质就是使用一个线程安全的工作队列连接工作者线程和客户端线程。客户端线程将任务放入工作队列后便返回,而工作线程则不断地从工作队列上取出工作并执行。当工作队列为空时,所有工作线程均处于等待在工作队列上,当有客户端提交了一个任务之后就会通知任意一个工作线程。随着大量的任务被提交,更多的工作线程会被唤醒。

Java中的锁(JUC,java.util.concurrent包下)

锁是用来控制多线程访问共享资源的方式。synchronized关键字会隐式地加锁解锁。

Lock接口(JDK1.5引入)

Lock接口相比synchronized,拥有显示地进行锁的获取与释放、可中断锁的获取以及超时获取锁等同步特性。

Lock lock = new ReentrantLock();
lock.lock();
try {...
} finally {// finally中释放锁,是为了保证获取锁之后最终都能够释放lock.unlock();
}

Lock接口可以支持:

1、尝试非阻塞地获取锁:当前线程获取锁时,如果锁没被其他线程获取,则成功获取并持有锁;

2、能被中断地获取锁:获取锁的线程能响应中断,中断异常将会被抛出,同时锁释放

3、超时获取锁:在指定截止时间之前获取锁,如果截止时间到了仍未获取锁,则返回。

Lock API

1、void lock():获取锁,电泳该方法当前线程会获取锁,获得锁后,从该方法返回;

2、void lockInterruptibly():可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程;

3、boolean tryLock():尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false;

4、boolean tryLock(long time,TimeUnit unit) throws InterruptedException:超时获取锁,当前线程在以下三种情况下会返回:

①:当前线程在超时时间内获得了锁

②:当前线程在超时时间内被中断

③:超时时间结束,返回false

5、void unlock():释放锁;

6、Condition newCondition():获取等待通知组件,该组件和当前的锁绑定,当前线程只有获得了锁,才能调用该组件的wait()方法,而调用后,当前线程将释放锁;

Lock接口的实现基本上都是通过聚合了一个同步器的子类来完成线程访问控制的。

队列同步器AbstractQueuedSynchronizer

队列同步器,简称同步器,是用来构建锁或者其他同步组件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

同步器主要使用方式是继承并实现抽象方法,通过getState()、setState(int newState)和compareAndSetState(int expect, int newState)来管理同步状态,能够确保状态的改变是线程安全的。同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,就可以方便得实现不同类型的同步组件(ReentrantLock、ReentrantReadWriteLock、CountDownLatch等)。

锁定义了使用者和锁交互的接口,隐藏实现细节;同步器面相的是锁的实现者,简化锁实现方式,屏蔽同步状态管理、线程排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者与实现者所需关注的领域。

同步器的接口

同步器设计基于模版方法模式的。同步器提供的模版方法基本上分三类:独占式获取与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况

同步队列

当前线程同步状态获取失败时,同步器会将当前线程及等待状态等信息构造成一个节点并加入同步队列,同时阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中节点保存获取同步状态失败的线程引用、等待状态及前驱和后继节点。每次失败的线程都会加入到队列的尾部。加入到队列的过程必须保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法CompareAndSetTail(Node expect,Node update),它需要传递当前线程认为的尾节点和当前节点,只有设置成功后,当前节点才能正式与之前的尾节点建立关联。

同步器队列遵循FIFO,首节点是获取锁成功的节点(只有前驱节点是头节点才能尝试获取同步状态),首节点的线程在释放锁时,会唤醒后续节点,而后继节点在成功获取到锁后,会把自己设置成首节点,设置首节点是由获取锁成功的线程来完成的,由于只有一个线程能成功获取到锁,所以设置首节点不需要CAS。 

import java.util.concurrent.locks.AbstractQueuedSynchronizer;
public class MyLock {private static final Sync STATE_HOLDER = new Sync();/*** 通过Sync内部类来持有同步状态, 当状态为1表示锁被持有,0表示锁处于空闲状态*/private static class Sync extends AbstractQueuedSynchronizer {/*** 是否被独占, 有两种表示方式*  1. 可以根据状态,state=1表示锁被占用,0表示空闲*  2. 可以根据当前独占锁的线程来判断,即getExclusiveOwnerThread()!=null 表示被独占*/@Overrideprotected boolean isHeldExclusively() {return getExclusiveOwnerThread() != null;}/*** 尝试获取锁,将状态从0修改为1,操作成功则将当前线程设置为当前独占锁的线程*/@Overrideprotected boolean tryAcquire(int arg) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;}/*** 释放锁,将状态修改为0*/@Overrideprotected boolean tryRelease(int arg) {if (getState() == 0) {throw new UnsupportedOperationException();}setExclusiveOwnerThread(null);setState(0);return true;}}/*** 下面的实现Lock接口需要重写的方法,基本是就是调用内部内Sync的方法*/public void lock() {STATE_HOLDER.acquire(1);}public void unlock() {STATE_HOLDER.release(1);}
}

独占式同步状态的获取与释放

通过调用同步器的acquire(int arg)方法来实现,对中断不敏感,进入同步队列后,后续对线程进行中断操作,线程也不会从同步队列中移除

被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现

同步器通过“死循环”来保证节点的正确添加,只有通过CAS将节点设置为尾节点后才会正确返回,否则不断尝试设置,即进入一个自旋过程。

通过调用同步器的release(int arg)来释放同步状态。头节点释放以后,会唤醒其后继节点,使得后继节点去尝试获取同步状态。

共享式同步状态的获取与释放

通过调用同步器的acquireShared(int arg)可以共享式地获取同步状态,成功获取同步状态并退出自旋的条件是tryAcquireShared(int arg)方法的返回值大于等于0.通过调用releaseShared(int arg)方法释放同步状态。通过自旋CAS来安全释放状态。

独占式超时获取同步状态

通过调用同步器的doAcquireNanos(int arg, long nanosTimeOut)可以超时获取同步状态,时间范围内获取成功则返回true,否则返回false。nanosTimeOut -=now(当前唤醒时间) - lastTime(上次唤醒时间)。大于0则继续睡眠nanosTimeOut纳秒,反之表示超时。在超时非常短的情况下,同步器会进入无条件快速自旋,因为此时再等待超时就没太大意义,无法做到十分精确。

重入锁ReentrantLock(排他锁)

重入锁,即支持重进入的锁,一个线程对资源的重复加锁。还支持获取锁时的公平和非公平性选择。synchronized支持隐式的重进入。

实现重进入:1、获取锁的线程是否为当前占据锁的线程;2、锁了多少次,则需释放多少次。

ReentrantLock是通过组合自定义同步器来实现锁的获取与释放的,默认是非公平的。

公平与非公平获取锁的区别:公平与否是针对锁而言的。如公平锁,则锁的获取顺序等于请求锁的绝对时间顺序,即FIFO,可通过是否还有前驱节点来判断。非公平锁,只要CAS设置同步状态成功,则表示当前线程获取到了锁。

非公平锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量,带来更小的性能开销。

读写锁ReentrantReadWriteLock(实现ReadWriteLock)

读写锁在同一时刻允许多个读线程访问,但写线程时,所有读线程与其他写线程被阻塞。

​​​​​​​读写锁维护了一个读锁一个写锁,通过读写分离使得并发性相比一般的排他锁有很大的提升。也支持非公平和公平,支持可重入(读锁可以再次获取读锁,写锁可以再次获取写锁,也可以获取读锁)、锁降级(遵循获取写锁、获取读锁再释放写锁的次序,写锁能降级为读锁)。

读写锁将状态变量切割为两部分,高16位为读,低16位为写。读锁和写锁都实现了 java.util.concurrent.locks.Lock 接口,所以除了支持 lock() 方法外,tryLock()、lockInterruptibly() 等方法也都是支持的。但是有一点需要注意,那就是只有写锁支持条件变量,读锁是不支持条件变量的,读锁调用 newCondition() 会抛出 UnsupportedOperationException 异常。另外,官方文档中还提到了,读写锁支持最多65535个递归写锁和65535个读锁。如果超过这个限制会导致锁定方法抛出错误。

Condition接口

Condition对象由Lock对象创建出来的,定义了等待/通知两种类型的方法。signal()/await()。获取一个condition必须通过lock的newCondition()方法。

Java并发容器和框架

ConcurrentHashMap

concurrentHashMap是线程安全且高效的。ConcurrentHashMap锁分段技术使得多个线程访问不同段的数据时,不同段的读写不会产生锁竞争的问题,在保证线程安全的同时降低了锁的粒度​​​​​​​,从而有效提升高并发访问效率。在进行读操作时不需要加锁,而在写操作时通过锁分段技术只对所操作的段加锁而不影响客户端对其它段的访问。

并发编程中HashMap可能会导致死循环,导致CPU利用率接近100%;

HashMap 扩容的时候会调用 resize() 方法,这里的并发操作容易在一个桶上形成环形链表。这样当获取一个不存在的 key 时,计算出的 index 正好是环形链表的下标就会出现死循环,其next节点永不为空。

而线程安全的HashTable效率又非常低。

HashTable使用synchronized来保证线程安全,但竞争激烈的情况下会进入阻塞轮询状态,使得其效率非常低下。

ConcurrentHashMap由Segment数组和HashEntry数组组成。Segment是一种可重入锁,继承于 ReentrantLock;HashEntry用于存储键值对。当对HashEntry数组的数据进行修改时,首先要获取对应的Segment锁。

初始化segment数组

initialCapacity、loadFactor、concurrencyLevel等几个参数来初始化segment数组、段偏移量segmentShift、段掩码segmentMask和每个segment中的HashEntry数组来实现的。

if (concurrencyLevel > 65535) {concurrencyLevel = 65535;
}
int sshift =0;
int ssize = 1;
while (ssize < concurrencyLevel) {++sshift;ssize <<= 1;
}
segmentShift = 32-sshift;
segmentMask= ssize -1;
this.segments = Segement.newArray(ssize);

concurrencyLevel最大值为65535,则segment数组最大长度为65536,对应的二进制位是16位。segment数组的长度是通过concurrencyLevel计算得到的。所以为了通过按位与的散列算法来定位数组下标,因此必须保证segment数组的长度是大于等于concurrencyLevel的最小的2的N次方。如concurrencyLevel大于8时,数组长度都是16。

初始化segmentShift、segmentMask

默认情况下concurrencyLevel=16,sshift=4.段偏移量segmentShift用于定位参与散列运算的位数,等于32-sshift。段掩码segmentMask=ssize-1。ssize最大值为65536.segmentShift最大16个。segmentMask最大65535.

初始化每个segment

  if (initialCapacity > MAXIMUM_CAPACITY)initialCapacity = MAXIMUM_CAPACITY;int c = initialCapacity / ssize;    // 总的桶数/总的段数if (c * ssize < initialCapacity)++c;int cap = 1;     // 每个段所拥有的桶的数目(2的幂次方)while (cap < c)cap <<= 1;for (int i = 0; i < this.segments.length; ++i)      // 初始化segments数组this.segments[i] = new Segment<K,V>(cap, loadFactor);

cap即segment数组中HashEntry数组的长度,等于initialCapacity除以ssize的倍数c,如果c大于1就取c的2的N次方,所以cap不是1就是2的N次方。segment的容量threshold=(int)cap*loadFactor。默认情况下initialCapacity=16,loadFactor=0.75。这样cap=1,threshold=0;

定位segment

首先对key进行hash得到一个哈希值,再对哈希值进行再散列从而定位到segment上。这样再哈希的目的是为了减少散列冲突,使元素能均匀分布在不同的segment上,从而提高存取效率。

 final Segment<K,V> segmentFor(int hash) {return segments[(hash >>> segmentShift) & segmentMask];}

put操作:从下面的源码我们可以看到,ConcurrentHashMap不同于HashMap,它既不允许key值为null(无法hashCode),也不允许value值为null。

public V put(K key, V value) {if (value == null) throw new NullPointerException();int hash = hash(key.hashCode());return segmentFor(hash).put(key, hash, value, false);}V put(K key, int hash, V value, boolean onlyIfAbsent) {lock();    // 上锁,保证线程安全try {int c = count;// ensure capacity,先判断是否需要扩容,// 而hashMap是在插入元素后再判断,可能会进行一次无效扩容if (c++ > threshold) rehash();HashEntry<K,V>[] tab = table;    // table是Volatile的int index = hash & (tab.length - 1);    // 定位到段中特定的桶HashEntry<K,V> first = tab[index];   // first指向桶中链表的表头HashEntry<K,V> e = first;// 检查该桶中是否存在相同key的结点while (e != null && (e.hash != hash || !key.equals(e.key)))  e = e.next;V oldValue;if (e != null) {        // 该桶中存在相同key的结点oldValue = e.value;if (!onlyIfAbsent)e.value = value;        // 更新value值}else {         // 该桶中不存在相同key的结点oldValue = null;++modCount;     // 结构性修改,modCount加1// 创建HashEntry并将其链到表头tab[index] = new HashEntry<K,V>(key, hash, first, value);  //write-volatile,count值的更新一定要放在最后一步(volatile变量) count = c;      }return oldValue;    // 返回旧值(该桶中不存在相同key的结点,则返回null)} finally {unlock();      // 在finally子句中解锁}
}

 针对段的扩容rehashConcurrentHashMap的重哈希实际上是对ConcurrentHashMap的某个段的重哈希,因此ConcurrentHashMap的每个段所包含的桶位自然也就不尽相同。

先会创建一个原来2倍容量的数组,然后将原数组的元素进行再散列后插入到新数组中。由于扩容是按照2的幂次方进行的,所以扩展前在同一个桶中的元素,现在要么还是在原来的序号的桶里,或者就是原来的序号再加上一个2的幂次方,就这两种选择。

void rehash() {HashEntry<K,V>[] oldTable = table;    // 扩容前的tableint oldCapacity = oldTable.length;if (oldCapacity >= MAXIMUM_CAPACITY)   // 已经扩到最大容量,直接返回return;// 新创建一个table,其容量是原来的2倍HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);   threshold = (int)(newTable.length * loadFactor);   // 新的阈值int sizeMask = newTable.length - 1;     // 用于定位桶for (int i = 0; i < oldCapacity ; i++) {// We need to guarantee that any existing reads of old Map can//  proceed. So we cannot yet null out each bin.HashEntry<K,V> e = oldTable[i];  // 依次指向旧table中的每个桶的链表表头if (e != null) {    // 旧table的该桶中链表不为空HashEntry<K,V> next = e.next;int idx = e.hash & sizeMask;   // 重哈希已定位到新桶if (next == null)    //  旧table的该桶中只有一个节点newTable[idx] = e;else {    // Reuse trailing consecutive sequence at same slotHashEntry<K,V> lastRun = e;int lastIdx = idx;for (HashEntry<K,V> last = next;last != null;last = last.next) {int k = last.hash & sizeMask;// 寻找k值相同的子链,该子链尾节点与父链的尾节点必须是同一个if (k != lastIdx) {lastIdx = k;lastRun = last;}}// JDK直接将子链lastRun放到newTable[lastIdx]桶中newTable[lastIdx] = lastRun;// 对该子链之前的结点,JDK会挨个遍历并把它们复制到新桶中for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {int k = p.hash & sizeMask;HashEntry<K,V> n = newTable[k];newTable[k] = new HashEntry<K,V>(p.key, p.hash,n, p.value);}}}}table = newTable;   // 扩容完成
}

get操作:get的高效在于整个get过程中不需要加锁,除非读到的值是空才会加锁重读。我们知道ConcurrentHashMap不同于HashMap,它既不允许key值为null,也不允许value值为null。但是,此处怎么会存在键值对存在且的Value值为null的情形呢?JDK官方给出的解释是,这种情形发生的场景是:初始化HashEntry时发生的指令重排序导致的,也就是在HashEntry初始化完成之前便返回了它的引用。这时,JDK给出的解决之道就是加锁重读。​​​​​​​

public V get(Object key) {// 先经过一次散列,再进行了散列运算   int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash);
}// segmentFor(hash).get(key, hash);V get(Object key, int hash) {// read-volatile,首先读count变量(当前segment大小)if (count != 0) { // 获取桶中链表头结点// 定位entry使用的hash算法: int index = hash & (tab.length -1);HashEntry<K,V> e = getFirst(hash);   while (e != null) {// 查找链中是否存在指定Key的键值对if (e.hash == hash && key.equals(e.key)) { V v = e.value;// 如果读到value域不为 null,直接返回if (v != null)  return v;   // 如果读到value域为null,说明发生了重排序,加锁后重新读取// recheckreturn readValueUnderLock(e); }e = e.next;}}// 如果不存在,直接返回nullreturn null;
}V readValueUnderLock(HashEntry<K,V> e) {lock();try {return e.value;} finally {unlock();}
}
  • 用HashEntery对象的不变性来降低读操作对加锁的需求;

  • 用Volatile变量协调读写线程间的内存可见性;

  • 若读时发生指令重排序现象,则加锁重读;

size操作:先通过2次不锁住segment的方式来统计各个segment的大小,如果统计过程中count发生了变化,则采用加锁的方式锁住所有segment的put、remove和clean方法,再来统计所有segment的大小。Segment包含一个modCount成员变量,在会引起段发生结构性改变的所有操作(put操作、 remove操作和clean操作)里,都会将变量modCount进行加1,因此,JDK只需要在统计size前后比较modCount是否发生变化就可以得知容器的大小是否发生变化。

clear操作只是把ConcurrentHashMap中所有的桶置空,每个桶之前引用的链表依然存在,只是桶不再引用这些链表而已,而链表本身的结构并没有发生任何修改。因此,正在遍历某个链表的读线程依然可以正常执行对该链表的遍历。

put操作如果需要插入一个新节点到链表中时会在链表头部插入这个新节点,此时链表中的原有节点的链接并没有被修改。也就是说,插入新的健/值对到链表中的操作不会影响读线程正常遍历这个链表。

remove操作:删除节点C之后的所有节点原样保留到新链表中;删除节点C之前的每个节点被克隆到新链表中(它们在新链表中的链接顺序被反转了)。因此在执行remove操作时,原始链表并没有被修改,也就是说,读线程不会受同时执行 remove 操作的并发写线程的干扰。

​​​​​​​ConcurrentLinkedQueue

ConcurrentLinkedQueue,一个基于链接节点的无界线程安全队列,采用FIFO对节点排序,元素添加到尾部,获取元素时返回头部,采用CAS来实现。

入队:一是定位出尾节点;二是CAS算法将入队节点设置成尾节点的next节点,不成功则重试;

出队:返回一个节点元素,并清空该节点对元素的引用。

阻塞队列

阻塞队列,支持两个附加操作的队列。这两个附加操作支持阻塞的插入和移除方法。阻塞的插入即当队列满时,队列会阻塞插入元素的线程,直到队列不满;阻塞的移除即在队列为空时获取元素的线程会等待队列非空。常用于生产者和消费者的场景。

有6种实现:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue、DelayQueue、PriorityBlockingQueue、LinkedTransferQueue;

ArrayBlockingQueue:基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列。

ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

LinkedBlockingQueue:基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。

需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

ArrayBlockingQueue和LinkedBlockingQueue是两个最普通也是最常用的阻塞队列,一般情况下,在处理多线程间的生产者消费者问题,使用这两个类足以。

DelayQueue:DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。

使用场景:DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。

PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

SynchronousQueue:一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。

声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

阻塞队列的典型使用场景就是 生产者/消费者模式。利用 ArrayBlockingQueue 来实现一个多线程版本的生产者/消费者模式。

public class BlockingQueueDemo {static ArrayBlockingQueue<String> abq = new ArrayBlockingQueue(3);public static void main(String[] args) {// 生产者for (int i = 0; i < 3; i++) {new Thread(() -> producer(), "producerThread" + i).start();}// 消费者for (int i = 0; i < 3; i++) {new Thread(() -> consumer(), "consumerThread" + i).start();}}private static void consumer() {while (true) {try {String msg = abq.take();System.out.println(Thread.currentThread().getName() + " ->receive msg:" + msg);} catch (InterruptedException e) {e.printStackTrace();}}}private static void producer() {for (int i = 0; i < 100; i++) {try {abq.put("[" + i + "]");System.out.println(Thread.currentThread().getName() + " ->send msg:" + i);} catch (InterruptedException e) {e.printStackTrace();}}}}

阻塞队列的优点:

1、降低多线程开发的难度:阻塞队列本身是线程安全的,队列可以安全地从一个线程向另外一个线程传递数据,所以我们的生产者/消费者直接使用线程安全的队列就可以,而不需要自己去考虑更多的线程安全问题。这也就意味着,考虑锁等线程安全问题的重任从 你 转移到了 队列 上,降低了我们开发的难度和工作量。

2、 隔离代码,实现业务代码解耦:队列还能起到一个隔离的作用。比如说我们开发一个银行转账的程序,那么生产者线程不需要关心具体的转账逻辑,只需要把转账任务,如账户和金额等信息放到(put)队列中就可以,而不需要去关心银行这个类如何实现具体的转账业务。而作为银行这个类来讲,它会去从队列里(take)取出来将要执行的具体的任务,再去通过自己的各种方法来完成本次转账。这样就实现了具体任务与执行任务类之间的解耦,任务被放在了阻塞队列中,而负责放任务的线程是无法直接访问到我们银行具体实现转账操作的对象的,实现了隔离,提高了安全性。

阻塞功能使得生产者和消费者两端的能力得以平衡,当有任何一端速度过快时,阻塞队列便会把过快的速度给降下来。实现阻塞最重要的两个方法是 take 方法和 put方法。

Java中13个原子操作类

JDK1.5开始提供的JUC下的atomic包下,一共13个类,4种类型的原子更新方式,分别为原子更新基本类型、原子更新数组、原子更新引用和原子更新属性(字段)。Atomic包里的类基本上都是使用Unsafe实现的包装类。值用volatile修饰。CAS会带来ABA问题,所以直接使用原子类也有ABA问题。但是使用AtomicStampedReference  可以解决这个问题。

原子更新基本类型(3个)

AtomicBoolean、AtomicInteger、AtomicLong。

例如AtomicInteger。addAndGet(原子方式将输入值和AtomicInteger的value相加)、compareAndSet、getAndIncrement(以原子方式+1,但返回自增前的值)、lazySet(int newValue)(最终设置为新值,但可能其他线程在一段时间内读到的还是旧值)、getAndSet(以原子方式设置新值,并返回旧值)。其他数据类型可以将其和int等进行转换,再使用对应的CAS操作。

原子更新数组类型(4个)

AtomicIntegerArray 、AtomicLongArray、AtomicReferenceArray(引用类型数组)、AtomicBooleanArray。

int addAndGet(int i, int data):原子方式将输入值与数组中索引为i的元素相加。

boolean compareAndSet(int i, int expect, int update);

 构造方法传入的是个数组,原子类会复制一个数组。所以对内部数组元素进行修改时,不会影响传入的数组。

原子更新引用类型(3个)

AtomicReference、AtomicReferenceFieldUpdater(引用类型里的字段)、AtomicMarkableReference(带有标记位的引用类型)。

原子更新字段类(3个)

AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicStampedReference(带版本号的引用类型,可解决CAS的ABA问题)

因原子更新字段类都是抽象类,所以必须使用静态方法newUpdater()创建一个更新器,并设置想要更新的类和属性。其次,更新类的字段必须使用public volatile修饰符。

Java中的并发工具类

CountDownLatch等待多线程完成

CountDownLatch允许一个或多个多线程等待其他线程完成操作。

CountDownLatch的构造函数接收一个int类型的参数作为计数器,想等待N个点完成,就传入N,调用countDown方法时就减1。

同步屏障CyclicBarrier

可循环使用的屏障。让一组线程到达一个屏障时被阻塞,直到最后一个线程到达时解除,所有被屏障拦截的线程才会继续执行。

new CyclicBarrier(int count),表示要拦截的线程数量,每个线程调用await方法告诉屏障到达,当前线程被阻塞。如果数量与线程数不相符,则会永远等待。new CyclicBarrier(int count,Runnable action)表示在到达屏障时,可优先执行action。

// 构造函数
public CyclicBarrier(int parties)
public CyclicBarrier(int parties, Runnable barrierAction)//重要方法
public int await() throws InterruptedException, BrokenBarrierException
public int await(long timeout, TimeUnit unit) throws InterruptedException, BrokenBarrierException, TimeoutException

应用场景:多线程计算数据,最后合并计算结果的场景

public class CyclicBarrierDemo {static class TaskThread extends Thread {CyclicBarrier barrier;public TaskThread(CyclicBarrier barrier) {this.barrier = barrier;}@Overridepublic void run() {try {Thread.sleep(1000);System.out.println(getName() + " 到达栅栏 A");barrier.await();System.out.println(getName() + " 冲破栅栏 A");Thread.sleep(2000);System.out.println(getName() + " 到达栅栏 B");barrier.await();System.out.println(getName() + " 冲破栅栏 B");} catch (Exception e) {e.printStackTrace();}}}public static void main(String[] args) {int threadNum = 5;CyclicBarrier barrier = new CyclicBarrier(threadNum, new Runnable() {@Overridepublic void run() {System.out.println(Thread.currentThread().getName() + " 完成最后任务");}});for(int i = 0; i < threadNum; i++) {new TaskThread(barrier).start();}}}打印结果:
Thread-1 到达栅栏 A
Thread-3 到达栅栏 A
Thread-0 到达栅栏 A
Thread-4 到达栅栏 A
Thread-2 到达栅栏 A
Thread-2 完成最后任务
Thread-2 冲破栅栏 A
Thread-1 冲破栅栏 A
Thread-3 冲破栅栏 A
Thread-4 冲破栅栏 A
Thread-0 冲破栅栏 A
Thread-4 到达栅栏 B
Thread-0 到达栅栏 B
Thread-3 到达栅栏 B
Thread-2 到达栅栏 B
Thread-1 到达栅栏 B
Thread-1 完成最后任务
Thread-1 冲破栅栏 B
Thread-0 冲破栅栏 B
Thread-4 冲破栅栏 B
Thread-2 冲破栅栏 B
Thread-3 冲破栅栏 B

CyclicBarrier 与 CountDownLatch 区别

1、CountDownLatch是线程组之间的等待,即一个(或多个)线程等待N个线程完成某件事情之后再执行;而CyclicBarrier则是线程组内的等待,即每个线程相互等待,即N个线程都被拦截之后,然后依次执行。

2、CountDownLatch是减计数方式,而CyclicBarrier是加计数方式

3、CountDownLatch计数为0无法重置,而CyclicBarrier计数达到初始值,则可以重置

4、CountDownLatch不可以复用,而CyclicBarrier可以复用

Java中的线程池

线程池可带来的好处

1、降低资源消耗:降低线程创建和销毁带来的消耗;

2、提高响应速度:任务到达时,不需等待线程创建就能立即执行;

3、提高线程的可管理性:线程池统一分配、调优和监控。

什么是线程池

 java.util.concurrent.Executors提供了一个 java.util.concurrent.Executor接口的实现用于创建线程池。多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。

一个线程池包括以下四个基本组成部分:
1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。

线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目。

 线程池实现原理

ThreadPoolExcutor的execute()方法流程如下所示:

创建新线程执行时都需获取全局锁。 按照如上设计思路,是为了在执行execute()方法时,尽可能避免全局锁。在ThreadPoolExcutor完成预热之后(当前运行的线程数大于等于corePoolSize),几乎所有的execute()方法调用都时执行步骤2进入阻塞队列,不需获取全局锁。

工作队列即阻塞队列线程池创建线程时,会将线程封装成工作线程Worker,worker完成任务后,还会循环读取工作队列里的任务来执行

为何使用阻塞队列?!因为线程无限制创建会导致内存占用过多而产生OOM,并造成CPU过度切换;同时线程的创建和销毁消耗较高;线程池创建线程需要获取mainlock这个全局锁,影响并发效率,阻塞队列可以很好的缓冲;阻塞队列可以保证任务队列中没有任务时阻塞获取任务的线程,使得线程进入wait状态,释放cpu资源;当队列中有任务时才唤醒对应线程从队列中取出消息进行执行。使得在线程不至于一直占用cpu资源。

线程池的使用

线程池的创建

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

1、corePoolSize(线程池基本大小):当向线程池提交一个任务时,若线程池已创建的线程数小于corePoolSize,即便此时存在空闲线程,也会通过创建一个新线程来执行该任务,直到已创建的线程数大于corePoolSize时就不再创建了( prestartAllCoreThreads() 方法会使得线程池提前创建并启动所有基本线程。)

2、maximumPoolSize(线程池最大数量):线程池所允许的最大线程个数。当队列满了,且已创建的线程数小于maximumPoolSize,则线程池会创建新的线程来执行任务。另外,对于无界队列,可忽略该参数。

3、keepAliveTime(线程存活保持时间)当线程池中线程数大于核心线程数时,线程的空闲时间如果超过线程存活时间,那么这个线程就会被销毁,直到线程池中的线程数小于等于核心线程数。

4、workQueue(任务队列):用于传输和保存等待执行任务的阻塞队列。

5、threadFactory(线程工厂):用于创建新线程。threadFactory创建的线程也是采用new Thread()方式,threadFactory创建的线程名都具有统一的风格:pool-m-thread-n(m为线程池的编号,n为线程池内的线程编号)。

6、handler(线程饱和策略):当线程池和队列都满了,再加入线程会执行此策略。

AbortPolicy直接抛出异常;CallerRunsPolicy只用调用者所在线程来执行任务;DiscardOldestPolicy丢弃队列里最近的一个任务,并执行当前任务;DiscardPolicy不处理丢弃掉。

提交任务

execute()和submit()方法。execute(),执行一个任务,没有返回值。submit(),提交一个线程任务,有返回值。

submit(Callable<T> task)能获取到它的返回值,通过future.get()获取(阻塞直到任务执行完)。一般使用FutureTask+Callable配合使用(IntentService中有体现)。

submit(Runnable task, T result)能通过传入的载体result间接获得线程的返回值。
submit(Runnable task)则是没有返回值的,就算获取它的返回值也是null。

Future.get方法会使取结果的线程进入阻塞状态,知道线程执行完成之后,唤醒取结果的线程,然后返回结果。

关闭线程池

调用线程池的shutdown或shutdownNow方法来关闭线程池,即遍历线程池中的工作线程,逐个调用线程的interrupt方法来中断线程。故无法响应中断的任务可能永远无法停止。shutdownNow将线程池状态设置为STOP,然后尝试停止所有工作线程或暂停的线程,并返回等待执行任务的列表;而shutdown只是将线程池的状态设置为SHUTDOWN状态,然后中断所有没有正在执行任务的线程。

CPU密集型,线程池线程个数=N(CPU)+1;IO密集型的线程个数=2*N(CPU);

Executor框架

线程池就是线程的集合,线程池集中管理线程,以实现线程的重用,降低资源消耗,提高响应速度等。线程用于执行异步任务,单个的线程既是工作单元也是执行机制,从JDK1.5开始,为了把工作单元与执行机制分离开,Executor框架诞生了,是一个用于统一创建与运行的接口。Executor框架实现的就是线程池的功能。

Executor框架包括3大部分:

(1)任务。即工作单元,包括被执行任务需要实现的接口:Runnable接口或者Callable接口;

(2)任务的执行。也就是把任务分派给多个线程的执行机制,包括Executor接口及继承自Executor接口的ExecutorService接口。

(3)异步计算的结果。包括Future接口及实现了Future接口的FutureTask类。

类与接口如下所示:

ThreadPoolExecutor

ThreadPoolExecutor通过Executors工具类来创建ThreadPoolExecutor的子类FixedThreadPool、SingleThreadExecutor、CachedThreadPool,这些子类继承ThreadPoolExecutor,并且其中的一些参数已经被配置好。

// LinkedBlockingQueue不同于ArrayBlockingQueue,
// 它如果不指定容量,默认为Integer.MAX_VALUE//FixedThreadPoll 可重用固定数量的线程池
// 核心线程数和最大线程数相等,等于传入的值大小
// 当线程池数量大于核心线程数量大小时,多余的空闲线程会被立即终止(keepAliveTime=0)
// LinkedBlockingQueue<Runnable>无界队列作为工作队列
// 适用于为满足资源管理的需求而需要限制当前线程数量的应用场景
// 适用于负载比较重的服务器
ExecutorService ftp = Executors.newFixedThreadPool(int threadNums);
ExecutorService ftp = Executors.newFixedThreadPool(int threadNums, ThreadFactory threadFactory);//SingleThreadExecutor 使用单个Worker线程的线程池
// 核心线程大小和最大线程数都是1.LinkedBlockingQueue<Runnable>无界队列作为工作队列
// 适用于需要保证顺序执行任务并在任意时间点不会有多线程是活动的应用场景
ExecutorService ste = Executors.newSingleThreadExecutor();
ExecutorService ste = Executors.newSingleThradPool(ThreadFactory threadFactory);//CachedThreadPool 会根据需要创建新线程的线程池
// 核心线程=0,最大线程数为Integer.Max_value.无界的
// 如果keepAliveTime=60L即空闲线程超过60s就会被终止。
// 使用没容量的synchronousQueue作为工作队列
// 主线程提交任务的速度高于maximumPool中线程处理任务的速度时,就会不断创建新线程
// 大小无界线程池,适用于执行很多的短期异步的小程序,或负载较轻的服务器
ExecutorService ctp = Executors.newCachedThreadPool();
ExecutorService ctp = Executors.newCachedThreadPool(ThreadFactory threadFactory);

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。主要用于在给定的延迟后执行任务或者定期执行任务。作用类似于java.util包下的Timer类,但是比Timer功能更强大、更灵活,因为Timer只能控制单个线程延迟或定期执行,而ScheduledThreadPoolExecutor对应的是多个线程的后台线程。

 ScheduledThreadPoolExecutor可以利用Executors工厂类来创建两种:ScheduledThreadPoolExecutor和SingleThreadScheduledExecutor.

// ScheduledThreadPoolExecutor
// DelayQueue无界队列。
// 适用于若干个(固定)线程延时或者定期执行任务,
// 同时为了满足资源管理的需求而需要限制后台线程数量的场景。
ScheduledExecutorService stp = Executors.newScheduledThreadPool(int threadNums);
ScheduledExecutorService stp = Executors.newScheduledThreadPool(int threadNums, ThreadFactory threadFactory);// SingleThreadScheduledExecutor
// 适用于需要单个线程延时或者定期的执行任务,
//同时需要保证各个任务顺序执行的应用场景。
ScheduledExecutorService stse = Executors.newSingleThreadScheduledExecutor(int threadNums);
ScheduledExecutorService stp = Executors.newSingleThreadScheduledExecutor(int threadNums, ThreadFactory threadFactory);

Future

Future接口和实现Future接口的FutureTask类用来表示异步计算的结果。FutureTask的Future就源自于它的异步工作机制,FutureTask就提供了这么一个异步的返回结果的机制,当执行一个FutureTask的时候,我们可以接着做别的任务,在将来的某个时间,FutureTask任务完成后会返回FutureTask对象来包装返回的结果,只要调用这个对象的get()方法即可获取返回值。

Runnable和Callable接口都可以被ThreadPoolExecutor、ScheduledThreadPoolExecutor执行。runnable不会返回结果,callable可以返回结果。runnable可以使用Executors包装成一个callable。

Java并发编程艺术相关推荐

  1. Java并发编程艺术阅读笔记(一)

    Java并发编程艺术阅读笔记(一) 1.什么是上下文切换 CPU通过时间片分配算法循环执行任务,执行完一个任务的时间片后就会切换到下一个任务.但是在切换任务之前会保存上一个任务的状态,在切换回该任务, ...

  2. java并发编程艺术——基础篇

    这篇文章目的是为了总结一下这段时间看<java并发编程艺术>学到的东西,尝试用自己的话说出来对java多线程的理解和使用. 一.什么是多线程,为什么要用多线程,多线程带来的挑战 多线程定义 ...

  3. Java并发编程艺术学习笔记(五)

    Java并发编程艺术学习笔记(五) Java并发容器和框架 Java为开发者也提供了许多开发容器和框架,可以从每节的原理分析来学习其中精妙的并发程序. 一.ConcurrentHashMap的实现原理 ...

  4. Java并发编程艺术----读书笔记(二)

    版权声明:本文为博主原创文章,未经博主允许不得转载. https://blog.csdn.net/a724888/article/details/64214595  java并发编程艺术2 jav ...

  5. 【java并发编程艺术学习】(一)初衷、感想与笔记目录

    不忘初心,方得始终. 学习java编程这么长时间,自认为在项目功能需求开发中没啥问题,但是之前的几次面试和跟一些勤奋的或者小牛.大牛级别的人的接触中,才发现自己的无知与浅薄. 学习总得有个方向吧,现阶 ...

  6. Java 并发编程艺术 读书笔记

    第 1 章 并发编程的挑战 1.1.3 如何减少上下文切换 减少上下文切换的方法有无锁并发编程.CAS 算法.使用最少线程和使用协程. 无锁并发编程.多线程竞争锁时,会引起上下文切换,所以多线程处理数 ...

  7. JAVA并发编程艺术读书笔记(1,2章节)

    第一章 并发编程的挑战 为什么要使用并发编程? 主要是为了更有效地利用资源.即使是单核的CPU也可以多线程执行程序,多线程实际上是CPU分配时间片给各个线程,因为时间片非常短,所以看起来就像在同事执行 ...

  8. Java并发编程艺术读书笔记

    1.多线程在CPU切换过程中,由于需要保存线程之前状态和加载新线程状态,成为上下文切换,上下文切换会造成消耗系统内存.所以,可合理控制线程数量. 如何控制: (1)使用ps -ef|grep appn ...

  9. 【java并发编程艺术学习】(四)第二章 java并发机制的底层实现原理 学习记录(二) synchronized...

    章节介绍 本章节主要学习 Java SE 1.6 中为了减少获得锁 和 释放锁 时带来的性能消耗 而引入的偏向锁 和 轻量级锁,以及锁的存储结构 和 升级过程. synchronized实现同步的基础 ...

最新文章

  1. 基于 HTML5 的 WebGL 技术构建 3D 场景(一)
  2. System.Net.Dns.GetHostByAddress(string) 已经过时
  3. php表白情话,向一个人表白 抖音最火99句情话告白
  4. 消防信号二总线有没电压_春晓161#地块人防工程消防电源监控系统的设计与应用...
  5. Servlet API
  6. 已经push的如何回退_如何撤回Git push 到远程分支以后的方法
  7. php 发送文本 设置头,php中header设置常见文件类型的content-type
  8. mysql数据库原理
  9. mysql++缓冲区_思考mysql内核之初级系列4--innodb缓冲区管理(摘自老杨)
  10. linux端口快速释放,Linux 快速释放端口与释放内存缓存,linux释放端口缓存
  11. 微信公众号调用扫一扫,使扫一扫支持扫PDF417格式的条码
  12. 3、那智机器人手柄各操作键的功能
  13. 2017中兴捧月算法精英挑战赛-迪杰斯特拉
  14. 机器学习笔记 - 学习使用TensorFlow和张量处理单元 (TPU) 构建图像分类模型
  15. 图书管理系统java+Oracle
  16. nodejs负载均衡(一):服务负载均衡
  17. Windows平台上一些开发软件的卸载与安装@大蟒蛇马戏团
  18. 新品发布-T3M系列宽带高速MIMO Mesh自组网电台
  19. php 小写还是大写,php中如何判断字母是大写还是小写
  20. 物 理 学 简 介(一)

热门文章

  1. Util-linux
  2. C++输入密码显示*
  3. 5.5 QR分解一:施密特正交化
  4. 【转载】关于重定向RedirectAttributes的用法
  5. android 输入法模式,android输入法的四种模式(弹出输入法式的窗口变化)
  6. 消费者洞察:数据化闭环洞察消费者
  7. Threejs教程之着色器
  8. ProgressDialog在线程里下载图片
  9. 【Docker】9、Docker-Compose安装轻量级分布式日志服务Graylog
  10. TypeError: argument 1 has unexpected type 'NoneType'(解决办法)