版权声明:本文出自汪磊的博客,转载请务必注明出处。

一、ThreadFactory概述以及源码分析

ThreadFactory很简单,就是一个线程工厂也就是负责生产线程的,我们看下ThreadFactory源码;

1 public interfaceThreadFactory {2

3 /**

4 * Constructs a new {@codeThread}. Implementations may also initialize5 * priority, name, daemon status, {@codeThreadGroup}, etc.6 *7 *@paramr a runnable to be executed by new thread instance8 *@returnconstructed thread, or {@codenull} if the request to9 * create a thread is rejected10 */

11 Thread newThread(Runnable r);12 }

很简单吧,就是一个接口,newThread方法就是用来生产线程的,子类需要实现这个方法来根据自己规则生产相应的线程。

那安卓中什么地方用到了ThreadFactory呢?稍有经验的就会知道线程池中用到了,我们看下平常使用线程池是怎么创建的,以下是我一个项目中用到的:

1 //创建线程池对象

2 public static final Executor poolExecutor = newThreadPoolExecutor(3 CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE, TimeUnit.SECONDS,4 new LinkedBlockingQueue());

咦?没有用到线程池啊,别急,我们看下ThreadPoolExecutor创建的源码:

1 public ThreadPoolExecutor(intcorePoolSize,2 intmaximumPoolSize,3 longkeepAliveTime,4 TimeUnit unit,5 BlockingQueueworkQueue) {6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,7 Executors.defaultThreadFactory(), defaultHandler);8 }

看到了吧,最终调用的如下构造函数来创建线程池:

1 public ThreadPoolExecutor(intcorePoolSize,2 intmaximumPoolSize,3 longkeepAliveTime,4 TimeUnit unit,5 BlockingQueueworkQueue,6 ThreadFactory threadFactory,7 RejectedExecutionHandler handler) {8 if (corePoolSize < 0 ||

9 maximumPoolSize <= 0 ||

10 maximumPoolSize < corePoolSize ||

11 keepAliveTime < 0)12 throw newIllegalArgumentException();13 if (workQueue == null || threadFactory == null || handler == null)14 throw newNullPointerException();15 this.corePoolSize =corePoolSize;16 this.maximumPoolSize =maximumPoolSize;17 this.workQueue =workQueue;18 this.keepAliveTime =unit.toNanos(keepAliveTime);19 this.threadFactory =threadFactory;20 this.handler =handler;21 }

ThreadFactory传入的参数是Executors.defaultThreadFactory(),那我们继续看下Executors中defaultThreadFactory吧:

1 /**

2 * The default thread factory3 */

4 static class DefaultThreadFactory implementsThreadFactory {5 private static final AtomicInteger poolNumber = new AtomicInteger(1);6 private finalThreadGroup group;7 private final AtomicInteger threadNumber = new AtomicInteger(1);8 private finalString namePrefix;9

10 DefaultThreadFactory() {11 SecurityManager s =System.getSecurityManager();12 group = (s != null) ?s.getThreadGroup() :13 Thread.currentThread().getThreadGroup();14 namePrefix = "pool-" +

15 poolNumber.getAndIncrement() +

16 "-thread-";17 }18

19 publicThread newThread(Runnable r) {20 Thread t = newThread(group, r,21 namePrefix +threadNumber.getAndIncrement(),22 0);23 if(t.isDaemon())24 t.setDaemon(false);25 if (t.getPriority() !=Thread.NORM_PRIORITY)26 t.setPriority(Thread.NORM_PRIORITY);27 returnt;28 }29 }

DefaultThreadFactory实现了ThreadFactory接口,newThread中生产了一个个线程并且设置为不是守护线程,线程优先级均为Thread.NORM_PRIORITY。

在我们使用线程池的时候如果不自己创建线程工厂类,那么系统会给我们创建一个默认的线程工厂来生产线程(Executors中defaultThreadFactory())

好了,关于线程工厂就见到这里了,知道有这么个玩意用来生产线程的就可以了。

二、BlockingQueue概述以及源码分析

BlockingQueue顾名思义:阻塞队列,简单说就是放入,取出数据都会发生阻塞。比如取出数据时发现容器没有数据,就会等待产生阻塞一直等到有数据

为止,同样放入数据时如果容器已经满了,那么就回等待一直到容器有空间可以放入数据。

BlockingQueue就是一个接口,定义了插入,取出数据的接口,如下:

1 public interface BlockingQueue extends Queue{2

3 //添加元素到队列里,添加成功返回true,由于容量满了添加失败会抛出IllegalStateException异常

4 booleanadd(E e);5

6 //添加元素到队列里,成功返回true,失败返回false

7 booleanoffer(E e);8

9 //添加元素到队列里,如果容量满了会阻塞直到容量不满

10 void put(E e) throwsInterruptedException;11

12

13 //添加元素到队列里,如果容器已满会阻塞列队,但是不会一直阻塞,只会阻塞timeout时间,在这期间内添加成功则返回true,否则返回false

14 boolean offer(E e, longtimeout, TimeUnit unit)15 throwsInterruptedException;16

17

18 //从队列中获取元素,如果队列为空,则一直阻塞

19 E take() throwsInterruptedException;20

21 //从队列中获取元素,如果容器为空会阻塞列队,但是不会一直阻塞,只会阻塞timeout时间,在这期间内获取成功则返回对应元素,否则返回null

22 E poll(longtimeout, TimeUnit unit)23 throwsInterruptedException;24

25 //存储数据的队列剩余空间大小

26 intremainingCapacity();27

28 //删除指定的元素,成功返回true,失败返回false

29 booleanremove(Object o);30

31 public booleancontains(Object o);32

33 int drainTo(Collection super E>c);34

35 int drainTo(Collection super E> c, intmaxElements);36 }

主要方法已经给出注释。

BlockingQueue的具体子类如下图所示:

其中最最常用的就是ArrayBlockingQueue以及LinkedBlockingQueue,我们以ArrayBlockingQueue为例详细分析一下实现过程。

三、ArrayBlockingQueue源码分析

ArrayBlockingQueue内部是以数组为容器盛放元素,并且放入和取出元素的时候使用同一个锁,也就是放入的时候不能同时取出元素。

ArrayBlockingQueue中主要属性:

1 //存储元素的数组,是个循环数组,至于为什么是循环数组下面会讲到

2 finalObject[] items;3

4 //下一次拿数据的时候的索引

5 inttakeIndex;6

7 //下一次放数据的时候的索引

8 intputIndex;9

10 //队列中已经存储元素的个数

11 intcount;12

13 //锁,只有这一把锁

14 finalReentrantLock lock;15

16 //等待拿数据的的条件对象

17 private finalCondition notEmpty;18

19 //等待放数据的的条件对象

20 private final Condition notFull;

已经给出详细注释就不一一详细解释了。

接下来我们看下构造函数;

1 public ArrayBlockingQueue(intcapacity) {2 this(capacity, false);3 }4

5 public ArrayBlockingQueue(int capacity, booleanfair) {6 if (capacity <= 0)7 throw newIllegalArgumentException();8 this.items = newObject[capacity];9 lock = newReentrantLock(fair);10 notEmpty =lock.newCondition();11 notFull =lock.newCondition();12 }

初始化的时候我们需要指定盛放元素容器的大小,并且初始化一些属性。

接下来我们分析下ArrayBlockingQueue中添加的方法:add,offer以及put方法。

先看add方法:

1 public booleanadd(E e) {2 return super.add(e);3 }

直接调用的父类的方法,我们只好去看看父类中的add方法了:

1 public booleanadd(E e) {2 if(offer(e))3 return true;4 else

5 throw new IllegalStateException("Queue full");6 }

是不是逻辑很简单,add方法内部调用了offer方法如果offer方法返回true则add直接返回true,否则抛出IllegalStateException异常。

接下来我们看下offer方法:

1 public booleanoffer(E e) {2 if (e == null) throw newNullPointerException();3 final ReentrantLock lock = this.lock;4 lock.lock();5 try{6 if (count ==items.length)7 return false;8 else{9 enqueue(e);10 return true;11 }12 } finally{13 lock.unlock();14 }15 }

第2行,检查是否为null,为null则抛出空指针异常。

3,4行加锁,保证只有一个线程操作。

6,7行检查当前容器是否已将满了,如果满了则不能再放入元素,直接返回false。

9,10行如果容器没满则执行enqueue方法放入元素,然后返回true,enqueue方法后面会分析。

13行解除锁。以上就是offer方法的逻辑,比较简单,该说的都说了。

接下来分析put方法:

1 public void put(E e) throwsInterruptedException {2 if (e == null) throw newNullPointerException();3 final ReentrantLock lock = this.lock;4 lock.lockInterruptibly();5 try{6 while (count ==items.length)7 notFull.await();8 enqueue(e);9 } finally{10 lock.unlock();11 }12 }

第4行这里调用的是lockInterruptibly方法,与lock方法相比,lockInterruptibly可以被中断,中断的时候产生InterruptedException异常。

6,7行同样判断容器是否已经满了,如果已经满了则执行wait逻辑等待,这里就是阻塞队列的核心,是不是觉得原来如此啊。

8行,如果容器有空间执行enqueue方法,向容器中加入元素。

offer与put都用到了enqueue方法向容器中加入元素,接下来我们看下enqueue方法:

1 private voidenqueue(E x) {2 //assert lock.getHoldCount() == 1;3 //assert items[putIndex] == null;

4 final Object[] items = this.items;5 items[putIndex] =x;6 if (++putIndex == items.length) putIndex = 0;7 count++;8 notEmpty.signal();9 }

4,5行就是向数组items中放入元素x。

6行,放元素的索引putIndex加1后与数组长度比较,如果达到数组长度则将putIndex置为0,相当于下一次放入元素位置为数组的第一次位置,从头开始放入元素,上面说过items是个循环数组,就是在这里体现出来的,如果我们初始化的时候设定容器items大小为10,然而我们不停放入数据也是没问题的,只不过后面加入的数据会覆盖前面的数据。

8行,唤醒取数据的线程,告诉其哥们我放入了一个数据你可以取数据了。

到这放入数据的核心部分就分析完了,是不是很简单???放入数据还有个offer(E e, long timeout, TimeUnit unit)方法,同样很简单,可以自行分析。

接下来分析取出数据的方法,取出数据主要是poll与take方法。

先来看下poll方法;

1 publicE poll() {2 final ReentrantLock lock = this.lock;3 lock.lock();4 try{5 return (count == 0) ? null: dequeue();6 } finally{7 lock.unlock();8 }9 }

2,3行同样是先锁住,保证单线程操作。

5行,判断当前线程中元素数量是否为0,如果为0则没有元素返回null,否则执行dequeue方法取出数据并返回,后续会分析dequeue方法。

7行解除锁。

poll方法是不是很简单,同样poll(long timeout, TimeUnit unit)也不难分析可自行分析。

接下来看下take方法:

1 public E take() throwsInterruptedException {2 final ReentrantLock lock = this.lock;3 lock.lockInterruptibly();4 try{5 while (count == 0)6 notEmpty.await();7 returndequeue();8 } finally{9 lock.unlock();10 }11 }

take方法也不难理解,核心就是5,6行逻辑,如果count为0也就是容器内没有数据则执行wait方法,线程一直处于等待状态,如果不为0则执行dequeue

方法取出数据。

我们再看下dequeue方法:

1 privateE dequeue() {2 //assert lock.getHoldCount() == 1;3 //assert items[takeIndex] != null;

4 final Object[] items = this.items;5 @SuppressWarnings("unchecked")6 E x =(E) items[takeIndex];7 items[takeIndex] = null;8 if (++takeIndex == items.length) takeIndex = 0;9 count--;10 if (itrs != null)11 itrs.elementDequeued();12 notFull.signal();13 returnx;14 }

6,7,13行从数组items中取出数据,并将原数组位置处置为null,最后13行处返回取出的数据。

8行,取数据索引takeIndex加1后与数组总长度比较如果达到数组长度则将takeIndex置为0,下一次从数组开始处取数据。

10,11行通知迭代器有数据取出。

12行通知放入数据的线程有数据取出了,你可以放入数据了。

好了,以上就是ArrayBlockingQueue中放入取出数据的操作源码分析,多线程中总会提到生产者消费者模式,其实用ArrayBlockingQueue实现是很简单的,ArrayBlockingQueue只是将wait,notify操作进行了封装而已。

LinkedBlockingQueue源码就不一一分析了,主要区别的LinkedBlockingQueue内部是用链表来存储数据的,并且有两把锁,放数据锁与取数据锁,也就是放入数据的线程和取出数据的线程可以同时操作LinkedBlockingQueue,而ArrayBlockingQueue中放数据线程与取数据线程是互斥的,不能同时操作,LinkedBlockingQueue初始化的时候可以不指定容器大小,如果不指定则容器大小为Integer.MAX_VALUE,而ArrayBlockingQueue则必须指定容器大小。

好了以上就是本篇全部内容了,希望对你有用。

声明:文章将会陆续搬迁到个人公众号,以后文章也会第一时间发布到个人公众号,及时获取文章内容请关注公众号

java thread queue_java线程池技术(一):ThreadFactory与BlockingQueue相关推荐

  1. java多线程编程之线程池技术全面解读

    在多线程编程时,创建线程是十分消耗资源的,当线程创建过多时,便会引发内存溢出,因此引入了线程池技术. 目录 线程池的优势 线程池的创建&使用 线程池的工作原理 线程池的参数 功能线程池 线程池 ...

  2. java 线程执行完就会回收吗_Java线程池技术Executors的这个坑你踩过吗?

    线程池技术是Java的一大特性,如果我们想要编写高并发.高吞吐的程序,线程池的技术使用是必须的.对于很多程序员来说,多线程和线程池技术都了然于胸,基本原理和使用都数量掌握,分分钟可以写出一个生产消费者 ...

  3. [转]new Thread的弊端及Java四种线程池的使用

    介绍new Thread的弊端及Java四种线程池的使用,对Android同样适用.本文是基础篇,后面会分享下线程池一些高级功能. 1.new Thread的弊端 执行一个异步任务你还只是如下new ...

  4. java 线程池技术_JAVA36计之---线程池技术

    各位兄弟姐妹,大家肯定对于java线程池比较熟悉了吧,不过这篇肯定还是会让你收获满满 654法则大家要记者---6大核心参数,5大线程池,4种拒绝策略 在Java中使用线程池,可以用ThreadPoo ...

  5. java workerdone_【架构】Java并发编程——线程池的使用

    前言 如果我们要使用线程的时候就去创建一个,这样虽然非常简便,但是就会有一个问题: 如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为 ...

  6. java并发编程——线程池的工作原理与源码解读

    2019独角兽企业重金招聘Python工程师标准>>> 线程池的简单介绍 基于多核CPU的发展,使得多线程开发日趋流行.然而线程的创建和销毁,都涉及到系统调用,比较消耗系统资源,所以 ...

  7. 万字图文 | 学会Java中的线程池,这一篇也许就够了!

    来源:一枝花算不算浪漫 线程池原理思维导图.png 前言 Java中的线程池已经不是什么神秘的技术了,相信在看的读者在项目中也都有使用过.关于线程池的文章也是数不胜数,我们站在巨人的肩膀上来再次梳理一 ...

  8. Java常见的线程池有哪些?

    1.什么是线程池 java.util.concurrent.Executors提供了一个 java.util.concurrent.Executor接口的实现用于创建线程池 多线程技术主要解决处理器单 ...

  9. Java多线程及线程池

    1.volatile 内存模型的相关概念 Java并发编程:volatile关键字解析 - Matrix海子 - 博客园 (cnblogs.com) 在JVM底层volatile是采用"内存 ...

  10. Java常用四大线程池用法以及ThreadPoolExecutor详解

    2019独角兽企业重金招聘Python工程师标准>>> 为什么用线程池? 1.创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处-理效率 2.线程并发数量过多 ...

最新文章

  1. 任务间通信的基本知识
  2. python常用的包_Python3之常用包汇总
  3. jzoj3682-Points and Segments【模型转化,欧拉回路】
  4. ubuntu下安装php redis
  5. 2018黑马39期WEB前端视频教程
  6. 网站性能并发测试工具
  7. Express中Router的使用
  8. 新手程序员之初生牛犊不怕虎
  9. Android图片之svg
  10. 《文明之光》吴军 著,读书笔记
  11. OpenCV 透射变换
  12. 微信小程序的一些新手示例(¥62)
  13. 配置文件加密 HikariDataSource
  14. IDM UltraEdit编辑器V26.00.0.48 烈火汉化64位版
  15. Bugku web(1—35)
  16. java jitter buffer_android webrtc jitter buffer大小设置
  17. 高一英语计算机课文翻译,高一必修2英语课文翻译之《WHO AM I? 》
  18. itext html转换为pdf排版错乱,使用iText库将html转换为pdf时不适用hr的Inline CSS
  19. selenium源码通读·5 |webdriver/common/action_chains.py-ActionChains类
  20. 联通家庭宽带开启ipv6

热门文章

  1. 使用ARKit编写测量应用程序代码:交互和测量
  2. 中国最伟大的现实主义诗人:杜甫的一生
  3. 【洋桃电子】STM32入门100步-01
  4. 【量化课堂】风险模型
  5. 手把手教你使用stata做竞争风险模型
  6. springboot文件上传大小限制:The field file exceeds its maximum permitted size of 1048576 bytes
  7. android 模拟器 pubg,雷电安卓模拟器怎么玩绝地求生刺激战场 PC端带你愉快吃鸡...
  8. 【前端面试之缓存】js本地缓存、浏览器缓存、服务器缓存
  9. 软件程序开发步骤有哪些?如何简单制作手机App?
  10. 程序员必备:那些实用的Chrome扩展程序