concurrent之atomic相关

概述

java.util.concurrent.atomic原子操作类包里面提供了一组原子变量类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。可以对基本数据、数组中的基本数据、对类中的基本数据进行操作。原子变量类相当于一种泛化的volatile变量,能够支持原子的和有条件的读-改-写操作。

java.util.concurrent.atomic中的类可以分成4组:

  • 标量类:AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference 
  • 数组类:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray 
  • 更新器类:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater 
  • 复合变量类:AtomicMarkableReference,AtomicStampedReference

标量类

AtomicBoolean、AtomicInteger、AtomicLong、AtomicReference这四种基本类型用来处理布尔,整数,长整数,对象四种数据,其内部实现不是简单的使用synchronized,而是一个更为高效的方式CAS (compare and swap) + volatile和native方法,从而避免了synchronized的高开销,执行效率大为提升。其实例各自提供对相应类型单个变量的访问和更新。每个类也为该类型提供适当的实用工具方法。

以为例,源码如下:

public class AtomicInteger extends Number implements java.io.Serializable {// ……private static final Unsafe unsafe = Unsafe.getUnsafe();private static final long valueOffset;//……private volatile int value;public AtomicInteger(int initialValue) {value = initialValue;}public AtomicInteger() {}public final int get() {return value;}public final void set(int newValue) {value = newValue;}public final void lazySet(int newValue) {unsafe.putOrderedInt(this, valueOffset, newValue);}public final int getAndSet(int newValue) {for (;;) {int current = get();if (compareAndSet(current, newValue))return current;}}public final boolean compareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}public final boolean weakCompareAndSet(int expect, int update) {return unsafe.compareAndSwapInt(this, valueOffset, expect, update);}
}

虽然原子的标量类扩展了基本类型的类,但是并没有扩展基本类型的包装类,如Integer或Long,事实上它们也不能直接扩展。因为基本类型的包装类是不可以修改的,而原子变量类是可以修改的。在原子变量类中没有重新定义hashCode或equals方法,每个实例都是不同的,他们也不宜用做基于散列容器中的键值。

数组类

AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray类进一步扩展了原子操作,对这些类型的数组提供了支持。这些类在为其数组元素提供volatile访问语义方面也引人注目,这对于普通数组来说是不受支持的。其内部并不是像AtomicInteger一样维持一个volatile变量,而是全部由native方法实现。数组变量进行volatile没有意义,因此set/get就需要unsafe来做了,但是多了一个index来指定操作数组中的哪一个元素。

更新器类

AtomicReferenceFieldUpdater,AtomicIntegerFieldUpdater和AtomicLongFieldUpdater 是基于反射的实用工具,可以提供对关联字段类型的访问,可用于获取任意选定volatile字段上的compareAndSet操作。它们主要用于原子数据结构中,该结构中同一节点的几个 volatile 字段都独立受原子更新控制。这些类在如何以及何时使用原子更新方面具有更大的灵活性,但相应的弊端是基于映射的设置较为拙笨、使用不太方便,而且在保证方面也较差。

注:netty5.0中类ChannelOutboundBuffer统计发送的字节总数,由于使用volatile变量已经不能满足,所以使用AtomicIntegerFieldUpdater 来实现的,看下面代码:

//定义
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER =AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize");private volatile long totalPendingSize;//使用
long oldValue = totalPendingSize;
long newWriteBufferSize = oldValue + size;
while (!TOTAL_PENDING_SIZE_UPDATER.compareAndSet(this, oldValue, newWriteBufferSize)) {oldValue = totalPendingSize;newWriteBufferSize = oldValue + size;
}

复合变量类

AtomicMarkableReference 类将单个布尔值与引用关联起来。维护带有标记位的对象引用,可以原子方式更新带有标记位的引用类型。 
  AtomicStampedReference 类将整数值与引用关联起来。维护带有整数“标志”的对象引用,可以原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和版本号,可以解决使用CAS进行原子更新时,可能出现的ABA问题。
ABA问题:

JDK1.5之后,Atomic包提供的类AtomicStampedReference就解决了这个问题。这个类的compareAndSet方法先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。

public class AtomicStampedReference<V> {private static class Pair<T> {final T reference;final int stamp;private Pair(T reference, int stamp) {this.reference = reference;this.stamp = stamp;}static <T> Pair<T> of(T reference, int stamp) {return new Pair<T>(reference, stamp);}}private volatile Pair<V> pair;public AtomicStampedReference(V initialRef, int initialStamp) {pair = Pair.of(initialRef, initialStamp);}public V getReference() {return pair.reference;}public int getStamp() {return pair.stamp;}public V get(int[] stampHolder) {Pair<V> pair = this.pair;stampHolder[0] = pair.stamp;return pair.reference;}public boolean weakCompareAndSet(V   expectedReference,V   newReference,int expectedStamp,int newStamp) {return compareAndSet(expectedReference, newReference,expectedStamp, newStamp);}public boolean compareAndSet(V   expectedReference,V   newReference,int expectedStamp,int newStamp) {Pair<V> current = pair;returnexpectedReference == current.reference &&expectedStamp == current.stamp &&((newReference == current.reference &&newStamp == current.stamp) ||casPair(current, Pair.of(newReference, newStamp)));}
}

concurrent之collections相关

java.util.concurrent包,此包下的集合都不允许添加null元素。具体集合见下表:

对应接口 特性 并发技术 使用场景
ConcurrentHashMap Map 非阻塞、线程安全 CAS,synchronized,volatile  
ConcurrentSkipListMap Map 线程安全   构造函数支持排序,甚至可按照复合类型字段排序
ConcurrentSkipListSet Map 线程安全   构造函数支持排序,甚至可按照复合类型字段排序
CopyOnWriteArrayList List 线程安全   1、读写分离,读和写分开,需要读和写时都是对拷贝的副本进行操作。 
2、最终一致性 
缺点:
1、由于写操作的时候,需要拷贝数组,会消耗内存,如果原数组的内容比较多的情况下,可能导致young gc或者full gc
2、不能用于实时读的场景,像拷贝数组、新增元素都需要时间,所以调用一个set操作后,读取到数据可能还是旧的,虽然CopyOnWriteArrayList 能做到最终一致性,但是还是没法满足实时性要求;
3、CopyOnWriteArrayList 合适读多写少的场景,不过这类慎用 
因为谁也没法保证CopyOnWriteArrayList 到底要放置多少数据,万一数据稍微有点多,每次add/set都要重新复制数组,这个代价实在太高昂了。在高性能的互联网应用中,这种操作分分钟引起故障。
CopyOnWriteArraySet Set 线程安全   1、读写分离,读和写分开,需要读和写时都是对拷贝的副本进行操作。 
2、不存储重复对象
3、最终一致性
ConcurrentLinkedQueue Queue 线程安全、FIFO   线程安全,但一边遍历一边poll还是不行的
ConcurrentLinkedDeque Deque 线程安全、链表   链表式操作,可对队首队尾直接操作
ArrayBlockingQueue Queue、Collection 有界、阻塞、线程安全、FIFO   生产者、消费者场景比较合适,并且支持FIFO
LinkedBlockingQueue Queue 阻塞、线程安全、FIFO、链表    
LinkedTransferQueue Queue 阻塞、线程安全、FIFO   LinkedTransferQueue实现了一个重要的接口TransferQueue,该接口含有下面几个重要方法:
1. transfer(E e):若当前存在一个正在等待获取的消费者线程,即立刻移交之;否则,会插入当前元素e到队列尾部,并且等待进入阻塞状态,到有消费者线程取走该元素。
2. tryTransfer(E e):若当前存在一个正在等待获取的消费者线程(使用take()或者poll()函数),使用该方法会即刻转移/传输对象元素e;若不存在,则返回false,并且不进入队列。这是一个不阻塞的操作。
3. tryTransfer(E e, long timeout, TimeUnit unit):若当前存在一个正在等待获取的消费者线程,会立即传输给它;否则将插入元素e到队列尾部,并且等待被消费者线程获取消费掉;若在指定的时间内元素e无法被消费者线程获取,则返回false,同时该元素被移除。
4. hasWaitingConsumer():判断是否存在消费者线程。
5. getWaitingConsumerCount():获取所有等待获取元素的消费线程数量。
LinkedBlockingDeque Deque 阻塞、线程安全、FIFO、链表    
DelayQueue Queue 阻塞、线程安全、FIFO、链表    
PriorityBlockingQueue Queue 阻塞、优先级   要使用FIFO,您需要插入一个新的FIFOEntry(anEntry),而不是普通的entry对象
SynchronousQueue Queue 线程安全、链表   只允许一个值添加、取出,无容量

ConcurrentHashMap

ConcurrentHashMap 1.7

还在jdk1.7的时候ConcurrentHashMap的底层数据结构其实是由Segment数组和多个HashEntry组成,如下图所示:

Segment数组的意义就是将一个大的table分割成多个小的table来进行加锁,也就是上面的提到的锁分段技术,而每一个Segment元素存储的是HashEntry数组+链表,这个和HashMap的散列表+链表的数据存储结构是一样的。这里的每个table就像我们之前所说的HashTable一样。

同时从源码中我们可以看出,Segment继承了ReentrantLock(独占锁)类。使大table中的每个小table都有了一个锁。

static final class Segment<K,V> extends ReentrantLock implements Serializable

ConcurrentHashMap 1.8

在jdk1.8中,ConCurrentHashMap的数据结构底层是:散列表+链表+红黑树,其变化与HashMap在1.8的变化是一样的。

concurrentHashMap特点:

  • ConcurrentHashMap支持高并发情况下对哈希表的访问和更新。
  • ConcurrentHashMap与HashTable相似,与HashMap不同。
  • ConcurrentHashMap的所有操作都是线程安全的。
  • 它不允许null用作键或值
  • get操作没有上锁。是非阻塞的。所以在并发情况下可以与阻塞的put或remove函数交迭。但在聚合操作下比如putAll和clean,并发情况下由于线程调度的原因get函数可能只能检索到插入和删除的一些Entries(函数还未执行完)。
  • 与get函数的处理相类似的还有Iterators, Spliterators,Enumerations,在其创建时或之后,倘若ConcurrentHashMap再发生改变就不会再抛ConcurrentModificationException了。取而代之的是在其改变时new新的数据从而不影响原有的数据,Iterator会在其完成后再将头指针替换为新的数据,这样Iterator线程可以使用原来老的数据,而写线程也可以并发的完成改变,更重要的,这保证了多个线程并发执行的连续性和扩展性,是性能提升的关键。
  • 不过,迭代器被设计成每次仅由一个线程使用。
  • 同时需要注意:size,isEmpty,containsValue等函数的使用,在ConcurrentHashMap实例并发情况下是无意义的。它只能反映该实例的一个暂态,除非此时它并未发生并发修改。

ConcurrrentHashMap关键属性

  • volatile Node<K,V>[] table:装载Node的数组,作为ConcurrentHashMap的数据容器,采用懒加载的方式,直到第一次插入数据的时候才会进行初始化操作,数组的大小总是为2的幂次方(因为继承自HashMap)。
  • transient volatile Node<K,V>[] nextTable:扩容时使用,平时为null,只有在扩容的时候才为非null。逻辑机制和ArrayList底层的数组扩容一致。
  • transient volatile long baseCount:元素数量基础计数器,该值也是一个阶段性的值(产出的时候可能容器正在被修改)。通过CAS的方式进行更改。
  • transient volatile int sizeCtl:散列表初始化和扩容的大小都是由该变量来控制。当为负数时,它正在被初始化或者扩容。
  • static final sun.misc.Unsafe U:在ConcurrentHashMap的实现中可以看到大量的U.compareAndSwapXXXX的方法去修改ConcurrentHashMap的一些属性。这些方法实际上是利用了CAS算法保证了线程安全性,这是一种乐观策略,假设每一次操作都不会产生冲突(变量实际值!=期望值),当且仅当冲突发生的时候再去尝试。

Java中transient关键字的作用,简单地说,就是让某些被修饰的成员属性变量不被序列化,这一看好像很好理解,就是不被序列化,那么什么情况下,一个对象的某些字段不需要被序列化呢?如果有如下情况,可以考虑使用关键字transient修饰:

  • 类中的字段值可以根据其它字段推导出来,如一个长方形类有三个属性:长度、宽度、面积(示例而已,一般不会这样设计),那么在序列化的时候,面积这个属性就没必要被序列化了;
  • 其它,看具体业务需求吧,哪些字段不想被序列化;

ConcurrentHashMap构造函数

// 构造一个空map映射,初始容量16
public ConcurrentHashMap() {
}// 初始化时明确给定一个初始容量,减少resize次数
public ConcurrentHashMap(int initialCapacity) {if (initialCapacity < 0)throw new IllegalArgumentException();// tableSizeFor 函数返回一个最接近入参 initialCapacity 容量的2进制整数int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?MAXIMUM_CAPACITY :tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));this.sizeCtl = cap;
}// 创建一个与给定的 Map 映射具有相同元素的 ConcurrentHashMap
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {// 我们前面所述 sizeCtl 两个含义,构造和扩容。// 此处必然是构造容量为 DEFAULT_CAPACITY = 16 的 ConcurrentHashMapthis.sizeCtl = DEFAULT_CAPACITY;putAll(m);
}public void putAll(Map<? extends K, ? extends V> m) {// 初始化数组容量,防止直接迭代insert导致频繁扩容tryPresize(m.size());for (Map.Entry<? extends K, ? extends V> e : m.entrySet())putVal(e.getKey(), e.getValue(), false);
}// 构造一个空的 Map 映射,并给定其初始容量与加载因子
public ConcurrentHashMap(int initialCapacity, float loadFactor) {this(initialCapacity, loadFactor, 1);
}// 构造一个空的 Map 映射,并给定其初始容量,加载因子与预估的并发更新的线程数
public ConcurrentHashMap(int initialCapacity,float loadFactor, int concurrencyLevel) {if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)throw new IllegalArgumentException();if (initialCapacity < concurrencyLevel)   // Use at least as many bins// 该情况下,一个更新线程负责一个HashEntryinitialCapacity = concurrencyLevel;   // as estimated threads// 确定 table 的真实容器大小 = 元素数量 initialCapacity / 元素密度 loadFactor// 比如你要存30个元素,构造Map的时候传入30和0.75,那么table真实容量就应该是 30/0.75。保证你要存的元素数量是table容器的0.75倍long size = (long)(1.0 + (long)initialCapacity / loadFactor);int cap = (size >= (long)MAXIMUM_CAPACITY) ?MAXIMUM_CAPACITY : tableSizeFor((int)size);this.sizeCtl = cap;
}

常见API

initTable()

private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)// sizeCtl变量小于0,即我们前面所提到的为-1。说明此时其他线程已经修改过sizeCtl变量的值,并赋值为-1,说明此时正有线程在构造ConcurrentHashMap对象。此时我们应该终止当前线程的构造操作。// 1. 保证只有一个线程正在进行初始化操作// 小知识:Thread.yield()函数在程序只有一个线程运行的时候,会继续运行不再暂停唯一的线程对象。Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {// 通过CAS函数比较并设置SIZECTL(sizeCtl)常量的值,我们前面对sizeCtl变量解析的时候说过,当ConcurrentHashMap在构造的时候sizeCtl为-1。try {if ((tab = table) == null || tab.length == 0) {// 2. 得出数组的大小int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")// 3. 这里才真正的初始化数组Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;// 4. 计算数组中可用的大小:实际大小n*0.75(加载因子)sc = n - (n >>> 2);//无符号右移两位 n-(1/4)n}} finally {sizeCtl = sc;}break;}}return tab;
}

有可能存在一个情况是多个线程同时走到这个方法中,为了保证能够正确初始化,在第1步中会先通过if进行判断,若当前已经有一个线程正在初始化即sizeCtl值变为-1,这个时候其他线程在If判断为true从而调用Thread.yield()让出CPU时间片。正在进行初始化的线程会调用U.compareAndSwapInt方法将sizeCtl改为-1即正在初始化的状态。

putVal(K key, V value, boolean onlyIfAbsent)

当且仅当table中不存在该key对应的Entry时才插入该Entry。否则不替换table中原有的Entry中的value。

final V putVal(K key, V value, boolean onlyIfAbsent) {if (key == null || value == null) throw new NullPointerException();// 1. 计算key的hash值,与HashMap处理逻辑一样int hash = spread(key.hashCode());int binCount = 0;for (Node<K,V>[] tab = table;;) {Node<K,V> f; int n, i, fh;// 2. 如果当前table还没有初始化先调用initTable方法将tab进行初始化if (tab == null || (n = tab.length) == 0)tab = initTable();// 3. tab中索引为i的位置的元素为null,则直接使用CAS将值插入即可else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null)))break;                   // no lock when adding to empty bin}/* 4. 如果要插入的位置是一个forwordingNode节点,表示正在扩容,那么当前线程帮助扩容这里我有个问题:为什么该位置会是forwordingNode节点 */else if ((fh = f.hash) == MOVED)tab = helpTransfer(tab, f);else {V oldVal = null;// 5. 进行到这一步,说明要插入的位置有值,需要对该桶加锁。synchronized (f) {// 确定f是tab中的头节点if (tabAt(tab, i) == f) {// fh = 桶首元素的hast值。如果头结点的哈希值大于等于0,说明要插入的节点在(链表)中。否则有可能该桶的数据结构不是链表而是红黑树if (fh >= 0) {binCount = 1;// 开始迭代找key的节点,f = 桶首元素for (Node<K,V> e = f;; ++binCount) {K ek;// 如果某一节点的key的哈希值及key与参数相等,替换该节点的valueif (e.hash == hash &&((ek = e.key) == key ||(ek != null && key.equals(ek)))) {oldVal = e.val;if (!onlyIfAbsent)e.val = value;break;}Node<K,V> pred = e;// 没有找到则继续向后迭代,当迭代到最后一个元素还没有找到时,将该Entry插入到链表尾。if ((e = e.next) == null) {pred.next = new Node<K,V>(hash, key,value, null);break;}}}// 6. 如果要插入的节点在红黑树中,则按照树的方式插入或替换节点else if (f instanceof TreeBin) {Node<K,V> p;binCount = 2;if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,value)) != null) {oldVal = p.val;if (!onlyIfAbsent)p.val = value;}}}}// 7. 如果binCount不为0,说明插入或者替换操作完成了if (binCount != 0) {// 判断节点数量是否大于等于8,如果是就需要把链表转化成红黑树if (binCount >= TREEIFY_THRESHOLD)// 链表转成红黑树 treeifyBin(tab, i);if (oldVal != null)return oldVal;break;}}}// 8. 对当前容量大小进行检查,如果超过了临界值(实际大小*加载因子)就需要扩容 // 能执行到这一步,说明节点不是被替换的,是被插入的,否则在binCount判断 !=0 的时候就要被return了。addCount(1L, binCount);return null;
}private final Node<K,V>[] initTable() {Node<K,V>[] tab; int sc;while ((tab = table) == null || tab.length == 0) {if ((sc = sizeCtl) < 0)Thread.yield(); // lost initialization race; just spinelse if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {try {if ((tab = table) == null || tab.length == 0) {int n = (sc > 0) ? sc : DEFAULT_CAPACITY;@SuppressWarnings("unchecked")Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];table = tab = nt;sc = n - (n >>> 2);}} finally {sizeCtl = sc;}break;}}return tab;
}

整体流程下来就是:

  1. 计算key哈希值
  2. 根据哈希值计算在table中的位置
  3. 根据哈希值执行插入或替换操作
    1. 如果这个位置没有值,直接将键值对放进去,不需要加锁。(CAS)
    2. 如果要插入的位置是一个forwordingNode节点,表示正在扩容,那么当前线程帮助扩容
    3. 加锁。以下操作都需要加锁。
    4. 如果要插入的节点在链表中,遍历链表中的所有节点,如果某一节点的key哈希值和key与参数相等,替换节点的value,记录被替换的值;如果遍历到了最后一个节点,还没找到key对应的节点,根据参数新建节点,插入链表尾部
    5. 如果要插入的节点在树中,则按照树的方式插入或替换节点。如果是替换操作,记录被替换的值
  4. 判断节点数量是否大于8,如果大于就需要把链表转化成红黑树
  5. 如果操作3中执行的是替换操作,返回被替换的value。程序结束。
  6. 能执行到这一步,说明节点不是被替换的,是被插入的,所以要将map的元素数量加1。

get(Object key)

据我们之前的类头注释所译。get函数没有必要加锁的。但是可以看到的是ConcurrentHashMap对其Node类的next属性加上了volatile关键字进行修饰。来保证并发情况下其他线程若正在与get函数同步的修改该节点的next属性,保证了它的可见性。

Node类

Node类实现了Map.Entry接口,主要存放key-value对,并且具有next域。

static class Node<K,V> implements Map.Entry<K,V> {final int hash;final K key;volatile V val;volatile Node<K,V> next;......
}

另外可以看出很多属性都是用volatile进行修饰的,也是为了保证并发情况下的该属性的可见性。同事对hash和key用final进行修饰也是提供了这两个常用变量的缓存,性能上有所提高。

CAS

在ConcurrentHashMap中会有大量的CAS操作来修改它的一些属性和操作。所以先来看一些常用的CAS操作是如何保证线程安全的.

tabAt:获取table数组中索引为i的Node元素。

 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);}

casTabAt:利用CAS操作设置table数组中索引为i的元素

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,Node<K,V> c, Node<K,V> v) {return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

总结

  • 底层结构是散列表(数组+链表)+红黑树,这一点和HashMap是一样的。
  • Hashtable是将所有的方法进行同步,效率低下。而ConcurrentHashMap作为一个高并发的容器,它是通过synchronized+CAS算法来进行实现线程安全的。使用3个CAS操作来确保node的一些操作的原子性,这种方式代替了锁。
    • 采用synchronized而不是ReentrantLock。
    • CAS算法是乐观锁的一种
  • ConcurrentHashMap的key和Value都不能为null
  • get方法是非阻塞,无锁的。重写Node类,通过volatile修饰next来实现每次获取都是最新设置的值。相比于在jdk1.7中的变化就是不采用segment而采用node,锁住node来实现减小锁粒度。
  • sizeCtl的不同值来代表不同含义,起到了控制的作用。

ConcurrentLinkedQueue

ConcurrentLinkedQueue是并发大师Doug Lea(如果看了jdk的concurrent包的源码,相信读者对此人不会陌生)根据Michael-Scott提出的非阻塞链接队列算法的基础上修改而来,它是一个基于链表的无界线程安全队列,它采用先入先出的规则对节点进行排序,当我们添加一个节点的时候,它会添加到队列的尾部;当我们获取一个元素的时,它会返回队列头部的元素。它通过使用head和tail引用延迟更改的方式,减少CAS操作,在满足线程安全的前提下,提高了队列的操作效率。

  • offer,add区别:

一些队列有大小限制,因此如果想在一个满的队列中加入一个新项,多出的项就会被拒绝。这时新的 offer 方法就可以起作用了。它不是对调用 add() 方法抛出一个 unchecked 异常,而只是得到由 offer() 返回的 false。

  • poll,remove区别:

remove() 和 poll() 方法都是从队列中删除第一个元素。remove() 的行为与 Collection 接口的版本相似,但是新的 poll() 方法在用空集合调用时不是抛出异常,只是返回 null。因此新的方法更适合容易出现异常条件的情况。

  • peek,element区别:

element() 和 peek() 用于在队列的头部查询元素。与 remove() 方法类似,在队列为空时, element() 抛出一个异常,而 peek() 返回 null

CopyOnWriteArrayList

其对并发场景下操作提供了更好的支持,其内部使用ReentrantLock进行并发的控制,它主要具有以下特性:

  1. 所有元素都存储在数组里面, 只有当数组进行remove, update时才在方法上加上ReentrantLock, 拷贝一份snapshot的数组, 只改变snapshot中的元素,最后再赋值到CopyOnWriteArrayList中。
  2. 所有的get方法只是获取数组对应下标上的元素(无需加锁控制)。

以add方法为例:

public boolean add(E e) {final ReentrantLock lock = this.lock;//1、获取锁lock.lock();try {//2、获取当前数组元素Object[] elements = getArray();//3、获取数组长度int len = elements.length;//4、复制旧数组数据到新数组Object[] newElements = Arrays.copyOf(elements, len + 1);//5、将新元素添加到新数组的尾部newElements[len] = e;//6、将新数组更新为当前数组setArray(newElements);return true;} finally {//7、释放锁lock.unlock();}
}

整体操作比较简单。

LinkedBlockingQueue

LinkedBlockingQueue是一个使用链表实现的阻塞队列,支持多线程并发操作,可保证数据的一致性。

与ArrayBlockingQueue相同的是,LinkedBlockingQueue也实现了元素“先进先出(FIFO)”规则,也使用ReentrantLock来保证数据的一致性;

与ArrayBlockingQueue不同的是,LinkedBlockingQueue通常被认为是“无界”的,在默认情况下LinkedBlockingQueue的链表长度为Integer.MAX_VALUE。

成员变量

对于ArrayBlockingQueue来说,当队列在进行入队和出队时,永远只能有一个操作被执行。因为该队列只有一把锁,所以在多线程执行中并不允许同时出队和入队。

与ArrayBlockingQueue不同的是,LinkedBlockingQueue拥有两把锁,一把读锁,一把写锁,可以在多线程情况下,满足同时出队和入队的操作。

在ArrayBlockingQueue中,由于出队入队使用了同一把锁,无论元素增加还是减少,都不会影响到队列元素数量的统计,所以使用了int类型的变量作为队列数量统计。

但是,在LinkedBlockingQueue中则不同。上面说了,在LinkedBlockingQueue中使用了2把锁,在同时出队入队时,都会涉及到对元素数量的并发修改,会有线程安全的问题。因此,在LinkedBlockingQueue中使用了原子操作类AtomicInteger,底层使用CAS(compare and set)来解决数据安全问题。

public class LinkedBlockingQueue<E> extends AbstractQueue<E>implements BlockingQueue<E>, java.io.Serializable {//队列容量大小,默认为Integer.MAX_VALUEprivate final int capacity;//队列中元素个数:(与ArrayBlockingQueue的不同)//出队和入队是两把锁private final AtomicInteger count = new AtomicInteger(0);//队列--头结点private transient Node<E> head;//队列--尾结点private transient Node<E> last;//与ArrayBlockingQueue的不同,两把锁//读取锁private final ReentrantLock takeLock = new ReentrantLock();//出队等待条件private final Condition notEmpty = takeLock.newCondition();//插入锁private final ReentrantLock putLock = new ReentrantLock();//入队等待条件private final Condition notFull = putLock.newCondition();
}

构造函数

既可以是有界又可以是无解的 :

//默认构造函数:
public LinkedBlockingQueue() {//默认队列长度为Integer.MAX_VALUEthis(Integer.MAX_VALUE);
}//指定队列长度的构造函数:
public LinkedBlockingQueue(int capacity) {//初始化链表长度不能为0if (capacity <= 0) throw new IllegalArgumentException();this.capacity = capacity;//设置头尾结点,元素为nulllast = head = new Node<E>(null);
}

插入元素(入队)

LinkedBlockingQueue的插入获取和ArrayBlockingQueue基本类似,都包含有阻塞式和非阻塞式。

put(E e)是阻塞式插入,如果队列中的元素与链表长度相同,则此线程等待,直到有空余空间时,才执行。

//向队列尾部插入元素:队列满了线程等待
public void put(E e) throws InterruptedException {//不能插入为null元素:if (e == null) throw new NullPointerException();int c = -1;//创建元素结点:Node<E> node = new Node(e);final ReentrantLock putLock = this.putLock;final AtomicInteger count = this.count;//加插入锁,保证数据的一致性:putLock.lockInterruptibly();try {//当队列元素个数==链表长度while (count.get() == capacity) {//插入线程等待:notFull.await();}//插入元素:enqueue(node);//队列元素增加:count+1,但返回+1前的count值:c = count.getAndIncrement();//容量还没满,唤醒生产者线程// (例如链表长度为5,此时第五个元素已经插入,c=4,+1=5,所以超过了队列容量,则不会再唤醒生产者线程)if (c + 1 < capacity)notFull.signal();} finally {//释放锁:putLock.unlock();}//当c=0时,即意味着之前的队列是空队列,消费者线程都处于等待状态,需要被唤醒进行消费if (c == 0)//唤醒消费者线程:signalNotEmpty();
}

offer(E e)是非阻塞式插入,队列中的元素与链表长度相同时,直接返回false,不会阻塞线程。

//向队列尾部插入元素:返回true/false
public boolean offer(E e) {//插入元素不能为空if (e == null) throw new NullPointerException();final AtomicInteger count = this.count;//如果队列元素==链表长度,则直接返回falseif (count.get() == capacity)return false;int c = -1;//创建元素结点对象:Node<E> node = new Node(e);final ReentrantLock putLock = this.putLock;//加锁,保证数据一致性putLock.lock();try {//队列元素个数 小于 链表长度if (count.get() < capacity) {//向队列中插入元素:enqueue(node);//增加队列元素个数:c = count.getAndIncrement();//容量还没满,唤醒生产者线程:if (c + 1 < capacity)notFull.signal();}} finally {//释放锁:putLock.unlock();}//此时,代表队列中还有一条数据,可以进行消费,唤醒消费者线程if (c == 0)signalNotEmpty();return c >= 0;
}

获取元素(出队)

take():阻塞式出队,获取队列头部元素,如果队列中没有元素,则此线程的等待,直到队列中有元素才执行。

//从队列头部获取元素,并返回。队列为null,则一直等待
public E take() throws InterruptedException {E x;int c = -1;final AtomicInteger count = this.count;final ReentrantLock takeLock = this.takeLock;//设置读取锁:takeLock.lockInterruptibly();try {//如果此时队列为空,则获取线程等待while (count.get() == 0) {notEmpty.await();}//从队列头部获取元素:x = dequeue();//减少队列元素-1,返回count减少前的值;c = count.getAndDecrement();//队列中还有可以消费的元素,唤醒其他消费者线程if (c > 1)notEmpty.signal();} finally {//释放锁:takeLock.unlock();}//队列中出现了空余元素,唤醒生产者进行生产。// (链表长度为5,队列在执行take前有5个元素,执行到此处时候有4个元素了,但是c的值还是5,所以会进入到if中来)if (c == capacity)signalNotFull();return x;
}

poll():非阻塞式出队,当队列中没有元素,则返回null.

//获取头部元素,并返回。队列为空,则直接返回null
public E poll() {final AtomicInteger count = this.count;//如果队列中还没有元素,则直接返回 nullif (count.get() == 0)return null;E x = null;int c = -1;final ReentrantLock takeLock = this.takeLock;//加锁,保证数据的安全takeLock.lock();try {//此时在判断,队列元素是否大于0if (count.get() > 0) {//移除队头元素x = dequeue();//减少队列元素个数c = count.getAndDecrement();//此时队列中,还有1个元素,唤醒消费者线程继续执行if (c > 1)notEmpty.signal();}} finally {//释放锁:takeLock.unlock();}//队列中出现了空余元素,唤醒生产者进行生产。// (链表长度为5,队列在执行take前有5个元素,执行到此处时候有4个元素了,但是c的值还是5,所以会进入到if中来)if (c == capacity)signalNotFull();return x;
}

ArrayBlockingQueue与LinkedBlockingQueue对比

ArrayBlockingQueue底层基于数组实现,需要使用者指定队列长度,是一个不折不扣的有界队列。

LinkedBlockingQueue底层基于链表实现,无需使用者指定队列长度(可自定义),当使用默认大小时候,是一个无界队列。

ArrayBlockingQueue由于默认必须设置队列长度,所以在使用时会能更好的预测系统性能;而LinkedBlockingQueue默认无参构造,无需指定队列长度,所以在使用时一定要多加注意,当队列中元素短时间内暴增时,可能会对系统产生灾难性影响。

但是,LinkedBlockingQueue的一大优点也是ArrayBlockingQueue所不具备的,那么就是在多个CPU的情况下,LinkedBlockingQueue可以做到同一时刻既消费、又生产。故LinkedBlockingQueue的性能也要优于ArrayBlockingQueue。

生产者消费者实现:

使用LinkedBlockingQueue简单模拟消费者生产者实现;

public class LinkedBlockingQueueTest {static class Apple{String colour;public Apple(String colour){this.colour = colour;}public String getColour() {return colour;}public void setColour(String colour) {this.colour = colour;}}//生产者static class Producer implements Runnable{LinkedBlockingQueue<Apple> queueProducer ;Apple apple;public Producer( LinkedBlockingQueue<Apple> queueProducer,Apple apple){this.queueProducer = queueProducer;this.apple = apple;}public void run() {try {System.out.println("生产"+apple.getColour()+"的苹果");queueProducer.put(apple);} catch (InterruptedException e) {e.printStackTrace();}}}//消费者static class Consumer implements Runnable{LinkedBlockingQueue<Apple> queueConsumer ;public Consumer(LinkedBlockingQueue<Apple> queueConsumer){this.queueConsumer = queueConsumer;}public void run() {try {Apple apple = queueConsumer.take();System.out.println("消费"+apple.getColour()+"的苹果");} catch (InterruptedException e) {e.printStackTrace();}}}public static void main(String[] args) throws InterruptedException {LinkedBlockingQueue<Apple> queue = new LinkedBlockingQueue<Apple>();Apple appleRed = new Apple("红色");Apple appleGreen = new Apple("绿色");Producer producer1 = new Producer(queue,appleRed);Producer producer2 = new Producer(queue,appleGreen);Consumer consumer = new Consumer(queue);producer1.run();producer2.run();consumer.run();Thread.sleep(10000);}
}

concurrent之executors相关

构造函数

在Java中,线程池的概念是Executor这个接口,具体实现为ThreadPoolExecutor类,学习Java中的线程池,就可以直接学习他了

//五个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue)//六个参数的构造函数-1
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory)//六个参数的构造函数-2
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler)//七个参数的构造函数
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
  • int corePoolSize => 该线程池中核心线程数最大值

线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程

核心线程默认情况下会一直存活在线程池中,即使这个核心线程啥也不干(闲置状态)。

如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果不干活(闲置状态)的话,超过一定时间(时长下面参数决定),就会被销毁掉

  • int maximumPoolSize:该线程池中线程总数最大值

线程总数 = 核心线程数 + 非核心线程数。

  • long keepAliveTime:非核心线程闲置超时时长

一个非核心线程,如果不干活(闲置状态)的时长超过这个参数所设定的时长,就会被销毁掉

如果设置allowCoreThreadTimeOut = true,则会作用于核心线程

TimeUnit unit:eepAliveTime的单位,TimeUnit是一个枚举类型

BlockingQueue<Runnable> workQueue

该线程池中的任务队列:维护着等待执行的Runnable对象

当所有的核心线程都在干活时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务

常用的workQueue类型:

  1. SynchronousQueue:这个队列接收到任务的时候,会直接提交给线程处理,而不保留它,如果所有线程都在工作怎么办?那就新建一个线程来处理这个任务!所以为了保证不出现<线程数达到了maximumPoolSize而不能新建线程>的错误,使用这个类型队列的时候,maximumPoolSize一般指定成Integer.MAX_VALUE,即无限大

  2. LinkedBlockingQueue:这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize

  3. ArrayBlockingQueue:可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则发生错误

  4. DelayQueue:队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务

ThreadFactory threadFactory:创建线程工厂:

new ThreadFactory() {private final AtomicInteger mCount = new AtomicInteger(1);public Thread new Thread(Runnable r) {return new Thread(r,"AsyncTask #" + mCount.getAndIncrement());}
}

RejectedExecutionHandler handler

拒绝策略:

  1. AbortPolicy -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
  2. CallerRunsPolicy -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。
  3. DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
  4. DiscardPolicy -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝 的任务。

执行策略

上面介绍参数的时候其实已经说到了ThreadPoolExecutor执行的策略,这里给总结一下,当一个任务被添加进线程池时:

  1. 线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务
  2. 线程数量达到了corePools,则将任务移入队列等待
  3. 队列已满,新建线程(非核心线程)执行任务
  4. 队列已满,总线程数又达到了maximumPoolSize,就会(RejectedExecutionHandler)抛出异常

线程池类型

FixedThreadPool()

ExecutorsnewFixedThreadPool方法创建。它是一种线程数量固定的线程池,当线程处于空闲状态时,他们并不会被回收,除非线程池被关闭。当所有的线程都处于活动状态时,新的任务都会处于等待状态,直到有线程空闲出来。FixedThreadPool只有核心线程,且该核心线程都不会被回收,这意味着它可以更快地响应外界的请求

特点:

  1. 可控制线程最大并发数(同时执行的线程数)
  2. 超出的线程会在队列中等待
public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
}

CachedThreadPool()

ExecutorsnewCachedThreadPool方法创建,不存在核心线程,只存在数量不定的非核心线程,而且其数量最大值为Integer.MAX_VALUE。当线程池中的线程都处于活动时(全满),线程池会创建新的线程来处理新的任务,否则就会利用新的线程来处理新的任务,线程池中的空闲线程都有超时机制,默认超时时长为60s,超过60s的空闲线程就会被回收。和FixedThreadPool不同的是,CachedThreadPool的任务队列其实相当于一个空的集合,这将导致任何任务都会被执行,因为在这种场景下SynchronousQueue是不能插入任务的,SynchronousQueue是一个特殊的队列,在很多情况下可以理解为一个无法储存元素的队列。从CachedThreadPool的特性看,这类线程比较适合执行大量耗时较小的任务。当整个线程池都处于闲置状态时,线程池中的线程都会因为超时而被停止回收,几乎是不占任何系统资源。

特点:

  1. 线程数无限制
  2. 有空闲线程则复用空闲线程,若无空闲线程则新建线程
  3. 一定程序减少频繁创建/销毁线程,减少系统开销

创建方法:

ExecutorService cachedThreadPool = Executors.newCachedThreadPool();

public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}

ScheduledThreadPool()

通过ExecutorsnewScheduledThreadPool方式创建,核心线程数量是固定的,而非核心线程是没有限制的,并且当非核心线程闲置时它会被立即回收,ScheduledThreadPool这类线程池主要用于执行定时任务和具有固定时期的重复任务。

特点:

  1. 支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {return new ScheduledThreadPoolExecutor(corePoolSize);
}//ScheduledThreadPoolExecutor():
public ScheduledThreadPoolExecutor(int corePoolSize) {super(corePoolSize, Integer.MAX_VALUE,DEFAULT_KEEPALIVE_MILLIS, MILLISECONDS,new DelayedWorkQueue());
}

SingleThreadExecutor()

通过ExecutorsnewSingleThreadExecutor方法来创建。这类线程池内部只有一个核心线程,它确保所有的任务都在同一个线程中按顺序执行。SingleThreadExecutor的意义在于统一所有外界任务一个线程中,这使得这些任务之间不需要处理线程同步的问题

特点:

  1. 有且仅有一个工作线程执行任务
  2. 所有任务按照指定顺序执行,即遵循队列的入队出队规则
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}

使用案例

案例一:countDownLatch

package com.br.lucky.utils;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;public class FatureTest {//1、配置线程池private static ExecutorService es = Executors.newFixedThreadPool(20);//2、封装响应Featureclass BizResult{public String orderId;public String  data;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getData() {return data;}public void setData(String data) {this.data = data;}}//3、实现Callable接口class BizTask implements Callable {private String orderId;private Object data;//可以用其他方式private CountDownLatch countDownLatch;public BizTask(String orderId, Object data, CountDownLatch countDownLatch) {this.orderId = orderId;this.data = data;this.countDownLatch = countDownLatch;}@Overridepublic Object call() {try {//todo businessSystem.out.println("当前线程Id = " + this.orderId);BizResult br = new BizResult();br.setOrderId(this.orderId);br.setData("some key about your business" + this.getClass());return br;}catch (Exception e){e.printStackTrace();}finally {//线程结束时,将计时器减一countDownLatch.countDown();}return null;}}/*** 业务逻辑入口*/public List<Future> beginBusiness() throws InterruptedException {//模拟批量业务数据List<String> list = new ArrayList<>();for (int i = 0 ; i < 1000 ; i++) {list.add(String.valueOf(i));}//设置计数器CountDownLatch countDownLatch = new CountDownLatch(list.size());//接收多线程响应结果List<Future> resultList = new ArrayList<>();//begin threadfor( int i = 0 ,size = list.size() ; i<size; i++){//todo something befor threadresultList.add(es.submit(new BizTask(list.get(i), null, countDownLatch)));}//wait finishcountDownLatch.await();return resultList;}public static void main(String[] args) throws InterruptedException {FatureTest ft = new FatureTest();List<Future> futures = ft.beginBusiness();System.out.println("futures.size() = " + futures.size());//todo some operateSystem.out.println(" ==========================end========================= " );}}

案例二:future

package com.br.lucky.utils;import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;public class FatureTest {/**
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("demo-pool-%d").build();ExecutorService pool = new ThreadPoolExecutor(5, 200,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.CallerRunsPolicy());for(int i=0;i<1000;i++) {pool.execute(() -> {//todo 业务逻辑在此});}
*///1、配置线程池private static ExecutorService es = Executors.newFixedThreadPool(20);//2、封装响应Featureclass BizResult{public String orderId;public String  data;public String getOrderId() {return orderId;}public void setOrderId(String orderId) {this.orderId = orderId;}public String getData() {return data;}public void setData(String data) {this.data = data;}}//3、实现Callable接口class BizTask implements Callable {private String orderId;private Object data;public BizTask(String orderId, Object data) {this.orderId = orderId;this.data = data;}@Overridepublic Object call() {try {//todo businessSystem.out.println("当前线程Id = " + this.orderId);BizResult br = new BizResult();br.setOrderId(this.orderId);br.setData("some key about your business" + this.getClass());Thread.sleep(3000);return br;}catch (Exception e){e.printStackTrace();}return null;}}/*** 业务逻辑入口*/public List<Future> beginBusiness() throws InterruptedException, ExecutionException {//模拟批量业务数据List<String> list = new ArrayList<>();for (int i = 0 ; i < 100 ; i++) {list.add(String.valueOf(i));}//接收多线程响应结果List<Future> resultList = new ArrayList<>();//begin threadfor( int i = 0 ,size = list.size() ; i<size; i++){//todo something befor threadFuture future = es.submit(new BizTask(list.get(i), null));resultList.add(future);}for (Future f : resultList) {f.get();}System.out.println(" =====多线程执行结束====== ");//wait finishreturn resultList;}public static void main(String[] args) throws InterruptedException, ExecutionException {FatureTest ft = new FatureTest();List<Future> futures = ft.beginBusiness();System.out.println("futures.size() = " + futures.size());//todo some operateSystem.out.println(" ==========================end========================= " );}}

参考文档:

https://blog.csdn.net/lmb55/article/details/79547685

https://www.cnblogs.com/zhuwenjoyce/p/9578382.html

https://www.jianshu.com/p/5a9a814c420e

https://blog.csdn.net/wtopps/article/details/85267115

https://www.jianshu.com/p/d2bbb300ce95

https://www.jianshu.com/p/210eab345423

https://www.jianshu.com/p/edd7cb4eafa0

线程和并发(三)阻塞队列和线程池相关推荐

  1. java 阻塞锁_Java实现锁、公平锁、读写锁、信号量、阻塞队列、线程池等常用并发工具...

    锁的实现 锁的实现其实很简单,主要使用Java中synchronized关键字. public class Lock { private volatile boolean isLocked = fal ...

  2. Java多线程闲聊(四):阻塞队列与线程池原理

    Java多线程闲聊(四)-阻塞队列与线程池原理 前言 复用永远是人们永恒的主题,这能让我们更好地避免重复制造轮子. 说到多线程,果然还是绕不开线程池,那就来聊聊吧. 人们往往相信,世界是存在一些规律的 ...

  3. 【JUC】第五章 JUC 阻塞队列、线程池

    第五章 JUC 阻塞队列.线程池 文章目录 第五章 JUC 阻塞队列.线程池 一.阻塞队列 1.简介 2.BlockingQueue 的方法 3.常见的 BlockingQueue 二.线程池 1.简 ...

  4. Java线程详解(15)-阻塞队列和阻塞栈

    Java线程:新特征-阻塞队列 阻塞队列是Java5线程新特征中的内容,Java定义了阻塞队列的接口java.util.concurrent.BlockingQueue,阻塞队列的概念是,一个指定长度 ...

  5. 让人抓头的Java并发(四) 阻塞队列--CPU飙升排查案例

    在上一篇文章中给大家介绍了牛批的AQS,大致讲解了JUC中同步的思路.本来还没想好这一篇应该写点什么,刚好上周某个同事的代码出现问题,排查后发现是使用阻塞队列不当导致的,所以本篇决定介绍下阻塞队列. ...

  6. java并发队列_Java并发教程–阻塞队列

    java并发队列 如第3部分所述,Java 1.5中引入的线程池提供了核心支持,该支持很快成为许多Java开发人员的最爱. 在内部,这些实现巧妙地利用了Java 1.5中引入的另一种并发功能-阻塞队列 ...

  7. Java并发教程–阻塞队列

    如第3部分所述,Java 1.5中引入的线程池提供了核心支持,该支持很快成为许多Java开发人员的最爱. 在内部,这些实现巧妙地利用了Java 1.5中引入的另一种并发功能-阻塞队列. 队列 首先,简 ...

  8. 【Java 并发编程】线程池机制 ( ThreadPoolExecutor 线程池构造参数分析 | 核心线程数 | 最大线程数 | 非核心线程存活时间 | 任务阻塞队列 )

    文章目录 前言 一.ThreadPoolExecutor 构造参数 二.newCachedThreadPool 参数分析 三.newFixedThreadPool 参数分析 四.newSingleTh ...

  9. 【Java 并发编程】线程池机制 ( 线程池执行任务细节分析 | 线程池执行 execute 源码分析 | 先创建核心线程 | 再放入阻塞队列 | 最后创建非核心线程 )

    文章目录 一.线程池执行任务细节分析 二.线程池执行 execute 源码分析 一.线程池执行任务细节分析 线程池执行细节分析 : 核心线程数 101010 , 最大小成熟 202020 , 非核心线 ...

最新文章

  1. OracleJDBC
  2. mysql base64
  3. 让Elasticsearch飞起来:性能优化实践干货
  4. Java的数据类型及其封装器类
  5. python多进程内存共享_Python—并发编程04多进程内存共享,python,间,的
  6. python列表冒号逗号常规用法
  7. leetcode —— 面试题62. 圆圈中最后剩下的数字
  8. 比_thread高级的threading模块,对比释放锁例子
  9. vs2010 c++项目创建简易教程
  10. 驾驶机动车在高速公路上倒车、逆行、穿越中央分隔带掉头的一次记6分。
  11. 845.数组中的最长山脉
  12. 利用ESP定律进行脱壳 ——合天网安实验室学习笔记
  13. 制作浏览器javascript书签
  14. 【C语言】0x1F<<11等于0还是0xF800 ?
  15. 信息收集之主动信息收集(一)
  16. WPS如何并排放置两张图片_WPS表格:如何批量将所有图片大小修改成一致?
  17. 使用HttpClient模拟POST请求
  18. LeetCode56. 合并区间(Java贪心解法)
  19. 明确数据分析目标的 3 个步骤!
  20. JUCE 0基础小白学习历程day1--基础了解

热门文章

  1. 易语言mysql数据库分页_易语言分页读数据库 mysql数据库分页
  2. 基于JAVA社区流浪猫狗救助网站计算机毕业设计源码+数据库+lw文档+系统+部署
  3. android BT 遥控器配置
  4. 腾讯发布首个全自研机器狗Max,有腿又有轮,会“拜年讨红包”
  5. 第一个合作开发的游戏项目--飞机大战(cocos creater)
  6. 2023年软件测试学什么?需要懂代码?经常加班吗?--测试老司机写给迷茫的你
  7. Java AES密码盐加密
  8. Win10/win11安装tensorflow,不用anaconda(高中生都能看懂版)/CUDA、cuDNN安装教程
  9. 牛客练习赛63---牛牛的树行棋
  10. 二极管损坏的原因有哪些?