面试官问:“在项目中用过多线程吗?”你就把这个案例讲给他听!
文章目录
- 多线程开发实例
- 应用背景
- 设计要点
- 防止重复
- 失败机制
- 线程池选择
- 核心代码
- 对线面试官
- 面试官:先从最简单的开始,说说什么是线程吧
- 面试官:说说Java里怎么创建线程吧
- 面试官:说说线程的生命周期和状态
- 面试官:我看你提到了线程阻塞,那你再说说线程死锁吧
- 面试官:怎么避免死锁呢?
- 面试官:我看你的例子里用到了synchronized,说说 synchronized的用法吧
- 面试官:除了使用synchronized,还有什么办法来加锁吗?详细说一下
- 面试官:说说synchronized和Lock的区别
- 面试官:你提到了synchronized基于jvm层面,对这个有了解吗?
- synchronized的优化能说一说吗?
- 面试官:说一下CAS
- 面试官:CAS会导致什么问题?
- 面试官:能说一下说下ReentrantLock原理吗
- 面试官:能说一下AQS吗
- 面试官:能说一下Semaphore/CountDownLatch/CyclicBarrier吗
- volatile原理知道吗?
- 面试官:说说你对Java内存模型(JMM)的理解,为什么要用JMM
- 面试官:看你用到了线程池,能说说为什么吗
- 面试官:能说一下线程池的核心参数吗?
- 面试官:完整说一下线程池的工作流程
- 面试官:拒绝策略有哪些
- 面试官:说一下你的核心线程数是怎么选的
- 面试官:说一下有哪些常见阻塞队列
- 面试官:说一下有哪几种常见的线程池吧
在面试当中,有时候会问到你在项目中用过多线程么?
对于普通的应届生或者工作时间不长的初级开发 ???—— crud仔流下了没有技术的眼泪。
博主这里整理了项目中用到了多线程的一个简单的实例,希望能对你有所启发。
多线程开发实例
应用背景
应用的背景非常简单,博主做的项目是一个审核类的项目,审核的数据需要推送给第三方监管系统,这只是一个很简单的对接,但是存在一个问题。
我们需要推送的数据大概三十万条,但是第三方监管提供的接口只支持单条推送(别问为什么不支持批量,问就是没讨撕论比好过)。可以估算一下,三十万条数据,一条数据按3秒算,大概需要250(为什么恰好会是这个数)个小时。
所以就考虑到引入多线程来进行并发操作,降低数据推送的时间,提高数据推送的实时性。
设计要点
防止重复
我们推送给第三方的数据肯定是不能重复推送的,必须要有一个机制保证各个线程推送数据的隔离。
这里有两个思路:
- 将所有数据取到集合(内存)中,然后进行切割,每个线程推送不同段的数据
- 利用 数据库分页的方式,每个线程取 [start,limit] 区间的数据推送,我们需要保证start的一致性
这里采用了第二种方式,因为考虑到可能数据量后续会继续增加,把所有数据都加载到内存中,可能会有比较大的内存占用。
失败机制
我们还得考虑到线程推送数据失败的情况。
如果是自己的系统,我们可以把多线程调用的方法抽出来加一个事务,一个线程异常,整体回滚。
但是是和第三方的对接,我们都没法做事务的,所以,我们采用了直接在数据库记录失败状态的方法,可以在后面用其它方式处理失败的数据。
线程池选择
在实际使用中,我们肯定是要用到线程池来管理线程,关于线程池,我们常用 ThreadPoolExecutor提供的线程池服务,SpringBoot中同样也提供了线程池异步的方式,虽然SprignBoot异步可能更方便一点,但是使用ThreadPoolExecutor更加直观地控制线程池,所以我们直接使用ThreadPoolExecutor构造方法创建线程池。
大概的技术设计示意图:
核心代码
上面叭叭了一堆,到了show you code的环节了。我将项目里的代码抽取出来,简化出了一个示例。
核心代码如下:
/*** @Author 三分恶* @Date 2021/3/5* @Description*/
@Service
public class PushProcessServiceImpl implements PushProcessService {@Autowiredprivate PushUtil pushUtil;@Autowiredprivate PushProcessMapper pushProcessMapper;private final static Logger logger = LoggerFactory.getLogger(PushProcessServiceImpl.class);//每个线程每次查询的条数private static final Integer LIMIT = 5000;//起的线程数private static final Integer THREAD_NUM = 5;//创建线程池ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));@Overridepublic void pushData() throws ExecutionException, InterruptedException {//计数器,需要保证线程安全int count = 0;//未推送数据总数Integer total = pushProcessMapper.countPushRecordsByState(0);logger.info("未推送数据条数:{}", total);//计算需要多少轮int num = total / (LIMIT * THREAD_NUM) + 1;logger.info("要经过的轮数:{}", num);//统计总共推送成功的数据条数int totalSuccessCount = 0;for (int i = 0; i < num; i++) {//接收线程返回结果List<Future<Integer>> futureList = new ArrayList<>(32);//起THREAD_NUM个线程并行查询更新库,加锁for (int j = 0; j < THREAD_NUM; j++) {synchronized (PushProcessServiceImpl.class) {int start = count * LIMIT;count++;//提交线程,用数据起始位置标识线程Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));//先不取值,防止阻塞,放进集合futureList.add(future);}}//统计本轮推送成功数据for (Future f : futureList) {totalSuccessCount = totalSuccessCount + (int) f.get();}}//更新推送标志pushProcessMapper.updateAllState(1);logger.info("推送数据完成,需推送数据:{},推送成功:{}", total, totalSuccessCount);}/*** 推送数据线程类*/class PushDataTask implements Callable<Integer> {int start;int limit;int threadNo; //线程编号PushDataTask(int start, int limit, int threadNo) {this.start = start;this.limit = limit;this.threadNo = threadNo;}@Overridepublic Integer call() throws Exception {int count = 0;//推送的数据List<PushProcess> pushProcessList = pushProcessMapper.findPushRecordsByStateLimit(0, start, limit);if (CollectionUtils.isEmpty(pushProcessList)) {return count;}logger.info("线程{}开始推送数据", threadNo);for (PushProcess process : pushProcessList) {boolean isSuccess = pushUtil.sendRecord(process);if (isSuccess) { //推送成功//更新推送标识pushProcessMapper.updateFlagById(process.getId(), 1);count++;} else { //推送失败pushProcessMapper.updateFlagById(process.getId(), 2);}}logger.info("线程{}推送成功{}条", threadNo, count);return count;}}
}
代码很长,我们简单说一下关键的地方:
- 线程创建:线程内部类选择了实现Callable接口,这样方便获取线程任务执行的结果,在示例里用于统计线程推送成功的数量
class PushDataTask implements Callable<Integer> {
- 使用 ThreadPoolExecutor 创建线程池,
//创建线程池ThreadPoolExecutor pool = new ThreadPoolExecutor(THREAD_NUM, THREAD_NUM * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));
主要构造参数如下:
- corePoolSize:线程核心参数选择了5
- maximumPoolSize:最大线程数选择了核心线程数2倍数
- keepAliveTime:非核心闲置线程存活时间直接置为0
- unit:非核心线程保持存活的时间选择了 TimeUnit.SECONDS 秒
- workQueue:线程池等待队列,使用 容量初始为100的 LinkedBlockingQueue阻塞队列
这里还有没写出来的线程池拒绝策略,采用了默认AbortPolicy:直接丢弃任务,抛出异常。
- 使用 synchronized 来保证线程安全,保证计数器的增加是有序的
synchronized (PushProcessServiceImpl.class) {
- 使用集合来接收线程的运行结果,防止阻塞
List<Future<Integer>> futureList = new ArrayList<>(32);
好了,主要的代码和简单的解析就到这里了。
关于这个简单的demo,这里只是简单地做推送数据处理。考虑一下,这个实例是不是可以用在你项目的某些地方。例如监管系统的数据校验、审计系统的数据统计、电商系统的数据分析等等,只要是有大量数据处理的地方,都可以把这个例子结合到你的项目里,这样你就有了多线程开发的经验。
完整代码仓库地址在文章底部
马上年关了,很多准备拿完年终跑路的小伙伴们可以看过来啦,为年后跳槽做点准备:暂时不准备挪窝的也可以来看看,有备无患嘛 面试难点 其实面试同一个岗位的话,大家的基础知识技能都差不多,面试官一般都是通过你 ... 开课开课~ 面试官:为什么项目中用Redis? 我:当然是因为Redis好啊 面试官:emmm.....那Redis哪里好? 我:因为Redis快啊. 面试官:(这小伙子有点彪啊...)那为什么Red ... 点击上方"Java精选",选择"设为星标" 别问别人为什么,多问自己凭什么! 下方留言必回,有问必答! 每天 08:35 更新文章,每天进步一点点... 之前在 ... 点击上方"Java精选",选择"设为星标" 别问别人为什么,多问自己凭什么! 下方留言必回,有问必答! 每天 08:35 更新文章,每天进步一点点... 本文主 ... 转载自 面试官问:为什么 Java 线程没有 Running 状态?我懵了 什么是 RUNNABLE? 与传统的ready状态的区别 与传统的running状态的区别 当I/O阻塞时 如何看待RUNN ... 点击上方"朱小厮的博客",选择"设为星标" 后台回复"书",获取 后台回复"k8s",可领取k8s资料 title: 面 ... 文章首发:阿里面试官问我:到底知不知道什么是Eureka,这次,我没沉默 什么是服务注册? 首先我们来了解下,服务注册.服务发现和服务注册中心的之间的关系. 举个形象的例子,三者之间的关系就好像是供货 ... 前言 最近在面试时,碰到这样一个问题:在问到项目部分时,面试官问我:你的项目中用到的分数.金额之类的数字是用的什么数据类型? 我没有过多思考脱口而出:double!随后面试官又问:为啥不用floa ... 以下文章来源方志朋的博客,回复"666"获面试宝典 最近有个上位机获取下位机上报数据的项目,由于上报频率比较频繁且数据量大,导致数据增长过快,磁盘占用多. 为了节约成本,定期进行数 ... 点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! Kafka存在丢消息的问题,消息丢失会发生在Broker, ...面试官问:“在项目中用过多线程吗?”你就把这个案例讲给他听!相关推荐
最新文章
热门文章