ConcurrentLinkedQueue非阻塞无界链表队列 

ConcurrentLinkedQueue是一个线程安全的队列,基于链表结构实现,是一个无界队列,理论上来说队列的长度可以无限扩大。

与其他队列相同,ConcurrentLinkedQueue 也采用的是先进先出(FIFO)入队规则,对元素进行排序。当我们向队列中添加元素时,新插入的元素会插入到队列的尾部;而当我们获取一个元素时,它会从队列的头部中取出。

因为 ConcurrentLinkedQueue 是链表结构,所以当入队时,插入的元素依次向后延伸,形成链表;而出队时,则从链表的第一个元素开始获取,依次递增; 
不知道,我这样形容能否让你对链表的入队、出队产生一个大概的思路!

ConcurrentLinkedQuere 
值得注意的是,在使用ConcurrentLinkedQueue时,如果涉及到队列是否为空的判断,切记不可使用size()==0的做法,因为在 size()方法中,是通过遍历整个链表来实现的,在队列元素很多的时候,size()方法十分消耗性能和时间,只是单纯的判断队列为空使用isEmpty()即可!!!

public class ConcurrentLinkedQueueTest {

public static int threadCount = 10;

public static ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<String>();

static class Offer implements Runnable {

public void run() {

//不建议使用queue.size()==0,影响效率。可以使用!queue.isEmpty()

if(queue.size()==0){

String ele = new Random().nextInt(Integer.MAX_VALUE)+"";

queue.offer(ele);

System.out.println("入队元素为"+ele);

}

}

}

static class Poll implements Runnable {

public void run() {

if(!queue.isEmpty()){

String ele = queue.poll();

System.out.println("出队元素为"+ele);

}

}

}

public static void main(String[] agrs) { 
 ExecutorService executorService = Executors.newFixedThreadPool(4);  for(int x=0;x<threadCount;x++){

executorService.submit(new Offer());

executorService.submit(new Poll());

}

executorService.shutdown();

}

}

一种输出:

入队元素为313732926

出队元素为313732926

入队元素为812655435

出队元素为812655435

入队元素为1893079357

出队元素为1893079357

入队元素为1137820958

出队元素为1137820958

入队元素为1965962048

出队元素为1965962048

出队元素为685567162

入队元素为685567162

出队元素为1441081163

入队元素为1441081163

出队元素为1627184732

入队元素为1627184732

ConcurrentLinkedQuere

如图 ConcurrentLinkedQueue 中有两个 volatile 类型的 Node 节点分别用来存在列表的首尾节点,其中 head节点存放链表第一个 item 为 null 的节点,tail 则并不是总指向最后一个节点。Node 节点内部则维护一个变量 item用来存放节点的值,next用来存放下一个节点,从而链接为一个单向无界列表。

public ConcurrentLinkedQueue() {

head = tail = new Node<E>(null);

}

如上代码初始化时候会构建一个item为NULL的空节点作为链表的首尾节点。 ConcurrentLinkedQuere 

✓  Offer 

offer操作是在链表末尾添加一个元素,下面看看实现原理。

public boolean offer(E e) {

//e为null则抛出空指针异常

如图首先查找尾节点,q==null,p就是尾节点,所以执行p.casNext通过cas设置p的next为新增节点,

这时候p==t所以不重新设置尾节点为当前新节点。由于多线程可以调用offer方法,所以可能两个线程同时执行到了(1)进行cas,那么只有一个会成功(假如线程1成功了),成功后的链表为:

失败的线程会循环一次这时候指针为:

这时候会执行(3)所以p=q,然后在循环后指针位置为:

所以没有其他线程干扰的情况下会执行(1)执行cas把新增节点插入到尾部,没有干扰的情况下线程2 cas会成

功,然后去更新尾节点tail,由于p!=t所以更新。这时候链表和指针为:

假如线程2cas时候线程3也在执行,那么线程3会失败,循环一次后,线程3的节点状态为:

这时候p!=t ;并且t的原始值为told,t的新值为tnew ,所以told!=tnew,所以  p=tnew=tail;

然后在循环一下后节点状态:

q==null所以执行(1)。

现在就差p==q这个分支还没有走,这个要在执行poll操作后才会出现这个情况。poll后会存在下面的状态

这个时候添加元素时候指针分布为:

所以会执行(2)分支 结果  p=head

然后循环,循环后指针分布:

所以执行(1),然后p!=t所以设置tail节点。现在分布图:

自引用的节点会被垃圾回收掉。

✓  add 

add操作是在链表末尾添加一个元素,下面看看实现原理。

其实内部调用的还是offer

public boolean add(E e) {

return offer(e);

}

✓  poll操作 

poll操作是在链表头部获取并且移除一个元素,下面看看实现原理。

public E poll() {

restartFromHead:

//死循环

for (;;) {

//死循环

for (Node<E> h = head, p = h, q;;) {

//保存当前节点值

E item = p.item;

//当前节点有值则cas变为null(1)

if (item != null && p.casItem(item, null)) {

//cas成功标志当前节点以及从链表中移除

if (p != h) // 类似tail间隔2设置一次头节点(2)

updateHead(h, ((q = p.next) != null) ? q : p);

return item;

}

//当前队列为空则返回null(3)

else if ((q = p.next) == null) {

updateHead(h, p);

return null;

}

//自引用了,则重新找新的队列头节点(4)

else if (p == q)

continue restartFromHead;

else//(5)

p = q;

}

}

}

final void updateHead(Node<E> h, Node<E> p) {

if (h != p && casHead(h, p))

h.lazySetNext(h);

}

当队列为空时候:

可知执行(3)这时候有两种情况,第一没有其他线程添加元素时候(3)结果为true然后因为h!=p为false

所以直接返回null。第二在执行q=p.next前,其他线程已经添加了一个元素到队列,这时候(3)返回false,然后

执行(5)p=q,然后循环后节点分布:

这时候执行(1)分支,进行cas把当前节点值值为null,同时只有一个线程会成功,cas成功 标示该节点

从队列中移除了,然后p!=h,调用updateHead方法,参数为h,p;h!=p所以把p变为当前链表head节点,然后h节点的next指向自己。现在状态为:

cas失败 后 会再次循环,这时候分布图为:

这时候执行(3)返回null.

现在还有个分支(4)没有执行过,那么什么时候会执行那?

这时候执行(1)分支,进行cas把当前节点值值为null,同时只有一个线程A会成功,cas成功 标示该

节点从队列中移除了,然后p!=h,调用updateHead方法,假如执行updateHead前另外一个线程B开始poll这时候它p指向为原来的head节点,然后当前线程A执行updateHead这时候B线程链表状态为:

所以会执行(4)重新跳到外层循环,获取当前head,现在状态为:

✓  peek操作 

peek操作是获取链表头部一个元素(只读取不移除),下面看看实现原理。

代码与poll类似,只是少了castItem.并且peek操作会改变head指向,offer后head指向哨兵节点,第

一次peek后head会指向第一个真的节点元素。

public E peek() {

restartFromHead:

for (;;) {

for (Node<E> h = head, p = h, q;;) {

E item = p.item;

if (item != null || (q = p.next) == null) {

updateHead(h, p);

return item;

}

else if (p == q)

continue restartFromHead;

else

p = q;

}

}

}

✓  size 

获取当前队列元素个数,在并发环境下不是很有用,因为使用CAS没有加锁所以从调用size函数到返回结

果期间有可能增删元素,导致统计的元素个数不精确。

public int size() {

int count = 0;

for (Node<E> p = first(); p != null; p = succ(p))

if (p.item != null)

// 最大返回Integer.MAX_VALUE

if (++count == Integer.MAX_VALUE)

break;

return count;

}

//获取第一个队列元素(哨兵元素不算),没有则为null

Node<E> first() {

restartFromHead:

for (;;) {

for (Node<E> h = head, p = h, q;;) {

boolean hasItem = (p.item != null);

if (hasItem || (q = p.next) == null) {

updateHead(h, p);

return hasItem ? p : null;

}

else if (p == q)

continue restartFromHead;

else

p = q;

}

}

}

//获取当前节点的next元素,如果是自引入节点则返回真正头节点

final Node<E> succ(Node<E> p) {

Node<E> next = p.next;

return (p == next) ? head : next;

}

✓  remove操作 

如果队列里面存在该元素则删除给元素,如果存在多个则删除第一个,并返回true,否者返回false

public boolean remove(Object o) {

//查找元素为空,直接返回false

if (o == null) return false;

Node<E> pred = null;

for (Node<E> p = first(); p != null; p = succ(p)) {

E item = p.item;

//相等则使用cas值null,同时一个线程成功,失败的线程循环查找队列中其他元素是否有匹配的。  if (item != null &&

o.equals(item) &&

p.casItem(item, null)) {

//获取next元素

Node<E> next = succ(p);

//如果有前驱节点,并且next不为空则链接前驱节点到next,

if (pred != null && next != null)

pred.casNext(p, next);

return true;

}

pred = p;

}

return false;

}

✓  contains 

判断队列里面是否含有指定对象,由于是遍历整个队列,所以类似size 不是那么精确,有可能调用该方法

时候元素还在队列里面,但是遍历过程中才把该元素删除了,那么就会返回false.

public boolean contains(Object o) {

if (o == null) return false;

for (Node<E> p = first(); p != null; p = succ(p)) {

E item = p.item;

if (item != null && o.equals(item))

return true;

}

return false;

}

ConcurrentLinkedQuereoffer 

offer中有个 判断  t != (t = tail)假如  t=node1;tail=node2;并且node1!=node2那么这个判断是true还

是false那,答案是true,这个判断是看当前t是不是和tail相等,相等则返回true否者为false,但是无论

结果是啥执行后t的值都是tail。

下面从字节码来分析下为啥。

•  一个例子

public static void main(String[] args)  {

int t = 2;

int tail = 3;

System.out.println(t != (t = tail));

}

结果为:true

•  字节码文件

C:\Users\Simple\Desktop\TeacherCode\Crm_Test\build\classes\com\itheima\crm\util>javap -c Test001 警告: 二进制文件Test001包含com.itheima.crm.util.Test001

Compiled from "Test001.java"

public class com.itheima.crm.util.Test001 {

public com.itheima.crm.util.Test001();

Code:

0: aload_0

1: invokespecial #8                  // Method java/lang/Object."<init>":()V

4: return

public static void main(java.lang.String[]);

Code:

0: iconst_2

1: istore_1

2: iconst_3

3: istore_2

4: getstatic     #16                 // Field java/lang/System.out:Ljava/io/PrintStream;

7: iload_1

8: iload_2

9: dup

10: istore_1

11: if_icmpeq     18

14: iconst_1

15: goto          19

18: iconst_0

19: invokevirtual #22                 // Method java/io/PrintStream.println:(Z)V

22: return

}

我们从上面标黄的字节码文件中分析

一开始栈为空:

•  第0行指令作用是把值2入栈栈顶元素为2

2

•  第1行指令作用是将栈顶int类型值保存到局部变量t中

•  第2行指令作用是把值3入栈栈顶元素为3

3

•  第3行指令作用是将栈顶int类型值保存到局部变量tail中。

•  第4调用打印命令

•  第7行指令作用是把变量t中的值入栈

2

•  第8行指令作用是把变量tail中的值入栈

3

2

•  现在栈里面的元素为3、2,并且3位于栈顶

•  第9行指令作用是当前栈顶元素入栈,所以现在栈内容3,3,2

3

3

2

•  第10行指令作用是把栈顶元素存放到t,现在栈内容3,2

3

2

•  第 11 行指令作用是判断栈顶两个元素值,相等则跳转  18。由于现在栈顶严肃为 3,2 不相等所以返回true.

•  第14行指令作用是把1入栈

•  然后回头分析下!=是双目运算符,应该是首先把左边的操作数入栈,然后在去计算了右侧操作数。

ConcurrentLinkedQuere 
ConcurrentLinkedQueue使用CAS非阻塞算法实现使用CAS解决了当前节点与next节点之间的安全链接和对当前节点值的赋值。由于使用CAS没有使用锁,所以获取size的时候有可能进行offer,poll或者remove操作,导致获取的元素个数不精确,所以在并发情况下size函数不是很有用。另外第一次peek或者first时候会把head指向第一个真正的队列元素。

下面总结下如何实现线程安全的,可知入队出队函数都是操作volatile变量:head,tail。所以要保证队列线程安全只需要保证对这两个 Node 操作的可见性和原子性,由于 volatile 本身保证可见性,所以只需要看下多线程下如果保证对着两个变量操作的原子性。

对于offer操作是在tail后面添加元素,也就是调用tail.casNext方法,而这个方法是使用的CAS操作,只有一个线程会成功,然后失败的线程会循环一下,重新获取tail,然后执行casNext方法。对于poll也是这样的。

➢ ConcurrentHashMap非阻Hash 

ConcurrentHashMap 是 Java 并发包中提供的一个线程安全且高效的 HashMap 实现,ConcurrentHashMap在并发编程的场景中使用频率非常之高,本文就来分析下ConcurrentHashMap的实现原理,并对其实现原理进行分析。

ConcurrentLinkedQuere 

ConcurrentHashMap 是由 Segment 数组结构和 HashEntry 数组结构组成。Segment 是一种可重入锁ReentrantLock,在 ConcurrentHashMap 里扮演锁的角色,HashEntry 则用于存储键值对数据。一个ConcurrentHashMap里包含一个Segment数组,Segment的结构和HashMap类似,是一种数组和链表结构, 一个 Segment 里包含一个 HashEntry 数组,每个 HashEntry 是一个链表结构的元素, 每个 Segment 守护者一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得它对应的Segment锁。

ConcurrentLinkedQuere 
众所周知,哈希表是中非常高效,复杂度为 O(1)的数据结构,在 Java 开发中,我们最常见到最频繁使用的就是HashMap和HashTable,但是在线程竞争激烈的并发场景中使用都不够合理。

HashMap :先说HashMap,HashMap是线程不安全的,在并发环境下,可能会形成环状链表(扩容时可能造成,具体原因自行百度google或查看源码分析),导致get操作时,cpu空转,所以,在并发环境中使用HashMap是非常危险的。

HashTable :  HashTable和HashMap的实现原理几乎一样,差别无非是1.HashTable不允许key和value为null;2.HashTable 是线程安全的。但是 HashTable 线程安全的策略实现代价却太大了,简单粗暴,get/put 所有相关操作都是synchronized的,这相当于给整个哈希表加了一把大锁,多线程访问时候,只要有一个线程访问或操作该对象,那其他线程只能阻塞,相当于将所有的操作串行化,在竞争激烈的并发场景中性能就会非常差。如下图

HashTable 性能差主要是由于所有操作需要竞争同一把锁,而如果容器中有多把锁,每一把锁锁一段数据,这样在多线程访问时不同段的数据时,就不会存在锁竞争了,这样便可以有效地提高并发效率。这就是ConcurrentHashMap所采用的"分段锁"思想

ConcurrentLinkedQuere 
ConcurrentHashMap采用了非常精妙的"分段锁"策略,ConcurrentHashMap的主干是个Segment数组。

final Segment<K,V>[] segments;

Segment继承了ReentrantLock,所以它就是一种可重入锁(ReentrantLock)。在ConcurrentHashMap,

一个Segment就是一个子哈希表,Segment里维护了一个HashEntry数组,并发环境下,对于不同Segment

的数据进行操作是不用考虑锁竞争的。(就按默认的ConcurrentLeve为16来讲,理论上就允许16个线程

并发执行,有木有很酷)

所以,对于同一个Segment的操作才需考虑线程同步,不同的Segment则无需考虑。Segment类似于HashMap,

一个Segment维护着一个HashEntry数组

transient volatile HashEntry<K,V>[] table;

HashEntry是目前我们提到的最小的逻辑处理单元了。一个ConcurrentHashMap维护一个Segment数组,

一个Segment维护一个HashEntry数组。

static final class HashEntry<K,V> {

final int hash;

final K key;

volatile V value;

volatile HashEntry<K,V> next;

//其他省略

}

我们说 Segment 类似哈希表,那么一些属性就跟我们之前提到的 HashMap 差不离,比如负载因子

loadFactor,比如阈值threshold等等,看下Segment的构造方法

Segment(float lf, int threshold, HashEntry<K,V>[] tab) {

this.loadFactor = lf;//负载因子

this.threshold = threshold;//阈值

this.table = tab;//主干数组即HashEntry数组

}

我们来看下ConcurrentHashMap的构造方法

public ConcurrentHashMap(int initialCapacity,

float loadFactor, int concurrencyLevel) {

if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)   throw new IllegalArgumentException();

//MAX_SEGMENTS 为1<<16=65536,也就是最大并发数为65536

if (concurrencyLevel > MAX_SEGMENTS)

concurrencyLevel = MAX_SEGMENTS;
 //2的sshif次方等于ssize,例:ssize=16,sshift=4;ssize=32,sshif=5  int sshift = 0;

//ssize 为segments数组长度,根据concurrentLevel计算得出

int ssize = 1;

while (ssize < concurrencyLevel) {

++sshift;

ssize <<= 1;


 //segmentShift和segmentMask这两个变量在定位segment时会用到,后面会详细讲  this.segmentShift = 32 - sshift;

this.segmentMask = ssize - 1;

if (initialCapacity > MAXIMUM_CAPACITY)

initialCapacity = MAXIMUM_CAPACITY; 
 //计算cap的大小,即Segment中HashEntry的数组长度,cap也一定为2的n次方.

int c = initialCapacity / ssize;

if (c * ssize < initialCapacity)

++c;

int cap = MIN_SEGMENT_TABLE_CAPACITY;

while (cap < c)

cap <<= 1; 
 //创建segments数组并初始化第一个Segment,其余的Segment延迟初始化  Segment<K,V> s0 =

new Segment<K,V>(loadFactor, (int)(cap * loadFactor),

(HashEntry<K,V>[])new HashEntry[cap]);

Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];

UNSAFE.putOrderedObject(ss, SBASE, s0);

this.segments = ss;

}

初始化方法有三个参数,如果用户不指定则会使用默认值,initialCapacity 为 16,loadFactor 为 0.75(负载因

子,扩容时需要参考),concurrentLevel为16。

从上出来,SegmentssizeconcurrentLevel一定等于

concurrentLevelssize 大于或 concurrentLevel 最小 2 的次幂比如:默

concurrentLevel  16,则 ssize 16concurrentLevel 14ssize16;若 concurrentLevel

17ssize32Segment2要是便列算来定

Segmentindex 

其实,put方法对segment也会有所体现

public V put(K key, V value) {

Segment<K,V> s;

//concurrentHashMap不允许key/value为空

if (value == null)

throw new NullPointerException();

//hash函数对key的hashCode重新散列,避免差劲的不合理的hashcode,保证散列均匀

int hash = hash(key);

//返回的hash值无符号右移segmentShift位与段掩码进行位运算,定位segment

int j = (hash >>> segmentShift) & segmentMask;

if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck

(segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment

s = ensureSegment(j);

return s.put(key, hash, value, false);

}

从源码看出,put的主要逻辑也就两步:

1.定位segment并确保定位的Segment已初始化

2.调用Segment的put方法。

PssegmentShiftsegmentMask 
  segmentShift和segmentMask这两个全局变量的主要作用是用来定位Segment,int j =(hash >>> segmentShift) & segmentMask。

segmentMask:段掩码,假如 segments 数组长度为 16,则段掩码为 16-1=15;segments 长度为32,段掩码为32-1=31。这样得到的所有bit位都为1,可以更好地保证散列的均匀性 
segmentShift:2 的 sshift 次方等于 ssize,segmentShift=32-sshift。若 segments 长度为 16,segmentShift=32-4=28;若 segments 长度为 32,segmentShift=32-5=27。而计算得出的 hash 值最大为32位,无符号右移segmentShift,则意味着只保留高几位(其余位是没用的),然后与段掩码segmentMask位运算来定位Segment。

ConcurrentLinkedQuere

✓  Get 

public V get(Object key) {

Segment<K,V> s;

HashEntry<K,V>[] tab;

int h = hash(key);
 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;  //先定位Segment,再定位HashEntry
 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&   (tab = s.table) != null) { 
 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile

(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);

e != null; e = e.next) {

K k;

if ((k = e.key) == key || (e.hash == h && key.equals(k)))

return e.value;

}

}

return null;

}

get 无需其中使用 volatile volatile 

不会 

来看下 concurrentHashMap 代理到 Segment 上的 put 方法,Segment 中的 put 方法是要加锁的。

只不过是锁粒度细了而已。

✓  Put 

final V put(K key, int hash, V value, boolean onlyIfAbsent) {

HashEntry<K,V> node = tryLock() ? null : 
 scanAndLockForPut(key,  hash,  value);//tryLock 不成功时会遍历定位到的 HashEnry 位置的链表

(遍历主要是为了使 CPU 缓存链表),若找不到,则创建 HashEntry。tryLock 一定次数后(MAX_SCAN_RETRIES 变量决定),则lock。若遍历过程中,由于其他线程的操作导致链表头结点变化,则需要重新遍历。

V oldValue;

try {

HashEntry<K,V>[] tab = table; 
 int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,这个hash值在定位Segment

时和在Segment中定位HashEntry都会用到,只不过定位Segment时只用到高几位。

HashEntry<K,V> first = entryAt(tab, index);

for (HashEntry<K,V> e = first;;) {

if (e != null) {

K k;

if ((k = e.key) == key ||

(e.hash == hash && key.equals(k))) {

oldValue = e.value;

if (!onlyIfAbsent) {

e.value = value;

++modCount;

}

break;

}

e = e.next;

}

else {

if (node != null)

node.setNext(first);

else

node = new HashEntry<K,V>(hash, key, value, first);

int c = count + 1; 
  //若c超出阈值threshold,需要扩容并rehash。扩容后的容量是当前容量的2倍。这样可以最大程

度避免之前散列好的entry重新散列,具体在另一篇文章中有详细分析,不赘述。扩容并rehash的这个过程是比较消耗资源的。

if (c > threshold && tab.length < MAXIMUM_CAPACITY)

rehash(node);

else

setEntryAt(tab, index, node);

++modCount;

count = c;

oldValue = null;

break;

}

}

} finally {

unlock();

}

return oldValue;

}

ConcurrentLinkedQuere

ConcurrentHashMap 作为一种线程安全且高效的哈希表的解决方案,尤其其中的"分段锁"的方案,相比

HashTable的全表锁在性能上的提升非常之大。本文对ConcurrentHashMap的实现原理进行了详细分析,并解读了

部分源码,希望能帮助到有需要的童鞋。

➢ ConcurrentSkipListMap非阻Hash表集合 

大家都是知道 TreeMap,它是使用树形结构的方式进行存储数据的线程不安全的 Map 集合(有序的哈希表),

并且可以对Map中的Key进行排序,Key中存储的数据需要实现Comparator接口或使用CompareAble接口的子类来实现排序。

ConcurrentSkipListMap也是和TreeMap,它们都是有序的哈希表。但是,它们是有区别的: 
第一,它们的线程安全机制不同,TreeMap 是非线程安全的,而 ConcurrentSkipListMap 是线程安全的。第二,ConcurrentSkipListMap是通过跳表实现的,而TreeMap是通过红黑树实现的。

那现在我们需要知道说明是跳表。

SkipList 
Skip list(跳表)是一种可以代替平衡树的数据结构,默认是按照Key值升序的。Skip list让已排序的数据分布在多层链表中,以 0-1 随机数决定一个数据的向上攀升与否,通过“空间来换取时间”的一个算法,在每个节点中增加了向前的指针,在插入、删除、查找时可以忽略一些不可能涉及到的结点,从而提高了效率。

从概率上保持数据结构的平衡比显示的保持数据结构平衡要简单的多。对于大多数应用,用Skip list要比用树算法相对简单。由于Skip list比较简单,实现起来会比较容易,虽然和平衡树有着相同的时间复杂度(O(logn)),但是skip list的常数项会相对小很多。Skip list在空间上也比较节省。一个节点平均只需要1.333个指针(甚至更少)。

下图为Skip list结构图(以7,14,21,32,37,71,85序列为例)

SkipList性质

(1) 由很多层结构组成,level是通过一定的概率随机产生的。

(2) 每一层都是一个有序的链表,默认是升序,也可以根据创建映射时所提供的 Comparator 进行排序,具体取决于使用的构造方法。

(3) 最底层(Level 1)的链表包含所有元素。

(4) 如果一个元素出现在Level i 的链表中,则它在Level i 之下的链表也都会出现。

(5) 每个节点包含两个指针,一个指向同一链表中的下一个元素,一个指向下面一层的元素。

ConcurrentSkipListMap 
ConcurrentSkipListMap提供了一种线程安全的并发访问的排序映射表。内部是SkipList(跳表)结构实现,在理论上能够在O(log(n))时间内完成查找、插入、删除操作。 注意,调用ConcurrentSkipListMap的size时,由于多个线程可以同时对映射表进行操作,所以映射表需要遍历整个链表才能返回元素个数,这个操作是个O(log(n))的操作。

ConcurrentSkipListMap

ConcurrentSkipListMap的数据结构,如下图所示:

说明:可以看到 ConcurrentSkipListMap 的数据结构使用的是跳表,每一个 HeadIndex、Index 结点都会包含一个对Node的引用,同一垂直方向上的Index、HeadIndex结点都包含了最底层的Node结点的引用。并且层级越高,该层级的结点(HeadIndex和Index)数越少。Node结点之间使用单链表结构。

ConcurrentSkipListMap 
ConcurrentSkipListMap主要用到了Node和Index两种节点的存储方式,通过volatile关键字实现了并发的操作

static final class Node<K,V> {

final K key;

volatile Object value;//value值

volatile Node<K,V> next;//next引用

……

}

static class Index<K,V> {

final Node<K,V> node;

final Index<K,V> down;//downy引用

volatile Index<K,V> right;//右边引用

……

}

✓  ConcurrentSkipListMap 

通过SkipList的方式进行查找操作:(下图以“查找91”进行说明:)

红色虚线,表示查找的路径,蓝色向右箭头表示right引用;黑色向下箭头表示down引用;

下面就是源码中的实现(get方法是通过doGet方法来时实现的)

public V get(Object key) {

return doGet(key);

}

//doGet的实现

private V doGet(Object okey) {

Comparable<? super K> key = comparable(okey);

Node<K,V> bound = null;

Index<K,V> q = head;//把头结点作为当前节点的前驱节点

Index<K,V> r = q.right;//前驱节点的右节点作为当前节点

Node<K,V> n;

K k;

int c;

for (;;) {//遍历

Index<K,V> d;

// 依次遍历right节点   
 if (r != null && (n = r.node) != bound && (k = n.key) != null) {   
 if ((c = key.compareTo(k)) > 0) {//由于key都是升序排列的,所有当前关键字大于所要

查找的key时继续向右遍历

q = r;

r = r.right;

continue;

} else if (c == 0) {   
 //如果找到了相等的key节点,则返回该Node的value如果value为空可能是其他并发delete

导致的,于是通过另一种

//遍历findNode的方式再查找

Object v = n.value;

return (v != null)? (V)v : getUsingFindNode(key);

} else

bound = n;

}   
 //如果一个链表中right没能找到key对应的value,则调整到其down的引用处继续查找    if ((d = q.down) != null) {

q = d;

r = d.right;

} else

break;

}   
 // 如果通过上面的遍历方式,还没能找到key对应的value,再通过Node.next的方式进行查找    for (n = q.node.next;  n != null; n = n.next) {

if ((k = n.key) != null) {

if ((c = key.compareTo(k)) == 0) {

Object v = n.value;

return (v != null)? (V)v : getUsingFindNode(key);

} else if (c < 0)

break;

}

}

return null;

}

✓  ConcurrentSkipListMap 

通过SkipList的方式进行删除操作:(下图以“删除23”进行说明:)

红色虚线,表示查找的路径,蓝色向右箭头表示right引用;黑色向下箭头表示down引用;

下面就是源码中的实现(remove方法是通过doRemove方法来时实现的)

//remove操作,通过doRemove实现,把所有level中出现关键字key的地方都delete掉 public V remove(Object key) {

return doRemove(key, null);

}

final V doRemove(Object okey, Object value) {

Comparable<? super K> key = comparable(okey);

for (;;) {   
 Node<K,V> b = findPredecessor(key);//得到key的前驱(就是比key小的最大节点)    Node<K,V> n = b.next;//前驱节点的next引用

for (;;) {//遍历

if (n == null)//如果next引用为空,直接返回

return null;

Node<K,V> f = n.next;   
 if (n != b.next)                    // 如果两次获得的b.next不是相同的Node,就跳转

到第一层循环重新获得b和n

break;

Object v = n.value;   
 if (v == null) {                    // 当 n 被其他线程 delete 的时候,其 value==null,

此时做辅助处理,并重新获取b和n

n.helpDelete(b, f);

break;

}   
 if (v == n || b.value == null)      // 当其前驱被delet的时候直接跳出,重新获取b和n     break;

int c = key.compareTo(n.key);

if (c < 0)

return null;

if (c > 0) {//当key较大时就继续遍历

b = n;

n = f;

continue;

}

if (value != null && !value.equals(v))

return null;

if (!n.casValue(v, null))

break;   
 if (!n.appendMarker(f) || !b.casNext(n, f))//casNext方法就是通过比较和设置b(前驱)

的next节点的方式来实现删除操作

findNode(key);                  // 通过尝试findNode的方式继续find    else {

findPredecessor(key);           // Clean index   
 if (head.right == null)   //如果head的right引用为空,则表示不存在该level     tryReduceLevel();

}

return (V)v;

}

}

}

✓  ConcurrentSkipListMap 

通过SkipList的方式进行插入操作:(下图以“添加55”的两种情况,进行说明:)

在level=2(该level存在)的情况下添加55的图示:只需在level<=2的合适位置插入55即可

在 level=4(该 level 不存在,图示 level4 是新建的)的情况下添加55 的情况:首先新建 level4,然后在level<=4

的合适位置插入55。

下面就是源码中的实现(put方法是通过doPut方法来时实现的)

//put操作,通过doPut实现

public V put(K key, V value) {

if (value == null)

throw new NullPointerException();

return doPut(key, value, false);

}

private V doPut(K kkey, V value, boolean onlyIfAbsent) {

Comparable<? super K> key = comparable(kkey);

for (;;) {

Node<K,V> b = findPredecessor(key);//前驱

Node<K,V> n = b.next;

//定位的过程就是和get操作相似

for (;;) {

if (n != null) {

Node<K,V> f = n.next;   
 if (n != b.next) // 前后值不一致的情况下,跳转到第一层循环重新获得b和n     break;;

Object v = n.value;

if (v == null) {               // n被delete的情况下

n.helpDelete(b, f);

break;

}   
 if (v == n || b.value == null) // b 被delete的情况,重新获取b和n     break;

int c = key.compareTo(n.key);

if (c > 0) {

b = n;

n = f;

continue;

}

if (c == 0) {

if (onlyIfAbsent || n.casValue(v, value))

return (V)v;

else

break; // restart if lost race to replace value

}

// else c < 0; fall through

}

Node<K,V> z = new Node<K,V>(kkey, value, n);

if (!b.casNext(n, z))

break;         // restart if lost race to append to b   
 int level = randomLevel();//得到一个随机的level作为该key-value插入的最高level    if (level > 0)

insertIndex(z, level);//进行插入操作

return null;

}

}

}

/**

* 获得一个随机的level值

*/

private int randomLevel() {

int x = randomSeed;

x ^= x << 13;

x ^= x >>> 17;

randomSeed = x ^= x << 5;

if ((x & 0x8001) != 0) // test highest and lowest bits

return 0;

int level = 1;

while (((x >>>= 1) & 1) != 0) ++level;

return level;

}

//执行插入操作:如上图所示,有两种可能的情况:

//1.当level存在时,对level<=n都执行insert操作 
//2.当level不存在(大于目前的最大level)时,首先添加新的level,然后在执行操作1 private void insertIndex(Node<K,V> z, int level) {

HeadIndex<K,V> h = head;

int max = h.level;

if (level <= max) {//情况1

Index<K,V> idx = null;   
 for (int i = 1; i <= level; ++i)//首先得到一个包含1~level个级别的down关系的链表,

最后的inx为最高level

idx = new Index<K,V>(z, idx, null);   
 addIndex(idx, h, level);//把最高level的idx传给addIndex方法    } else { // 情况2 增加一个新的级别

level = max + 1;

Index<K,V>[] idxs = (Index<K,V>[])new Index[level+1];

Index<K,V> idx = null;

for (int i = 1; i <= level; ++i)//该步骤和情况1类似

idxs[i] = idx = new Index<K,V>(z, idx, null);

HeadIndex<K,V> oldh;

int k;

for (;;) {

oldh = head;

int oldLevel = oldh.level;

if (level <= oldLevel) { // lost race to add level

k = level;

break;

}

HeadIndex<K,V> newh = oldh;

Node<K,V> oldbase = oldh.node;

for (int j = oldLevel+1; j <= level; ++j)   
 newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);//创建新的    if (casHead(oldh, newh)) {

k = oldLevel;

break;

}

}

addIndex(idxs[k], oldh, k);

}

}

/**

*在1~indexlevel层中插入数据

*/   
 private void addIndex(Index<K,V> idx, HeadIndex<K,V> h, int indexLevel) {     //  insertionLevel 代表要插入的level,该值会在indexLevel~1间遍历一遍     int insertionLevel = indexLevel;

Comparable<? super K> key = comparable(idx.node.key);

if (key == null) throw new NullPointerException();   
 // 和get操作类似,不同的就是查找的同时在各个level上加入了对应的key    for (;;) {

int j = h.level;

Index<K,V> q = h;

Index<K,V> r = q.right;

Index<K,V> t = idx;

for (;;) {

if (r != null) {

Node<K,V> n = r.node;

// compare before deletion check avoids needing recheck

int c = key.compareTo(n.key);

if (n.value == null) {

if (!q.unlink(r))

break;

r = q.right;

continue;

}

if (c > 0) {

q = r;

r = r.right;

continue;

}

}

if (j == insertionLevel) {//在该层level中执行插入操作

// Don't insert index if node already deleted

if (t.indexesDeletedNode()) {

findNode(key); // cleans up

return;

}

if (!q.link(r, t))//执行link操作,其实就是inset的实现部分

break; // restart

if (--insertionLevel == 0) {

// need final deletion check before return

if (t.indexesDeletedNode())

findNode(key);

return;

}

}   
 if (--j >= insertionLevel && j < indexLevel)//key移动到下一层level     t = t.down;

q = q.down;

r = q.right;

}

}

}

ConcurrentLinkedQuere 

下面我们看下面示例输出的结果

import java.util.*;import java.util.concurrent.*;

/*
 *   ConcurrentSkipListMap是“线程安全”的哈希表,而TreeMap是非线程安全的。

*

*   下面是“多个线程同时操作并且遍历map”的示例

*   (01) 当map是ConcurrentSkipListMap对象时,程序能正常运行。

*   (02) 当map是TreeMap对象时,程序会产生ConcurrentModificationException异常。

*

*/public class ConcurrentSkipListMapDemo1 {

// TODO: map是TreeMap对象时,程序会出错。

//private static Map<String, String> map = new TreeMap<String, String>(); 
 private static Map<String, String> map = new ConcurrentSkipListMap<String, String>();  public static void main(String[] args) {

// 同时启动两个线程对map进行操作!

new MyThread("a").start();

new MyThread("b").start();

}

private static void printAll() {

String key, value;

Iterator iter = map.entrySet().iterator();

while(iter.hasNext()) {

Map.Entry entry = (Map.Entry)iter.next();

key = (String)entry.getKey();

value = (String)entry.getValue();

System.out.print("("+key+", "+value+"), ");

}

System.out.println();

}

private static class MyThread extends Thread {

MyThread(String name) {

super(name);

}

@Override

public void run() {

int i = 0;

while (i++ < 6) {

// “线程名” + "序号"

String val = Thread.currentThread().getName()+i;

map.put(val, "0");

// 通过“Iterator”遍历map。

printAll();

}

}

}

}

某一次的运行结果:

(a1, 0), (a1, 0), (b1, 0), (b1, 0),

(a1, 0), (b1, 0), (b2, 0),
(a1, 0), (a1, 0), (a2, 0), (a2, 0), (b1, 0), (b1, 0), (b2, 0), (b2, 0), (b3, 0),
(b3, 0), (a1, 0),
(a2, 0), (a3, 0), (a1, 0), (b1, 0), (a2, 0), (b2, 0), (a3, 0), (b3, 0), (b1, 0), (b4, 0), (b2, 0), (a1, 0), (b3, 0), (a2, 0), (b4, 0),
(a3, 0), (a1, 0), (a4, 0), (a2, 0), (b1, 0), (a3, 0), (b2, 0), (a4, 0), (b3, 0), (b1, 0), (b4, 0), (b2, 0), (b5, 0),

(b3, 0), (a1, 0), (b4, 0), (a2, 0), (b5, 0),
(a3, 0), (a1, 0), (a4, 0), (a2, 0), (a5, 0), (a3, 0), (b1, 0), (a4, 0), (b2, 0), (a5, 0), (b3, 0), (b1, 0), (b4, 0), (b2, 0), (b5, 0), (b3, 0), (b6, 0),

(b4, 0), (a1, 0), (b5, 0), (a2, 0), (b6, 0),
(a3, 0), (a4, 0), (a5, 0), (a6, 0), (b1, 0), (b2, 0), (b3, 0), (b4, 0), (b5, 0), (b6, 0),

结果 
示例程序中,启动两个线程(线程a和线程b)分别对ConcurrentSkipListMap进行操作。以线程a而言,它会先获取“线程名”+“序号”,然后将该字符串作为key,将“0”作为value,插入到ConcurrentSkipListMap中;接着,遍历并输出ConcurrentSkipListMap中的全部元素。 线程b的操作和线程a一样,只不过线程b的名字和线程a的名字不同。

当 map 是 ConcurrentSkipListMap 对象时,程序能正常运行。如果将 map 改为 TreeMap 时,程序会产生ConcurrentModificationException异常。

2)  java.util.concurrent.atomic 

➢ AtomicBoolean原子性布尔 

AtomicBoolean是java.util.concurrent.atomic包下的原子变量,这个包里面提供了一组原子类。其基本的特性就是在多线程环境下,当有多个线程同时执行这些类的实例包含的方法时,具有排他性,即当某个线程进入方法,执行其中的指令时,不会被其他线程打断,而别的线程就像自旋锁一样,一直等到该方法执行完成,才由JVM从等待队列中选择一个另一个线程进入,这只是一种逻辑上的理解。实际上是借助硬件的相关指令来实现的,不会阻塞线程(或者说只是在硬件级别上阻塞了)。

AtomicBoolean,在这个Boolean值的变化的时候不允许在之间插入,保持操作的原子性。

下面将解释重点方法并举例: 
boolean compareAndSet(expectedValue, updateValue)这个方法主要两个作用:

1. 比较AtomicBoolean和expect的值,如果一致,执行方法内的语句。其实就是一个if语句  2. 把 AtomicBoolean 的值设成 update,比较最要的是这两件事是一气呵成的,这连个动作之间不会被 打断,任何内部或者外部的语句都不可能在两个动作之间运行。为多线程的控制提供了解决的方案 下面我们从代码上解释:

首先我们看下在不使用AtomicBoolean情况下,代码的运行情况:

package zmx.atomic.test;

import java.util.concurrent.TimeUnit;

public class BarWorker implements Runnable {

//静态变量

private static boolean exists = false;

private String name;

public BarWorker(String name) {

this.name = name;

}

@Override

public void run() {

if (!exists) {

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e1) {

// do nothing

}

exists = true;

System.out.println(name + " enter");

try {

System.out.println(name + " working");

TimeUnit.SECONDS.sleep(2);

} catch (InterruptedException e) {

// do nothing

}

System.out.println(name + " leave");

exists = false;

} else {

System.out.println(name + " give up");

}

}

public static void main(String[] args) {

BarWorker bar1 = new BarWorker("bar1");

BarWorker bar2 = new BarWorker("bar2");

new Thread(bar1).start();

new Thread(bar2).start();

}

}

运行结果:

bar1 enter

bar2 enter

bar1 working

bar2 working

bar1 leave

bar2 leave

从上面的运行结果我们可看到,两个线程运行时,都对静态变量exists同时做操作,并没有保证exists静态变量

的原子性,也就是一个线程在对静态变量 exists 进行操作到时候,其他线程必须等待或不作为。等待一个线程操作完

后,才能对其进行操作。

下面我们将静态变量使用AtomicBoolean来进行操作

package zmx.atomic.test;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.atomic.AtomicBoolean;

public class BarWorker2 implements Runnable {

//静态变量使用AtomicBoolean 进行操作 
 private static AtomicBoolean exists = new AtomicBoolean(false);

private String name;

public BarWorker2(String name) {

this.name = name;

}

@Override

public void run() {

if (exists.compareAndSet(false, true)) {

System.out.println(name + " enter");

try {

System.out.println(name + " working");

TimeUnit.SECONDS.sleep(2);

} catch (InterruptedException e) {

// do nothing

}

System.out.println(name + " leave");

exists.set(false);

} else {

System.out.println(name + " give up");

}

}

public static void main(String[] args) {

BarWorker2 bar1 = new BarWorker2("bar1");

BarWorker2 bar2 = new BarWorker2("bar2");

new Thread(bar1).start();

new Thread(bar2).start();

}

}

运行结果:

bar1 enter

bar1 working

bar2 give up

bar1 leave

可以从上面的运行结果看出仅仅一个线程进行工作,因为exists.compareAndSet(false, true)提供了原子性操作,

比较和赋值操作组成了一个原子操作, 中间不会提供可乘之机。使得一个线程操作,其他线程等待或不作为。

下面我们简单介绍下AtomicBoolean的API

创建  AtomicBoolean 

你可以这样创建一个  AtomicBoolean:

AtomicBoolean atomicBoolean = new AtomicBoolean();

以上示例新建了一个默认值为  false 的  AtomicBoolean。如果你想要为  AtomicBoolean 实例设置一个显式的

初始值,那么你可以将初始值传给  AtomicBoolean 的构造子:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);

  AtomicBoolean 
你可以通过使用  get() 方法来获取一个  AtomicBoolean 的值。示例如下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);

boolean value = atomicBoolean.get();

AtomicBoolean 

你可以通过使用  set() 方法来设置一个  AtomicBoolean 的值。

示例如下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);

atomicBoolean.set(false);

以上代码执行后  AtomicBoolean 的值为  false。

AtomicBoolean 
你可以通过  getAndSet()  方法来交换一个  AtomicBoolean  实例的值。getAndSet()  方法将返回 AtomicBoolean 当前的值,并将为  AtomicBoolean 设置一个新值。示例如下:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);

boolean oldValue = atomicBoolean.getAndSet(false);

以上代码执行后  oldValue  变量的值为  true,atomicBoolean  实例将持有  false  值。代码成功将 AtomicBoolean 当前值  ture 交换为  false。

比较并设AtomicBoolean的值 
compareAndSet() 方法允许你对  AtomicBoolean 的当前值与一个期望值进行比较,如果当前值等于期望值的话,将会对  AtomicBoolean 设定一个新值。compareAndSet() 方法是原子性的,因此在同一时间之内有单个线程执行它。因此  compareAndSet()  方法可被用于一些类似于锁的同步的简单实现。以下是一个 compareAndSet() 示例:

AtomicBoolean atomicBoolean = new AtomicBoolean(true);

boolean expectedValue = true;

boolean newValue      = false;

boolean wasNewValueSet = atomicBoolean.compareAndSet(

expectedValue, newValue);

本示例对  AtomicBoolean 的当前值与  true 值进行比较,如果相等,将  AtomicBoolean 的值更新为  false

➢ AtomicInteger原子性整型 

AtomicInteger,一个提供原子操作的Integer的类。在Java语言中,++i和i++操作并不是线程安全的,

在使用的时候,不可避免的会用到 synchronized关键字。而AtomicInteger 则通过一种线程安全的加减操

作接口。

我们先来看看AtomicInteger给我们提供了什么方法:

public final int get() //获取当前的值

public final int getAndSet(int newValue)//获取当前的值,并设置新的值

public final int getAndIncrement()//获取当前的值,并自增

public final int getAndDecrement() //获取当前的值,并自减

public final int getAndAdd(int delta)  //获取当前的值,并加上预期的值

下面通过两个简单的例子来看一下  AtomicInteger 的优势在哪:

普通线程同步:

class Test2 {

private volatile int count = 0;

public synchronized void increment() {

count++; //若要线程安全执行执行count++,需要加锁

}

public int getCount() {

return count;

}

}

使用AtomicInteger:

class Test2 {

private AtomicInteger count = new AtomicInteger();

public void increment() {

count.incrementAndGet();

}

//使用AtomicInteger之后,不需要加锁,也可以实现线程安全。

public int getCount() {

return count.get();

}

}

从上面的例子中我们可以看出:使用 AtomicInteger 是非常的安全的.而且因为 AtomicInteger 由硬件提供原子

操作指令实现的。在非激烈竞争的情况下,开销更小,速度更快。AtomicInteger 是使用非阻塞算法来实现并发控制

的。AtomicInteger的关键域只有一下3个:

// setup to use Unsafe.compareAndSwapInt for updates

private static final Unsafe unsafe = Unsafe.getUnsafe();

private static final long valueOffset;

static {

try {

valueOffset= 
unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));

} catch (Exception ex) {

throw new Error(ex);

}

}

private volatile int value;

这里,  unsafe是java提供的获得对对象内存地址访问的类,注释已经清楚的写出了,它的作用就是在更新操作

时提供“比较并替换”的作用。实际上就是 AtomicInteger 中的一个工具。valueOffset 是用来记录 value 本身在内

存的便宜地址的,这个记录,也主要是为了在更新操作在内存中找到value的位置,方便比较。

注意:value是用来存储整数的时间变量,这里被声明为volatile,就是为了保证在更新操作时,当前线程可以拿

到value最新的值(并发环境下,value可能已经被其他线程更新了)。

优点:最大的好处就是可以避免多线程的优先级倒置和死锁情况的发生,提升在高并发处理下的性能。

下面我们简单介绍下AtomicInteger的API

创建AtomicInteger 

创建一个  AtomicInteger 示例如下:

AtomicInteger atomicInteger = new AtomicInteger();

本示例将创建一个初始值为  0 的  AtomicInteger。如果你想要创建一个给定初始值的  AtomicInteger,你可

以这样:

AtomicInteger atomicInteger = new AtomicInteger(123);

本示例将  123 作为参数传给  AtomicInteger 的构造子,它将设置  AtomicInteger 实例的初始值为  123。

AtomicInteger 

你可以使用  get() 方法获取  AtomicInteger 实例的值。示例如下:

AtomicInteger atomicInteger = new AtomicInteger(123);

int theValue = atomicInteger.get();

AtomicInteger 

你可以通过  set() 方法对  AtomicInteger 的值进行重新设置。以下是  AtomicInteger.set() 示例: 

AtomicInteger atomicInteger = new AtomicInteger(123);

atomicInteger.set(234);

以上示例创建了一个初始值为  123 的  AtomicInteger,而在第二行将其值更新为  234。 

比较并设AtomicInteger 

AtomicInteger 类也通过了一个原子性的  compareAndSet() 方法。这一方法将  AtomicInteger 实例的当前值

与期望值进行比较,如果二者相等,为  AtomicInteger 实例设置一个新值。AtomicInteger.compareAndSet() 代码

示例:

AtomicInteger atomicInteger = new AtomicInteger(123);

int expectedValue = 123;

int newValue      = 234;

atomicInteger.compareAndSet(expectedValue, newValue);

本示例首先新建一个初始值为  123 的  AtomicInteger 实例。然后将  AtomicInteger 与期望值  123 进行比较 ,如果相等,将  AtomicInteger 的值更新为  234。

AtomicInteger 
AtomicInteger 类包含有一些方法,通过它们你可以增加  AtomicInteger 的值,并获取其值。这些方法如下:

public final int addAndGet(int addValue)//在原来的数值上增加新的值,并返回新值

public final int getAndIncrement()//获取当前的值,并自增

public final int incrementAndget() //自减,并获得自减后的值

public final int getAndAdd(int delta)  //获取当前的值,并加上预期的值

第一个  addAndGet() 方法给  AtomicInteger 增加了一个值,然后返回增加后的值。getAndAdd() 方法为 AtomicInteger 增加了一个值,但返回的是增加以前的  AtomicInteger 的值。具体使用哪一个取决于你的应用场景。

以下是这两种方法的示例:

AtomicInteger atomicInteger = new AtomicInteger();

System.out.println(atomicInteger.getAndAdd(10));

System.out.println(atomicInteger.addAndGet(10));

本示例将打印出  0 和  20。例子中,第二行拿到的是加  10 之前的  AtomicInteger 的值。加  10 之前的值是  0。第三行将  AtomicInteger 的值再加  10,并返回加操作之后的值。该值现在是为  20。你当然也可以使用这俩方法为 AtomicInteger 添加负值。结果实际是一个减法操作。getAndIncrement() 和  incrementAndGet() 方法类似于 getAndAdd() 和  addAndGet(),但每次只将  AtomicInteger 的值加  1。

AtomicInteger 
AtomicInteger 类还提供了一些减小  AtomicInteger 的值的原子性方法。这些方法是:

public final int decrementAndGet()

public final int getAndDecrement()

decrementAndGet() 将  AtomicInteger 的值减一,并返回减一后的值。getAndDecrement() 也将 AtomicInteger 的值减一,但它返回的是减一之前的值。

➢ AtomicIntegerArray原子性整型数组 

java.util.concurrent.atomic.AtomicIntegerArray 类提供了可以以原子方式读取和写入的底层 int 数组的操作,

还包含高级原子操作。 AtomicIntegerArray 支持对底层 int 数组变量的原子操作。 它具有获取和设置方法,如在变

量上的读取和写入。 也就是说,一个集合与同一变量上的任何后续get相关联。 原子compareAndSet方法也具有

这些内存一致性功能。

AtomicIntegerArray本质上是对int[]类型的封装。使用Unsafe类通过CAS的方式控制int[]在多线程下的安全

性。它提供了以下几个核心API:

//获得数组第i个下标的元素

public final int get(int i)

//获得数组的长度

public final int length()

//将数组第i个下标设置为newValue,并返回旧的值

public final int getAndSet(int i, int newValue) 
//进行CAS操作,如果第i个下标的元素等于expect,则设置为update,设置成功返回true public final boolean compareAndSet(int i, int expect, int update) //将第i个下标的元素加1

public final int getAndIncrement(int i)

//将第i个下标的元素减1

public final int getAndDecrement(int i)

//将第i个下标的元素增加delta(delta可以是负数)

public final int getAndAdd(int i, int delta)

下面给出一个简单的示例,展示AtomicIntegerArray使用:

public class AtomicIntegerArrayDemo { 
 static AtomicIntegerArray arr = new AtomicIntegerArray(10);  public static class AddThread implements Runnable{

public void run(){

for(int k=0;k<10000;k++)

arr.getAndIncrement(k%arr.length());

}


 public static void main(String[] args) throws InterruptedException {  Thread[] ts=new Thread[10];

for(int k=0;k<10;k++){

ts[k]=new Thread(new AddThread());

}

for(int k=0;k<10;k++){ts[k].start();}

for(int k=0;k<10;k++){ts[k].join();}

System.out.println(arr);

}

}

输出结果: 
[10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000, 10000]

上述代码第2行,申明了一个内含10个元素的数组。第3行定义的线程对数组内10个元素进行累加操作,每个

元素各加1000次。第11行,开启10个这样的线程。因此,可以预测,如果线程安全,数组内10个元素的值必然都

是10000。反之,如果线程不安全,则部分或者全部数值会小于10000。

➢ AtomicLongAtomicLongArray原子性整型数组 

AtomicLong、AtomicLongArray的API跟AtomicInteger、AtomicIntegerArray在使用方法都是差不多的。

区别在于用前者是使用原子方式更新的long值和long数组,后者是使用原子方式更新的Integer值和Integer数组。

两者的相同处在于它们此类确实扩展了 Number,允许那些处理基于数字类的工具和实用工具进行统一访问。在实际

开发中,它们分别用于不同的场景。这个就具体情况具体分析了,下面将举例说明 AtomicLong 的使用场景(使用

AtomicLong生成自增长ID),其他就不在过多介绍。

import java.util.ArrayList;

import java.util.Collections;

import java.util.HashSet;

import java.util.List;

import java.util.Set;

import java.util.concurrent.atomic.AtomicLong;

public class AtomicLongTest {

/**

* @param args

*/

public static void main(String[] args) {

final AtomicLong orderIdGenerator = new AtomicLong(0);

final List<Item> orders = Collections

.synchronizedList(new ArrayList<Item>());

for (int i = 0; i < 10; i++) {

Thread orderCreationThread = new Thread(new Runnable() {

public void run() {

for (int i = 0; i < 10; i++) {

long orderId = orderIdGenerator.incrementAndGet();

Item order = new Item(Thread.currentThread().getName(),

orderId);

orders.add(order);

}

}

});

orderCreationThread.setName("Order Creation Thread " + i);

orderCreationThread.start();

}

try {

Thread.sleep(1000);

} catch (InterruptedException e) {

e.printStackTrace();

}

Set<Long> orderIds = new HashSet<Long>();

for (Item order : orders) {

orderIds.add(order.getID());

System.out.println("Order name:" + order.getItemName()

+"----"+"Order id:" + order.getID());

}

}

}

class Item {

String itemName;

long id;

Item(String n, long id) {

this.itemName = n;

this.id = id;

}

public String getItemName() {

return itemName;

}

public long getID() {

return id;

}

}

输出:

Order name:Order Creation Thread 0----Order id:1

Order name:Order Creation Thread 1----Order id:2

Order name:Order Creation Thread 0----Order id:4

Order name:Order Creation Thread 1----Order id:5

Order name:Order Creation Thread 3----Order id:3

Order name:Order Creation Thread 0----Order id:7

Order name:Order Creation Thread 1----Order id:6

........

Order name:Order Creation Thread 2----Order id:100

从运行结果我们看到,不管是哪个线程。它们获得的 ID 是不会重复的,保证的 ID 生成的原子性,避免了线程安

全上的问题。

3)  java.util.concurrent.lock 

待续...

ConcurrentLinkedQueue非阻塞无界链表队列相关推荐

  1. java多线程 --ConcurrentLinkedQueue 非阻塞 线程安全队列

    ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部:当我们获取一个元素时,它会返回队列头 ...

  2. ConcurrentLinkedQueue非阻塞队列实现原理分析

    2019独角兽企业重金招聘Python工程师标准>>> 引言 在并发编程中我们有时候需要使用线程安全的队列.如果我们要实现一个线程安全的队列有两种实现方式一种是使用阻塞算法,另一种是 ...

  3. 一读多写非自旋无锁链表队列实现

    现在一说到多线程操作的队列,基本上都会标榜自己是无锁的,这样的无锁基本上都有两大特点: 1. 换了一种锁 利用原子变量自旋,本质还是自旋等锁,其实也是一种锁,只是用了更底层一点.这种无锁,其实也是消耗 ...

  4. 同步异步 阻塞 非阻塞 异步调用 线程队列 协程

    阻塞 非阻塞 阻塞:程序遇到了IO操作 导致代码无法继续执行 交出了COU执行权 非阻塞:没有IO操作 或者即使遇到IO操作 也不阻塞代码执行 阻塞 就绪 运行指的是应用程序所处的状态写程序时 尽量减 ...

  5. 阻塞队列和非阻塞队列(JAVA)

    文章目录 1.阻塞队列 1.1 代码举例 1.2 LinkedBlockingQueue 2.非阻塞队列 2.1 代码举例 2.2 ConcurrentLinkedQueue 1.阻塞队列 1.1 代 ...

  6. java 非阻塞队列_java并发之非阻塞队列

    非阻塞队列有:ArrayDeque.PriorityQueue.ConcurrentLinkedQueue 之前都说过阻塞队列了,其实差别都不大,阻塞和非阻塞的区别在于阻塞队列有put和take方法进 ...

  7. AIO,BIO,NIO:同步阻塞式IO,同步非阻塞IO,异步非阻塞IO

    BIO,同步阻塞式IO,简单理解:一个连接一个线程 NIO,同步非阻塞IO,简单理解:一个请求一个线程 AIO,异步非阻塞IO,简单理解:一个有效请求一个线程 IO:阻塞IO BIO:同步阻塞IO.服 ...

  8. linux socket write()函数阻塞卡住线程问题(线程无法结束)write()非阻塞代码

    文章目录 1.参考文章:C++网络通信中write和read的为什么会阻塞 [2.参考文章:网络编程(24)-- linux中write和read函数的阻塞试验](https://blog.csdn. ...

  9. 并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究

    并发队列-无界非阻塞队列 ConcurrentLinkedQueue 原理探究 http://www.importnew.com/25668.html 一. 前言 常用的并发队列有阻塞队列和非阻塞队列 ...

最新文章

  1. NHibernate初学体验记
  2. oracle10安装网络需求警告,安装oracle10出现的问题,求解!!!!!(在线等)
  3. wowza官方测试报告
  4. 企业中的混乱:如何对云计算具有信心
  5. 属性与内存管理(属性与内存管理都是相互关联的)
  6. Jquery插件入门之Validate插件的简单使用
  7. C++ Variadic Templates(可变参数模板)
  8. HTML(2)--- 简介
  9. python学习——matplotlib库——折线图
  10. matplotlib —— 注释及几何图形的绘制
  11. spring源码之bean加载(bean解析下篇)
  12. Java开发工程师,每个阶段需要掌握什么重点?
  13. 抖音服务器升级维护时间,抖音服务器升级要多久2021
  14. 盘点数独终盘生成算法
  15. 一文详解 Base64编码原理
  16. 几种常见的十进制代码(笔记)
  17. 11.4王者荣耀服务器维护中,英雄战迹11号更新维护公告 删档最后一更
  18. 模拟电路仿真LTspice(3):三极管共发射极放大电路
  19. MAC自带词典添加词典文件
  20. AD怎么输入坐标_实名推荐|相见恨晚的CAD坐标提取技巧

热门文章

  1. python格式化输出杨辉三角
  2. 毕业生必须知道:干部身份、三方协议、派遣证【转】
  3. android手机越用越慢的原因
  4. 史上最全!全领域网络安全拓扑图(118页)
  5. 联想平板android7.1.1,预装安卓 7.1,曝联想 Tab 4 系列平板无缘 8.0
  6. maxIdle和maxIdle和maxWait
  7. 利用Sharepoint+infopath+Sharepoint Designer课程已经录制完成,谢谢大家关注!
  8. Geomagic Touch(LAN口版本) 环境配置及驱动安装【过程记录】
  9. “看脸”看不出花,但人工智能可以“看”出性格
  10. 文献阅读之无人机防御性驾驶-提前感知快速飞行的轨迹规划