2019独角兽企业重金招聘Python工程师标准>>>

算法原理

分配服务器算法

算法原理是注册中心有一个队列表PAMIRS_S
,它包含如下关键信息:QUEUE_ID是队列标识   CUR_SERVER是当前分配服务器标识,REQ_SERVER是申请分配服务器标识。

假如有1,2,3,4,5个队列,有A,B,C三个服务器依次启动。则算法的规则是这样的:

A启动的时候:

由于没有其它的主机,则将所有的队列分配给A。

QUEUE_ID  CUR_SERVER REQ_SERVER
1 A     
2 A
3 A
4 A
5 A

B启动的时候:

QUEUE_ID  CUR_SERVER REQ_SERVER
1 A     
2 A B
3 A
4 A B
5 A

C启动的时候:

QUEUE_ID  CUR_SERVER REQ_SERVER
1 A     
2 A B
3 A C
4 A
5 A B

D启动的时候:

QUEUE_ID  CUR_SERVER REQ_SERVER
1 A     
2 A B
3 A C
4 A D
5 A

服务器释放算法

上述算法中实现了预分配,那什么时候实现正式分配呢?当在获取任务队列的时候(必须控制在当前服务器中的所有任务都执行完毕的情况下,否则会重复执行任务的可能性)会先释放自己已经持有,但是别人要申请的队列,将这些队列让给申请人。

比如当前队列是A,在执行释放队列前的数据状态是:

QUEUE_ID  CUR_SERVER REQ_SERVER
1 A     
2 A B
3 A C
4 A D
5 A

释放自己持有,别人申请的队列之后的数据状态为:

QUEUE_ID  CUR_SERVER REQ_SERVER
1 A     
2 B
3 C
4 D
5 A

这个时候A持有的队列只有1和5了,队列就实现了均匀的分配给所有机器。

算法实现

分配队列代码实现

最开始的代码是在TBScheduleManager的方法assignScheduleTask方法。

public void assignScheduleTask() throws Exception {int clearServerCount = scheduleCenter.clearExpireScheduleServer(this.taskTypeInfo,this.taskTypeRunningInfo);List<ScheduleServer> serverList = scheduleCenter.selectAllValidScheduleServer(this.getTaskTypeRunningInfo().getTaskType());int clearTaskQueueInfoCount = scheduleCenter.clearTaskQueueInfo(this.getTaskTypeRunningInfo().getTaskType(), serverList);boolean isNeedReAssign = false;if (clearServerCount > 0 || clearTaskQueueInfoCount > 0) {isNeedReAssign = true;} else  {for (ScheduleServer item : serverList) {//注意,比较时间一定要用数据库时间if (item.getCenterServerTime().getTime() - item.getRegisterTime().getTime()< taskTypeInfo.getJudgeDeadInterval() * 3 ) {isNeedReAssign = true;break;}} }if (isNeedReAssign == true) {scheduleCenter.assignQueue(this.getTaskTypeRunningInfo().getTaskType(),this.currenScheduleServer.getUuid(), serverList);}if (log.isDebugEnabled()) {//log.debug(message);}}

它会先查询一下是否需要重新分配队列,当已经清理过过期的服务器,或者已经清理过非法服务器持有的队列,或者有新的服务器(注册时间距离现在时间小于3个时间周期)注册的时候,则需要重新预分配队列。比较时间一定要以注册中心的时间为准。

需要重新预分配队列则进入方法scheduleCenter.assignQueue。

private Connection getConnection() throws SQLException{Connection result = this.dataSource.getConnection();if(result.getAutoCommit() == true){result.setAutoCommit(false);}return result;} public void assignQueue(String taskType, String currentUuid,List<ScheduleServer> serverList) throws Exception {Connection conn = null;try{conn = this.getConnection();clientInner.assignQueue(conn, taskType,currentUuid,serverList);conn.commit();}catch(Throwable e){if(conn != null){conn.rollback();}if(e instanceof Exception){throw (Exception)e;}else{throw new Exception(e);}             }finally{if(conn!= null){conn.close();}}      }

这个方法说明连接关闭了自动提交,方法内的多个SQL执行是在一个事务里的。这个非常关键。

/*** 重新分配任务处理队列* * @param taskType* @param serverList* @throws Exception*/public void assignQueue(Connection conn,String taskType, String currentUuid,List<ScheduleServer> serverList) throws Exception {this.lockTaskTypeRunningInfo(conn,taskType, currentUuid);String sqlQueue = " SELECT TASK_TYPE,QUEUE_ID,CUR_SERVER,REQ_SERVER FROM "+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")+ " WHERE TASK_TYPE = ? ORDER BY QUEUE_ID";PreparedStatement stmtQueue = conn.prepareStatement(sqlQueue);stmtQueue.setString(1, taskType);ResultSet setQueue = stmtQueue.executeQuery();int point = 0;int taskCount = 0;while (setQueue.next()) {PreparedStatement stmtUpdateQueue = null;String sqlModifyQueue = "";if (setQueue.getString("CUR_SERVER") == null) {sqlModifyQueue = " UPDATE "+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")+ " SET CUR_SERVER = ?,REQ_SERVER = null,GMT_MODIFIED = "+ getDataBaseSysdateString(conn)+ " WHERE TASK_TYPE = ? and QUEUE_ID = ? ";stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue);stmtUpdateQueue.setString(1, serverList.get(point).getUuid());stmtUpdateQueue.setString(2, taskType);stmtUpdateQueue.setString(3, setQueue.getString("QUEUE_ID"));stmtUpdateQueue.executeUpdate();stmtUpdateQueue.close();} else if (!(serverList.get(point).getUuid().equalsIgnoreCase(setQueue.getString("CUR_SERVER")) == true && setQueue.getString("REQ_SERVER") == null)) {sqlModifyQueue = " UPDATE "+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")+ " SET REQ_SERVER = ? ,GMT_MODIFIED = "+ getDataBaseSysdateString(conn)+ " WHERE TASK_TYPE = ? and QUEUE_ID = ? ";stmtUpdateQueue = conn.prepareStatement(sqlModifyQueue);stmtUpdateQueue.setString(1, serverList.get(point).getUuid());stmtUpdateQueue.setString(2, taskType);stmtUpdateQueue.setString(3, setQueue.getString("QUEUE_ID"));stmtUpdateQueue.executeUpdate();stmtUpdateQueue.close();} else {// 不需要修改当前记录的信息}taskCount = taskCount + 1;if (point >= serverList.size() - 1) {point = 0;} else {point = point + 1;}}setQueue.close();stmtQueue.close();if (taskCount == 0) {throw new Exception("没有对任务类型配置数据处理队列,TASK_TYPE = " + taskType);}}
public void lockTaskTypeRunningInfo(Connection conn,String taskType, String lockServerUuid)throws Exception {String sql = " UPDATE "+ transferTableName(conn, "PAMIRS_SCHEDULE_TASKTRUN")+ " set LAST_ASSIGN_TIME = "+ getDataBaseSysdateString(conn)+ ",LAST_ASSIGN_UUID = ? , GMT_MODIFIED = "+ getDataBaseSysdateString(conn) + " where TASK_TYPE = ? ";PreparedStatement statement = conn.prepareStatement(sql);statement.setString(1, lockServerUuid);statement.setString(2, taskType);statement.executeUpdate();statement.close();}

分配队列之前,会先调用方法lockTaskTypeRunningInfo对这个运行期类型进行加锁,看它使用的SQL语句可以看出来,它是使用了数据库实现的行锁(或者范围锁)来实现加锁,避免多个进程同时分配队列时的冲突,其它进程若要更新该行需要等待释放锁。这就要求我们在建表的时候一定要对字段TASK_TYPE建立索引,并且如果是mysql的话,要选择支持行锁的表引擎,避免锁粒度过大导致的系统性能问题。

分配队列的实现是先查询出该任务所有的队列列表,然后循环这个列表,依次给这个队列列表分配服务器,参数输入的是有效服务器列表。

这个代码就实现了上述算法。它依次对队列列表进行循环,有下面这些情况:

如果当前队列未分配服务器(即 CUR_SERVER=null)则将当前服务器分配给该队列(即赋值给CUR_SERVER字段);

如果当前队列已经分配服务器(即 CUR_SERVER!=null),并且分配的服务器不是当前服务器,则将当前服务器设置为待分配服务器(即赋值给REQ_SERVER字段);如果是当前服务器则表示应分配,就没有必要再放入待分配服务器。

其中服务器的选择是循环的,因为服务器的数量可能小于队列数。选择到最后一个服务器则下一个又回到第一个服务器。

这样就实现了服务器可以均匀的分配给多个队列,当服务器数大于队列数的时候就有可能会出现有的服务器无法分配给对应的任务队列的问题,会报警。

服务器代码实现

在调度管理器中有一个获取当前服务器某个任务队列列表的方法,查看该方法源码可以看到检查处理器中的数据是否已经处理完,若没有处理完则会循环等待阻塞程序直到处理完成才能继续获取任务队列。它最终调用了私有方法getCurrentScheduleQueueNow。

/*** 重新加载当前服务器的任务队列* 1、释放当前服务器持有,但有其它服务器进行申请的任务队列* 2、重新获取当前服务器的处理队列* * 为了避免此操作的过度,阻塞真正的数据处理能力。系统设置一个重新装载的频率。例如1分钟* * 特别注意:*   此方法的调用必须是在当前所有任务都处理完毕后才能调用,否则是否任务队列后可能数据被重复处理*/@SuppressWarnings("static-access")public List<String> getCurrentScheduleQueue() {try{if (this.isNeedReloadQueue == true) {            //特别注意:需要判断数据队列是否已经空了,否则可能在队列切换的时候导致数据重复处理//主要是在线程不休眠就加载数据的时候一定需要这个判断if (this.processor != null) {while (this.processor.isDealFinishAllData() == false) {Thread.currentThread().sleep(50);}}//真正开始处理数据this.getCurrentScheduleQueueNow();}this.lastReloadTaskQueueTime = ScheduleUtil.getCurrentTimeMillis();      return this.currentTaskQueue;       }catch(Exception e){throw new RuntimeException(e);}}

getCurrentScheduleQueueNow方法才真正实现了获取队列的逻辑,我们进去看一下。

private List<String> getCurrentScheduleQueueNow() throws Exception {//是否被人申请的队列this.scheduleCenter.releaseDealQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid());//重新查询当前服务器能够处理的队列this.currentTaskQueue = this.scheduleCenter.reloadDealQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid());//如果超过10个心跳周期还没有获取到调度队列,则报警if(this.currentTaskQueue.size() ==0 && ScheduleUtil.getCurrentTimeMillis() - this.lastReloadTaskQueueTime> this.taskTypeInfo.getHeartBeatRate() * 10){            String message ="调度服务器" + this.currenScheduleServer.getUuid() +"[TASK_TYPE=" + this.getTaskTypeRunningInfo().getTaskType() + "]自启动以来,超过10个心跳周期,还 没有获取到分配的任务队列";log.warn(message);if(this.scheduleAlert != null){this.scheduleAlert.noTaskQueue(this.getTaskTypeRunningInfo().getTaskType(), this.currenScheduleServer.getUuid(),message);}}if(this.currentTaskQueue.size() >0){//更新时间戳this.lastReloadTaskQueueTime = ScheduleUtil.getCurrentTimeMillis();}return this.currentTaskQueue;}

它先调用了scheduleCenter.releaseDealQueue方法释放自己的队列,即下列代码。然后重新加载自己的队列,当10个周期获取到的队列数为0则会报警。

/*** 释放自己把持,别人申请的队列* * @param taskType* @param uuid* @return* @throws Exception*/public void releaseDealQueue(Connection conn,String taskType, String uuid) throws Exception {String querySql = "select QUEUE_ID from "+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")+ " WHERE TASK_TYPE = ? and CUR_SERVER = ?  AND  REQ_SERVER IS NOT NULL ";PreparedStatement stmtQueue = conn.prepareStatement(querySql);stmtQueue.setString(1, taskType);stmtQueue.setString(2, uuid);ResultSet set = stmtQueue.executeQuery();List<String> queueIds = new ArrayList<String>();while(set.next()){queueIds.add(set.getString("QUEUE_ID"));}set.close();stmtQueue.close();String sqlQueue = " update "+ transferTableName(conn, "PAMIRS_SCHEDULE_QUEUE")+ " set CUR_SERVER = REQ_SERVER,REQ_SERVER = NULL, GMT_MODIFIED = "+ getDataBaseSysdateString(conn)+ " WHERE TASK_TYPE = ? and CUR_SERVER = ? AND QUEUE_ID = ?  AND  REQ_SERVER IS NOT NULL ";for(String queueId:queueIds){stmtQueue = conn.prepareStatement(sqlQueue);stmtQueue.setString(1, taskType);stmtQueue.setString(2, uuid);stmtQueue.setString(3, queueId);stmtQueue.executeUpdate();stmtQueue.close();conn.commit();}}

该方法的实现是查询当前任务分给当前服务器的所有队列列表,然后会依次循环将字段REQ_SERVER的值赋给字段CUR_SERVER,也就是表示将待分配服务器正式设置为已分配服务器,并且将REQ_SERVER设置为空,这也就实现了服务器释放算法。

转载于:https://my.oschina.net/ywbrj042/blog/634097

taobao-pamirs-schedule-2.0源码分析——任务队列分配源码分析相关推荐

  1. 鸿蒙OS内核分析|解读鸿蒙源码

    操作系统(Operating System): 操作系统的功能是负责管理各种硬件设备,同时对底层硬件设备进行抽象,为上层软件提供高效的系统接口.操作系统设计和实现的优劣直接决定了系统栈的各个方面,比如 ...

  2. 触摸屏驱动分析(eeti源码为例)

    module_init(egalax_i2c_ts_init)–>表示驱动加载时首先执行的函数是egalax_i2c_ts_init,下面看egalax_i2c_ts_init函数源码: sta ...

  3. Django源码分析3:处理请求wsgi分析与视图View

    django源码分析 本文环境python3.5.2,django1.10.x系列 根据前上一篇runserver的博文,已经分析了本地调试服务器的大致流程,现在我们来分析一下当runserver运行 ...

  4. 【Android 插件化】Hook 插件化框架 ( 从 Hook 应用角度分析 Activity 启动流程 二 | AMS 进程相关源码 | 主进程相关源码 )

    Android 插件化系列文章目录 [Android 插件化]插件化简介 ( 组件化与插件化 ) [Android 插件化]插件化原理 ( JVM 内存数据 | 类加载流程 ) [Android 插件 ...

  5. php从内存中获取源码_【PHP7源码分析】PHP内存管理

    作者: 顺风车运营研发团队 李乐 第一章 从操作系统内存管理说起 程序是代码和数据的集合,进程是运行着的程序:操作系统需要为进程分配内存:进程运行完毕需要释放内存:内存管理就是内存的分配和释放: 1. ...

  6. python关键词提取源码_Python 结巴分词 关键词抽取分析

    关键词抽取就是从文本里面把跟这篇文档意义最相关的一些词抽取出来.这个可以追溯到文献检索初期,当时还不支持全文搜索的时候,关键词就可以作为搜索这篇论文的词语.因此,目前依然可以在论文中看到关键词这一项. ...

  7. android 源码分析notification,# Notification 源码分析

    引言 notification.jpg Notification 在v7版本下从4.0后增加了Media Style. 今天我们分析下Notification在v7版本的源码.有助于我们针对不同版本的 ...

  8. spring源码分析第一天------源码分析知识储备

    spring源码分析第一天------源码分析知识储备 Spring源码分析怎么学? 1.环境准备: 2.思路    看:是什么? 能干啥    想:为什么?     实践:怎么做?         ...

  9. c++ 退出函数_UCOSIII源码分析之——bsp_os.c文件分析

    点击上方公众号名称关注,获得更多内容 ✎ 编 者 悟 语 对于坚持做的人来说,每一次的"如期而至",其实并不需要什么"期待",也没有什么"悬念&quo ...

最新文章

  1. 编写c语言程序 斐波那契,C语言程序实现斐波那契数列的解题思路???
  2. nodeType 节点简介
  3. Hyper-v Server在线调整虚拟硬盘大小
  4. 开关电源反馈环路设计matlab,开关电源控制环路设计(初级篇).pdf
  5. oracle经常开关好吗,oracle启动和关闭
  6. Python数据结构与算法(第七天)
  7. 【网络安全】如何使用QueenSono从ICMP提取数据
  8. Blockchain Patent Players and domain
  9. 嵩天-Python语言程序设计程序题--第四周:程序的控制结构
  10. shellcraft新姿势
  11. 菜鸟裹裹升级退换货:全程可见让商家物流纠纷率降半
  12. LVDS原理及设计指南
  13. CentOS 7 Linux实时内核下的epoll性能分析
  14. 算法设计 分治, 归并排序, 快速排序
  15. char 与 unsigned char的本质区别(转)
  16. sonar mysql 配置_Sonar配置与使用
  17. 超级简单理解工厂模式
  18. vue项目中实现汉字转拼音缩写
  19. 预测分子的化学性质和化学反应
  20. 【新学期、新Flag】例文:我的新学期Flag

热门文章

  1. asp.net core系列 37 WebAPI 使用OpenAPI (swagger)中间件
  2. JSOI 2008 【魔兽地图】
  3. Sandcastle Help File Builder(.NET帮助文档工具)的版本选择心得——支持VS2010至VS2015,高版本项目文件问题...
  4. QBlog V2.5 源码开放下载(ASP.NET 番外系列之开端)
  5. 3D脚本 maxscript入门教程(7)
  6. C++添加一个头文件和extern以及全局变量和局部变量问题(清C++蓝书16.3.19上机的一小题)...
  7. Windows phone8 基础篇(二) xaml介绍 一
  8. 李航《统计学习方法》第一章课后答案链接
  9. LeetCode 144. 树的前序遍历迭代写法
  10. 一道非齐次方程组解的判定习题--行向量