一.TbSchedule相关配置

pom.xml

<dependency><groupId>com.taobao.pamirs.schedule</groupId><artifactId>tbschedule-core</artifactId><version>3.4.1</version>
</dependency>

application.yml

zkConfig:zkConnectString: 127.0.0.1:2181rootPath: /zk根目录/次目录zkSessionTimeout: 60000userName: 用户名password: 密码isCheckParentPath: true

TbScheduleConfig.java

/*** TbSchedule配置*/
@Component
public class TbScheduleConfig {@Value("${zkConfig.zkConnectString}")private String zkConnectString;@Value("${zkConfig.rootPath}")private String rootPath;@Value("${zkConfig.zkSessionTimeout}")private String zkSessionTimeout;@Value("${zkConfig.userName}")private String userName;@Value("${zkConfig.password}")private String password;@Value("${zkConfig.isCheckParentPath}")private String isCheckParentPath;@Bean(initMethod = "init")public TBScheduleManagerFactory tbScheduleManagerFactory() {TBScheduleManagerFactory tbScheduleManagerFactory = new TBScheduleManagerFactory();Map<String, String> zkConfig = new HashMap<>(16);zkConfig.put("zkConnectString", getZkConnectString());zkConfig.put("rootPath", getRootPath());zkConfig.put("zkSessionTimeout", getZkSessionTimeout());zkConfig.put("userName", getUserName());zkConfig.put("password", getPassword());zkConfig.put("isCheckParentPath", getIsCheckParentPath());tbScheduleManagerFactory.setZkConfig(zkConfig);return tbScheduleManagerFactory;}public String getZkConnectString() {return zkConnectString;}public void setZkConnectString(String zkConnectString) {this.zkConnectString = zkConnectString;}public String getRootPath() {return rootPath;}public void setRootPath(String rootPath) {this.rootPath = rootPath;}public String getZkSessionTimeout() {return zkSessionTimeout;}public void setZkSessionTimeout(String zkSessionTimeout) {this.zkSessionTimeout = zkSessionTimeout;}public String getUserName() {return userName;}public void setUserName(String userName) {this.userName = userName;}public String getPassword() {return password;}public void setPassword(String password) {this.password = password;}public String getIsCheckParentPath() {return isCheckParentPath;}public void setIsCheckParentPath(String isCheckParentPath) {this.isCheckParentPath = isCheckParentPath;}/*** 文件传输*/@Bean(name = "任务Bean名称")@Lazy(false)@DependsOn({"tbScheduleManagerFactory"})public DoFileImportExcelTask doTask() {DoTask task = new DoTask();task.setTaskType("任务Bean名称");task.setOwnSign("BASE");return task;}}

StringUtil.java


/*** 字符串工具类,继续自 org.apache.commons.lang.StringUtils*/
public class StringUtil extends StringUtils {/*** 获取本地ip地址 Title: getLocalIP* <p>* Description:** @return*/public static String getLocalIP() {try {return InetAddress.getLocalHost().getHostAddress();} catch (Exception e) {return "";}}/*** 获取32位随机码 Title: getUUID* <p>* Description:** @return*/public static String getUUID() {return UUID.randomUUID().toString().replace("-", "").toUpperCase();}/*** 取摸的条件 Title: getCondition* <p>* Description:将服务器实例列表转为以逗号间隔的字符串** @param queueList* @return*/public static StringBuilder getCondition(List<TaskItemDefine> queueList) {StringBuilder condition = new StringBuilder();for (int i = 0; i < queueList.size(); i++) {if (i > 0) {condition.append(",");}TaskItemDefine s = queueList.get(i);condition.append(Integer.parseInt(s.getTaskItemId()));}return condition;}/*** 判断一个或多个对象是否为空** @param values 可变参数,要判断的一个或多个对象* @return 只有要判断的一个对象都为空则返回true, 否则返回false*/public static boolean isNull(Object... values) {if (Boolean.TRUE.equals(isNotNullAndNotEmpty(values))) {return false;}for (Object value : values) {boolean flag = false;if (value instanceof Object[]) {flag = !isNotNullAndNotEmpty((Object[]) value);} else if (value instanceof Collection<?>) {flag = !isNotNullAndNotEmpty((Collection<?>) value);} else {flag = (null == value);}if (flag) {return true;}}return false;}/*** 判断一个或多个对象是否为空** @param values 可变参数,要判断的一个或多个对象* @return 只有要判断的一个对象都为空则返回true, 否则返回false*/public static boolean isObjectNull(Object obj) {return obj == null;}/*** 判断一个或多个对象是否为非空** @param values 可变参数,要判断的一个或多个对象* @return 只有要判断的一个或多个对象都不为空则返回true, 否则返回false*/public static boolean isNotNull(Object... values) {if (Boolean.FALSE.equals(isNotNullAndNotEmpty(values))) {return false;}for (Object value : values) {boolean flag = true;if (value instanceof Object[]) {flag = isNotNullAndNotEmpty((Object[]) value);} else if (value instanceof Collection<?>) {flag = isNotNullAndNotEmpty((Collection<?>) value);} else {flag = (null != value);}if (!flag) {return false;}}return true;}/*** 判断对象数组是否为空并且数量大于0** @param value* @return*/public static Boolean isNotNullAndNotEmpty(Object[] value) {boolean bl = false;if (null != value && 0 < value.length) {bl = true;}return bl;}/*** 判断对象集合(List,Set)是否为空并且数量大于0** @param value* @return*/public static Boolean isNotNullAndNotEmpty(Collection<?> value) {boolean bl = false;if (null != value && !value.isEmpty()) {bl = true;}return bl;}/*** Title: splitList* <p>* Description:拆分list,按500条拆分** @param list* @return*/public static List<List<String>> splitList(List<String> list) {List<List<String>> lists = new ArrayList<>();List<String> subList;int size = list.size();int sum = 500;int count = size / sum;int yu = size % sum;if (count == 0) {lists.add(list);} else {if (size % sum != 0) {count++;}for (int i = 0; i < count; i++) {if (sum * (i + 1) <= size) {subList = list.subList(sum * i, sum * (i + 1));} else {subList = list.subList(sum * i, sum * (i) + yu);}lists.add(subList);}}return lists;}
}

二.TbSchedule相关代码

AbstractBaseWorkerTask.java

@Component
public abstract class AbstractBaseWorkerTask<T> implementsIScheduleTaskDealMulti<T>, ApplicationListener<ContextRefreshedEvent> {protected String taskName;protected String taskType;protected String ownSign;protected Log logger = LogFactory.getLog(this.getClass());protected abstract boolean processWorkerTask(Object[] tasks);protected abstract List<T> fetchWorkerTasks(String ownSign, int queueNum,List<TaskItemDefine> queueList, int fetchNum);@Overridepublic Comparator<T> getComparator() {return (t1, t2) -> 1;}/*** 加载worker任务*/@Overridepublic List<T> selectTasks(String taskParameter, String ownSign, int queueNum, List<TaskItemDefine> queueList, int fetchNum) throws Exception {logger.info(this.getClass().getSimpleName() + "扫描worker任务列表开始...");long start = System.currentTimeMillis();List<T> taskLst = fetchWorkerTasks(ownSign, queueNum, queueList, fetchNum); // 获取worker任务int taskNo = taskLst != null ? taskLst.size() : 0;long end = System.currentTimeMillis();logger.info("扫描worker任务列表结束共计加载[" + taskNo + "]个任务" + "  耗时:" + (end - start) + "毫秒");return taskLst;}/*** 执行worker任务*/@Overridepublic boolean execute(Object[] tasks, String arg1) throws Exception {boolean rst = true;logger.info("开始执行...");long startTimestamp = System.currentTimeMillis();rst = processWorkerTask(tasks);long endTimestamp = System.currentTimeMillis();logger.info("执行完毕:耗时:" + (endTimestamp - startTimestamp));return rst;}// 在spring启动完成后会执行此方法@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {logger.info("任务调度初始化");}public String getTaskName() {return taskName;}public void setTaskName(String taskName) {this.taskName = taskName;}public String getTaskType() {return taskType;}public void setTaskType(String taskType) {this.taskType = taskType;}public String getOwnSign() {return ownSign;}public void setOwnSign(String ownSign) {this.ownSign = ownSign;}/*** @param queueList* @return*/protected String getCondition(List<TaskItemDefine> queueList) {StringBuilder condition = new StringBuilder("");for (int i = 0; i < queueList.size(); i++) {if (i > 0) {condition.append(",");}TaskItemDefine s = queueList.get(i);condition.append(Integer.parseInt(s.getTaskItemId()));}return condition.toString();}
}

Task.java


public class Task  extends BaseModel  {/*** 主键*/private Long id;/*** 关联单号*/private String refId;/*** 任务类型*/private String taskType;/*** 任务类型*/private String carrierCode;/*** 任务数据*/private String taskData;/*** 任务的状态*/private Integer status;/*** 任务的执行次数*/private Integer taskExeCount;/*** 创建时间*/private Date createTime;/*** 更新时间*/private Date updateTime;/*** 执行实例服务器IP*/private String exeInstanceIp;/*** 推送状态*/private Integer statusPush;/*** 推送次数*/private Integer pushExeCount;private Integer yn;/*** 时间戳*/private Date ts;public Long getId() {return id;}public void setId(Long id) {this.id = id;}public String getRefId() {return refId;}public void setRefId(String refId) {this.refId = refId == null ? null : refId.trim();}public String getTaskType() {return taskType;}public void setTaskType(String taskType) {this.taskType = taskType == null ? null : taskType.trim();}public Integer getTaskExeCount() {return taskExeCount;}public void setTaskExeCount(Integer taskExeCount) {this.taskExeCount = taskExeCount;}public Integer getStatus() {return status;}public void setStatus(Integer status) {this.status = status;}public String getExeInstanceIp() {return exeInstanceIp;}public void setExeInstanceIp(String exeInstanceIp) {this.exeInstanceIp = exeInstanceIp == null ? null : exeInstanceIp.trim();}public Integer getStatusPush() {return statusPush;}public void setStatusPush(Integer statusPush) {this.statusPush = statusPush;}public Integer getPushExeCount() {return pushExeCount;}public void setPushExeCount(Integer pushExeCount) {this.pushExeCount = pushExeCount;}public Date getTs() {return ts;}public void setTs(Date ts) {this.ts = ts;}public String getTaskData() {return taskData;}public void setTaskData(String taskData) {this.taskData = taskData == null ? null : taskData.trim();}public String getCarrierCode() {return carrierCode;}public void setCarrierCode(String carrierCode) {this.carrierCode = carrierCode;}@Overridepublic String toString() {return "StarsSalesWhereTask{" +"id=" + id +", refId='" + refId + '\'' +", taskType='" + taskType + '\'' +", carrierCode='" + carrierCode + '\'' +", taskData='" + taskData + '\'' +", status=" + status +", taskExeCount=" + taskExeCount +", createTime=" + createTime +", updateTime=" + updateTime +", exeInstanceIp='" + exeInstanceIp + '\'' +", statusPush=" + statusPush +", pushExeCount=" + pushExeCount +", yn=" + yn +", ts=" + ts +'}';}
}

BaseModel.java


public class BaseModel implements Serializable {private static final long serialVersionUID = -610797345091216847L;protected int startRownum; // 分页开始行号protected int endRownum; // 分页结束行号protected int fetchNum; // 每次查询数据量protected int queueNum; // 队列数protected Date createTime; // 创建时间protected Date updateTime; // 更新时间protected Integer yn; // 是否有效 0 无效 1 有效protected String condition; // 取模条件protected String saveYn; // 取模条件public int getStartRownum() {return startRownum;}public void setStartRownum(int startRownum) {this.startRownum = startRownum;}public int getEndRownum() {return endRownum;}public void setEndRownum(int endRownum) {this.endRownum = endRownum;}public int getFetchNum() {return fetchNum;}public void setFetchNum(int fetchNum) {this.fetchNum = fetchNum;}public int getQueueNum() {return queueNum;}public void setQueueNum(int queueNum) {this.queueNum = queueNum;}public String getCondition() {return condition;}public void setCondition(String condition) {this.condition = condition;}public Date getCreateTime() {return createTime;}public void setCreateTime(Date createTime) {this.createTime = createTime;}public Date getUpdateTime() {return updateTime;}public void setUpdateTime(Date updateTime) {this.updateTime = updateTime;}public Integer getYn() {return yn;}public void setYn(Integer yn) {this.yn = yn;}public String getSaveYn() {return saveYn;}public void setSaveYn(String saveYn) {this.saveYn = saveYn;}
}

DoTaskService.java

@Service
@Transactional(rollbackFor = Exception.class)
public class DoTaskService {@Resourceprivate TaskMapper taskMapper;public int batchUpdateSyncTaskData(List<Task> taskList)  {String hostIp = StringUtil.getLocalIP();String uuid = StringUtil.getUUID();String exeInstanceIp = hostIp + "$" + uuid;for (Task inWorkerTask : taskList) {inWorkerTask.setExeInstanceIp(exeInstanceIp);inWorkerTask.setUpdateTime(new Date());}return taskMapper.batchUpdateSyncTaskData(taskList);}public List<StarsSalesWhereTask> selectSyncByTask(Task task)  {return taskMapper.selectSyncByTask(task);}public int batchUpdatePushTaskData(List<Task> taskList)  {String hostIp = StringUtil.getLocalIP();String uuid = StringUtil.getUUID();String exeInstanceIp = hostIp + "$" + uuid;for (StarsSalesWhereTask workerTask : taskList) {workerTask.setExeInstanceIp(exeInstanceIp);workerTask.setUpdateTime(new Date());}return this.taskMapper.batchUpdatePushTaskData(taskList);}public List<Task> selectPushByTask(Task task)  {return taskMapper.selectPushByTask(task);}}

TaskMapper.java

public interface TaskMapper {int insert(Task record);int deleteByPrimaryKey(Long id);int updateByPrimaryKey(Task record);int batchUpdateSyncTaskData(@Param("taskList") List<Task> taskList);int batchUpdatePushTaskData(List<Task> taskList);Task selectByPrimaryKey(Long id);List<Task> selectSyncByTask(Task example);List<Task> selectPushByTask(Task task);}

TaskMapper.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="TaskMapper"><resultMap id="BaseResultMap" type="Task"><id column="ID" jdbcType="BIGINT" property="id" /><result column="REF_ID" jdbcType="VARCHAR" property="refId" /><result column="TASK_TYPE" jdbcType="VARCHAR" property="taskType" /><result column="STATUS" jdbcType="INTEGER" property="status" /><result column="TASK_EXE_COUNT" jdbcType="INTEGER" property="taskExeCount" /><result column="CREATE_TIME" jdbcType="TIMESTAMP" property="createTime" /><result column="UPDATE_TIME" jdbcType="TIMESTAMP" property="updateTime" /><result column="EXE_INSTANCE_IP" jdbcType="VARCHAR" property="exeInstanceIp" /><result column="STATUS_PUSH" jdbcType="INTEGER" property="statusPush" /><result column="PUSH_EXE_COUNT" jdbcType="INTEGER" property="pushExeCount" /><result column="YN" jdbcType="INTEGER" property="yn" /><result column="ts" jdbcType="TIMESTAMP" property="ts" /></resultMap><resultMap extends="BaseResultMap" id="ResultMapWithBLOBs" type="Task"><result column="TASK_DATA" jdbcType="LONGVARCHAR" property="taskData" /></resultMap><sql id="Base_Column_List">ID, REF_ID, TASK_TYPE, TASK_DATA,`STATUS`, TASK_EXE_COUNT, CREATE_TIME, UPDATE_TIME, EXE_INSTANCE_IP,STATUS_PUSH, PUSH_EXE_COUNT, YN, ts</sql><sql id="Blob_Column_List">TASK_DATA</sql><select id="selectByPrimaryKey" parameterType="java.lang.Long" resultMap="ResultMapWithBLOBs">select<include refid="Base_Column_List" />,<include refid="Blob_Column_List" />from taskwhere ID = #{id,jdbcType=BIGINT}</select><delete id="deleteByPrimaryKey" parameterType="java.lang.Long">delete from taskwhere ID = #{id,jdbcType=BIGINT}</delete><!-- <insert id="insert" keyColumn="ID" keyProperty="id" parameterType="com.stars.file.entity.StarsSalesWhereTask" useGeneratedKeys="true">insert into task (REF_ID, TASK_TYPE, `STATUS`,TASK_EXE_COUNT, CREATE_TIME, UPDATE_TIME,EXE_INSTANCE_IP, STATUS_PUSH, PUSH_EXE_COUNT,YN, ts, TASK_DATA)values (#{refId,jdbcType=VARCHAR}, #{taskType,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},#{taskExeCount,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP}, #{updateTime,jdbcType=TIMESTAMP},#{exeInstanceIp,jdbcType=VARCHAR}, #{statusPush,jdbcType=INTEGER}, #{pushExeCount,jdbcType=INTEGER},#{yn,jdbcType=INTEGER}, #{ts,jdbcType=TIMESTAMP}, #{taskData,jdbcType=LONGVARCHAR})</insert>--><insert id="insert" keyColumn="ID" keyProperty="id" parameterType="com.stars.file.entity.StarsSalesWhereTask" useGeneratedKeys="true">insert into task (TASK_TYPE, `STATUS`,TASK_EXE_COUNT, CREATE_TIME,STATUS_PUSH, PUSH_EXE_COUNT,YN, ts, TASK_DATA)values (#{taskType,jdbcType=VARCHAR}, #{status,jdbcType=INTEGER},#{taskExeCount,jdbcType=INTEGER}, #{createTime,jdbcType=TIMESTAMP},#{statusPush,jdbcType=INTEGER}, #{pushExeCount,jdbcType=INTEGER},#{yn,jdbcType=INTEGER}, #{ts,jdbcType=TIMESTAMP}, #{taskData,jdbcType=LONGVARCHAR})</insert><insert id="insertSelective" keyColumn="ID" keyProperty="id" parameterType="com.stars.file.entity.StarsSalesWhereTask" useGeneratedKeys="true">insert into task<trim prefix="(" suffix=")" suffixOverrides=","><if test="refId != null">REF_ID,</if><if test="taskType != null">TASK_TYPE,</if><if test="status != null">`STATUS`,</if><if test="taskExeCount != null">TASK_EXE_COUNT,</if><if test="createTime != null">CREATE_TIME,</if><if test="updateTime != null">UPDATE_TIME,</if><if test="exeInstanceIp != null">EXE_INSTANCE_IP,</if><if test="statusPush != null">STATUS_PUSH,</if><if test="pushExeCount != null">PUSH_EXE_COUNT,</if><if test="yn != null">YN,</if><if test="ts != null">ts,</if><if test="taskData != null">TASK_DATA,</if></trim><trim prefix="values (" suffix=")" suffixOverrides=","><if test="refId != null">#{refId,jdbcType=VARCHAR},</if><if test="taskType != null">#{taskType,jdbcType=VARCHAR},</if><if test="status != null">#{status,jdbcType=INTEGER},</if><if test="taskExeCount != null">#{taskExeCount,jdbcType=INTEGER},</if><if test="createTime != null">#{createTime,jdbcType=TIMESTAMP},</if><if test="updateTime != null">#{updateTime,jdbcType=TIMESTAMP},</if><if test="exeInstanceIp != null">#{exeInstanceIp,jdbcType=VARCHAR},</if><if test="statusPush != null">#{statusPush,jdbcType=INTEGER},</if><if test="pushExeCount != null">#{pushExeCount,jdbcType=INTEGER},</if><if test="yn != null">#{yn,jdbcType=INTEGER},</if><if test="ts != null">#{ts,jdbcType=TIMESTAMP},</if><if test="taskData != null">#{taskData,jdbcType=LONGVARCHAR},</if></trim></insert><update id="updateByPrimaryKeySelective" parameterType="Task">update task<set><if test="refId != null">REF_ID = #{refId,jdbcType=VARCHAR},</if><if test="taskType != null">TASK_TYPE = #{taskType,jdbcType=VARCHAR},</if><if test="status != null">`STATUS` = #{status,jdbcType=INTEGER},</if><if test="taskExeCount != null">TASK_EXE_COUNT = #{taskExeCount,jdbcType=INTEGER},</if><if test="createTime != null">CREATE_TIME = #{createTime,jdbcType=TIMESTAMP},</if><if test="updateTime != null">UPDATE_TIME = #{updateTime,jdbcType=TIMESTAMP},</if><if test="exeInstanceIp != null">EXE_INSTANCE_IP = #{exeInstanceIp,jdbcType=VARCHAR},</if><if test="statusPush != null">STATUS_PUSH = #{statusPush,jdbcType=INTEGER},</if><if test="pushExeCount != null">PUSH_EXE_COUNT = #{pushExeCount,jdbcType=INTEGER},</if><if test="yn != null">YN = #{yn,jdbcType=INTEGER},</if><if test="ts != null">ts = #{ts,jdbcType=TIMESTAMP},</if><if test="taskData != null">TASK_DATA = #{taskData,jdbcType=LONGVARCHAR},</if></set>where ID = #{id,jdbcType=BIGINT}</update><update id="updateByPrimaryKeyWithBLOBs" parameterType="Task">update taskset REF_ID = #{refId,jdbcType=VARCHAR},TASK_TYPE = #{taskType,jdbcType=VARCHAR},`STATUS` = #{status,jdbcType=INTEGER},TASK_EXE_COUNT = #{taskExeCount,jdbcType=INTEGER},CREATE_TIME = #{createTime,jdbcType=TIMESTAMP},UPDATE_TIME = #{updateTime,jdbcType=TIMESTAMP},EXE_INSTANCE_IP = #{exeInstanceIp,jdbcType=VARCHAR},STATUS_PUSH = #{statusPush,jdbcType=INTEGER},PUSH_EXE_COUNT = #{pushExeCount,jdbcType=INTEGER},YN = #{yn,jdbcType=INTEGER},ts = #{ts,jdbcType=TIMESTAMP},TASK_DATA = #{taskData,jdbcType=LONGVARCHAR}where ID = #{id,jdbcType=BIGINT}</update><update id="updateByPrimaryKey" parameterType="Task">update taskset REF_ID = #{refId,jdbcType=VARCHAR},TASK_TYPE = #{taskType,jdbcType=VARCHAR},`STATUS` = #{status,jdbcType=INTEGER},TASK_EXE_COUNT = #{taskExeCount,jdbcType=INTEGER},CREATE_TIME = #{createTime,jdbcType=TIMESTAMP},UPDATE_TIME = #{updateTime,jdbcType=TIMESTAMP},EXE_INSTANCE_IP = #{exeInstanceIp,jdbcType=VARCHAR},STATUS_PUSH = #{statusPush,jdbcType=INTEGER},PUSH_EXE_COUNT = #{pushExeCount,jdbcType=INTEGER},YN = #{yn,jdbcType=INTEGER},ts = #{ts,jdbcType=TIMESTAMP}where ID = #{id,jdbcType=BIGINT}</update><select id="selectByByRefid" parameterType="java.lang.String"resultMap="BaseResultMap">select<include refid="Base_Column_List"/>from taskwhere REF_ID = #{id,jdbcType=VARCHAR}</select><!--   public static final int TASK_WIATIMG = 1; // 等待处理状态public static final int TASK_EXETING = 2;// 正在处理中public static final int TASK_SUCEESS = 3;// 成功处理完成public static final int TASK_ERROR = 4;// 处理异常状态public static final int TASK_SKIP = 5;// 同步忽略状态--><!--2.1.获取待执行的任务 --><select id="selectSyncByTask" parameterType="Task" resultMap="BaseResultMap">select<include refid="Base_Column_List"/>from taskwhere (status = 1 or (status = 2 and<![CDATA[ update_time < (now() - 10 / 60 / 24)  ]]>)) and<if test="taskType != null">TASK_TYPE = #{taskType,jdbcType=VARCHAR} and</if><if test="condition != null">mod(id,#{queueNum,jdbcType=NUMERIC}) in (${condition}) and</if>YN = 0<if test="fetchNum != null"><![CDATA[ limit 0,#{fetchNum,jdbcType=NUMERIC} ]]></if></select><!--2.2.批量更新任务表状态 --><update id="batchUpdateSyncTaskData" parameterType="java.util.List">update task<trim prefix="set" suffix="UPDATE_TIME = now()"><trim prefix="TASK_EXE_COUNT = case" suffix="end,"><foreach collection="taskList" item="task">when id = #{task.id,jdbcType=NUMERIC} then #{task.taskExeCount}</foreach></trim><trim prefix="EXE_INSTANCE_IP = case" suffix="end,"><foreach collection="taskList" item="task">when id = #{task.id,jdbcType=NUMERIC} then #{task.exeInstanceIp}</foreach></trim><trim prefix="STATUS = case" suffix="end,"><foreach collection="taskList" item="task">when id = #{task.id,jdbcType=NUMERIC} then #{task.status}</foreach></trim></trim><foreach collection="taskList" item="task" open="where id in(" close=") and YN = 0" separator=",">#{task.id,jdbcType=NUMERIC}</foreach></update><!-- 3.1获取待推送订单的任务 --><select id="selectPushByTask"parameterType="Task"resultMap="BaseResultMap">select<include refid="Base_Column_List"/>from taskwhere status = 3 and (status_push = 1 or (status_push = 2 and<![CDATA[ update_time < (now() - 10 / 60 / 24) and ]]>) ) and<if test="taskType != null">TASK_TYPE = #{taskType,jdbcType=VARCHAR} and</if><if test="condition != null">mod(id,#{queueNum,jdbcType=NUMERIC}) in (${condition}) and</if>YN = 1<if test="fetchNum != null"><![CDATA[ limit 0,#{fetchNum,jdbcType=NUMERIC} ]]></if></select><!-- 3.2批量更新任务表推送状态 --><update id="batchUpdatePushTaskData" parameterType="java.util.List">update tasksetpush_exe_count =#{pushExeCount,jdbcType=NUMERIC},exe_instance_ip =#{exeInstanceIp,jdbcType=NUMERIC},status_Push =#{statusPush,jdbcType=NUMERIC},update_time =#{updateTime,jdbcType=TIMESTAMP}where id in<foreach collection="list" item="item" index="index" open="("close=")" separator=",">#{item.id,jdbcType=NUMERIC}</foreach><if test="statusPush != 1 and statusPush != 2">and status_Push != 1</if>and YN = 1</update></mapper>

三.TbSchedule相关SQL

task.sql


SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;DROP TABLE IF EXISTS `task`;
CREATE TABLE `task`  (`ID` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',`REF_ID` varchar(300) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '关联单号',`TASK_TYPE` varchar(30) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT '1' COMMENT '任务类型',`TASK_DATA` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '任务数据(必须是json字符串格式)',`STATUS` int(11) NULL DEFAULT 1 COMMENT '任务的状态(1等待处理,2正在处理,3处理成功,4处理异常,5同步忽略状态)',`TASK_EXE_COUNT` int(11) NULL DEFAULT 0 COMMENT '任务的执行次数',`CREATE_TIME` datetime(0) NULL DEFAULT NULL COMMENT '创建时间',`UPDATE_TIME` datetime(0) NULL DEFAULT NULL COMMENT '更新时间',`EXE_INSTANCE_IP` varchar(300) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '执行实例服务器IP',`STATUS_PUSH` int(11) NOT NULL DEFAULT 1 COMMENT '推送状态',`PUSH_EXE_COUNT` int(11) NOT NULL DEFAULT 0 COMMENT '推送次数',`YN` int(6) NOT NULL DEFAULT 0 COMMENT '0:启用,1:作废',`ts` timestamp(0) NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP(0) COMMENT '时间戳',PRIMARY KEY (`ID`) USING BTREE,INDEX `idx_status`(`STATUS`) USING BTREE,INDEX `idx_status_status_push`(`STATUS`, `STATUS_PUSH`) USING BTREE,INDEX `idx_status_task_type`(`STATUS`, `TASK_TYPE`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 8594 CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = '描述' ROW_FORMAT = Dynamic;SET FOREIGN_KEY_CHECKS = 1;

四.执行任务

DoTask.java

public class DoTask extends AbstractBaseWorkerTask<Task> {@Autowiredprivate DoTaskService workerTaskService;/***  第一步 获取数据根据task_type* @param ownSign* @param queueNum* @param queueList* @param fetchNum  每次获取的数据量* @return*/@Overrideprotected List<Task> fetchWorkerTasks(String ownSign, int queueNum, List<TaskItemDefine> queueList, int fetchNum) {logger.info(" 获取开始 ... ");List<Task> rstList = new ArrayList<>();StringBuilder condition = StringUtil.getCondition(queueList);logger.info("condition = " + condition + " ; queueNum = " + queueNum);try {Task param = new Task();param.setQueueNum(queueNum);param.setFetchNum(fetchNum);String queuelistStr = StringUtil.getCondition(queueList).toString();if (StringUtils.isBlank(queuelistStr)) {return rstList;}// 查询条件param.setCondition(queuelistStr);// 设置任务执行中param.setStatus(1);param.setCondition(queuelistStr);param.setTaskType("任务类型");long startTime = System.currentTimeMillis();// 具体查询方法rstList = this.workerTaskService.selectSyncByTask(param);long endTime = System.currentTimeMillis();int cnt = rstList == null ? 0 : rstList.size();logger.info(" 获取总数 [ " + cnt + " ] ,耗时[" + (endTime - startTime)+ " ]毫秒");} catch (Exception e) {logger.error(" 获取失败: " + e, e);}return rstList;}/*** 第二步 根据获取数据做处理* @param tasks* @return*/@Overrideprotected boolean processWorkerTask(Object[] tasks) {logger.info(" 执行开始 ...");// 记录执行的任务列表信息List<Task> taskList = new ArrayList<>();try {// 赋值状态和执行次数for (Object object : tasks) {if (object instanceof Task) {Task workerTask = (Task) object;workerTask.setTaskData(workerTask.getTaskData());workerTask.setStatus(2);workerTask.setTaskExeCount(workerTask.getTaskExeCount() + 1);workerTask.setUpdateTime(new Date());taskList.add(workerTask);}}// 在操作之前现将所有带操作的数据更新为待执行状态int updateRes = this.workerTaskService.batchUpdateSyncTaskData(taskList);if (updateRes > 0) {// 执行成功的集合List<Task> successTaskList = new ArrayList<>();// 执行失败的集合List<Task> errorTaskList = new ArrayList<>();for (Task task : taskList) {Boolean result = 业务逻辑;if(result){successTaskList.add(task);} else{errorTaskList.add(task);}}// 更新任务表执行状态if (!successTaskList.isEmpty()) {handerTask(successTaskList,3);}if (!errorTaskList.isEmpty()) {handerTask(errorTaskList, 4);}}} catch (Exception e) {logger.error(" 执行失败:" + e, e);}logger.info(" 执行成功");return false;}/*** <hr color = green>* <b>方法名称 : </b> handerTask<br>* <b>方法注释 : </b> 异常处理类** @param taskList* @param operateStatus* @return*/private int handerTask(List<Task> taskList, int operateStatus) {int res = -1;if (taskList.isEmpty()) {logger.info( " 状态为 [" + operateStatus + "] 数据为空");} else {for (Task task : taskList) {int exeCount = task.getTaskExeCount();// 判断如果任务执行失败并且此条任务的执行次数小于等于3次的将任务状态重新更新为待执行if (exeCount <= 3&& operateStatus == 4) {operateStatus = 1;}task.setStatus(operateStatus);task.setUpdateTime(new Date());}try {// 批量更新任务状态long startTime = System.currentTimeMillis();res = workerTaskService.batchUpdateSyncTaskData(taskList);long endTime = System.currentTimeMillis();logger.info(" 状态为  [" + operateStatus + "] 处理完成, 耗时 ["+ (endTime - startTime) + "] 毫秒");} catch (Exception e) {logger.error( " 状态为  [" + operateStatus + "] 处理失败:" + e, e);return res;}}return res;}}

TbSchedule的简单使用及其配置相关推荐

  1. JSTL 及 tablibs 的简单介绍和配置方法

    JSTL 及 tablibs 的简单介绍和配置方法 jstl 简介 jstl 的全称就是jsp standard tag libraries, 就是jsp里的标准标签库. 引用jstl技术能在jsp种 ...

  2. 阿里微服务专家自己手写Spring Boot 实现一个简单的自动配置模块

    为了更好的理解 Spring Boot 的 自动配置和工作原理,我们自己来实现一个简单的自动配置模块. 假设,现在项目需要一个功能,需要自动记录项目发布者的相关信息,我们如何通过 Spring Boo ...

  3. Tornado 使用手册(一)---------- 简单的tornado配置

    2019独角兽企业重金招聘Python工程师标准>>> #Tornado 使用手册(一)---------- 简单的tornado配置 1. 简单的web.py import tor ...

  4. saltstack的简单安装和配置

    saltstack的简单安装和配置 什么是saltstack? SaltStack是一个服务器基础架构集中化管理平台,具备配置管理.远程执行.监控等功能,一般可以理解为简化版的puppet和加强版的f ...

  5. Spring简单的文件配置

    Spring简单的文件配置 "计应134(实验班) 凌豪" 一.Spring文件配置 spring至关重要的一环就是装配,即配置文件的编写,接下来我按刚才实际过程中一步步简单讲解. ...

  6. 简单的三层交换配置路由实验 (思科)

    实验名称:简单的三层交换配置路由 (思科) 实验拓扑: 1.配置终端设备:pc1-pc5按照拓扑图中所示配置各个pc的ip地址.网关配置为 192.168.x.254 pc6配置为 192.168.7 ...

  7. 简单教学 apache 配置 Expire/Cache-Control 头

    简单教学 apache 配置 Expire/Cache-Control 头 这里我使用的是Apache2.4.17 打开apache安装目录,找到conf目录,用记事本打开httpd.conf 文件. ...

  8. 简单安装与配置mysql数据库(绿色版)

    简单安装与配置mysql数据库(绿色版) 目录 绿色版下载 mysql绿色版(5.7版本的安装与配置) 绿色版下载: mysql官网下载地址:https://www.oracle.com/index. ...

  9. 【RHCE】NFS服务器简介及简单共享目录配置

    目录 NFS服务器简介 NFS的使用 [手工挂载] 客户端使用autofs自动挂载 NFS服务器简介 配置文件置顶: 主配置文件:vim /etc/exports[文件可能不存在.正常的] NFS(N ...

  10. linux网桥的简单理解和配置

    linux网桥的简单理解和配置 Linux网桥是linux虚拟网络设备之一.网上很多分析linux网桥的文章,例如代码层面的分析.这里不牵扯复杂的分析和配置,主要是面向虚拟机链接一个用途,小白我的备忘 ...

最新文章

  1. iShow UI for React 最佳实践
  2. php常用操作数组函数,PHP自带的几个实用的数组函数
  3. .net core json 为null输出_SpringBoot实战(九):标准化json返回值
  4. 【小程序】当前“页面B”动态更改title,点击返回按钮,更改的标题会显示在“来源页面A”...
  5. 内核同步 (来自chinaunix总结)
  6. Java Map 自定义排序
  7. mysql 书名_深入理解MySQL
  8. Mycat跨库join实现方式总结
  9. Google 推出的编程学习应用 Grasshopper
  10. ​ACL 2022 | 普林斯顿陈丹琦组:模型剪枝的加速方法
  11. mysql sniffer 官网下载_MySQL抓包工具:MySQL Sniffer 和性能优化
  12. python qq群发消息_python qq发消息
  13. 洛谷 P4238 【模板】多项式乘法逆
  14. Python入门(二)-编程环境
  15. bandizip没有右键菜单解决办法
  16. 关于银河麒麟系统配置本地yum源配置流程说明
  17. Android 调用so库全过程
  18. Python安装Numpy模块
  19. xcode打包报错Command CodeSign failed with a nonzero exit code的解决方案
  20. tidymodels绘制校准曲线

热门文章

  1. 公转对讲融合项目如何实现对接?
  2. 关于 u-nas 报警声音
  3. 怎么完全卸载赛门铁克_symantec卸载方法
  4. 九、Kali Linux 2 社会工程学工具
  5. 一刀工具箱 - 古诗文查询
  6. 国防科技大学|信息化保障和支援能力训练虚拟仿真实验
  7. 《生产实习》实习报告——JAVA大数据工程师
  8. 职称计算机题库 云盘,职称计算机考试题库「附答案」
  9. Google Chrome 49.0.2623.112 XP系统最终版离线安装包官方下载地址
  10. Go 开发关键技术指南 | 带着服务器编程金刚经走进 2020 年