#TBSchedule 源码改造

前段时间由于工作需要,简单研究了下tbschedule。发现其功能不错,但是真正用起来功能还是有点欠缺:

  • 日志无法与现有项目相结合
  • 持续需job数据时,不支持定期执行任务(quatz),只能在某个时间段内执行job
  • job执行完毕之后,没有回调方法
  • 运行时发生异常之后,没有提供异常处理接口

改造代码:

  • com.taobao.pamirs.schedule.IScheduleTaskDeal,添加前后置处理器、异常回调
/*** 调度器对外的基础接口* * @author xuannan** @param <T>*            任务类型*/
public interface IScheduleTaskDeal<T> {/*** 根据条件,查询当前调度服务器可处理的任务* * @param taskParameter*            任务的自定义参数* @param ownSign*            当前环境名称* @param taskItemNum*            当前任务类型的任务队列数量* @param taskItemList*            当前调度服务器,分配到的可处理队列* @param eachFetchDataNum*            每次获取数据的数量* @return* @throws Exception*/public List<T> selectTasks(String taskParameter, String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception;/*** 获取任务的比较器,主要在NotSleep模式下需要用到* * @return*/public Comparator<T> getComparator();/*** @Description: 执行任务前置处理器* @param jobName*            任务名称* @param taskParameter*            任务参数* @param ownSign*            所有者* @param taskItemNum*            任务项个数* @param taskItemList*            任务项集合* @param eachFetchDataNum*            每次获取量* @return 任务id* @throws*/public Long beforeTask(String jobName, String taskParameter, String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList, int eachFetchDataNum);/*** @Description: 任务后置处理器* @author wangxiaohu wsmalltiger@163.com* @param id*            任务id* @param getListSize*            获取到集合大小* @throws*/public void afterTask(Long id, Integer getListSize);/*** @Description: 发生异常回调* @param id*            任务id* @param exception*            异常对象* @throws*/public void onException(Long id, Throwable exception);}

添加了 beforeTask、afterTask、onException三个方法。可以在对应的方法中实现job开始时间、结束时间、操作记录数、异常信息等日志功能。


  • com.taobao.pamirs.schedule.taskmanager.TBScheduleProcessorSleep改造,使用上面新加的三个接口
/*** 任务调度器,在TBScheduleManager的管理下实现多线程数据处理* * @author xuannan** @param <T>*/
class TBScheduleProcessorSleep<T> implements IScheduleProcessor, Runnable {private static transient Logger logger = LoggerFactory.getLogger(TBScheduleProcessorSleep.class);final LockObject m_lockObject = new LockObject();List<Thread> threadList = Collections.synchronizedList(new ArrayList<Thread>());/*** 任务管理器*/protected TBScheduleManager scheduleManager;/*** 任务类型*/ScheduleTaskType taskTypeInfo;/*** 任务处理的接口类*/protected IScheduleTaskDeal<T> taskDealBean;/*** 当前任务队列的版本号*/protected long taskListVersion = 0;final Object lockVersionObject = new Object();final Object lockRunningList = new Object();protected List<T> taskList = Collections.synchronizedList(new ArrayList<T>());/*** 是否可以批处理*/boolean isMutilTask = false;/*** 是否已经获得终止调度信号*/boolean isStopSchedule = false;// 用户停止队列调度boolean isSleeping = false;StatisticsInfo statisticsInfo;//wangxiaohu add job日志idprivate Long taskDomainId = null;//获取到记录数量private Integer taskGetListSize = 0;//执行完毕线程list集合private List<String> overThreadList = new ArrayList<String>();/*** 创建一个调度处理器* * @param aManager* @param aTaskDealBean* @param aStatisticsInfo* @throws Exception*/public TBScheduleProcessorSleep(TBScheduleManager aManager,IScheduleTaskDeal<T> aTaskDealBean, StatisticsInfo aStatisticsInfo)throws Exception {this.scheduleManager = aManager;this.statisticsInfo = aStatisticsInfo;this.taskTypeInfo = this.scheduleManager.getTaskTypeInfo();this.taskDealBean = aTaskDealBean;if (this.taskDealBean instanceof IScheduleTaskDealSingle<?>) {if (taskTypeInfo.getExecuteNumber() > 1) {taskTypeInfo.setExecuteNumber(1);}isMutilTask = false;} else {isMutilTask = true;}if (taskTypeInfo.getFetchDataNumber() < taskTypeInfo.getThreadNumber() * 10) {logger.warn("参数设置不合理,系统性能不佳。【每次从数据库获取的数量fetchnum】 >= 【线程数量threadnum】 *【最少循环次数10】 ");}for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {this.startThread(i);}}/*** 需要注意的是,调度服务器从配置中心注销的工作,必须在所有线程退出的情况下才能做* * @throws Exception*/public void stopSchedule() throws Exception {// 设置停止调度的标志,调度线程发现这个标志,执行完当前任务后,就退出调度this.isStopSchedule = true;//清除所有未处理任务,但已经进入处理队列的,需要处理完毕this.taskList.clear();}private void startThread(int index) {Thread thread = new Thread(this);threadList.add(thread);String threadName = this.scheduleManager.getScheduleServer().getTaskType()+ "-"+ this.scheduleManager.getCurrentSerialNumber()+ "-exe"+ index;thread.setName(threadName);thread.start();}public synchronized Object getScheduleTaskId() {if (this.taskList.size() > 0)return this.taskList.remove(0); // 按正序处理return null;}public synchronized Object[] getScheduleTaskIdMulti() {if (this.taskList.size() == 0) {return null;}int size = taskList.size() > taskTypeInfo.getExecuteNumber() ? taskTypeInfo.getExecuteNumber() : taskList.size();Object[] result = null;if (size > 0) {result = (Object[]) Array.newInstance(this.taskList.get(0).getClass(), size);}for (int i = 0; i < size; i++) {result[i] = this.taskList.remove(0); // 按正序处理}return result;}public void clearAllHasFetchData() {this.taskList.clear();}public boolean isDealFinishAllData() {return this.taskList.size() == 0;}public boolean isSleeping() {return this.isSleeping;}protected int loadScheduleData() {try {//在每次数据处理完毕后休眠固定的时间if (this.taskTypeInfo.getSleepTimeInterval() > 0) {if (logger.isTraceEnabled()) {logger.trace("处理完一批数据后休眠:"+ this.taskTypeInfo.getSleepTimeInterval());}this.isSleeping = true;Thread.sleep(taskTypeInfo.getSleepTimeInterval());this.isSleeping = false;if (logger.isTraceEnabled()) {logger.trace("处理完一批数据后休眠后恢复");}}List<TaskItemDefine> taskItems = this.scheduleManager.getCurrentScheduleTaskItemList();// 根据队列信息查询需要调度的数据,然后增加到任务列表中if (taskItems.size() > 0) {if (taskList.size() == 0) {this.overThreadList.clear();Long taskDomainId = taskDealBean.beforeTask(taskDealBean.getClass().getName(), taskTypeInfo.getTaskParameter(), scheduleManager.getScheduleServer().getOwnSign(),this.scheduleManager.getTaskItemCount(), taskItems,taskTypeInfo.getFetchDataNumber());setTaskDomainId(taskDomainId);}List<T> tmpList = this.taskDealBean.selectTasks(taskTypeInfo.getTaskParameter(), scheduleManager.getScheduleServer().getOwnSign(), this.scheduleManager.getTaskItemCount(), taskItems, taskTypeInfo.getFetchDataNumber());taskGetListSize = tmpList == null ? 0 : tmpList.size();scheduleManager.getScheduleServer().setLastFetchDataTime(new Timestamp(scheduleManager.scheduleCenter.getSystemTime()));if (tmpList != null) {this.taskList.addAll(tmpList);}} else {if (logger.isTraceEnabled()) {logger.trace("没有获取到需要处理的数据队列");}}addFetchNum(taskList.size(), "TBScheduleProcessor.loadScheduleData");return this.taskList.size();} catch (Throwable ex) {logger.error("Get tasks error.", ex);}return 0;}@SuppressWarnings({ "unchecked", "static-access", "rawtypes" })public void run() {try {long startTime = 0;while (true) {this.m_lockObject.addThread();Object executeTask;//wangxiaohu  添加quatz运行方式int listSize = this.taskList.size();//wangxiaohu endwhile (true) {if (this.isStopSchedule == true) {//停止队列调度this.m_lockObject.realseThread();this.m_lockObject.notifyOtherThread();//通知所有的休眠线程synchronized (this.threadList) {try {this.threadList.remove(Thread.currentThread());if (this.threadList.size() == 0) {this.scheduleManager.unRegisterScheduleServer();}} catch (Exception e) {afterTask();throw new Exception("清理job发生异常,剩余 "+ this.taskList.size() + " 条数据没有被执行。",e);}}return;}//加载调度任务if (this.isMutilTask == false) {executeTask = this.getScheduleTaskId();} else {executeTask = this.getScheduleTaskIdMulti();}if (executeTask == null) {break;}try {//运行相关的程序startTime = scheduleManager.scheduleCenter.getSystemTime();if (this.isMutilTask == false) {if (((IScheduleTaskDealSingle) this.taskDealBean).execute(executeTask, scheduleManager.getScheduleServer().getOwnSign()) == true) {addSuccessNum(1,scheduleManager.scheduleCenter.getSystemTime() - startTime,"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");} else {addFailNum(1,scheduleManager.scheduleCenter.getSystemTime() - startTime,"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");}} else {if (((IScheduleTaskDealMulti) this.taskDealBean).execute((Object[]) executeTask,scheduleManager.getScheduleServer().getOwnSign()) == true) {addSuccessNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime,"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");} else {addFailNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime,"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");}}} catch (Throwable ex) {if (this.isMutilTask == false) {addFailNum(1,scheduleManager.scheduleCenter.getSystemTime() - startTime,"TBScheduleProcessor.run");} else {addFailNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime,"TBScheduleProcessor.run");}logger.warn("Task :" + executeTask + " 处理失败", ex);if (getTaskDomainId() != null) {taskDealBean.onException(getTaskDomainId(), ex);}}}//当前队列中所有的任务都已经完成了。if (logger.isTraceEnabled()) {logger.trace(Thread.currentThread().getName()+ ":当前运行线程数量:" + this.m_lockObject.count());}if (this.m_lockObject.realseThreadButNotLast() == false) {afterTask();//wangxiaohu  添加quatz运行方式if (listSize != 0) {if (this.taskTypeInfo.getSleepTimeInterval() < 0) {this.scheduleManager.pause("没数据休眠时间过短,使用quatz运行方式!");}}//wangxiaohu endint size = 0;Thread.currentThread().sleep(100);startTime = scheduleManager.scheduleCenter.getSystemTime();// 装载数据size = this.loadScheduleData();if (size > 0) {this.m_lockObject.notifyOtherThread();} else {//判断当没有数据的是否,是否需要退出调度if (this.isStopSchedule == false&& this.scheduleManager.isContinueWhenData() == true) {if (logger.isTraceEnabled()) {logger.trace("没有装载到数据,start sleep");}this.isSleeping = true;Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());this.isSleeping = false;if (logger.isTraceEnabled()) {logger.trace("Sleep end");}} else {//没有数据,退出调度,唤醒所有沉睡线程this.m_lockObject.notifyOtherThread();}afterTask();}this.m_lockObject.realseThread();} else {// 将当前线程放置到等待队列中。直到有线程装载到了新的任务数据if (logger.isTraceEnabled()) {logger.trace("不是最后一个线程,sleep");}this.m_lockObject.waitCurrentThread();}}} catch (Throwable e) {logger.error(e.getMessage(), e);}}private void afterTask() {if (getTaskDomainId() != null&& !overThreadList.contains(Thread.currentThread().getName())) {overThreadList.add(Thread.currentThread().getName());taskDealBean.afterTask(getTaskDomainId(), taskGetListSize);}}private void setTaskDomainId(Long taskDomainId) {this.taskDomainId = taskDomainId;}private Long getTaskDomainId() {return this.taskDomainId;}public void addFetchNum(long num, String addr) {this.statisticsInfo.addFetchDataCount(1);this.statisticsInfo.addFetchDataNum(num);}public void addSuccessNum(long num, long spendTime, String addr) {this.statisticsInfo.addDealDataSucess(num);this.statisticsInfo.addDealSpendTime(spendTime);}public void addFailNum(long num, long spendTime, String addr) {this.statisticsInfo.addDealDataFail(num);this.statisticsInfo.addDealSpendTime(spendTime);}
}

代码较多,可以从中搜索beforeTask、afterTask、onException去研究;另外上述代码中还包含有如何添加定期(quatz)执行任务功能,有对应的注释。此处只有SLEEP模式的改造代码,NOTSLEEP模式未进行改造。

代码进行过简单测试,由于自身业务需求,对多线程组、多线程模式下测试过少。欢迎大家指出任何疑问或建议!

github地址(内有demo):https://github.com/smatiger/DISchedule

tbschedule 前后置处理器、定期执行任务相关推荐

  1. Jmeter测试计划元件+后置处理器元件+执行顺序

    测试计划元件 测试计划对象有一个叫做"功能测试"复选框.如果被选择,它将导致 JMeter 记录来自服务器返回的每个取样的数据.如果你在你的测试监听器中选择一个文件,这个数据将被写 ...

  2. 【Spring注解系列11】Spring后置处理器BeanPostProcessor用法与原理

    1.BeanPostProcessor原理 先说,bean的后置处理器BeanPostProcessor接口中两个方法: postProcessBeforeInitialization:在初始化之前工 ...

  3. Spring中Bean的后置处理器

    以下内容引用自http://wiki.jikexueyuan.com/project/spring/bean-post-processors.html: Bean后置处理器 BeanPostProce ...

  4. Spring中BeanPostProcessors后置处理器到底在哪里拦截

    研究spring源码的时候,发现注入bean到spring对象中有很多种,有一种是@bean注解,并且括号里可以写一些初始化时要执行的方法,还有销毁时执行的方法,spring中后置处理器可以将某些be ...

  5. 通过Spring的BeanPostProcessor的 bean的后置处理器会拦截所有bean创建过程

    postProcessBeforeInitialization 在init方法之前调用 postProcessAfterInitialization 在init方法之后调用 package com.C ...

  6. Spring Bean 后置处理器PostProcessor

    BeanPostProcessor 接口定义回调方法,可以实现该方法来提供自己的实例化逻辑,依赖解析逻辑等.可以在 Spring 容器通过插入一个或多个 BeanPostProcessor 的实现来完 ...

  7. Spring Bean 后置处理器

    转载自  Spring Bean 后置处理器 Spring--Bean 后置处理器 BeanPostProcessor 接口定义回调方法,你可以实现该方法来提供自己的实例化逻辑,依赖解析逻辑等.你也可 ...

  8. Spring(二)--FactoryBean、bean的后置处理器、数据库连接池、引用外部文件、使用注解配置bean等...

    实验1:配置通过静态工厂方法创建的bean  [通过静态方法提供实例对象,工厂类本身不需要实例化!] 1.创建静态工厂类 public class StaticFactory {private sta ...

  9. Spring容器创建流程(4)调用beanFactory后置处理器

    postProcessBeanFactory() 方法留给子类去实现. invokeBeanFactoryPostProcessors() 调用bean工厂的后置处理器(以前的执行流程可在系列文章中查 ...

  10. spring--bean后置处理器(BeanPostProcessor)原理解析

    文章目录 功能描述: 如何使用: 定义要处理的接口类型 添加实际需要处理的类 定义后置处理器 编写测试类 执行日志 后置处理器加载解析 registerBeanPostProcessors注册拦截be ...

最新文章

  1. 自定义ORM系列(三)工具雏形及基本用法
  2. 基于UNet和camvid数据集的道路分割
  3. docker搭建Redis的主从集群
  4. 2010-09-11
  5. C 语言实例 - 删除字符串中的特殊字符
  6. 原来编译通过,现在编译不通过,怎么回事?
  7. Git-第二篇廖雪峰Git教程学习笔记(1)基本命令,版本回退
  8. ue4下载安装(学习笔记)
  9. 硬件原理图 一键开关机电路
  10. docker容器中配置文件修改错误,导致容器无法启动
  11. 【测试】linux tc命令|Linux模拟网络延迟、丢包等|traffic control(流量控制)
  12. 实现Web端指纹登录
  13. 自定义dialog 可以读秒自动关闭
  14. 二、 常见传感器的检测
  15. netty 简单demo
  16. ffmpeg视频上传功能常用的俩个工具类【1.视频转码=2.视频抓图】
  17. matlab转差频率控制,异步电动机转差频率间接矢量控制matlab仿真(毕业设计).doc
  18. 乐视三合一摄像头和kinect_乐视三合一体感摄像头快评测,看看到底怎么玩?
  19. DALAL创建的行人检测INRIA数据库
  20. vue从后端获取数据绑定复选框

热门文章

  1. 二次量子化与量子计算化学
  2. 2000元以内办公用计算机,2000元以内买什么笔记本 便宜实用笔记本【推荐】
  3. 扁平化管理:硅谷高效工作法
  4. MATLAB随机数的应用ppt,Matlab生成随机数.ppt
  5. PDF Converter 注册码
  6. 分布式光伏站远程监控组网方案
  7. viper12a电源电路图_基于VIPER12A芯片设计的开关电源
  8. [Unity2D]实现人物动画帧的导入以及左右移动
  9. 软件安全测试是为了什么,一航软件测评有这些见解
  10. leetcode第12题Python版整数转罗马字符串