最近梳理其他同事以前写的 job 后有点想法,记录下。

一、业务场景

在大多数的系统都有类似这样的逻辑,比如下单了给用户赠送积分,用户在论坛上发表了帖子,给用户增加积分等等。

下单赠送积分,那么一个订单肯定不能重复赠送积分,所以需要一些状态来比较来哪些是已赠送的,哪些是没有赠送的。或许可以在订单表里加个字段来标记是否赠送了积分。

有时候,业务人员出于营销的需要,可能要搞个某某时间段内下单返券的活动。难道又在订单表里加个字段?肯定不能,谁知道还要搞多少活动呢。

二、实现

为了使核心的业务流程尽可能简单高效,赠送积分、返券(后面简称为task)之类的逻辑应该通过异步的job来处理。

因为 task 的处理状态不能放在核心的业务表里,所以,可以另外建一个表示异步任务的 async_task 表,结构如下:

-- 业务job处理 任务

create table async_task (

id number(11) primary key,

key_work varchar2(32), -- 不同业务逻辑的task用不同的keyword

biz_id char(32), -- 业务数据 ID,比如订单号

biz_data varchar2(256), -- 核心的业务数据,用于避免关联业务表;具体结构取决于keyword

status number, -- 任务的处理状态; -2:未处理, -1:处理中, 0:已处理, 大于 0 的数字表示失败次数

create_tm date, -- 任务的创建时间

modify_tm date -- 任务的修改时间

);

处于性能考虑,可以在 key_work 字段上建立分区,在 biz_id 上建立索引。

当业务表有需要处理的数据时,就往 async_task 插入一条相应的记录(可以异步插入),异步 job 再从 async_task 表里取数据来处理。

注意:处理 task 时,要保证数据的一致性。所在的项目组曾出现过,下单返券的活动里,送券与更新状态的操作没有放在同一个事务里,出现券送了,状态没更新,重复送券的问题。一定要注意事务的正确处理。

三、单线程、多线程处理 task

不管是用单线程还是多线程,都要考虑有大量 task 的情况,所以不能一次把所有符合条件的 task 都读取到内存里,一定要分页。

单机单线程

不用考虑数据被其他线程重复处理的情况,顺序处理即可:取一批数据处理,处理完了再取下一批,直到所有的都处理完了。

单机多线程

数据量大了,就不能用单线程慢慢地处理了。可以采用一个线程去读取未处理的 task,然后提交到线程池去处理,等这批 task 处理完后再去读取下一批,主流程如下:

// 直接使用 ThreadPoolExecutor 是为了使线程池的线程有特定的名字,任务队列有边界。

ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,

TimeUnit.SECONDS, new ArrayBlockingQueue(1000), // 有界队列

new ThreadFactory() { // 使用定制的

private AtomicInteger counter = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r);

thread.setName("task-handler-"

+ counter.incrementAndGet());

return null;

}

}, new CallerRunsPolicy());

do {

List tasks = getUnhandleTask();

if (tasks.isEmpty()) {

break;

}

List> callables = convert2callables(tasks);

executorService.invokeAll(callables);

} while (true);

executorService.shutdown();

线程池采用 CallerRunsPolicy 策略是为了在线程池处理不完任务,线程池的任务队列满的时候,读取 task 的线程可以直接处理 task,这样既减缓了 task 的读取速度,又可以加快 task 的处理速度。

多机处理

数据量实在太多,一台机器处理不完,可以用多台。

在多机处理的时候,上面的代码就有问题了,task 可能在不同的机器上被重复处理。

任务被 getUnhandleTask() 方法读取处理后、处理完成前,另一台机器上的线程也读取到了这个任务,发现是未处理的,它也会进行处理,这样就出现重复处理了。正确的主流程如下:

public class AsyncTask {

private long id;

private int status;

public static enum STATUS {

UNHANDLE, HANDLING

}

public long getId() {

return id;

}

public void setId(long id) {

this.id = id;

}

public int getStatus() {

return status;

}

public void setStatus(int status) {

this.status = status;

}

}

public class TestTaskHandle {

private Callable convert(final AsyncTask task) {

return new Callable() {

@Override

public Object call() throws Exception {

return doWithTask(task);

}

};

}

private Object doWithTask(AsyncTask task) {

return null;

}

private List getUnhandleTask() {

return null;

}

public void multiMachine() {

ExecutorService executorService = new ThreadPoolExecutor(0, 10, 5,

TimeUnit.SECONDS, new ArrayBlockingQueue(1000),

new ThreadFactory() {

private AtomicInteger counter = new AtomicInteger(0);

@Override

public Thread newThread(Runnable r) {

Thread thread = new Thread(r);

thread.setName("task-handler-"

+ counter.incrementAndGet());

return null;

}

}, new CallerRunsPolicy());

do {

List tasks = getUnhandleTask();

if (tasks.isEmpty()) {

break;

}

for (AsyncTask asyncTask : tasks) {

// 把 RDBMS 的 update 操作当作一个 CAS 命令

boolean isSuccess = updateStatus(asyncTask.getId(),

AsyncTask.STATUS.UNHANDLE, AsyncTask.STATUS.HANDLING);

if (isSuccess) {

// 把 task 更新为处理中,成功表示抢占到了这个任务,可以继续处理

executorService.submit(convert(asyncTask));

} // else 被其他线程处理了

}

} while (true);

executorService.shutdown();

}

public boolean updateStatus(long id, AsyncTask.STATUS oldStatus,

AsyncTask.STATUS newStatus) {

return true;

}

}

在上面的实现中,每一条记录都需要通过一个数据库的 update 操作来判断是否可以继续处理,开销不小。一个改进的做法是:在 async_task 表增加一个 owner 字段,每个线程使用一个唯一的标识 tid(比如 UUID)。当 task 读取线程要读取任务时,先对 async_task 表里的未处理 task 执行 update,把状态更新为处理中, owner 更新为自己的 tid。如果这个 update 的影响行数大于 0,表示抢占到了任务,然后根据 tid 去读取任务,再分发给线程池去处理。

在存在并发竞争的情景下,很重要的就是借助数据库事务的ACID来达到一种 CAS 的效果;正确处理并发问题总是需要基础的 CAS 操作或锁。

欢迎关注我的微信公众号: coderbee笔记,可以更及时回复你的讨论。

java任务_Java 任务处理相关推荐

  1. 【源码+教程】Java课设项目_12款最热最新Java游戏项目_Java游戏开发_Java小游戏_飞翔的小鸟_王者荣耀_超级玛丽_推箱子_黄金矿工_贪吃蛇

    马上就要期末了,同学们课设做的如何了呢?本篇为大家带来了12款热门Java小游戏项目的源码和教程,助力大家顺利迎接暑假![源码+教程]Java课设项目_12款最热最新Java游戏项目_Java游戏开发 ...

  2. 黑马程序员全套Java教程_Java基础教程_异常(含扩展)(二十三)

    黑马程序员全套Java教程_Java基础教程_异常(含扩展)(二十三) 1.1 异常概述与异常体系结构 1.2 JVM遇到异常时的默认处理方案 1.3 异常处理 1.4 异常处理之try--catch ...

  3. 普罗米修斯监控java项目_java学到什么程度可以出去实习?

    把基础的知识学完,然后再学个框架,比如常见的SSH,SSM之类的,自己能用这个框架做个简单的项目,就可以了 简单的来说就是把下方的视频教程学完就可以找工作了(需要完整的资料可以找up) Java零基础 ...

  4. Ajax接收Java异常_java – 处理来自Servlet的Jquery AJAX响应中的异常

    我的servlet代码是 try{ //something response.setStatus(201); out.print("Data successfully saved" ...

  5. java 原子量_Java原子量 - Rickxue的个人空间 - OSCHINA - 中文开源技术交流社区

    所谓的原子量即操作变量的操作是"原子的",该操作不可再分,因此是线程安全的. 为何要使用原子变量呢,原因是多个线程对单个变量操作也会引起一些问题.在Java5之前,可以通过vola ...

  6. cmd怎么实现Java你好_java环境配置以及如何在cmd窗口运行java代码

    对于初学java的人来说,电脑的环境配置也许会让你头疼,但只要你认真一些学习,相信对你来说都是OK的啦~ 首先回到桌面,选择我的电脑,单击右键属性,进入高级系统设置,点击环境变量设置.用户变量选择Te ...

  7. java创建对象_java 创建对象的五种方式

    通过 Class 对象的 getConstructor 可以获取 java.lang.reflect.Constructor 对象 Constructor 对象用来描述类的构造方法,通过给 getCo ...

  8. java程序设计_Java程序设计:学习笔记(4-5)(未完工)

    声明: 本文内容基于"吉首大学软件学院-Java程序设计(Java面向对象程序设计)"网课与个人实践经验修改编写而成.本文属于Arcadia项目组成部分.若有错误或不足之处存在请联 ...

  9. java包装_Java基础之神奇的包装类(一)

    1. 导读 JAVA中针对八种基本数据类型提供了相对应的包装类, 今天主要基于几个问题来分享下个人对于包装类的理解, 本期先分享下面两个问题: .1 什么是包装类? 有了基本类型, 为什么还需要有包装 ...

最新文章

  1. 并查集c++代码_[Leetcode 每日精选](本周主题-并查集) 547. 朋友圈
  2. 10使用CSS美化页面
  3. [题解] 2019牛客暑期多校第三场H题 Magic Line
  4. 信息系统项目管理师-招投标法、政府采购法核心知识点思维脑图
  5. 安装mysql 环境变量_win10系统安装mysql数据库后配置环境变量的图文教程
  6. 好好的活,简简单单过!
  7. TensorFlow的基本介绍及Hello,world
  8. html5代码书写规范
  9. 基本功 | Java即时编译器原理解析及实践
  10. nodejs 使用的一些http网络请求模块
  11. 分布式及架构设计理论
  12. 科技软文营销标题写作的5个常用技巧
  13. 新加坡政府将与加美两国就网络安全问题展开合作
  14. 阿里云叔度:一场技术人的自我修行
  15. 以太网链路捆绑原理实验
  16. 「广州SEO优化」Google优化SEO关键词排名工具
  17. Html 实现amr文件播放
  18. 目标跟踪数据集GOT-10k的配置
  19. FleaPHP 开发指南 - 5. 应用程序设置
  20. opencv今犹在,不见当年引路人

热门文章

  1. 2013年35真棒恭贺新禧 - 壁纸
  2. jQuery :nth-child前有无空格的区别
  3. 区块链组织-超级账本(Hyperledger)的简介
  4. Powershell 自定义输出列,两个例子
  5. Ubuntu修改DNS服务器
  6. SSM框架下实现导入功能
  7. Visual Studio 2015价格大幅下调
  8. Report_报表中Ref Cursor数据源的概念和用法(案例)
  9. Throwable、Error、Exception、RuntimeException 区别 联系
  10. 我的计算机书籍创作心得