系统任务和普通任务都是通过任务管理器调度的。它们的区别是:系统任务在程序运行后即不会被修改,而普通任务则会被修改。(转载请指明出于breaksoftware的csdn博客)

为什么要有这样的设计?因为我希望它是一个可以不用停止服务就可以更新相关配置的系统。比如我们现在要加一个普通任务,我们只要修改下普通任务配置文件即可。再比如我们需要修改数据库中表结构,我们也不用停止服务修改代码来保证数据格式的一致性。

我们程序需要知道配置文件是否被修改。如何去做?一种方法是借用一些系统方法监听相应配置文件的修改,一旦文件有变化,马上通知我们的主程序去处理。另一种则是采用轮询检查机制,即定期去生成差异结果。为了不让这个系统更加复杂,我选择后者。而它就是我所谓的系统任务。

不管是系统任务还是普通任务,实现的类都要继承于job_base

from abc import ABCMeta,abstractmethod
class job_base:__metaclass__ = ABCMeta@abstractmethoddef run(self):pass

有这个限制主要是为了保证每个任务都是run方法。调度框架将执行该方法以完成任务执行。

在《码农技术炒股之路——架构和设计》一文中,介绍了我们将基于APScheduler实现任务调度功能。首先我们需要启动BackgroundScheduler对象

from apscheduler.schedulers.background import BackgroundScheduler@singleton
class job_center():def __init__(self):self._sched = Noneself._job_conf_path = ""self._job_id_handle = {}self._static_job_id_handle = {}def start(self):self._sched = BackgroundScheduler()self._sched.start()

当我们需要加入任务时,则调用下面这个方法

    def add_jobs(self, jobs_info, is_static = False):if None == self._sched:LOG_WARNING("job center must call start() first")returnfor (job_name,job_info) in jobs_info.items():if is_static and job_name in self._static_job_id_handle.keys():continuejob_type = job_info["type"]class_name = job_info["class"]job_handle = self._get_obj(class_name)if is_static:self._static_job_id_handle[job_name] = job_handleelse:self._job_id_handle[job_name] = job_handlecmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"params = self._join_params(job_info)if 0 != len(params):cmd += " , "cmd += paramscmd += ")"#print cmdeval(cmd)

jobs_info保存的是任务配置文件中任务信息,我们看个样例

[update_share_base_info]
type=cron
class=update_stock_base_info
day_of_week=1-5
hour=9
minute=30
second=10
timezone = Asia/Shanghai

这个配置中除了type和class,其他都是APScheduler框架中add_job方法中的参数。上面配置意思是:以上海时间,从周一到周五,早上9点30分10秒执行一次。

add_jobs中通过class字段,获取该class对应的一个对象

    def _get_obj(self, _cls_name):  _packet_name = _cls_name  _module_home = __import__(_packet_name,globals(),locals(),[_cls_name])obj =  getattr(_module_home,_cls_name)  class_obj = obj()return class_obj

这儿又要提到我之前特别强调过的单例使用方式。经过测试,《码农技术炒股之路——配置管理器、日志管理器》中单例的实现可以保证上面这个方法获取的是同一个对象,而网上其他单例模式则不行。
        获取对象后,我们要组装出要执行的命令。cmd = "self._sched.add_job(job_handle.run, job_type, id = job_name"中job_handle就是上面获取的对象,而run则是每个job都要有的方法。这也是为什么要求每个任务类都要继承于job_base的原因。

之后调用_join_params将配置文件中其他信息组装成参数拼接出完整命令

    def _join_params(self, job_info):params = ""param = ""job_type = job_info["type"]for key in job_info.keys():if key in conf_keys.job_conf_info_dict[job_type]:if  0 != len(params):params += ' , 'value = job_info[key]if value.isdigit():param = key + " = " + valueelse:param = key + " = '" + value + "'"if 0 != len(param):params += paramreturn params

如此我们配置中的任务就会被加入到APScheduler调度队列中。

我们再看下如何删除一个任务

    def remove_jobs(self, jobs_info):if None == self._sched:LOG_WARNING("job center must call start() first")returnfor job_name in jobs_info.keys():self._sched.remove_job(job_name)self._job_id_handle.pop(job_name)

第7行通过任务名称在APScheduler中把任务删除。第8行将任务对应的对象从列表中删除。为什么要使用_job_id_handle去保存这些任务对象呢?因为如果不在一个更大的生命周期内保存它,它就会被认为是一个局部变量,从而被释放,导致之后APScheduler再也调用不了它。
        我们看个管理普通任务的系统任务代码

@singleton
class j_load_job_conf(job_base):def __init__(self):self._pre_jobs_info = {}self._frame_conf_inst = scheduler_frame_conf_inst()self._job_center = job_center()def run(self):section_name = "strategy_job"option_name = "conf_path"if False == self._frame_conf_inst.has_option(section_name, option_name):LOG_WARNING("no %s %s" % (section_name, option_name))returnconf_path = self._frame_conf_inst.get(section_name, option_name)LOG_DEBUG("Load %s %s %s" % (section_name, option_name, conf_path))job_conf_parser_obj = job_conf_parser()jobs_info = job_conf_parser_obj.parse(conf_path)self._execute_jobs(jobs_info)def _execute_jobs(self, jobs_info):add_dict = {}remove_dict = {}modify_dict = {}frame_tools.dict_diff(jobs_info, self._pre_jobs_info, add_dict, remove_dict, modify_dict)add_jobs_info = dict(add_dict, **modify_dict)remove_jobs_info = {}for item in modify_dict.keys():remove_jobs_info[item] = self._pre_jobs_info[item]LOG_INFO("add jobs %s" % (json.dumps(add_jobs_info)))LOG_INFO("remove jobs %s" % (json.dumps(remove_jobs_info)))if 0 == len(add_jobs_info) and 0 == len(remove_jobs_info):returnself._pre_jobs_info = jobs_infoself._job_center.remove_jobs(remove_jobs_info)self._job_center.add_jobs(add_jobs_info)

run方法将会定期执行。它会从固定目录读取普通任务配置文件信息。然后在_execute_jobs方法中,通过和上一次读取的任务信息对比,生成三个字典:需要删除的任务、需要新增的任务和需要修改的任务。需要修改的任务将变成先删除后新增的方式实现修改。所以最后操作的将是两个字段信息。
        普通任务本文就不介绍了,之后介绍的每个抓取和离线计算业务都是普通任务。

码农技术炒股之路——任务管理器相关推荐

  1. 码农技术炒股之路——配置管理器、日志管理器

    配置管理器和日志管理器是项目中最为独立的模块.我们可以很方便将其剥离出来供其他Python工程使用.文件的重点将是介绍Python单例和logging模块的使用.(转载请指明出于breaksoftwa ...

  2. 码农技术炒股之路——抓取股票基本信息、实时交易信息、主力动向信息

    从本节开始,我们开始介绍各个抓取和备份业务.(转载请指明出于breaksoftware的csdn博客) 因为我们数据库很多,数据库中表也很多,所以我们需要一个自动检测并创建数据库和表的功能.在< ...

  3. 码农技术炒股之路——数据库管理器、正则表达式管理器

    我选用的数据库是Mysql.选用它是因为其可以满足我的需求,而且资料多.因为作为第三方工具,难免有一些配置问题.所以本文也会讲解一些和Mysql配置及开发相关的问题.(转载请指明出于breaksoft ...

  4. 码农技术炒股之路——抓取日线数据、计算均线和除权数据

    日线数据是股票每日收盘后的信息.这块数据不用实时抓取,所以并不占用宝贵的交易时间的资源.于是我们抓取完数据后直接往切片后的数据库中保存.(转载请指明出于breaksoftware的csdn博客) 抓取 ...

  5. 码农技术炒股之路——实时交易信息、主力动向信息分库备份

    一般来说,一个股票信息应该保存在一张表中.但是由于我机器资源限制,且我希望尽快频率的抓取数据.所以每天我将所有股票的实时交易信息放在daily_temp库中的一个以日期命名的表中.主力动向信息也是如此 ...

  6. 一个30岁男人转型码农的平凡之路

    今天给大家带来的是一个转行的故事,一个30岁才开始学习编程的小白,资质平平,真正的零基础. 他的故事和那些大众喜欢的.夸张的.甚至虚假的华丽转身不同,一点也不精彩.一点也不鸡汤,平淡如水,但是能反映出 ...

  7. php-fpm哪里下载_如何在centos系统下找到php-fpm的位置 - 翟码农技术博客

    reboot重启了服务器后,所有的服务都需要重新启动. 启动php-fpm时,使用如下命令systemctl start php-fpm.service 提示:Failed to restart ph ...

  8. 一个普通码农的Linux之路

    1. Hi,大家好,我是奔跑的码仔,是一名长期混迹于Linux江湖,靠Linux吃饭的程序员.生活在一个IT大环境不好的二线城市,大家也知道,这里的程序员本来就很稀少,况且是Linux程序员呢,就更是 ...

  9. php redis管道,php redis pipeline怎么用 - 翟码农技术博客

    网上一大堆文章都在说pipeline怎么提升性能,我只是想知道安装好phpredis库之后,代码上如何写来开启管道模式,也就是下面这一小撮代码而已.$pipe = $redis->multi(R ...

最新文章

  1. 姚期智云栖大会首日演讲:为什么我说现在是金融科技的“新”黄金时代
  2. codeforces 712 Memory and De-Evolution
  3. 超越竞争(2) 价值创新
  4. 《线性代数习题集》 Chapter 1_Determinants_Sec.1.Second-and Third-Order Determinants
  5. 波形信号发生器设计 Proteus仿真--输出频率可调的正弦波、三角波、方波
  6. 《集体智慧编程》第12章 算法总结 个人笔记
  7. LD_PRELOAD实现API劫持
  8. 计算机基础客户端v7,ComwareV7
  9. RGMII通信接口详述
  10. 每月一书(202104):《浪潮之巅》
  11. android ram rom测试工具,ROM与RAM的那点事,超详细解说
  12. android图片显示组件,Android可循环显示图像的Android Gallery组件用法实例
  13. 项目开发团队分配管理软件
  14. jeremy的路径规划学习:蚁群算法
  15. 6、七段数码管显示译码器设计与应用
  16. 从电脑的角度介绍什么叫做单片机
  17. 有两个以上的USB设备,他们的Vendor ID和Product ID都一样,如何指定对应的usb插口和/dev/ttyUSB的序号?
  18. 龙芯OpenHarmony课程 【1】编译烧录
  19. 计算机三级网络技术知识点大纲,全国计算机等级三级网络技术考试大纲
  20. Sending signal. PID: 8561 SIG: 9

热门文章

  1. endnote能自动翻译吗_人工智能能翻译古文吗?跟小编点评专业翻译PK人工智能翻译...
  2. python基础知识整理 第三节 :函数
  3. java list 拼音排序_java中实现List集合中对象元素按其属性的中文拼音排序
  4. uniapp 分享缩略图过大怎么办_女性胸外扩怎么办|3步带你完成改变
  5. java c 解决方案_Java jdk安装及javac命令无效解决方案
  6. vector容器 begin()与end()函数、front()与back()的用法
  7. 在Ubuntu 14.04 64bit上进行md5加密编程
  8. 字符串匹配算法 -- BM(Boyer-Moore) 和 KMP(Knuth-Morris-Pratt)详细设计及实现
  9. C++ algorithm的sort函数总结
  10. Nginx+Apache Yii2.0 配置方案