1:BlockingQueue继承关系

java.util.concurrent 包里的 BlockingQueue是一个接口, 继承Queue接口,Queue接口继承 Collection

  BlockingQueue----->Queue-->Collection

图:

队列的特点是:先进先出(FIFO)

2:BlockingQueue的方法

BlockingQueue 具有 4 组不同的方法用于插入、移除以及对队列中的元素进行检查。如果请求的操作不能得到立即执行的话,每个方法的表现也不同。这些方法如下:

  抛出异常 特殊值 阻塞 超时
插入 add(e) offer(e) put(e) offer(e, time, unit)
移除 remove() poll() take() poll(time, unit)
检查 element() peek() 不可用 不可用
 

四组不同的行为方式解释:

1(异常)

如果试图的操作无法立即执行,抛一个异常。

2(特定值)

如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。

3(阻塞)

如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。

4(超时)

如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

1.首先是springBoot的项目框架如下:

2.业务测试流程涉及的类,如下

BusinessThread 类

package com.springboot.demo.Threads;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

/**
 * Created by Administrator on 2018/5/9.
 */
@Component
@Scope("prototype")//spring 多例
public class BusinessThread implements Runnable{

private String acceptStr;

public BusinessThread(String acceptStr) {
        this.acceptStr = acceptStr;
    }

public String getAcceptStr() {
        return acceptStr;
    }

public void setAcceptStr(String acceptStr) {
        this.acceptStr = acceptStr;
    }

@Override
    public void run() {
        //业务操作
        System.out.println("多线程已经处理订单插入系统,订单号:"+acceptStr);

//线程阻塞
        /*try {
            Thread.sleep(1000);
            System.out.println("多线程已经处理订单插入系统,订单号:"+acceptStr);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
    }
}

TestThreadPoolManager 类

package com.springboot.demo.Threads;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Queue;
import java.util.concurrent.*;

/**
 * Created by Administrator on 2018/5/10.
 */
@Component
public class TestThreadPoolManager implements BeanFactoryAware {

//用于从IOC里取对象
    private BeanFactory factory; //如果实现Runnable的类是通过spring的application.xml文件进行注入,可通过 factory.getBean()获取,这里只是提一下

// 线程池维护线程的最少数量
    private final static int CORE_POOL_SIZE = 2;
    // 线程池维护线程的最大数量
    private final static int MAX_POOL_SIZE = 10;
    // 线程池维护线程所允许的空闲时间
    private final static int KEEP_ALIVE_TIME = 0;
    // 线程池所使用的缓冲队列大小
    private final static int WORK_QUEUE_SIZE = 50;

@Override
    public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
        factory = beanFactory;
    }

/**
     * 用于储存在队列中的订单,防止重复提交,在真实场景中,可用redis代替 验证重复
     */
    Map<String, Object> cacheMap = new ConcurrentHashMap<>();

/**
     * 订单的缓冲队列,当线程池满了,则将订单存入到此缓冲队列
     */
    Queue<Object> msgQueue = new LinkedBlockingQueue<Object>();

/**
     * 当线程池的容量满了,执行下面代码,将订单存入到缓冲队列
     */
    final RejectedExecutionHandler handler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            //订单加入到缓冲队列
            msgQueue.offer(((BusinessThread) r).getAcceptStr());
            System.out.println("系统任务太忙了,把此订单交给(调度线程池)逐一处理,订单号:" + ((BusinessThread) r).getAcceptStr());
        }
    };

/**创建线程池*/
   final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new ArrayBlockingQueue(WORK_QUEUE_SIZE), this.handler);

/**将任务加入订单线程池*/
    public void addOrders(String orderId){
        System.out.println("此订单准备添加到线程池,订单号:" + orderId);
        //验证当前进入的订单是否已经存在
        if (cacheMap.get(orderId) == null) {
            cacheMap.put(orderId, new Object());
            BusinessThread businessThread = new BusinessThread(orderId);
            threadPool.execute(businessThread);
        }
    }

/**
     * 线程池的定时任务----> 称为(调度线程池)。此线程池支持 定时以及周期性执行任务的需求。
     */
    final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

/**
     * 检查(调度线程池),每秒执行一次,查看订单的缓冲队列是否有 订单记录,则重新加入到线程池
     */
    final ScheduledFuture scheduledFuture = scheduler.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
            //判断缓冲队列是否存在记录
            if(!msgQueue.isEmpty()){
                //当线程池的队列容量少于WORK_QUEUE_SIZE,则开始把缓冲队列的订单 加入到 线程池
                if (threadPool.getQueue().size() < WORK_QUEUE_SIZE) {
                    String orderId = (String) msgQueue.poll();
                    BusinessThread businessThread = new BusinessThread(orderId);
                    threadPool.execute(businessThread);
                    System.out.println("(调度线程池)缓冲队列出现订单业务,重新添加到线程池,订单号:"+orderId);
                }
            }
        }
    }, 0, 1, TimeUnit.SECONDS);

/**获取消息缓冲队列*/
    public Queue<Object> getMsgQueue() {
        return msgQueue;
    }

/**终止订单线程池+调度线程池*/
    public void shutdown() {
        //true表示如果定时任务在执行,立即中止,false则等待任务结束后再停止
        System.out.println("终止订单线程池+调度线程池:"+scheduledFuture.cancel(false));
        scheduler.shutdown();
        threadPool.shutdown();

}
}
TestController 类

package com.springboot.demo;

import com.springboot.demo.Threads.TestThreadPoolManager;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

import java.util.Queue;
import java.util.UUID;

/**
 * Created by Administrator on 2018/5/9.
 */
@RestController
public class TestController {

@Autowired
    TestThreadPoolManager testThreadPoolManager;

/**
     * 测试模拟下单请求 入口
     * @param id
     * @return
     */
    @GetMapping("/start/{id}")
    public String start(@PathVariable Long id) {
        //模拟的随机数
        String orderNo = System.currentTimeMillis() + UUID.randomUUID().toString();

testThreadPoolManager.addOrders(orderNo);

return "Test ThreadPoolExecutor start";
    }

/**
     * 停止服务
     * @param id
     * @return
     */
    @GetMapping("/end/{id}")
    public String end(@PathVariable Long id) {

testThreadPoolManager.shutdown();

Queue q = testThreadPoolManager.getMsgQueue();
        System.out.println("关闭了线程服务,还有未处理的信息条数:" + q.size());
        return "Test ThreadPoolExecutor start";
    }
}

ThreadPoolExecutor线程池 + Queue队列相关推荐

  1. SpringBoot 引入线程池+Queue缓冲队列实现高并发下单业务

    点击关注公众号,利用碎片时间学习 主要是自己在项目中(中小型项目) 有支付下单业务(只是办理VIP,没有涉及到商品库存),目前用户量还没有上来,目前没有出现问题,但是想到如果用户量变大,下单并发量变大 ...

  2. 【Java 并发编程】线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )

    文章目录 一.线程池阻塞队列 二.拒绝策略 三.使用 ThreadPoolExecutor 自定义线程池参数 一.线程池阻塞队列 线程池阻塞队列是线程池创建的第 555 个参数 : BlockingQ ...

  3. Java Executor源码解析(3)—ThreadPoolExecutor线程池execute核心方法源码【一万字】

    基于JDK1.8详细介绍了ThreadPoolExecutor线程池的execute方法源码! 上一篇文章中,我们介绍了:Java Executor源码解析(2)-ThreadPoolExecutor ...

  4. ThreadPoolExecutor线程池核心参数详解

    理解ThreadPoolExecutor线程池的corePoolSize.maximumPoolSize和poolSize 我们知道,受限于硬件.内存和性能,我们不可能无限制的创建任意数量的线程,因为 ...

  5. 手写一个线程池,带你学习ThreadPoolExecutor线程池实现原理

    摘要:从手写线程池开始,逐步的分析这些代码在Java的线程池中是如何实现的. 本文分享自华为云社区<手写线程池,对照学习ThreadPoolExecutor线程池实现原理!>,作者:小傅哥 ...

  6. Java多线程学习总结(4)——ThreadPoolExecutor 线程池的拒绝策略学习总结

    前言 谈到java的线程池最熟悉的莫过于ExecutorService接口了,jdk1.5新增的java.util.concurrent包下的这个api,大大的简化了多线程代码的开发.而不论你用Fix ...

  7. ThreadPoolExecutor线程池原理

    ThreadPoolExecutor线程池原理 线程池原理 1. 线程池的简单介绍 1.1 线程池是什么 1.2 线程池解决的核心问题是什么 2. 线程池的实现原理 2.1 线程池的执行流程 2.2 ...

  8. ThreadPoolExecutor线程池,shutdown和shutdownNow关闭线程池方式对比,以及确保线程池能够彻底关闭的一种方式

    1. ThreadPoolExecutor线程池 1.1 创建线程池,构造方法的几个参数说明及创建如下. 1.2 shutdown方式关闭线程池 a. 空闲且能interrupt表示该线程处于阻塞等待 ...

  9. 13.ThreadPoolExecutor线程池之submit方法

    jdk1.7.0_79  在上一篇<ThreadPoolExecutor线程池原理及其execute方法>中提到了线程池ThreadPoolExecutor的原理以及它的execute方法 ...

最新文章

  1. ICRA 2021 | UPSLAM:联合全景SLAM
  2. 马斯克遭“天劫”:40颗星链卫星葬身地磁风暴,数千万美元打了水漂
  3. 马拉车(manacher)算法——最长回文(hdu3068)
  4. HTTP 遭 Google 抛弃,开发者该如何应对?
  5. 基于机器视觉的Data Matrix二维码识别
  6. 恶搞c语言小程序,用C语言做的 一个整人的小程序
  7. 2020手机cpu天梯图
  8. Boost:shared_memory_object --- 共享内存
  9. android 自定view 网状结构图
  10. bim综合建模插件 进行碰撞检查只需六步!
  11. 大数据面试题以及答案整理(面试必备)
  12. R语言:从 csv 文件中读取数据,然后将数据写入 csv 文件
  13. CyclicBarrier让多线程齐步走
  14. [ 前端开发 ] label标签的使用
  15. Uni-app的Dcloud市场插件总结使用(一)下拉模糊查询
  16. mysql slow设置_mysql slow log设置
  17. 用c语言计算正四棱锥的体积,《计算机图形学》习题与解答.doc
  18. 批处理判断文件夹是否为空
  19. 主机屋 linux,如何主机屋中发布网站?
  20. computed 的get 和set

热门文章

  1. C学习杂记(六)%2.0f打印输出宽度
  2. leetcode 131. 分割回文串 思考分析
  3. c语言 函数的参数传递示例_nexttoward()函数以及C ++中的示例
  4. linux下怎么查kill某个进程,Linux下查询进程PS或者杀死进程kill的小技巧
  5. 循环链表(代码、分析、汇编)
  6. A+B Problem III -- ACM解决方法
  7. LeetCode单链表题目测试代码(只需添加对应题目,本地即可debug)
  8. 关于argc和argv的输出
  9. linux操作系统之条件变量
  10. linux gcc 制作动态库