TBSchedule基本概念及原理

概念介绍

TBSchedule是一个支持分布式的调度框架,能让一种批量任务或者不断变化的任务,被动态的分配到多个主机的JVM中,不同的线程组中并行执行。基于ZooKeeper的纯Java实现,由Alibaba开源。

代码实现

起步配置

/* (non-Javadoc)* @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet()*/@Overridepublic void afterPropertiesSet() throws Exception {InputStream in = TaskCenter.class.getClassLoader().getResourceAsStream("schedule-conf.properties");Properties p = new Properties();p.load(in);TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();scheduleManagerFactory.setApplicationContext(applicationContext);try {scheduleManagerFactory.init(p);} catch (Exception e) {e.printStackTrace();}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

TBScheduleManagerFactory(com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory)是个什么?代码里对他的描述是“调度服务器构造器”,只关心他的启动过程

public void init(Properties p) throws Exception {//这里initialThread是个什么,作用和意义?if(this.initialThread != null){this.initialThread.stopThread();}//lock是一个锁(ReentrantLock)使用默认的非公平锁this.lock.lock();try{this.scheduleDataManager = null;this.scheduleStrategyManager = null;ConsoleManager.setScheduleManagerFactory(this);if(this.zkManager != null){this.zkManager.close();}this.zkManager = new ZKManager(p);this.errorMessage = "Zookeeper connecting ......" + this.zkManager.getConnectStr();initialThread = new InitialThread(this);initialThread.setName("TBScheduleManagerFactory-initialThread");initialThread.start();}finally{this.lock.unlock();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

到这里需要知道,initalThread是个什么线程,该线程的作用?点开看一下run方法的实现

@Overridepublic void run() {//factory就是这个调度服务器facotry.lock.lock();try {int count =0;while(facotry.zkManager.checkZookeeperState() == false){count = count + 1;if(count % 50 == 0){facotry.errorMessage = "Zookeeper connecting ......" + facotry.zkManager.getConnectStr() + " spendTime:" + count * 20 +"(ms)";log.error(facotry.errorMessage);}Thread.sleep(20);//貌似每20ms 检测一次zk的连接状态if(this.isStop ==true){return;}}//如果连接成功则初始化数据facotry.initialData();} catch (Throwable e) {log.error(e.getMessage(),e);}finally{facotry.lock.unlock();}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

这个initalThread内部会竞争factory的锁,所以如果zk一直检查失败,锁得不到释放,有可能一直阻塞线程,导致启动项目启动问题?while出口的isStop是什么?isStop莫非是一个取消状态?代表停止这个线程,这个线程可能更多的做一个zk状态的启动前自检。
多虑了,后边他的主线程会释放锁…..

scheduleDataManager是什么?具体作用是`com.taobao.pamirs.schedule.taskmanager.IScheduleDataManager
他是一个调度中心客户端,启动过程几乎没怎么参与。

scheduleStrategyManager是什么?类名`com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK
官方没有给出他的命名,暂且取名为调度策略管理器,提供了一些策略查询和更新的功能。

ConsoleManager.setScheduleManagerFactory(this);,貌似与管理控制台有关?
实际使用中,控制台和调度项目是不相关的,那么这个ConsoleManager是什么?

然后剩下的就是初始化一个zk的管理器,最后释放锁

问题是集成tbschedule只需这样就行,剩下的都是一些zk配置操作。如果和zk相关应该包含两个点:
1.在哪里添加监视点的?即tbschedule如何响应 rootPath 配置的节点路径下的相关变化的?
2.监视点的动作是怎样的

目前为止值得看下去的就两行,分别是com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory类下的init(Properties p)方法。


public void init(Properties p) throws Exception {//.......//ConsoleManager.setScheduleManagerFactory(this);//.......//   
  • 1
  • 2
  • 3
  • 4
  • 5

还有com.taobao.pamirs.schedule.strategy.InitialThread 类下的

public void run() {//......//facotry.initialData();//......//}
  • 1
  • 2
  • 3
  • 4
  • 5

其中TBScheduleManagerFactory类的init方法里的代码行 更像一个setter方法,正常编码时会避免在setter中做赋值以外操作的。暂且不管
然后facotry.initialData(); 方法名告诉我该方法会初始化数据。

/*** 在Zk状态正常后回调数据初始化* @throws Exception*/public void initialData() throws Exception{//初始化zk管理器this.zkManager.initial();//初始化调度中心客户端this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager);//初始化调度策略管理器this.scheduleStrategyManager  = new ScheduleStrategyDataManager4ZK(this.zkManager);if (this.start == true) {// 注册调度管理器this.scheduleStrategyManager.registerManagerFactory(this);if(timer == null){timer = new Timer("TBScheduleManagerFactory-Timer");}if(timerTask == null){timerTask = new ManagerFactoryTimerTask(this);timer.schedule(timerTask, 2000,this.timerInterval);}}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

其中timer和timerTask定义如下:其中ManagerFactoryTimerTask继承自java.util.TimerTask,所以会定时执行任务。这里是2秒后开始执行,之后每2秒执行一次。

private java.util.Timer timer;
private com.taobao.pamirs.schedule.strategy.ManagerFactoryTimerTask timerTask;
  • 1
  • 2

调度任务ManagerFactoryTimerTask 内部做了些啥?看样子是做了一些ZK连接状态监测的操作,不断重试连不上就重连,连上了就刷新信息。等等……刷新?刷的是啥????

public void run() {try {Thread.currentThread().setPriority(Thread.MAX_PRIORITY);if(this.factory.zkManager.checkZookeeperState() == false){if(count > 5){log.error("Zookeeper连接失败,关闭所有的任务后,重新连接Zookeeper服务器......");this.factory.reStart();}else{count = count + 1;}}else{count = 0;//重点是这里、this.factory.refresh();}}  catch (Throwable ex) {log.error(ex.getMessage(), ex);} finally {//这个后续有什么作用???factory.timerTaskHeartBeatTS = System.currentTimeMillis();}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

那么initialData() 的执行流程就是这样了?

1.初始化调度中心客户端
2.初始化调度策略管理器
3.注册调度管理器
4.启动一个定时任务,这个定时任务会以2秒为一个周期对当前的zk状态做检查,如果zk连接失败(重试次数5)会停止当前服务,否则刷新调度服务器

那么问题来了,这个定时器里所做的”刷新”操作又在做什么操作,哪些东西被刷了??

    public void refresh() throws Exception {this.lock.lock();try {// 判断状态是否终止ManagerFactoryInfo stsInfo = null;boolean isException = false;try {//貌似是从策略管理器中拿到策略信息stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid());} catch (Exception e) {isException = true;logger.error("获取服务器信息有误:uuid="+this.getUuid(), e);}if (isException == true) {try {stopServer(null); // 停止所有的调度任务this.getScheduleStrategyManager().unRregisterManagerFactory(this);} finally {reRegisterManagerFactory();}} else if (stsInfo.isStart() == false) {stopServer(null); // 停止所有的调度任务this.getScheduleStrategyManager().unRregisterManagerFactory(this);} else {reRegisterManagerFactory();}} finally {this.lock.unlock();}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

这个方法里有几个疑点:
1.getUuid() 拿到的是个啥?貌似是这个调度中心的唯一ID?没有在目前的代码逻辑看到对他的初始化?初始化在哪做的,对应官方的哪个名词?
2.stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid()); 取到的信息有哪些作用,数据来源是哪里,源数据的格式是什么?什么情况下会抛出异常??
3.ManagerFactoryInfo对象的isStart()方法,声明的是什么开始状态??
4.除去失败的情况需要终止调度任务,那么这个正确的情况reRegisterManagerFactory(); 做了哪些事情??

问题1:
关于getUuid(),uuid哪里初始化来的?
一般有getter就有setter ,通过setUuid(String uuid) 方法,找到注入值的位置,发现在com.taobao.pamirs.schedule.zk.ScheduleStrategyDataManager4ZK类的public List registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception方法中有调用。
巧了正好这个registerManagerFactory(factory) 反复看见好几遍了,他到底是个啥,干点啥好呢,现在有了问题5

5.ScheduleStrategyDataManager4ZK 所谓调度策略管理器,起到了啥作用,registerManagerFactory方法在干些什么??

其他操作都省略,就看uuid他的生成策略,到底是个啥,能标识个什么的唯一性??

/*** 注册ManagerFactory* @param managerFactory* @return 需要全部注销的调度,例如当IP不在列表中* @throws Exception*/public List<String> registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception{//第一波进来肯定无值啊if(managerFactory.getUuid() == null){String uuid = managerFactory.getIp() +"$" + managerFactory.getHostName() +"$"+ UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();//...省略...//managerFactory.setUuid(zkPath.substring(zkPath.lastIndexOf("/") + 1));}//...省略...//return result;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

生成策略是当不存在uuid时才会生成uuid,这个uuid很容易发现是跟当前机器的ip还有hostname有关的,为了避免重复还追加了一个uuid,我想这里可能是考虑到了一个机器下有多个项目集成tbschedule的情况吧
所以这里uuid代表了

当前应用(注意是应用)在整个调度服务器下的唯一ID,当前这个uuid在console(管理页面)上也有所体现。

问题2:
知道了uuid是个什么,那么这个代码stsInfo = this.getScheduleStrategyManager().loadManagerFactoryInfo(this.getUuid()); 字面意思可能就好理解了一些,貌似是拿应用的uuid去调度策略管理器中拿自己的调度策略?

public ManagerFactoryInfo loadManagerFactoryInfo(String uuid) throws Exception {//这里会拼装一个zk的路径,带有uuid哦String zkPath = this.PATH_ManagerFactory + "/" + uuid;//判断是否存在这个节点,没有相关节点就抛出异常?if(this.getZooKeeper().exists(zkPath, false)==null){throw new Exception("任务管理器不存在:" + uuid);}//拿到节点的存储值byte[] value = this.getZooKeeper().getData(zkPath,false,null);ManagerFactoryInfo result = new ManagerFactoryInfo();result.setUuid(uuid);//如果值为null代表已启动的??if(value== null){result.setStart(true);}else{//根据节点值来判断是否startresult.setStart(Boolean.parseBoolean(new String(value)));}return result;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

那么可见会已uuid拼装一个zk路径,去判断对应节点是否存在,如果不存在就抛出异常!如果存在就去判断节点值(true/false),标识了当前节点的启动状态。那么问题又来了。

6.应用在调度策略管理器下的节点是什么时候变更状态的?

问题3:
貌似和问题6有关?

问题4:
reRegisterManagerFactory()

public void reRegisterManagerFactory() throws Exception{//重新分配调度器List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);for (String strategyName : stopList) {this.stopServer(strategyName);}//根据策略重新分配调度任务的机器this.assignScheduleServer();//this.reRunScheduleServer();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

重新注册了一次调度策略管理器,what这个方法有返回值??之前为什么不对这个返回值做处理呢????!所以这个registerManagerFactory(this) 方法变得愈发的混乱了,有必要打开详细看看了。

/*** 注册ManagerFactory* @param managerFactory* @return 需要全部注销的调度,例如当IP不在列表中* @throws Exception*/public List<String> registerManagerFactory(TBScheduleManagerFactory managerFactory) throws Exception{if(managerFactory.getUuid() == null){String uuid = managerFactory.getIp() +"$" + managerFactory.getHostName() +"$"+ UUID.randomUUID().toString().replaceAll("-", "").toUpperCase();//拼装了一个zk节点的路径,等等这个路径的拼装格式貌似见过,所以loadManagerFactoryInfo方法中数据节点是在这一步时候建立的了?String zkPath = this.PATH_ManagerFactory + "/" + uuid +"$";zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL_SEQUENTIAL);managerFactory.setUuid(zkPath.substring(zkPath.lastIndexOf("/") + 1));}else{String zkPath = this.PATH_ManagerFactory + "/" + managerFactory.getUuid();if(this.getZooKeeper().exists(zkPath, false)==null){zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL);           }}// 方法的返回值是这个listList<String> result = new ArrayList<String>();//貌似会加载所有策略,这个loadAllScheduleStrategy() 方法,会把 this.zkManager.getRootPath() +  "/strategy" 这个节点下的所有子节点全都返回//然后将每个节点的内容取出,再调用这个方法ScheduleStrategy result = (ScheduleStrategy)this.gson.fromJson(valueString, ScheduleStrategy.class); 可以看作对json反序列化操作。//所以控制台操作创建的策略,是存储在this.zkManager.getRootPath() +  "/strategy" 节点的子节点上的for(ScheduleStrategy scheduleStrategy:loadAllScheduleStrategy()){boolean isFind = false;//暂停或者不在IP范围//猜测是要判断策略在当前机器上是否可用,如果不可用则加入列表返回,如果可用则在对应的策略节点下创建一个临时节点if(ScheduleStrategy.STS_PAUSE.equalsIgnoreCase(scheduleStrategy.getSts()) == false &&  scheduleStrategy.getIPList() != null){for(String ip:scheduleStrategy.getIPList()){//这里就是为什么控制太界面说“127.0.0.1或者localhost会在所有机器上运行” 的原因了if(ip.equals("127.0.0.1") || ip.equalsIgnoreCase("localhost") || ip.equals(managerFactory.getIp())|| ip.equalsIgnoreCase(managerFactory.getHostName())){//添加可管理TaskType//创建一个临时节点,这个临时节点与之前的调度服务器中生成的uuid有关。//这一步创建的临时节点如下文。String zkPath = this.PATH_Strategy+"/"+ scheduleStrategy.getStrategyName()+ "/"+ managerFactory.getUuid();if(this.getZooKeeper().exists(zkPath, false)==null){zkPath = this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.EPHEMERAL);           }isFind = true;break;}}}if(isFind == false){//清除原来注册的FactoryString zkPath = this.PATH_Strategy+"/"+ scheduleStrategy.getStrategyName()+ "/"+ managerFactory.getUuid();if(this.getZooKeeper().exists(zkPath, false)!=null){ZKTools.deleteTree(this.getZooKeeper(), zkPath);result.add(scheduleStrategy.getStrategyName());}}}return result;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55

针对我的环境,此处rootPath配置为”/dsp_official_0928_wd/schedule”,其zk的策略节点为,可见其与通过控制台录入的策略是对应的

[zk: localhost:2181(CONNECTED) 7] ls /dsp_official_0928_wd/schedule/strategy
[commonSyncAdvertiserTask, commonSyncCreativeTask]

按照loadAllScheduleStrategy() 方法的逻辑来说,会把这两个节点上的内容都拿出来,反序列化成com.taobao.pamirs.schedule.strategy.ScheduleStrategy 对象
以/dsp_official_0928_wd/schedule/strategy/commonSyncAdvertiserTask 节点内容为例

{
“strategyName”: “commonSyncAdvertiserTask”,
“IPList”: [
“127.0.0.1”
],
“numOfSingleServer”: 0,
“assignNum”: 2,
“kind”: “Schedule”,
“taskName”: “commonSyncAdvertiserTask”,
“taskParameter”: “”,
“sts”: “resume”
}

某策略可用的应用节点:

[zk: localhost:2181(CONNECTED) 8] ls /dsp_official_0928_wd/schedule/strategy/commonSyncAdvertiserTask
[127.0.0.1admadc01admadc010000000081]

通常情况下,策略应用节点的内容如下,但在registerManagerFactory此时还没有初始化该节点的值,默认为null:

{
“strategyName”: “commonSyncAdvertiserTask”,
“uuid”: “127.0.0.1admadc01admadc01FC8BBDAC658C4B00B0F47377E50A570D$0000000080”,
“requestNum”: 0,
“currentNum”: 0,
“message”: “”
}

先不管如上节点内容的意义,其实理解上也简单。

问题5:
所以registerManagerFactory() 方法就是在当前机器支持的策略配置上注册当前机器节点用于同步,并返回当前机器上所有不支持的策略

那么回到 reRegisterManagerFactory() 方法上来,

  public void reRegisterManagerFactory() throws Exception{//重新分配调度器//因为当前应用不支持这些策略,所以要在当前机器上停止这些策略的服务。List<String> stopList = this.getScheduleStrategyManager().registerManagerFactory(this);for (String strategyName : stopList) {this.stopServer(strategyName);}//根据策略重新分配调度任务的机器this.assignScheduleServer();this.reRunScheduleServer();}```这里迫切的需要知道以下两行的含义。```java
this.assignScheduleServer();this.reRunScheduleServer();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

assignScheduleServer() 方法的官方解释是 根据策略重新分配调度任务的机器 那么分配策略是什么呢?
1.加载所有本应用能够运行的策略(注意上文说道的策略应用节点的数据格式,返回默认格式)。
2.遍历1返回策略列表,按策略taskType获取到所有的可运行策略应用项列表
3.遍历跟当前factory实例相关的strategy,选举出每个strategy的leader factory实例,由leader重新计算每个factory实例能够分到的reqNum,即根据strategy身上的assignNum“numOfSingleServer,将assignNum平分给每个factory实例。
4.根据2的返回结果(总的服务器数量),和3的策略配置信息,为2返回的策略服务项分配值(修改zk策略应用节点的requestNum)

reRunScheduleServer() 方法没有给出官方的解释,这里暂且认为其是为了重新运行调度服务,怎么做的????

public void reRunScheduleServer() throws Exception{//拿到所有本机可运行的策略for (ScheduleStrategyRunntime run : this.scheduleStrategyManager.loadAllScheduleStrategyRunntimeByUUID(this.uuid)) {//在缓存中取List<IStrategyTask> list = this.managerMap.get(run.getStrategyName());if(list == null){list = new ArrayList<IStrategyTask>();this.managerMap.put(run.getStrategyName(),list);}//如果已经有在运行的任务组,且当前任务组数大于已分配的最大任务组数量,那么就停止最后添加的任务组while(list.size() > run.getRequestNum() && list.size() >0){//what??先从列表里删了????如果停止失败呢????????IStrategyTask task  =  list.remove(list.size() - 1);try {task.stop(run.getStrategyName());} catch (Throwable e) {logger.error("注销任务错误:strategyName=" + run.getStrategyName(), e);}}//不足,增加调度器ScheduleStrategy strategy = this.scheduleStrategyManager.loadStrategy(run.getStrategyName());//注意这个循环,创建线程组直到运行的线程组满了while(list.size() < run.getRequestNum()){//厉害了这里做了什么????IStrategyTask result = this.createStrategyTask(strategy);if(null==result){logger.error("strategy 对应的配置有问题。strategy name="+strategy.getStrategyName());}list.add(result);}}}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

看起来该方法会遍历所有当前应用支持的策略,并针对策略创建一个叫做IStrategyTask的东西,并维护在一个map中,这个this.createStrategyTask(strategy); 好吧,官方有说明“创建调度服务器”

/*** 创建调度服务器* @param baseTaskType* @param ownSign* @return* @throws Exception*/public IStrategyTask createStrategyTask(ScheduleStrategy strategy)throws Exception {IStrategyTask result = null;try{if(ScheduleStrategy.Kind.Schedule == strategy.getKind()){//就是这里。。。。。。。。。String baseTaskType = ScheduleUtil.splitBaseTaskTypeFromTaskType(strategy.getTaskName());String ownSign =ScheduleUtil.splitOwnsignFromTaskType(strategy.getTaskName());result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);}else if(ScheduleStrategy.Kind.Java == strategy.getKind()){result=(IStrategyTask)Class.forName(strategy.getTaskName()).newInstance();result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());}else if(ScheduleStrategy.Kind.Bean == strategy.getKind()){result=(IStrategyTask)this.getBean(strategy.getTaskName());result.initialTaskParameter(strategy.getStrategyName(),strategy.getTaskParameter());}}catch(Exception e ){logger.error("strategy 获取对应的java or bean 出错,schedule并没有加载该任务,请确认" +strategy.getStrategyName(),e);}return result;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

根据上文和日常配置,也知道来自于会走第一个条件分支。最后通过一句result = new TBScheduleManagerStatic(this,baseTaskType,ownSign,scheduleDataManager);返回一个com.taobao.pamirs.schedule.strategy.IStrategyTask 接口的实现。
继续看com.taobao.pamirs.schedule.taskmanager.TBScheduleManagerStatic类。

public TBScheduleManagerStatic(TBScheduleManagerFactory aFactory,String baseTaskType, String ownSign,IScheduleDataManager aScheduleCenter) throws Exception {super(aFactory, baseTaskType, ownSign, aScheduleCenter);}
  • 1
  • 2
  • 3
  • 4
  • 5

调用了父类com.taobao.pamirs.schedule.taskmanager.TBScheduleManager的构造函数,源码里面少有的对这个类写了一大堆文档注释,先拿出来学习下。

/*** 1、任务调度分配器的目标:    让所有的任务不重复,不遗漏的被快速处理。* 2、一个Manager只管理一种任务类型的一组工作线程。* 3、在一个JVM里面可能存在多个处理相同任务类型的Manager,也可能存在处理不同任务类型的Manager。* 4、在不同的JVM里面可以存在处理相同任务的Manager * 5、调度的Manager可以动态的随意增加和停止* * 主要的职责:* 1、定时向集中的数据配置中心更新当前调度服务器的心跳状态* 2、向数据配置中心获取所有服务器的状态来重新计算任务的分配。这么做的目标是避免集中任务调度中心的单点问题。* 3、在每个批次数据处理完毕后,检查是否有其它处理服务器申请自己把持的任务队列,如果有,则释放给相关处理服务器。*  * 其它:*   如果当前服务器在处理当前任务的时候超时,需要清除当前队列,并释放已经把持的任务。并向控制主动中心报警。* * @author xuannan**/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

第2点:一个Manager只管理一种任务类型的一组工作线程。
通过分析启动过程已经能够确认。发现针对每一个策略和线程组都创建了一个com.taobao.pamirs.schedule.strategy.IStrategyTask实例。

第3点:在一个JVM里面可能存在多个处理相同任务类型的Manager,也可能存在处理不同任务类型的Manager。
如果启动过程初始化多个com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory对象时可能会有可能有”在一个JVM里面可能存在多个处理相同任务类型的Manager”这种情况,还有会有requsetNum个相同任务类型的Manager在同一个JVM

第4点:在不同的JVM里面可以存在处理相同任务的Manager
这种情况是肯定存在的,取决于策略的”IP地址(逗号分隔)”配置

第5点:调度的Manager可以动态的随意增加和停止
这个要自己去验证了。

疑问:
1. List list = this.managerMap.get(run.getStrategyName()); 到底起到什么作用?是为了统计当前JVM已经运行的线程组数量么???
目前tbschedule启动过程到这就结束了,2秒一次的zk健康检查(心跳),并刷新任务组(及时响应控制台设置),貌似到目前为止,为什么这里并没有使用监视点来响应控制台的设置,而是自己主动查询,出发点是什么???

【个人学习TBSchedule】转自weiythi的博文

TBSchedule源码学习笔记-启动过程相关推荐

  1. Vuex 4源码学习笔记 - 通过Vuex源码学习E2E测试(十一)

    在上一篇笔记中:Vuex 4源码学习笔记 - 做好changelog更新日志很重要(十) 我们学到了通过conventional-changelog来生成项目的Changelog更新日志,通过更新日志 ...

  2. Vuex 4源码学习笔记 - Vuex是怎么与Vue结合?(三)

    在上一篇笔记中:Vuex源码学习笔记 - Vuex开发运行流程(二) 我们通过运行npm run dev命令来启动webpack,来开发Vuex,并在Vuex的createStore函数中添加了第一个 ...

  3. 雷神FFMpeg源码学习笔记

    雷神FFMpeg源码学习笔记 文章目录 雷神FFMpeg源码学习笔记 读取编码并依据编码初始化内容结构 每一帧的视频解码处理 读取编码并依据编码初始化内容结构 在开始编解码视频的时候首先第一步需要注册 ...

  4. Apache log4j-1.2.17源码学习笔记

    (1)Apache log4j-1.2.17源码学习笔记 http://blog.csdn.net/zilong_zilong/article/details/78715500 (2)Apache l ...

  5. Java多线程之JUC包:Semaphore源码学习笔记

    若有不正之处请多多谅解,并欢迎批评指正. 请尊重作者劳动成果,转载请标明原文链接: http://www.cnblogs.com/go2sea/p/5625536.html Semaphore是JUC ...

  6. RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的?

    RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 文章目录 RocketMQ 源码学习笔记 Producer 是怎么将消息发送至 Broker 的? 前言 项目 ...

  7. jquery源码学习笔记三:jQuery工厂剖析

    jquery源码学习笔记二:jQuery工厂 jquery源码学习笔记一:总体结构 上两篇说过,query的核心是一个jQuery工厂.其代码如下 function( window, noGlobal ...

  8. PHP Yac cache 源码学习笔记

    YAC 源码学习笔记 本文地址 http://blog.csdn.net/fanhengguang_php/article/details/54863955 config.m4 检测系统共享内存支持情 ...

  9. Vuex 4源码学习笔记 - 通过dispatch一步步来掌握Vuex整个数据流(五)

    在上一篇笔记中:Vuex 4源码学习笔记 - Store 构造函数都干了什么(四) 我们通过查看Store 构造函数的源代码可以看到主要做了三件事情: 初始化一些内部变量以外 执行installMod ...

  10. php连接tidb,TiDB源码学习笔记:启动TiDB

    作者:院长,神州数码云基地开发工程师,目前专注于TiDB源码研究. TiDB源码研究系列第一篇,简述TiDB的核心架构,从tidb-server/mian.go开始,探索启动TiDB的方法. 最近因为 ...

最新文章

  1. ## *将以下学生成绩数据,存放在Hdfs上,使用Spark读取完成下面分析**
  2. 用户系统-开放平台的一些思考
  3. 大厂提供什么样的软硬件来吸引人才?
  4. 使用cgroup对指定用户使用的memory进行限制的一个具体例子
  5. [SIR数据集实验][2]Java类数据集相应工具使用的小经验
  6. c 语言 abs 库函数,absread,abswirte - C 语言库函数手册
  7. 学习:java原理—反射机制
  8. JavaScript实现 页面滚动图片加载
  9. markdown方式测试图片2
  10. 终于在五一之前打了SP3
  11. GB28181-2016过检通过
  12. 41.Linux/Unix 系统编程手册(下) -- 共享库基础
  13. ExtJS4.2学习 php版(五)
  14. c语言字段宽度,2.6.3 控制输出的字段宽度
  15. 手绘风海报怎么做?手绘素材在哪里找?
  16. 每日英语阅读(五十四)
  17. 谷歌地球Google Earth打不开的解决办法
  18. mysql服务器cpu使用率过高100%
  19. c语言怎样写积分程序,C语言实现定积分求解方法
  20. 零基础学java——面向对象(三)

热门文章

  1. 组合数学—什么是组合数学(1)
  2. 网页设计中常用的20个Web安全字体
  3. ha rose server安装 sql_Rose HA for SQL2008的安装之一
  4. 提醒:使用过期Win10预览版后果很严重
  5. Autocad ET扩展工具汉化
  6. nanoCAD Pro 10.0.4447.1969 Build 4520中文免费版
  7. 腾讯加入QQ群 连接代码
  8. 永远感谢雷神-雷霄骅!
  9. 您电脑的网络管家 -NetSetMan
  10. 【cuda】——npp/cuda图像预处理resize+norm对比