TPS00-J. 用线程池实现应用在流量暴涨时优雅降级

很多程序都要解决这样一个问题——处理一系列外来的请求。Thread- Per-Message这种设计模式是最简单的并发策略了,它为每一个请求创建一个线程。这种模式在耗时较长,受io约束,基于session或者,任务相互独立等场景下表现优于顺序处理。

但是,这种设计也有几个缺陷,包括线程创建和调用,任务处理,资源分配和释放,和频繁的上下文切换等带来的的开销。此外,攻击者可以通过一下子发起铺天盖地的请求造展开DoS攻击。系统不能优雅的降级,而是变得反应迟钝,从而导致拒绝服务。从安全的角度来看,由于一些偶现错误,一个组件可以用尽所有资源,饿死所有其他组件。

线程池允许系统在它承受力充裕的范围内处理尽可能多的请求。而不是一遇到过量的请求就挂掉。线程池通过限制初始化的工作线程数和同时运行的线程数来克服这些问题。每个支持线程池的对象都接受一个Runnable或Callable<T>的任务,并将其存储在临时队列中,直到有资源可用。因为在一个线程池的线程可以重复使用,并能快速的地加入线程池货从其中移出,管理线程的生命周期的开销被最小化。

不规范代码示例

这个示例演示了Thread-Per-Message设计模式,RequestHandler类提供了一个public static的工厂方法。调用者可以通过此方法获得Handler的实例,然后在各自的线程中调用handleRequest()处理请求。

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;class Helper {public void handle(Socket socket) {// ... }
    }
}final class RequestHandler {private final Helper helper = new Helper();private final ServerSocket server;private RequestHandler(int port) throws IOException {server = new ServerSocket(port);}public static RequestHandler newInstance() throws IOException {return new RequestHandler(0); // 自动获得端口
    }public void handleRequest() {new Thread(new Runnable() {public void run() {try {helper.handle(server.accept());} catch (IOException e) {// Forward to handler
                }}}).start();}
}

Thread-Per-Message设计模式无法让服务优雅的降级。只要稀缺资源没有耗尽,系统还是可以正常的提供服务。举例来说,虽然可以创建很多的线程,系统中打开的文件描述符是有限的,文件描述符就是稀缺资源。如果我们的稀缺资源是内存,系统会突然的停止服务。

TPS01-J. 不要在有界线程池中运行互相依赖的任务

有界线程池是指其同一时刻执行任务的线程总数有上限。需要等待其他任务完成的任务不应该放到游街线程池中执行

有一种死锁叫做线程饿死型死锁,当线程池中的所有线程都在等待一个尚未开始的任务(只是进入了线程池的内部队列),这种死锁就会发生了

这种问题很有欺骗性,因为当需要的线程较少的时候程序可以正常的运行。有时候调大线程数就可以缓解这个问题。但是,确定合适的数量通常很难

不规范代码示例(子任务有依赖关系)

下面的例子有线程饿死的风险,本例包含一个ValidationService类负责执行各种检查,比如到后台的数据库检查用户输入的字段是否存在。

fieldAggregator()接受字符串类型的可变长的参数,并且为每一个参数创建一个任务,以便执行。每个任务使用ValidateInput执行验证。

反过来,ValidateInput 类将尝试为每个请求创建一个子任务,在任务重使用 SanitizeInput 类净化输入。在同一个线程池执行所有任务。在所有任务都执行完毕前FieldAggregator() 方法一直被阻塞,当所有结果都都可用了,FieldAggregator把返回结果汇总成 StringBuilder 对象返回给给调用者。

public final class ValidationService {private final ExecutorService pool;public ValidationService(int poolSize) {pool = Executors.newFixedThreadPool(poolSize);}public void shutdown() {pool.shutdown();}public StringBuilder fieldAggregator(String... inputs)throws InterruptedException, ExecutionException {StringBuilder sb = new StringBuilder();Future<String>[] results = new Future[inputs.length]; // 保存结果
for (int i = 0; i < inputs.length; i++) { // 把任务提交到线程池
results[i] = pool.submit(new ValidateInput<String>(inputs[i], pool));}for (int i = 0; i < inputs.length; i++) { // 把结果汇总
            sb.append(results[i].get());}return sb;}
}public final class ValidateInput<V> implements Callable<V> {private final V input;private final ExecutorService pool;ValidateInput(V input, ExecutorService pool) {this.input = input;this.pool = pool;}@Overridepublic V call() throws Exception {// 如果验证失败,在此处抛出异常Future<V> future = pool.submit(new SanitizeInput<V>(input)); // Subtaskreturn (V) future.get();}
}public final class SanitizeInput<V> implements Callable<V> {private final V input;SanitizeInput(V input) {this.input = input;}@Overridepublic V call() throws Exception {// Sanitize input and returnreturn (V) input;}
}

假设池大小设置为6,调用ValidationService.fieldAggregator()方法来验证6个参数,它提交6项任务的线程池。每个任务提交相应的子任务来清理输入。只要SanitizeInput子任务执行了,验证的线程就可以返回它们结果了,这些线程可以返回他们的结果。然而,这是不可能的,因为在线程池中所有六个线程,它们都被阻塞了。此外,由于还有活跃任务, 调用shutdown()方法无法关闭线程池。

标准代码示例(互相不依赖的任务)

在标准的代码中修改了ValidateInput<V>,SanitizeInput任务和它在同一个线程中执行。这样ValidateInput和SanitizeInput就独立开来,不再需要等待另一个执行结束。对SanitizeInput也做了修改,不再实现Callable接口

public final class ValidationService {// ...public StringBuilder fieldAggregator(String... inputs)throws InterruptedException, ExecutionException {// ...for (int i = 0; i < inputs.length; i++) {// 不把线程池传进去results[i] = pool.submit(new ValidateInput<String>(inputs[i]));}// ...
    }
}//不再使用同一个线程池
public final class ValidateInput<V> implements Callable<V> {private final V input;ValidateInput(V input) {this.input = input;}@Overridepublic V call() throws Exception {// 如果验证失败,抛出异常return (V) new SanitizeInput().sanitize(input);}
}public final class SanitizeInput<V> { // 不实现Callable接口public SanitizeInput() {}public V sanitize(V input) {// 净化输入并返回return input;}
}

不规范代码(子任务)

本示例包含了一些列的子任务,运行在一个共享的线程池中。BrowserManager类调用perUser(), perUser()创建子任务来调用perProfile(), perProfile()又创建子任务来调用perTab(),最终perTab()又创建子任务调用doSomething()。BrowserManager等待所有的子任务结束。

public final class BrowserManager {private final ExecutorService pool = Executors.newFixedThreadPool(10);private final int numberOfTimes;private static AtomicInteger count = new AtomicInteger(); // count = 0public BrowserManager(int n) {numberOfTimes = n;}public void perUser() {methodInvoker(numberOfTimes, "perProfile");pool.shutdown();}public void perProfile() {methodInvoker(numberOfTimes, "perTab");}public void perTab() {methodInvoker(numberOfTimes, "doSomething");}public void doSomething() {System.out.println(count.getAndIncrement());}public void methodInvoker(int n, final String method) {final BrowserManager manager = this;Callable<Object> callable = new Callable<Object>() {public Object call() throws Exception {Method meth = manager.getClass().getMethod(method);return meth.invoke(manager);}};Collection<Callable<Object>> collection = Collections.nCopies(n, callable);try {Collection<Future<Object>> futures = pool.invokeAll(collection);} catch (InterruptedException e) {// Forward to handlerThread.currentThread().interrupt(); // Reset interrupted status
        }// ...
    }public static void main(String[] args) {BrowserManager manager = new BrowserManager(5);manager.perUser();}
}

不幸的是,这个方案很容易出现线程饿死型死锁。例如,5个perUser()任务,每个生5个perProfile()任务,每个perProfile()任务生成5个perTab()任务,线程池会被耗尽,没有剩余的线程给而perTab来调用doSomething()方法。

java.util.concurrent.ExecutorService.invokeAll(Collection<? extends Callable<Object>>) 会等待所有任务结束。

标准代码示例(CallerRunsPolicy)

在这个标准的解决方案中,对任务进行了选择和调度,,避免了线程饥饿死锁。它设置了ThreadPoolExecutor的CallerRunsPolicy,并且使用了SynchronousQueue。这个策略要求,如果线程池的线程耗尽,所有后继的任务都在其发起的线程中执行

public final class BrowserManager {private final static ThreadPoolExecutor pool = new ThreadPoolExecutor(0, 10, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());private final int numberOfTimes;private static AtomicInteger count = new AtomicInteger(); // count = 0static {pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());}// ...
}

TPS02-J. 确保提交给一个线程池的任务是可中断的

TPS03-J. 确保线程池中的任务不会默默的失败

TPS04-J. 使用线程池时要确保ThreadLocal被重置

转载于:https://www.cnblogs.com/easyroom/p/5034678.html

线程池——JAVA并发编程指南相关推荐

  1. Day841.Executor与线程池-Java 并发编程实战

    Executor与线程池 Hi,我是阿昌,今天学习记录的是关于Executor与线程池. 虽然在 Java 语言中创建线程看上去就像创建一个对象一样简单,只需要 new Thread() 就可以了,但 ...

  2. java并发测试 线程池,Java并发编程——线程池

    1.任务与执行策略间的隐性耦合 一些任务具有这样的特征:需要或者排斥某种特定的执行策略.对其他任务具有依赖性的任务,就会要求线程池足够大,来保证它锁依赖任务不必排队或者不被拒绝:采用线程限制的任务需要 ...

  3. 通过ThreadPoolExecutor与ForkJoinPool比较,分别对比其execute ,submit 等方法提交线程池任务的区别,来深入理解线程池及并发编程

    前言 以前使用线程池,对execute . submit 等方法提交线程池任务的区别比较模糊,现在通过ThreadPoolExecutor与ForkJoinPool比较,分别对比其execute ,s ...

  4. java线程池_Java 并发编程 线程池源码实战

    作者 | 马启航 杏仁后端工程师.「我头发还多,你们呢?」 一.概述 笔者在网上看了好多的关于线程池原理.源码分析相关的文章,但是说实话,没有一篇让我觉得读完之后豁然开朗,完完全全的明白线程池,要么写 ...

  5. c++ 线程池_JAVA并发编程:线程池ThreadPoolExecutor源码分析

    前面的文章已经详细分析了线程池的工作原理及其基本应用,接下来本文将从底层源码分析一下线程池的执行过程.在看源码的时候,首先带着以下两个问题去仔细阅读.一是线程池如何保证核心线程数不会被销毁,空闲线程数 ...

  6. java 批量插入clob_SpringBoot系列(16)线程池Executors并发编程之批量查询-插入数据

    在上篇文章中Debug给大家分享介绍了"Java线程池-多线程的其中一种应用场景~广播式给所有有效用户发送邮件(通知)",本篇文章我们将继续向前迈进,继续介绍并实战"线程 ...

  7. 项目使用线程池_并发编程系列1:线程池的架构实现、大小配置、及四种线程池使用...

    △ 公众号回复关键词"架构" 即可领取<1500+BAT架构及面试专题合集> 本篇为线程池系列文章之一,不经常使用线程池的童鞋,还有对几种线程的使用不甚了解的童鞋,可以 ...

  8. java开源线程池_线程池 - Java 并发性和多线程 - UDN开源文档

    线程池 线程池(Thread Pool)对于限制应用程序中同一时刻运行的线程数很有用.因为每启动一个新线程都会有相应的性能开销,每个线程都需要给栈分配一些内存等等. 我们可以把并发执行的任务传递给一个 ...

  9. JUC_2_JAVA线程(java并发编程)

    JAVA线程 创建和运行线程 方法一:直接使用Thread 方法二:使用 Runnable 配合 Thread 原理之 Thread 与 Runnable 的关系 方法三:FutureTask 配合 ...

最新文章

  1. 微服务化之前需要先无状态化和容器化
  2. IDEA创建maven项目报错解决:Failed to create a Maven project: 'C:/Users/../IdeaProjects/../pom.xml' already e
  3. springboot基于maven多模块项目搭建(直接启动webApplication)
  4. linux磁盘管理系列一:磁盘配额管理
  5. python监听文件最后修改人_Python持续监听文件变化代码实例
  6. 合并excel文件 C语言,再见Ctrl + C!合并100个Excel表格,只需30秒!
  7. Alias Method解决随机类型概率问题(别名算法)
  8. 神经网络与深度学习——TensorFlow2.0实战(笔记)(五)(Matplotlib绘图基础<散点图>python)
  9. 深耕“工业互联网”,华为云持续开花
  10. js xmlhttp ajax 超时时间,如何为XMLHttpRequest设置超时和ontimeout?
  11. linux磁盘的命令是,linux磁盘相关的命令
  12. 使用Autodesk Vault插件向导轻松创建Vault插件
  13. qt和arcgis for qt在地图上做测距(画线和显示距离,单位km)
  14. 2018最新qq的服务器地址,腾讯QQ2018正式版新功能详细介绍
  15. 基于ace admin 的左侧菜单及tab,tab支持右键菜单及与左侧菜单联动
  16. Python使用打码平台进行识别验证码
  17. SecureCRT软件的使用
  18. 你今天Git了吗?上传资源上Github最新教程!
  19. Java正则表达式email
  20. RK3288 OTG切换为Host模式

热门文章

  1. java如何实现下载_java 如何实现下载功能
  2. Tensorflow实现简单的手写数字神经网络模型
  3. 更开放的分布式事务 | Fescar 品牌升级,更名为 Seata
  4. JavaScript—获取参数(23)
  5. 大数据小视角1:从行存储到RCFile
  6. SQL SERVER 2008数据库管理与维护总结
  7. 《OpenGL ES 3.x游戏开发(上卷)》一2.4 文件I/O
  8. scheduling.quartz.CronTriggerBean has interface org.quartz.CronTrigger as super class
  9. 关于文件上传,我要向struts提点意见
  10. Pspice 使用指南(中文)