容器

物理结构:数组、链表
逻辑结构:很多

Queue主要是为高并发准备的。

Vector Hashtable

Vector Hashtable 自带锁,有很多设计上不完善的地方,现在基本上不用。
测试Hashtable的性能:用100的线程,先put进去1000000个数,再get 1000000个数

package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Hashtable;
import java.util.UUID;public class T01_TestHashtable {static Hashtable<UUID, UUID> m = new Hashtable<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count / THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for (int i = start; i < start + gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for (int i = 0; i < threads.length; i++) {threads[i] = new MyThread(i * (count / THREAD_COUNT));}for (Thread t : threads) {t.start();}for (Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println("size=" + m.size());//---------------上面是测试put,下面是测试get 10000000次的第10的元素--------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(() -> {for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for (Thread t : threads) {t.start();}for (Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);}
}

直接使用Hashmap,多线程时会出现问题

package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.HashMap;
import java.util.UUID;public class T02_TestHashMap {static HashMap<UUID, UUID> m = new HashMap<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());}
}

使用SynchronizedHashMap,效率与直接使用Hashtable区别不是很大

package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;public class T03_TestSynchronizedHashMap {static Map<UUID, UUID> m = Collections.synchronizedMap(new HashMap<UUID, UUID>());static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);}
}

使用ConcurrentHashMap,插入的效率与前面的Hashtable差不多,但是读取的效率非常高。

package com.mashibing.juc.c_023_02_FromHashtableToCHM;import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;public class T04_TestConcurrentHashMap {static Map<UUID, UUID> m = new ConcurrentHashMap<>();static int count = Constants.COUNT;static UUID[] keys = new UUID[count];static UUID[] values = new UUID[count];static final int THREAD_COUNT = Constants.THREAD_COUNT;static {for (int i = 0; i < count; i++) {keys[i] = UUID.randomUUID();values[i] = UUID.randomUUID();}}static class MyThread extends Thread {int start;int gap = count/THREAD_COUNT;public MyThread(int start) {this.start = start;}@Overridepublic void run() {for(int i=start; i<start+gap; i++) {m.put(keys[i], values[i]);}}}public static void main(String[] args) {long start = System.currentTimeMillis();Thread[] threads = new Thread[THREAD_COUNT];for(int i=0; i<threads.length; i++) {threads[i] =new MyThread(i * (count/THREAD_COUNT));}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(m.size());//-----------------------------------start = System.currentTimeMillis();for (int i = 0; i < threads.length; i++) {threads[i] = new Thread(()->{for (int j = 0; j < 10000000; j++) {m.get(keys[10]);}});}for(Thread t : threads) {t.start();}for(Thread t : threads) {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}}end = System.currentTimeMillis();System.out.println(end - start);}
}

最早的容器Vector是自带锁的,但是你整个操作调用了两个原子方法的话,整体并不是原子的。你还需要在外面加sync

/*** 有N张火车票,每张票都有一个编号* 同时有10个窗口对外售票* 请写一个模拟程序* <p>* 分析下面的程序可能会产生哪些问题?* 重复销售?超量销售?* <p>* 使用Vector或者Collections.synchronizedXXX* 分析一下,这样能解决问题吗?* <p>* 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步* 就像这个程序,判断size和进行remove必须是一整个的原子操作** @author 马士兵*/
package com.mashibing.juc.c_024_FromVectorToQueue;import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;public class TicketSeller3 {static List<String> tickets = new LinkedList<>();static {for (int i = 0; i < 1000; i++) tickets.add("票 编号:" + i);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {synchronized (tickets) {if (tickets.size() <= 0) break;try {TimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}System.out.println("销售了--" + tickets.remove(0));}}}).start();}}
}

使用Queue:ConcurrentLinkedQueue,里面很多方法是CAS实现的

/*** 有N张火车票,每张票都有一个编号* 同时有10个窗口对外售票* 请写一个模拟程序* 分析下面的程序可能会产生哪些问题?* 重复销售?超量销售?* 使用Vector或者Collections.synchronizedXXX* 分析一下,这样能解决问题吗?* 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步* 就像这个程序,判断size和进行remove必须是一整个的原子操作* 使用ConcurrentQueue提高并发性*/
package com.mashibing.juc.c_024_FromVectorToQueue;import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;public class TicketSeller4 {static Queue<String> tickets = new ConcurrentLinkedQueue<>();static {for (int i = 0; i < 1000; i++) tickets.add("票 编号:" + i);}public static void main(String[] args) {for (int i = 0; i < 10; i++) {new Thread(() -> {while (true) {String s = tickets.poll();if (s == null) break;else System.out.println("销售了--" + s);}}).start();}}
}

多线程常用的容器

ConcurrentHashMap

里面用的是CAS操作,而CAS在Tree操作的时候太复杂了,所以不存在ConcurrentTreeMap,为了排序,换了跳表的结构代替Tree结构

跳表

  • 底层是链表
  • 拿出关键元素新开一层
  • 在查找的时候,从上往下查
  • CAS的实现难度比TreeMap容易很多
  • 查找操作的时间复杂度比链表快很多
/*** http://blog.csdn.net/sunxianghuang/article/details/52221913 * http://www.educity.cn/java/498061.html* 阅读concurrentskiplistmap*/
package com.mashibing.juc.c_025;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;public class T01_ConcurrentMap {public static void main(String[] args) {Map<String, String> map = new ConcurrentHashMap<>();//Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序//Map<String, String> map = new Hashtable<>();//Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX//TreeMapRandom r = new Random();Thread[] ths = new Thread[100];CountDownLatch latch = new CountDownLatch(ths.length);long start = System.currentTimeMillis();for(int i=0; i<ths.length; i++) {ths[i] = new Thread(()->{for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000));latch.countDown();});}Arrays.asList(ths).forEach(t->t.start());try {latch.await();} catch (InterruptedException e) {e.printStackTrace();}long end = System.currentTimeMillis();System.out.println(end - start);System.out.println(map.size());}
}

CopyOnWriteArrayList

本质上和ReadWrite是一个思路

写时复制,适用于读线程多,写线程少的情况。(读的时候不加锁),写的时候copy一个新的,写完之后把旧的指针指向新的。
写的效率比较低,因为是数组,每次写的时候都要复制。
add操作的源码如下:
示例

/*** 写时复制容器 copy on write* 多线程环境下,写时效率低,读时效率高* 适合写少读多的环境*/
package com.mashibing.juc.c_025;import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.concurrent.CopyOnWriteArrayList;public class T02_CopyOnWriteList {public static void main(String[] args) {List<String> lists =//new ArrayList<>(); //这个会出并发问题!//new Vector();new CopyOnWriteArrayList<>();Random r = new Random();Thread[] ths = new Thread[100];for (int i = 0; i < ths.length; i++) {Runnable task = new Runnable() {@Overridepublic void run() {for (int i = 0; i < 1000; i++) lists.add("a" + r.nextInt(10000));}};ths[i] = new Thread(task);}runAndComputeTime(ths);System.out.println(lists.size());}static void runAndComputeTime(Thread[] ths) {long s1 = System.currentTimeMillis();Arrays.asList(ths).forEach(t -> t.start());Arrays.asList(ths).forEach(t -> {try {t.join();} catch (InterruptedException e) {e.printStackTrace();}});long s2 = System.currentTimeMillis();System.out.println(s2 - s1);}
}

代码T05-T09:BlockingQueue讲解

Queue和List的区别是什么?

  • 提供了很多在多线程访问下比较友好的API
  • offer,peek,pool

BlockingQueue的特点是什么?
BlockingQueue的优势在于,增加了更多API,比如put,take
或者阻塞,或者指定时间等待
实现生产者-消费者模型,也是多线程里面最重要的一个模型,也是MQ的基础——MQ的本质,就是一个大型的生产者、消费者模型

LinkedBlockingQueue

LinkedBlockingQueue,是用链表实现的BlockingQueue
阻塞使用await实现的,底层应该是park

Queue常用接口
LinkedBlockingQueue示例

package com.mashibing.juc.c_025;import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;public class T05_LinkedBlockingQueue {static BlockingQueue<String> strs = new LinkedBlockingQueue<>();static Random r = new Random();public static void main(String[] args) {new Thread(() -> {for (int i = 0; i < 100; i++) {try {strs.put("a" + i); //如果满了,就会等待TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));} catch (InterruptedException e) {e.printStackTrace();}}}, "p1").start();for (int i = 0; i < 5; i++) {new Thread(() -> {for (; ; ) {try {System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待} catch (InterruptedException e) {e.printStackTrace();}}}, "c" + i).start();}}
}

DelayQueue

是BlockingQueue的一种,是一种阻塞的队列。
需要实现compareTo方法
需要指定等待时间
用来按时间进行任务调度

package com.mashibing.juc.c_025;import java.util.Calendar;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;public class T07_DelayQueue {static BlockingQueue<MyTask> tasks = new DelayQueue<>();static Random r = new Random();static class MyTask implements Delayed {String name;long runningTime;MyTask(String name, long rt) {this.name = name;this.runningTime = rt;}@Overridepublic int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))return -1;else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))return 1;elsereturn 0;}@Overridepublic long getDelay(TimeUnit unit) {return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Overridepublic String toString() {return name + " " + runningTime;}}public static void main(String[] args) throws InterruptedException {long now = System.currentTimeMillis();MyTask t1 = new MyTask("t1", now + 1000);MyTask t2 = new MyTask("t2", now + 2000);MyTask t3 = new MyTask("t3", now + 1500);MyTask t4 = new MyTask("t4", now + 2500);MyTask t5 = new MyTask("t5", now + 500);tasks.put(t1);tasks.put(t2);tasks.put(t3);tasks.put(t4);tasks.put(t5);System.out.println(tasks);for (int i = 0; i < 5; i++) {System.out.println(tasks.take());}}
}

PriorityQueue

内部进行了排序,底层是一个二叉树(小顶堆)的结构

package com.mashibing.juc.c_025;import java.util.PriorityQueue;public class T07_01_PriorityQueque {public static void main(String[] args) {PriorityQueue<String> q = new PriorityQueue<>();q.add("c");q.add("e");q.add("a");q.add("d");q.add("z");for (int i = 0; i < 5; i++) {System.out.println(q.poll());}}
}
输出:
a
c
d
e
z

SynchronousQueue

容量为0,不能往里装东西,只有有一个线程等着的时候,才能把东西递到这个线程手里,是用来一个线程给另外一个线程传数据的。
本质和Exchanger比较相似,也是需要两个线程同步对接,否则都会阻塞着。

在线程池里面,线程之间进行任务调度的时候,经常会用到。

package com.mashibing.juc.c_025;import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;public class T08_SynchronusQueue { //容量为0public static void main(String[] args) throws InterruptedException {BlockingQueue<String> strs = new SynchronousQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.put("aaa"); //阻塞等待消费者消费//strs.put("bbb");//strs.add("aaa");System.out.println(strs.size());}
}

TransferQueue

装完,阻塞等着,有线程把它取走,再离开
要先开启消费者线程,再往里面transfer,要不然就阻塞了~

场景1:要求某件任务有一个结果(比如一个订单等付款完成之后,确认有线程去处理它了,再给客户反馈)
场景2:确认收钱完成之后,才能把商品取走,比如面对面付款

示例:

package com.mashibing.juc.c_025;import java.util.concurrent.LinkedTransferQueue;public class T09_TransferQueue {public static void main(String[] args) throws InterruptedException {LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();strs.transfer("aaa");//strs.put("aaa");/*new Thread(() -> {try {System.out.println(strs.take());} catch (InterruptedException e) {e.printStackTrace();}}).start();*/}
}

经典的交替打印面试题可以用 TransferQueue 实现

package com.mashibing.juc.c_026_00_interview.A1B2C3;import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TransferQueue;public class T13_TransferQueue {public static void main(String[] args) {char[] aI = "1234567".toCharArray();char[] aC = "ABCDEFG".toCharArray();TransferQueue<Character> queue = new LinkedTransferQueue<Character>();new Thread(() -> {try {for (char c : aI) {System.out.print(queue.take());queue.transfer(c);}} catch (InterruptedException e) {e.printStackTrace();}}, "t1").start();new Thread(() -> {try {for (char c : aC) {queue.transfer(c);System.out.print(queue.take());}} catch (InterruptedException e) {e.printStackTrace();}}, "t2").start();}
}

下节课预习

  • Callable
  • Future, Completable Future

多线程与高并发(六):线程池可用的各种高并发容器详解:CopyOnWriteList,BlockingQueue等相关推荐

  1. python线程池(threadpool)模块使用笔记详解

    这篇文章主要介绍了python线程池(threadpool)模块使用笔记详解,小编觉得挺不错的,现在分享给大家,也给大家做个参考.一起跟随小编过来看看吧 最近在做一个视频设备管理的项目,设备包括(摄像 ...

  2. 线程池的执行原则及配置参数详解

    池是一种非常优秀的设计思想,通过建立池可以有效的利用系统资源,节约系统性能.Java 中的线程池就是一种非常好的实现,从 JDK 1.5 开始 Java 提供了一个线程工厂 Executors 用来生 ...

  3. python线程池模块第三方包_python线程池(threadpool)模块使用笔记详解

    最近在做一个视频设备管理的项目,设备包括(摄像机,DVR,NVR等),包括设备信息补全,设备状态推送,设备流地址推送等,如果同时导入的设备数量较多,如果使用单线程进行设备检测,那么由于设备数量较多,会 ...

  4. 线程池原理初探以及源码分析(详解)

    1,为什么用线程池 在学java基础的时候,就学过线程的创建方式,如继承Thread类,实现Runnable接口,实现Callable接口这三种,但是在企业级开发中,由于存在多线程以及高并发等现象,如 ...

  5. Java并发基础(六) - 线程池

    Java并发基础(六) - 线程池 1. 概述 这里讲一下Java并发编程的线程池的原理及其实现 2. 线程池的基本用法 2.1 线程池的处理流程图 该图来自<Java并发编程的艺术>: ...

  6. Java多线程学习(八)线程池与Executor 框架

    历史优质文章推荐: Java并发编程指南专栏 分布式系统的经典基础理论 可能是最漂亮的Spring事务管理详解 面试中关于Java虚拟机(jvm)的问题看这篇就够了 目录: [TOC] 本节思维导图: ...

  7. Java 并发编程 -- 线程池源码实战

    一.概述 小编在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写的太简单,只写了一点皮毛,要么就是是晦涩难懂,看完之后几乎都 ...

  8. 并发编程 | 线程池从入门到成神!

    一.为什么要用多线程 使用多线程,可以把一些大任务分解成多个小任务来执行,多个小任务之间互不影响,同时进行,这样,充分利用了cpu资源. 二.java中简单的实现多线程方式 继承Thread类,实现r ...

  9. Java多线程系列(三):Java线程池的使用方式,及核心运行原理

    之前谈过多线程相关的4种常用Java线程锁的特点,性能比较.使用场景,今天主要分享线程池相关的内容,这些都是属于Java面试的必考点. 为什么需要线程池 java中为了提高并发度,可以使用多线程共同执 ...

最新文章

  1. 【翻译】SQL Server索引进阶:第八级,唯一索引
  2. 客户端脚本验证码总结一
  3. pythonexcelweb交互插件_来一次Python与Excel的完美交互
  4. java map equals_Java中EnumMap的equals()方法: Java.util.EnumMap.equals() - Break易站
  5. 三维重建7:Visual SLAM算法笔记
  6. 单位脉冲信号与单位冲激信号的区别
  7. java和xampp_XAMPP和Bugfree详细教程
  8. AES-GCM加密算法
  9. 语言代号/地区代号/国家代号
  10. java 对象转map,map转对象
  11. js禁止苹果页面底部滚动_js禁止页面滚动
  12. 有缓震功能的舒缓拖鞋能给运动后带来什么样的减压效果?
  13. clickhouse开窗函数之同比环比
  14. d3dx9_35.dll如何修复
  15. 华为交换机查光衰_华为交换机硬件信息查看命令
  16. 输入关键字生成对联_对联生成器
  17. 插入U盘弹出不了的问题解决
  18. oracle修改用户表所属空间,Oracle修改用户表所属表空间的步骤
  19. JavaSwing_2.6: JTextField(文本框)
  20. Scanners Box

热门文章

  1. 2019ICPC(银川) - Delivery Route(强连通缩点+分块最短路)
  2. 洛谷 - P4012 深海机器人问题(最大费用最大流)
  3. 牛客 - 双流机场(思维)
  4. HDU - 5692 Snacks(dfs序+线段树)
  5. angular 字符串转换成数字_Python | 一文看懂Python列表、元组和字符串操作
  6. 自学JAVA5.18
  7. 数据分析与挖掘理论-数据探索
  8. __declspec(naked)详解
  9. leetcode-260.只出现一次的数字 III 解法
  10. Python OS和shutil模块的常见方法