java并发编程(1)并发程序的取消于关闭
一、任务的取消于关闭
1、中断Thread
1.每个线程都有一个boolean类型的中断状态。true则是中断状态中
interrupt:发出中断请求;isInterrupt:返回中断状态;interrupted:清除中断状态
2.JVM中的阻塞方法会检查线程中断状态,其响应方法为:清除中断状态,抛出InterruptedException异常,表示阻塞操作被中断结束 ;但JVM不保证阻塞方法何时检测到线程的中断状态
3.中断的理解:不会真正的中断一个正在运行的线程,而只是发出请求,具体的中断由任务自己处理
通过中断来取消线程通常是最好的方法
public class PrimeProducer extends Thread {private final BlockingQueue<BigInteger> queue;PrimeProducer(BlockingQueue<BigInteger> queue) {this.queue = queue;}public void run() {try {BigInteger p = BigInteger.ONE;while (!Thread.currentThread().isInterrupted())queue.put(p = p.nextProbablePrime());} catch (InterruptedException consumed) {/* Allow thread to exit *///如果捕获到中断异常,则由线程自己退出 }}public void cancel() {interrupt();} }
2、不可中断的阻塞的中断
如:Socket I/O操作,即使设置了中断请求,也不会中断,但是close 套接字,会使其抛出异常,达到中断效果;因此我们要重写中断方法
//自定义callable实现类 public abstract class SocketUsingTask <T> implements CancellableTask<T> {private Socket socket;protected synchronized void setSocket(Socket s) {socket = s;}//取消方法public synchronized void cancel() {try {if (socket != null)socket.close();} catch (IOException ignored) {}}//新建实例的方法public RunnableFuture<T> newTask() {return new FutureTask<T>(this) {public boolean cancel(boolean mayInterruptIfRunning) {try {SocketUsingTask.this.cancel();} finally {return super.cancel(mayInterruptIfRunning);}}};} }//自定义callable接口 interface CancellableTask <T> extends Callable<T> {void cancel();RunnableFuture<T> newTask(); } //自定义 执行池 class CancellingExecutor extends ThreadPoolExecutor {......//通过改写newTaskFor 返回自己的Callableprotected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {if (callable instanceof CancellableTask)return ((CancellableTask<T>) callable).newTask();elsereturn super.newTaskFor(callable);} }
3、通过自定义取消计时任务
private static final ScheduledExecutorService cancelExec = newScheduledThreadPool(1);/**** @param r 任务* @param timeout 超时时间* @param unit TimeUnit* @throws InterruptedException*/public static void timedRun(final Runnable r,long timeout, TimeUnit unit) throws InterruptedException {class RethrowableTask implements Runnable {//通过一个volatile变量,来存储线程是否异常private volatile Throwable t;public void run() {try {r.run();} catch (Throwable t) {this.t = t;}}private void rethrow() {if (t != null)throw launderThrowable(t);}}RethrowableTask task = new RethrowableTask();final Thread taskThread = new Thread(task);taskThread.start();//延时timeout个unit单位后 执行线程中断cancelExec.schedule(() -> taskThread.interrupt(), timeout, unit);//无论如何都等待;如果线程不响应中断,那么通过join等待任务线程timeout时间后 不再等待,回到调用者线程 taskThread.join(unit.toMillis(timeout));//如果 任务线程中有异常,则抛出 task.rethrow();}
注意:依赖于join,任务超时join退出 和 任务正常join推出 无法进行判断
4、通过Futrue来实现取消计时任务
private static final ExecutorService taskExec = Executors.newCachedThreadPool();public static void timedRun(Runnable r,long timeout, TimeUnit unit) throws InterruptedException {Future<?> task = taskExec.submit(r);try {//通过Futrue.get(超时时间),捕获相应的异常来处理计时运行和取消任务 task.get(timeout, unit);} catch (TimeoutException e) {// task will be cancelled below} catch (ExecutionException e) {// exception thrown in task; rethrowthrow launderThrowable(e.getCause());} finally {// Harmless if task already completedtask.cancel(true); // interrupt if running }}
二、停止基于线程的服务
1.通常,服务不能直接中断,造成服务数据丢失
2.线程池服务也不能直接中断
1、日志服务
标准的生产者,消费者模式
public class LogService {private final BlockingQueue<String> queue;private final LoggerThread loggerThread;private final PrintWriter writer;private boolean isShutdown;private int reservations;public LogService(Writer writer) {this.queue = new LinkedBlockingQueue<String>();this.loggerThread = new LoggerThread();this.writer = new PrintWriter(writer);}public void start() {loggerThread.start();}public void stop() {synchronized (this) {isShutdown = true;}loggerThread.interrupt(); //发出中断 }public void log(String msg) throws InterruptedException {synchronized (this) {if (isShutdown){throw new IllegalStateException(/*...*/);}++reservations; //保存的正确的在队列中的日志数量 }queue.put(msg); //将日志放入队列 }private class LoggerThread extends Thread {public void run() {try {while (true) {try {synchronized (LogService.this) {if (isShutdown && reservations == 0) {break;}}String msg = queue.take();synchronized (LogService.this) {--reservations;}writer.println(msg);} catch (InterruptedException e) { /* retry *///捕获了中断请求,但为了将剩余日志输出,不做处理,直到计数器 == 0时,关闭 }}} finally {writer.close();}}} }
2、ExecutorService中断
shutDown和shutDownNow
通常,将ExecetorService封装;如LogService,使其具有自己的生命周期方法
shutDownNow的局限性:不知道当前池中的线程状态,返回未开始的任务,但不能返回已开始未结束的任务
public class TrackingExecutor extends AbstractExecutorService {private final ExecutorService exec;private final Set<Runnable> tasksCancelledAtShutdown =Collections.synchronizedSet(new HashSet<Runnable>());public TrackingExecutor() {exec = Executors.newSingleThreadExecutor();}/*public TrackingExecutor(ExecutorService exec) {this.exec = exec;}*/public void shutdown() {exec.shutdown();}public List<Runnable> shutdownNow() {return exec.shutdownNow();}public boolean isShutdown() {return exec.isShutdown();}public boolean isTerminated() {return exec.isTerminated();}public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {return exec.awaitTermination(timeout, unit);}public List<Runnable> getCancelledTasks() {if (!exec.isTerminated())throw new IllegalStateException(/*...*/);return new ArrayList<Runnable>(tasksCancelledAtShutdown);}public void execute(final Runnable runnable) {exec.execute(new Runnable() {public void run() {try {runnable.run();} finally {if (isShutdown()&& Thread.currentThread().isInterrupted())tasksCancelledAtShutdown.add(runnable);}}});}@Testpublic void test() throws InterruptedException {ExecutorService executorService = Executors.newSingleThreadExecutor();TrackingExecutor trackingExecutor = new TrackingExecutor();trackingExecutor.execute(new Runnable() {@Overridepublic void run() {try {Thread.sleep(2000);System.err.println("123123");} catch (InterruptedException e) {Thread.currentThread().interrupt(); //设置状态 或继续抛,在execute中处理 e.printStackTrace();} finally {}}});List<Runnable> runnables = trackingExecutor.shutdownNow();trackingExecutor.awaitTermination(10,TimeUnit.SECONDS);List<Runnable> cancelledTasks = trackingExecutor.getCancelledTasks();System.err.println(cancelledTasks.size());} }
三、处理非正常线程终止
1.未捕获的Exception导致的线程终止
1.手动处理未捕获的异常
2.通过Thread的API UncaughExceptionHandler,能检测出某个线程又遇见未捕获而导致异常终止
注意:默认是将异常的的堆栈信息 输出到控制台;自定义的Handler:implements Thread.UncaughExceptionHandler覆写方法
可以为每个线程设置,也可以设置一个全局的ThreadGroup
Thread.setUncaughtExceptionHandler/Thread.setDefaultUncaughtExceptionHandler
2.JVM退出、守护线程等
转载于:https://www.cnblogs.com/zhangxinly/p/6905983.html
java并发编程(1)并发程序的取消于关闭相关推荐
- java 并发编程第七章:取消和关闭
Java没有提供任何机制来安全地终止线程,虽然Thread.stop和suspend等方法提供了这样的机制,但是存在严重的缺陷,应该避免使用这些方法.但是Java提供了中断Interruption机制 ...
- 【转】Java并发编程:并发容器之ConcurrentHashMap
JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能.因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了 ...
- Java并发编程:并发容器之CopyOnWriteArrayList(转载)
Java并发编程:并发容器之CopyOnWriteArrayList(转载) 原文链接: http://ifeve.com/java-copy-on-write/ Copy-On-Write简称COW ...
- 【Java并发编程】并发编程大合集
转载请注明出处:http://blog.csdn.net/ns_code/article/details/17539599 为了方便各位网友学习以及方便自己复习之用,将Java并发编程系列内容系列内容 ...
- 【檀越剑指大厂--并发编程】并发编程总结
并发编程 一.并发基础 1.什么是并行和并发? 并行,表示两个线程同时(同一时间)做事情. 并发,表示一会做这个事情,一会做另一个事情,存在着调度. 单核 CPU 不可能存在并行(微观上). 2.什么 ...
- 并发编程——JUC并发编程知识脑图
摘要 并发编程在软件编程中尤为突出和重要,在当今面试或工作中也是不可缺少的.作为一名高级java开发工程师,并发编程的技能已经成为了重要的一项.本博文将详细介绍并发编程中的知识点和知识脑图,帮助大家更 ...
- 学习笔记:Java 并发编程⑥_并发工具_JUC
若文章内容或图片失效,请留言反馈. 部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 视频链接:https://www.bilibili.com/video/av81461839 配套资料: ...
- 并发编程-02并发基础CPU多级缓存和Java内存模型JMM
文章目录 CPU多级缓存 CPU多级缓存概述 CPU 多级缓存-缓存一致性协议MESI CPU 多级缓存-乱序执行优化-重排序 JAVA内存模型 (JMM) 计算机硬件架构简易图示 JAVA内存模型与 ...
- Java并发编程以及并发须知的几个概念:什么是线程安全?
众所周知,在Java的知识体系中,并发编程是非常重要的一环,也是面试的必问题,一个好的Java程序员是必须对并发编程这块有所了解的.为了追求成为一个好的Java程序员,我决定从今天开始死磕Java的并 ...
- Java并发编程进阶——并发锁
1 JAVA 多线程锁介绍 1.1 悲观锁 定义:悲观锁指对数据被外界修改持保守态度,认为数据很容易就会被其他线程修改(很悲观),所以在数据被处理前先对数据进行加锁,并在整个数据处理过程中,使数据处于 ...
最新文章
- 使用字符串定界符(标准C ++)在C ++中解析(拆分)字符串
- RxJava+Retrofit+OkHttp深入浅出-终极封装四(多文件下载之断点续传)
- 使用Navicat for Oracle新建用户无法登陆(用户名大小写问题)
- 创业三年来的一些感想 - 创业篇1
- php 字符串比较的规则,PHP字符串比较函数strcmp()与strcasecmp()的用法介绍
- 线性代数拾遗(五):矩阵变换的应用
- gitlab修改管理员密码流程
- 随想录(编写简单资源管理代码)
- 华为全球最快 AI 训练集群 Atlas 900 诞生!
- oracle数据库导出和oracle导入数据的二种方法(oracle导入导出数据)
- H5 --力导向图、关系图谱
- 有车的朋友注意了!全语音识别,车载微信终于来了!
- [转载] api地理编码_通过地理编码API使您的数据更有意义
- ZOJ 1010. Area 解题报告
- ios10前台收到推送_APP在前台收到推送消息时也会弹出提醒?
- UV Mapping(UV贴图)
- Filebeat log @timestamp处理
- Git的基本使用(用户初始化配置、新建代码库、把文件提交到缓存区、把文件提交到本地仓库等)
- 压力测试-Jmeter脚本录制方案
- 科学探索奖名单揭晓:北大数学「黄金一代」袁新意上榜,首现90后获奖人