如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用
需求是在一个大数据量的表中按条件查询出数据后做相应的业务。我是使用的java线程池ThreadPoolExecutor,实现分批次去查询,查询到数据后,又分多个线程去做业务。
线程池类为 java.util.concurrent.ThreadPoolExecutor,常用构造方法为:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略
一个任务通过 execute(Runnable)方法被添加到线程池,任务就是一个 Runnable类型的对象,任务的执行方法就是Runnable类型对象的run()方法。
线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。
这样的过程说明,并不是先加入任务就一定会先执行。假设队列大小为 10,corePoolSize 为 3,maximumPoolSize 为 6,那么当加入 20 个任务时,执行的顺序就是这样的:首先执行任务 1、2、3,然后任务 4~13 被放入队列。这时候队列满了,任务 14、15、16 会被马上执行,而任务 17~20 则会抛出异常。最终顺序是:1、2、3、14、15、16、4、5、6、7、8、9、10、11、12、13。
下面来看具体的代码(代码中会有部分代码以+++表示不方便各位查看的):
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(4, 10, 3, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100), new ThreadPoolExecutor.CallerRunsPolicy());
int pageSize = 1000; // 每次查询交易条数
int handleSize = 200; // 线程一次性处理的交易条数
int handleCount = 0;
int transCount = +++mapper.getTransCount(batchDate, +);//根据条件去查询需要做业务的数据条数,查询条数的sql语句快
logger.info(MessageFormat.format("[+++日期:[{0}], 待+++记录条数为:[{1}]", batchDate, transCount));//MessageFormat.format是日志的一个方法,推荐大家这么使用
List<+++> tranList = null;
while (handleCount < transCount)
{
tranList = +++mapper.getTransList(batchDate, null, handleCount, pageSize);//int offset, int limit);
if (tranList == null || tranList.size() == 0)
{
logger.info(MessageFormat.format(++++
handleCount += pageSize;
continue;
}
int splitCount = (tranList.size() / handleSize) + (tranList.size() % handleSize == 0 ? 0 : 1);
CountDownLatch latch = new CountDownLatch(splitCount);
for (int i = 0; i < splitCount; i++)
{
int toIndex = (i + 1) * handleSize;
if (i == splitCount - 1)
{
toIndex = tranList.size();
}
List<NpcTransaction> subList = tranList.subList(i * handleSize, toIndex);
threadPool.execute(new +++Thread(+++Manager, subList, batchDate, latch));//塞入到线程池,执行的方法是+++Thread类中的run方法
}
handleCount += pageSize;
try
{
latch.await(5, TimeUnit.MINUTES);
}
catch (InterruptedException e)
{
logger.error(getClass().getName() + " doTask fail.", e);
}
}
下面是threadPool.execute(new +++Thread...中的thread类
public class +++Thread implements Runnable
{
private Logger logger =
private List<NpcTransaction> trans;
private CountDownLatch latch;
private String batchDate;
private +++Manager
public +++Thread(+++Manager +++Manager, List<+++> trans, String batchDate,
CountDownLatch latch)
{
this.+++Manager = +++Manager;
this.trans = trans;
this.batchDate = batchDate;
this.latch = latch;
}
/**
* 重载方法
*/
@Override
public void run()
{
int saveCount = 0;
try
{
saveCount = +++Manager.save+++Record(trans);//
}
catch (Exception e)
{
logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++异常:[{1}]", batchDate, e.getMessage()));
e.printStackTrace();
}
if (saveCount != trans.size())
{
logger.error(MessageFormat.format("[+++job] 跑批日期:[{0}], +++异常,++成功条数:[{1}],预期条数:[{2}]", batchDate,
saveCount, trans.size()));
}
latch.countDown();
}
public List<++Transaction> getTrans()
{
return trans;
}
public void setTrans(List<++Transaction> trans)
{
this.trans = trans;
}
/**
* 获取 latch
*
* @return 返回 latch
*/
public CountDownLatch getLatch()
{
return latch;
}
/**
* 设置 latch
*
* @param 对latch进行赋值
*/
public void setLatch(CountDownLatch latch)
{
this.latch = latch;
}
/**
* 获取 batchDate
*
* @return 返回 batchDate
*/
public String getBatchDate()
{
return batchDate;
}
/**
* 设置 batchDate
*
* @param 对batchDate进行赋值
*/
public void setBatchDate(String batchDate)
{
this.batchDate = batchDate;
}
}
都要写get,set方法,latch.countDown();这个最好写在finally中
关于CountDownLatch这个,我下面简单的说一下:
CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
主要方法
public CountDownLatch(int count);
public void countDown();
public void await() throws InterruptedException
构造方法参数指定了计数的次数
countDown方法,当前线程调用此方法,则计数减一
awaint方法,调用此方法会一直阻塞当前线程,直到计时器的值为0
此处用计数器,因为一组1000条数据通过5个线程去执行,这组执行完再进行第二组以及其后组的1000条数据操作。
如何用java线程池做分批次查询处理 java线程池ThreadPoolExecutor的使用相关推荐
- Java使用多线程做批处理(查询大量数据)
Java使用多线程做批处理(查询大量数据) Java使用多线程做批处理(查询大量数据) Java使用多线程做批处理(查询大量数据) 前言背景 Java使用多线程的条件 操作流程 前言背景 什么是进程: ...
- Java中内部做监视器_监视器模式 java
广告 精选中小企业最主流配置,适用于web应用场景.小程序及简单移动App,所有机型免费分配公网IP和50G高性能云硬盘(系统盘). mutex实际上就是对象本身 } 复制代码什么是监视器模式 jav ...
- java 和mysql做Android_基于Android和Java后台的朋友圈的设计和实现
我的CSDN: ListerCi 我的简书: 东方未曦 前言 这是秋招前做的一个应用,当时是想通过一个完整的项目来向面试官展现项目设计能力和实战能力,不过直到秋招最后很多面试官都没问这个,想来是他们觉 ...
- java程序如何做调查问卷_《Java程序设计》课程准备之问卷调查
一.你对自己的未来有什么规划?做了哪些准备? 答:未来就是找个好工作,在保证自己与父母生活条件良好的基础上,进一步的提高精神上的需求.如:旅游度假,支持更多业余爱好等.准备就是:好好学习,好好运动,好 ...
- Java工程师是做什么的?学习java能干什么?
由于广泛的市场前景,较高的薪资待遇,让Java工程师成为非常有前途的职位,那么Java工程师主要是做什么的呢?让我们跟着小编JavaEE一起来简单了解下. Java工程师,直白点来说,就好比你在做家具 ...
- java配合什么做前端_作为一个java程序员 ,前端的技术需要达到什么水平?
一个后端程序员,需要掌握前端技术吗? JSP时代 8年前,刚刚进入编程这个行业,当时的Web开发使用古老的SSH框架+JSP.那个时候,几乎所有的Java程序员都要懂得如何写JavaScript.如何 ...
- Java就业方向有哪些?学习Java开发能做什么?
学习Java开发都能做什么?Java主要应用在B/S和C/S领域.由于科技的不断发展,B/S将不足以满足社会需求,C/S将会是社会发展趋势.随着Servlet技术的使用,Java向Web移动设备方向挺 ...
- Java能做什么?学完Java可以从事什么工作呢?
如果你是一个Java初学者,你可能对Java应用在什么地方感到困惑.除了"马里奥""贪吃蛇"等经典游戏,其他领域好像也找不到Java的踪迹,那么Java究竟能做 ...
- Java工程师是做什么的?
随着电子产业和互联网行业的发展,Java技术的应用也越来越广泛了,Java工程师也就变成了一个非常受欢迎的岗位.因为市场发展前景好,工作环境也很不错,让许多人都想要转行来做Java工程师.那么Java ...
- java中级程序员面试题_中级Java程序员常见面试题汇总
下面是一些中级Java程序员常见面试题汇总,你可以用它来好好准备面试. 什么是线程? 线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位.程序员可以通过它进行多处理器 ...
最新文章
- R语言R-markdown实战示例、R-markdown、R-markdown生成结果汇报的HTML文件
- java hibernate详细_Java事务管理学习之Hibernate详细介绍
- 【Linux】43.ubuntu18.04安装搜狗输入法不能正常使用
- 如何增加儿童产品中的趣味性?
- HarmonyOS之常用组件RoundProgressBar的功能和使用
- 用了这么多年的泛型,你对它到底有多了解?
- js正则表达式限制文本框只能输入数字,小数点,英文字母
- hough变换连接边缘matlab,边缘检测与Hough变换实验报告 Matlab - 图文
- Java 反射 set get
- java 时间戳验证_关于Java:在时间戳服务器上使用时间戳和身份验证对jar进行签名...
- 机器学习入门(二):工具与框架的选择
- 微软lync 持续服务器,Lync Server 2013 Front-End服务器服务不会启动很长时间
- pytorch def __init__(self, num_classes, bkg_label, top_k, conf_thresh, nms_thresh):
- PHP endif、endwhile、endfor、冒号、switch、foreach使用介绍、Heredoc 、Nowdoc
- 利用Power Design 进行数据库设计(超详细)
- 局域网内部分电脑报错“指定的网络名不再可用”疑难杂症解决
- python判断整数浮点数_Python初识2 整数与浮点数
- VMware Workstation 14中文破解版下载(附密钥)(笔记)
- 分享一款上班摸鱼神器,再也不怕领导突然出现在身后了~
- 美国Java程序员收入和疫情期间面试心得体会