目录

一、简单使用

二、apscheduler

triggers

job stores

schedulers

使用上下文

日志设置


pip install Flask-APScheduler

Flask-APScheduler: Tips - Flask APScheduler

Flask-APScheduler 是基于 python 第三方库 apscheduler 做的集成, 所以官网上只有一些简单的使用案例,详细的配置还是要看 apscheduler的文档。

apscheduler: 

Advanced Python Scheduler — APScheduler 3.9.1 documentation

引用:

Flask-APScheduler使用教程 | Mr. Lee's blog

一、简单使用

__init__.py:

from flask import Flaskapp = Flask(__name__)
app.config.from_object('config.config')# 初始化db
db = SQLAlchemy()
db.init_app(app)# 初始化定时任务
from models.taskSchedule import scheduler
scheduler.init_app(app)
scheduler.start()
# 修改调度器执行组件冗余日志级别
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)

taskSchedule.py:

from flask_apscheduler import APSchedulerscheduler = APScheduler()# interval example, 间隔执行, 每30秒执行一次
@scheduler.task('interval', id='do_job_1', seconds=30, misfire_grace_time=900)
def job1():print('Job 1 executed')# cron examples, 每分钟执行一次
@scheduler.task('cron', id='do_job_2', minute='*')
def job2():print('Job 2 executed')# 每周执行一次
@scheduler.task('cron', id='do_job_3', week='*', day_of_week='sun')
def job3():print('Job 3 executed')

也可以在程勋运行后添加定时任务:

scheduler.start()
scheduler.add_job(**args)

二、apscheduler 

apscheduler 四个组件:

  • triggers: 任务触发器组件,提供任务触发方式
  • job stores: 任务商店组件,提供任务保存方式
  • executors: 任务调度组件,提供任务调度方式
  • schedulers: 任务调度组件,提供任务工作方式

triggers

支持三种任务触发方式

  • date:

固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建

| 参数 | 说明 |
| :——————————– | :——————- |
| run_date (datetime 或 str) | 作业的运行日期或时间 |
| timezone (datetime.tzinfo 或 str) | 指定时区 |

例如# 在 2019-4-24 00:00:01 时刻运行一次 start_system 方法
scheduler .add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])

  • interval:

时间间隔触发器,每个一定时间间隔执行一次。

| 参数 | 说明 |
| —————————- | ———- |
| weeks (int) | 间隔几周 |
| days (int) | 间隔几天 |
| hours (int) | 间隔几小时 |
| minutes (int) | 间隔几分钟 |
| seconds (int) | 间隔多少秒 |
| start_date (datetime 或 str) | 开始日期 |
| end_date (datetime 或 str) | 结束日期 |

# 在 2019-4-24 00:00:00 - 2019-4-24 08:00:00 之间, 每隔两小时执行一次 alarm_job 方法
scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')
  • cron:

cron风格的任务触发

参数 说明
year (int 或 str) 表示四位数的年份 (2019)
month(int\ str) 月 (范围1-12)
day(int\ str) 日 (范围1-31)
week(int\ str) 周 (范围1-53)
day_of_week (int\ str) 表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示
hour (int\ str) 表示取值范围为0-23时
minute (int\ str) 表示取值范围为0-59分
second (int\ str) 表示取值范围为0-59秒
start_date (datetime\ str) 表示开始时间
end_date (datetime\ str) 表示结束时间
timezone (datetime.tzinfo\ str) 表示时区取值

(int|str) 表示参数既可以是int类型,也可以是str类型
(datetime | str) 表示参数既可以是datetime类型,也可以是str类型

例如:表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5

sched.add_job(my_job, 'cron',second = '*/5')

job stores

支持四种任务存储方式

  • memory:默认配置任务存在内存中
  • mongdb:支持文档数据库存储
  • sqlalchemy:支持关系数据库存储
  • redis:支持键值对数据库存储

schedulers

调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用

  • BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
  • BackgroundScheduler: 当 不运行其它框架 的时候使用,并使你的任务在 后台运行
  • AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
  • GeventScheduler: 和 gevent 框架配套使用
  • TornadoScheduler: 和 tornado 框架配套使用
  • TwistedScheduler: 和 Twisted 框架配套使用
  • QtScheduler: 开发 qt 应用的时候使用

Flask-APScheduler 中默认使用的就是 BackgroundScheduler:

scheduler.py:

#.....class APScheduler(object):"""Provides a scheduler integrated to Flask."""def __init__(self, scheduler=None, app=None):self._scheduler = scheduler or BackgroundScheduler()self._host_name = socket.gethostname().lower()self._authentication_callback = None# .....

使用上下文

如果正在使用 Flask-SQLAlchemy 并在定时任务中执行数据库操作,需要提供 Flask 应用程序上下文:

from flask_apscheduler import APScheduler
scheduler = APScheduler()@scheduler.task("interval",id="update_news_graph_job",minutes = 16)
def update_news_graph():# 提供flask上下文对象with scheduler.app.app_context():result = db.session.execute(query_sql)
当然, scheduler 需要在flask中注册:
scheduler.init_app(app)
scheduler.start()

官网关于上下文的解释:

Basic Application - Flask APScheduler

Introduction into Contexts — Flask-SQLAlchemy Documentation (2.x)

日志设置

如果定时任务执行间隔几秒钟, 调度程序的日志会很多,可以设置调度程序日志级别或完全禁用:

#设置调度程序的日志级别, 原本级别为infoscheduler.start()
scheduler.add_job(every_minute, trigger='cron', second=0, id='every_minute')
logging.getLogger('apscheduler.executors.default').setLevel(logging.WARNING)#或者禁用调度程序日志
logging.getLogger('apscheduler.executors.default').propagate = False

Flask-定时任务相关推荐

  1. flask 定时任务 flask-apscheduler

    Flask-APScheduler介绍 Flask-APScheduler是基于APScheduler库开发的Flask拓展库.APScheduler的全称是Advanced Python Sched ...

  2. Flask从0到1快速后台服务开发

    Flask从0到1快速后台服务开发 版本说明: Python:3.7 Flask:1.0.2 前言 Flask是一个使用 Python 编写的轻量级 Web 应用框架.其 WSGI 工具箱采用 Wer ...

  3. flask、celery+redis 实现定时任务和异步——(一)

    1. Celery简介 Celery是一个异步任务的调度工具. Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其 ...

  4. python flask+apscheduler定时任务导致数据重复和错误

    python flask+apscheduler 定时任务导致数据重复和错误的解决办法 我们先看一下未定时前的代码,每一次执行数据都是准确的,是我们想要的结果 import datetime clas ...

  5. flask + celery实现定时任务和异步

    参考资料: Celery 官网:http://www.celeryproject.org/ Celery 官方文档英文版:http://docs.celeryproject.org/en/latest ...

  6. python- 进阶 与flask的搭配使用---定时任务框架APScheduler学习详解

    APScheduler简介 在平常的工作中几乎有一半的功能模块都需要定时任务来推动,例如项目中有一个定时统计程序,定时爬出网站的URL程序,定时检测钓鱼网站的程序等等,都涉及到了关于定时任务的问题,第 ...

  7. Flask中使用定时任务

    文章目录 1. 安装所需包 2. 定义定时任务配置类 3. 启动时加载定时任务 4. 定时任务中配置 1. cron定时调度 2. interval间隔调度 3. date定时调度 5. 注意事项 6 ...

  8. Celery - 一个懂得 异步任务 , 定时任务 , 周期任务 的芹菜

    1.什么是Celery? Celery 是芹菜 Celery 是基于Python实现的模块, 用于执行异步定时周期任务的 其结构的组成是由     1.用户任务 app     2.管道 broker ...

  9. celery 可视化_在Flask中使用Celery进行多任务分布执行

    关键字:Flask, Redis, RabbitMQ, Celery, Broker, Backend 前言 在后端服务器有时候需要处理耗时较长的任务,例如发送电子邮件,在处理这些任务时,这个线程就处 ...

  10. 5分钟快速掌握 Python 定时任务框架

    APScheduler 简介 在实际开发中我们经常会碰上一些重复性或周期性的任务,比如像每天定时爬取某个网站的数据.一定周期定时运行代码训练模型等,类似这类的任务通常需要我们手动来进行设定或调度,以便 ...

最新文章

  1. javac,使用-d .与省略-d的区别
  2. boost::mpl模块实现upper_bound相关的测试程序
  3. 如何找到SAP CRM One Order节点ID对应的描述信息(名称)
  4. ceph常用命令-pool相关命令
  5. ARM体系结构简介 —— 迅为
  6. 7-21 求前缀表达式的值 (25 分)(思路详解)
  7. 用数据库修改服务器的时间格式,如何查询数据库服务器的时间格式
  8. concurrenthashmap_ConcurrentHashMap原理浅析
  9. 中去掉外键_【Java笔记】035天,MySQL中的增删改查
  10. MySQL-JDBC
  11. 微信小程序开发工具下载及AppID查找
  12. “用户体验不是把每个环节做到极致,而是在关键环节打动用户“这句话对吗?...
  13. 3000个最常用的英语单词
  14. 空间两直线间最短距离计算公式
  15. 实时语音视频通话SDK如何实现立体声(一)
  16. 数电逻辑门方框中各符号所含意义(全)
  17. 主动积极:卓尔不群的个人管理策略
  18. 头像制作抖音微信壁纸小程序搭建一个基于uniCloud阿里OSS对象存储的免费图床源码
  19. dataguard日常管理
  20. 矢量求导之位移与速度及加速度

热门文章

  1. OPCClient远程连接OPC服务器配置手册
  2. 80c51流水灯汇编语言,利用80c51单片机制作流水灯(汇编语言)
  3. 小蜜蜂无纸化考试系统 官网
  4. 攻防世界Let_god_knows
  5. zabbix部署及监控测试
  6. HTML超链接实现页面内跳转
  7. 如何简单可靠地装系统-软碟通
  8. js实现漂亮的雪花飘落效果
  9. 搭建个人博客【搭建Hexo+Fluid博客并部署到GitHub/云服务器(阿里云/腾讯云)】
  10. python携程怎么做数据同步_python协程中同步如何使用?