tbschedule 前后置处理器、定期执行任务
#TBSchedule 源码改造
前段时间由于工作需要,简单研究了下tbschedule。发现其功能不错,但是真正用起来功能还是有点欠缺:
- 日志无法与现有项目相结合
- 持续需job数据时,不支持定期执行任务(quatz),只能在某个时间段内执行job
- job执行完毕之后,没有回调方法
- 运行时发生异常之后,没有提供异常处理接口
改造代码:
- com.taobao.pamirs.schedule.IScheduleTaskDeal,添加前后置处理器、异常回调
/*** 调度器对外的基础接口* * @author xuannan** @param <T>* 任务类型*/
public interface IScheduleTaskDeal<T> {/*** 根据条件,查询当前调度服务器可处理的任务* * @param taskParameter* 任务的自定义参数* @param ownSign* 当前环境名称* @param taskItemNum* 当前任务类型的任务队列数量* @param taskItemList* 当前调度服务器,分配到的可处理队列* @param eachFetchDataNum* 每次获取数据的数量* @return* @throws Exception*/public List<T> selectTasks(String taskParameter, String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception;/*** 获取任务的比较器,主要在NotSleep模式下需要用到* * @return*/public Comparator<T> getComparator();/*** @Description: 执行任务前置处理器* @param jobName* 任务名称* @param taskParameter* 任务参数* @param ownSign* 所有者* @param taskItemNum* 任务项个数* @param taskItemList* 任务项集合* @param eachFetchDataNum* 每次获取量* @return 任务id* @throws*/public Long beforeTask(String jobName, String taskParameter, String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList, int eachFetchDataNum);/*** @Description: 任务后置处理器* @author wangxiaohu wsmalltiger@163.com* @param id* 任务id* @param getListSize* 获取到集合大小* @throws*/public void afterTask(Long id, Integer getListSize);/*** @Description: 发生异常回调* @param id* 任务id* @param exception* 异常对象* @throws*/public void onException(Long id, Throwable exception);}
添加了 beforeTask、afterTask、onException三个方法。可以在对应的方法中实现job开始时间、结束时间、操作记录数、异常信息等日志功能。
- com.taobao.pamirs.schedule.taskmanager.TBScheduleProcessorSleep改造,使用上面新加的三个接口
/*** 任务调度器,在TBScheduleManager的管理下实现多线程数据处理* * @author xuannan** @param <T>*/
class TBScheduleProcessorSleep<T> implements IScheduleProcessor, Runnable {private static transient Logger logger = LoggerFactory.getLogger(TBScheduleProcessorSleep.class);final LockObject m_lockObject = new LockObject();List<Thread> threadList = Collections.synchronizedList(new ArrayList<Thread>());/*** 任务管理器*/protected TBScheduleManager scheduleManager;/*** 任务类型*/ScheduleTaskType taskTypeInfo;/*** 任务处理的接口类*/protected IScheduleTaskDeal<T> taskDealBean;/*** 当前任务队列的版本号*/protected long taskListVersion = 0;final Object lockVersionObject = new Object();final Object lockRunningList = new Object();protected List<T> taskList = Collections.synchronizedList(new ArrayList<T>());/*** 是否可以批处理*/boolean isMutilTask = false;/*** 是否已经获得终止调度信号*/boolean isStopSchedule = false;// 用户停止队列调度boolean isSleeping = false;StatisticsInfo statisticsInfo;//wangxiaohu add job日志idprivate Long taskDomainId = null;//获取到记录数量private Integer taskGetListSize = 0;//执行完毕线程list集合private List<String> overThreadList = new ArrayList<String>();/*** 创建一个调度处理器* * @param aManager* @param aTaskDealBean* @param aStatisticsInfo* @throws Exception*/public TBScheduleProcessorSleep(TBScheduleManager aManager,IScheduleTaskDeal<T> aTaskDealBean, StatisticsInfo aStatisticsInfo)throws Exception {this.scheduleManager = aManager;this.statisticsInfo = aStatisticsInfo;this.taskTypeInfo = this.scheduleManager.getTaskTypeInfo();this.taskDealBean = aTaskDealBean;if (this.taskDealBean instanceof IScheduleTaskDealSingle<?>) {if (taskTypeInfo.getExecuteNumber() > 1) {taskTypeInfo.setExecuteNumber(1);}isMutilTask = false;} else {isMutilTask = true;}if (taskTypeInfo.getFetchDataNumber() < taskTypeInfo.getThreadNumber() * 10) {logger.warn("参数设置不合理,系统性能不佳。【每次从数据库获取的数量fetchnum】 >= 【线程数量threadnum】 *【最少循环次数10】 ");}for (int i = 0; i < taskTypeInfo.getThreadNumber(); i++) {this.startThread(i);}}/*** 需要注意的是,调度服务器从配置中心注销的工作,必须在所有线程退出的情况下才能做* * @throws Exception*/public void stopSchedule() throws Exception {// 设置停止调度的标志,调度线程发现这个标志,执行完当前任务后,就退出调度this.isStopSchedule = true;//清除所有未处理任务,但已经进入处理队列的,需要处理完毕this.taskList.clear();}private void startThread(int index) {Thread thread = new Thread(this);threadList.add(thread);String threadName = this.scheduleManager.getScheduleServer().getTaskType()+ "-"+ this.scheduleManager.getCurrentSerialNumber()+ "-exe"+ index;thread.setName(threadName);thread.start();}public synchronized Object getScheduleTaskId() {if (this.taskList.size() > 0)return this.taskList.remove(0); // 按正序处理return null;}public synchronized Object[] getScheduleTaskIdMulti() {if (this.taskList.size() == 0) {return null;}int size = taskList.size() > taskTypeInfo.getExecuteNumber() ? taskTypeInfo.getExecuteNumber() : taskList.size();Object[] result = null;if (size > 0) {result = (Object[]) Array.newInstance(this.taskList.get(0).getClass(), size);}for (int i = 0; i < size; i++) {result[i] = this.taskList.remove(0); // 按正序处理}return result;}public void clearAllHasFetchData() {this.taskList.clear();}public boolean isDealFinishAllData() {return this.taskList.size() == 0;}public boolean isSleeping() {return this.isSleeping;}protected int loadScheduleData() {try {//在每次数据处理完毕后休眠固定的时间if (this.taskTypeInfo.getSleepTimeInterval() > 0) {if (logger.isTraceEnabled()) {logger.trace("处理完一批数据后休眠:"+ this.taskTypeInfo.getSleepTimeInterval());}this.isSleeping = true;Thread.sleep(taskTypeInfo.getSleepTimeInterval());this.isSleeping = false;if (logger.isTraceEnabled()) {logger.trace("处理完一批数据后休眠后恢复");}}List<TaskItemDefine> taskItems = this.scheduleManager.getCurrentScheduleTaskItemList();// 根据队列信息查询需要调度的数据,然后增加到任务列表中if (taskItems.size() > 0) {if (taskList.size() == 0) {this.overThreadList.clear();Long taskDomainId = taskDealBean.beforeTask(taskDealBean.getClass().getName(), taskTypeInfo.getTaskParameter(), scheduleManager.getScheduleServer().getOwnSign(),this.scheduleManager.getTaskItemCount(), taskItems,taskTypeInfo.getFetchDataNumber());setTaskDomainId(taskDomainId);}List<T> tmpList = this.taskDealBean.selectTasks(taskTypeInfo.getTaskParameter(), scheduleManager.getScheduleServer().getOwnSign(), this.scheduleManager.getTaskItemCount(), taskItems, taskTypeInfo.getFetchDataNumber());taskGetListSize = tmpList == null ? 0 : tmpList.size();scheduleManager.getScheduleServer().setLastFetchDataTime(new Timestamp(scheduleManager.scheduleCenter.getSystemTime()));if (tmpList != null) {this.taskList.addAll(tmpList);}} else {if (logger.isTraceEnabled()) {logger.trace("没有获取到需要处理的数据队列");}}addFetchNum(taskList.size(), "TBScheduleProcessor.loadScheduleData");return this.taskList.size();} catch (Throwable ex) {logger.error("Get tasks error.", ex);}return 0;}@SuppressWarnings({ "unchecked", "static-access", "rawtypes" })public void run() {try {long startTime = 0;while (true) {this.m_lockObject.addThread();Object executeTask;//wangxiaohu 添加quatz运行方式int listSize = this.taskList.size();//wangxiaohu endwhile (true) {if (this.isStopSchedule == true) {//停止队列调度this.m_lockObject.realseThread();this.m_lockObject.notifyOtherThread();//通知所有的休眠线程synchronized (this.threadList) {try {this.threadList.remove(Thread.currentThread());if (this.threadList.size() == 0) {this.scheduleManager.unRegisterScheduleServer();}} catch (Exception e) {afterTask();throw new Exception("清理job发生异常,剩余 "+ this.taskList.size() + " 条数据没有被执行。",e);}}return;}//加载调度任务if (this.isMutilTask == false) {executeTask = this.getScheduleTaskId();} else {executeTask = this.getScheduleTaskIdMulti();}if (executeTask == null) {break;}try {//运行相关的程序startTime = scheduleManager.scheduleCenter.getSystemTime();if (this.isMutilTask == false) {if (((IScheduleTaskDealSingle) this.taskDealBean).execute(executeTask, scheduleManager.getScheduleServer().getOwnSign()) == true) {addSuccessNum(1,scheduleManager.scheduleCenter.getSystemTime() - startTime,"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");} else {addFailNum(1,scheduleManager.scheduleCenter.getSystemTime() - startTime,"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");}} else {if (((IScheduleTaskDealMulti) this.taskDealBean).execute((Object[]) executeTask,scheduleManager.getScheduleServer().getOwnSign()) == true) {addSuccessNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime,"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");} else {addFailNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime,"com.taobao.pamirs.schedule.TBScheduleProcessorSleep.run");}}} catch (Throwable ex) {if (this.isMutilTask == false) {addFailNum(1,scheduleManager.scheduleCenter.getSystemTime() - startTime,"TBScheduleProcessor.run");} else {addFailNum(((Object[]) executeTask).length,scheduleManager.scheduleCenter.getSystemTime() - startTime,"TBScheduleProcessor.run");}logger.warn("Task :" + executeTask + " 处理失败", ex);if (getTaskDomainId() != null) {taskDealBean.onException(getTaskDomainId(), ex);}}}//当前队列中所有的任务都已经完成了。if (logger.isTraceEnabled()) {logger.trace(Thread.currentThread().getName()+ ":当前运行线程数量:" + this.m_lockObject.count());}if (this.m_lockObject.realseThreadButNotLast() == false) {afterTask();//wangxiaohu 添加quatz运行方式if (listSize != 0) {if (this.taskTypeInfo.getSleepTimeInterval() < 0) {this.scheduleManager.pause("没数据休眠时间过短,使用quatz运行方式!");}}//wangxiaohu endint size = 0;Thread.currentThread().sleep(100);startTime = scheduleManager.scheduleCenter.getSystemTime();// 装载数据size = this.loadScheduleData();if (size > 0) {this.m_lockObject.notifyOtherThread();} else {//判断当没有数据的是否,是否需要退出调度if (this.isStopSchedule == false&& this.scheduleManager.isContinueWhenData() == true) {if (logger.isTraceEnabled()) {logger.trace("没有装载到数据,start sleep");}this.isSleeping = true;Thread.currentThread().sleep(this.scheduleManager.getTaskTypeInfo().getSleepTimeNoData());this.isSleeping = false;if (logger.isTraceEnabled()) {logger.trace("Sleep end");}} else {//没有数据,退出调度,唤醒所有沉睡线程this.m_lockObject.notifyOtherThread();}afterTask();}this.m_lockObject.realseThread();} else {// 将当前线程放置到等待队列中。直到有线程装载到了新的任务数据if (logger.isTraceEnabled()) {logger.trace("不是最后一个线程,sleep");}this.m_lockObject.waitCurrentThread();}}} catch (Throwable e) {logger.error(e.getMessage(), e);}}private void afterTask() {if (getTaskDomainId() != null&& !overThreadList.contains(Thread.currentThread().getName())) {overThreadList.add(Thread.currentThread().getName());taskDealBean.afterTask(getTaskDomainId(), taskGetListSize);}}private void setTaskDomainId(Long taskDomainId) {this.taskDomainId = taskDomainId;}private Long getTaskDomainId() {return this.taskDomainId;}public void addFetchNum(long num, String addr) {this.statisticsInfo.addFetchDataCount(1);this.statisticsInfo.addFetchDataNum(num);}public void addSuccessNum(long num, long spendTime, String addr) {this.statisticsInfo.addDealDataSucess(num);this.statisticsInfo.addDealSpendTime(spendTime);}public void addFailNum(long num, long spendTime, String addr) {this.statisticsInfo.addDealDataFail(num);this.statisticsInfo.addDealSpendTime(spendTime);}
}
代码较多,可以从中搜索beforeTask、afterTask、onException去研究;另外上述代码中还包含有如何添加定期(quatz)执行任务功能,有对应的注释。此处只有SLEEP模式的改造代码,NOTSLEEP模式未进行改造。
代码进行过简单测试,由于自身业务需求,对多线程组、多线程模式下测试过少。欢迎大家指出任何疑问或建议!
github地址(内有demo):https://github.com/smatiger/DISchedule
tbschedule 前后置处理器、定期执行任务相关推荐
- Jmeter测试计划元件+后置处理器元件+执行顺序
测试计划元件 测试计划对象有一个叫做"功能测试"复选框.如果被选择,它将导致 JMeter 记录来自服务器返回的每个取样的数据.如果你在你的测试监听器中选择一个文件,这个数据将被写 ...
- 【Spring注解系列11】Spring后置处理器BeanPostProcessor用法与原理
1.BeanPostProcessor原理 先说,bean的后置处理器BeanPostProcessor接口中两个方法: postProcessBeforeInitialization:在初始化之前工 ...
- Spring中Bean的后置处理器
以下内容引用自http://wiki.jikexueyuan.com/project/spring/bean-post-processors.html: Bean后置处理器 BeanPostProce ...
- Spring中BeanPostProcessors后置处理器到底在哪里拦截
研究spring源码的时候,发现注入bean到spring对象中有很多种,有一种是@bean注解,并且括号里可以写一些初始化时要执行的方法,还有销毁时执行的方法,spring中后置处理器可以将某些be ...
- 通过Spring的BeanPostProcessor的 bean的后置处理器会拦截所有bean创建过程
postProcessBeforeInitialization 在init方法之前调用 postProcessAfterInitialization 在init方法之后调用 package com.C ...
- Spring Bean 后置处理器PostProcessor
BeanPostProcessor 接口定义回调方法,可以实现该方法来提供自己的实例化逻辑,依赖解析逻辑等.可以在 Spring 容器通过插入一个或多个 BeanPostProcessor 的实现来完 ...
- Spring Bean 后置处理器
转载自 Spring Bean 后置处理器 Spring--Bean 后置处理器 BeanPostProcessor 接口定义回调方法,你可以实现该方法来提供自己的实例化逻辑,依赖解析逻辑等.你也可 ...
- Spring(二)--FactoryBean、bean的后置处理器、数据库连接池、引用外部文件、使用注解配置bean等...
实验1:配置通过静态工厂方法创建的bean [通过静态方法提供实例对象,工厂类本身不需要实例化!] 1.创建静态工厂类 public class StaticFactory {private sta ...
- Spring容器创建流程(4)调用beanFactory后置处理器
postProcessBeanFactory() 方法留给子类去实现. invokeBeanFactoryPostProcessors() 调用bean工厂的后置处理器(以前的执行流程可在系列文章中查 ...
- spring--bean后置处理器(BeanPostProcessor)原理解析
文章目录 功能描述: 如何使用: 定义要处理的接口类型 添加实际需要处理的类 定义后置处理器 编写测试类 执行日志 后置处理器加载解析 registerBeanPostProcessors注册拦截be ...
最新文章
- 自定义ORM系列(三)工具雏形及基本用法
- 基于UNet和camvid数据集的道路分割
- docker搭建Redis的主从集群
- 2010-09-11
- C 语言实例 - 删除字符串中的特殊字符
- 原来编译通过,现在编译不通过,怎么回事?
- Git-第二篇廖雪峰Git教程学习笔记(1)基本命令,版本回退
- ue4下载安装(学习笔记)
- 硬件原理图 一键开关机电路
- docker容器中配置文件修改错误,导致容器无法启动
- 【测试】linux tc命令|Linux模拟网络延迟、丢包等|traffic control(流量控制)
- 实现Web端指纹登录
- 自定义dialog 可以读秒自动关闭
- 二、	常见传感器的检测
- netty 简单demo
- ffmpeg视频上传功能常用的俩个工具类【1.视频转码=2.视频抓图】
- matlab转差频率控制,异步电动机转差频率间接矢量控制matlab仿真(毕业设计).doc
- 乐视三合一摄像头和kinect_乐视三合一体感摄像头快评测,看看到底怎么玩?
- DALAL创建的行人检测INRIA数据库
- vue从后端获取数据绑定复选框