全栈工程师开发手册 (作者:栾鹏)
架构系列文章


参考:https://www.jianshu.com/p/089c56b4ec14

airflow介绍
https://lxwei.github.io/posts/airflow%E4%BB%8B%E7%BB%8D.html

python 教程:https://airflow.apache.org/tutorial.html

airflow调度

airflow的scheduler加载到dags后,会直接创建一批dags实例.

这些执行实例的Execution Date为start_time到当期时间之间的符合start_time+n*schedule_interval的时刻点(包含start_time), 有多少个符合时刻的时刻点就会保障数据库中有多少个dags实例在数据库中. (可以通过设置catchup=False将现在时刻之前的执行时间不再调度,否则注意这里是保证有,如果已经存在了就不再创建)

下一次scheduler 再次加载dags文件, 重新计算代码里面写的start_time,然后重新生成需要执行的实例, 发现部分之前时间的dags实例在数据库中已经有了,就不会再创建了,只会创建最新没有创建的运行实例.

注意: start_time要写成当前时间一个调度周期之前的时间才能有效.

新创建的dags实例会发送到celery, worder会订阅celery,执行 dags实例.

所以如果我们不想让两个dags实例同时运行, start_time的时间设置为上一次执行周期,或者设置worker只能有一个运行,并且每个里面只有一个并行.

worker订阅到dags实例后,会按照dags里面的代码检查一遍start_time 和接收到 dags实例的Execution Date

如果Execution Date 早于start_time 则会放弃执行,直接写入数据库dags实例执行错误.

dags中的多个task 如果包含依赖关系, 执行完上游task并不会立即执行下游task,会现将上游task的执行信息写入到数据库里面,清理准备执行下一个task, 中间大概有10s-15s的延迟.

多个worker之间是并行的. woker里面每个线程数也是并行的. 不过如果设置了每个worker里面的执行线程数

注意: 不要将时区转变为上海时区,因为里面会把时区转为UTC,所以还是使用UTC的时区,设定时间时,手动提前8小时

注意:

dag = DAG('tutorial', catchup=False, default_args=default_args)

catchup值为True,将忽略已经过去的执行时间。

忽略python脚本

如果自己的python文件不想被airflow搜索,可以在dags的根目录下面创建.airflowignore文件,每行一个正则,正则匹配到的文件,就会直接忽略。

注意只要包含正则项就会被忽略掉,并不是完全匹配才被忽略掉。

airflow.cfg配置文件

https://www.jianshu.com/p/69a768c84465
https://blog.csdn.net/sxf_123456/article/details/79141227

airflow源码分析-启动机制

用过airflow的人都知道,airflow webserver -p 8000 这样webserver就启动起来了
airflow scheduler 启动scheduler,具体是怎么启动起来的呢?看一下代码就知道了
setup.py

def do_setup():write_version()setup(name='apache-airflow',description='Programmatically author, schedule and monitor data pipelines',license='Apache License 2.0',version=version,packages=find_packages(exclude=['tests*']),package_data={'': ['airflow/alembic.ini', "airflow/git_version"]},include_package_data=True,zip_safe=False,scripts=['airflow/bin/airflow'],

当你执行airflow命令时,实际执行的是airflow/bin/airflow这个文件

airflow/bin/airflow

if __name__ == '__main__':if configuration.get("core", "security") == 'kerberos':os.environ['KRB5CCNAME'] = configuration.get('kerberos', 'ccache')os.environ['KRB5_KTNAME'] = configuration.get('kerberos', 'keytab')parser = CLIFactory.get_parser()args = parser.parse_args()args.func(args)

CLIFactory是一个解析类,当执行airflow webserver -p 时,CLIFactory负责把接收到的参数解析了,webserver对应的是airflow/bin/cli.py中的webserver(). scheduler同理

airflow部署和使用示例相关推荐

  1. AirFlow官方入门DAG示例

    经过前两篇文章的简单介绍之后,我们安装了自己的AirFlow以及简单了解了DAG的定义文件.现在我们要实现自己的一个DAG. 1. 启动Web服务器 使用如下命令启用: airflow webserv ...

  2. airflow部署问题集锦

    title: airflow部署问题集锦 date: 2022-10-21 22:58:30 tags: [airflow] categories: 调度系统 问题集锦. 环境: airflow: 2 ...

  3. airflow 部署

    环境 : ubuntu 14.04 LTS python 2.7 script: 设置环境变量: export AIRFLOW_HOME=~/airflow 安装相关依赖包: sudo apt-get ...

  4. DPI — nDPI — 安装部署与应用示例

    目录 文章目录 目录 nDPI 的安装 nDPI 的应用示例 通过协议文件来扩展 nDPI 解析器 处理 TLS 加密流量 nDPI 的安装 nDPI 支持在 Linux 平台运行,能够方便的将 nD ...

  5. 在tomcat新建html页面,仅将HTML,CSS网页部署到Tomcat(示例代码)

    我刚刚开始开发一个网站.我现在所拥有的只是一个由几个CSS样式表支持的HTML页面. 我可以从HTML和CSS页面创建WAR文件吗?如何将它们部署到Tomcat服务器上? 谢谢. 答案 没有必要创建一 ...

  6. Spark Operator 部署及入门示例

    关于存算分离 目前企业级的大数据应用主流还是采用Yarn或者Mesos来进行资源分配和运行调度的,例如我行目前采用Yarn来进行作业调度,并使用HDFS作为大数据的存储平台,这是典型的计算和存储紧耦合 ...

  7. centos7 redis5.0以前版本 部署集群示例 - 第二篇

    redis集群(5.0版本以前)的搭建,请参考笔者的这篇博客 https://blog.csdn.net/yzf279533105/article/details/103696990 redis集群( ...

  8. ansible-playbook 单个yml文件部署tomcat简单示例

    #单yaml配置[root@jenkins pb]# cat tomcat.yml --- - hosts: eeevars: #设置变量war_files: /var/lib/jenkins/wor ...

  9. 使用Amazon CDK部署基于Amazon Fargate的高可用、易扩展的Airflow集群

    前言 Apache Airflow(以下简称为Airflow) 是一项由Airbnb在 2014 年推出的开源项目,其目的是为了管理日益复杂的数据管理工具.脚本和分析工具,提供一个构建批处理工作流的方 ...

最新文章

  1. 万字长文的Redis五种数据结构详解(理论+实战),建议收藏。
  2. 脉冲20KV高压发生器电弧打火
  3. python工程师薪资坑吗-6年Python开发工程师精心总结学习思路,再不看看就凉了...
  4. sed 替换_生物信息之独孤九剑——sed
  5. 前端、数据库、Django简单的练习
  6. Android在第三方应用程序系统应用尽早开始,杀死自己主动的第三方应用程序,以重新启动...
  7. linux系统多网口聚合配置,Linux网卡聚合linux多网卡绑定聚合之bond模式的原理是什么...
  8. LeetCode第14题:最长公共前缀
  9. IntelliJ IDEA 运行你的第一个Java应用程序
  10. JAVA中的静态成员
  11. Android应用层 知识体系
  12. 软件基本功:以视频通话为例,交叉测试表格
  13. dom4j读取配置文件
  14. Oracle执行计划使用分析SQL执行效率
  15. SSH端口转发(port forwarding)基础知识
  16. jBox----弹出层插件
  17. 智能合约审计之DDOS概述
  18. 关于前端的CSS命名
  19. yum的配置文件yum.conf详解
  20. SQL 2014新功能介绍系列3 - 备份还原篇

热门文章

  1. python程序-调试Python程序代码的几种方法总结
  2. 语音识别技术准确率早已超过人类平均水平
  3. U-最小公倍数 递归
  4. 计算机网络实验5以太网链路帧实验,计算机网络实验-使用Wireshark分析以太网帧与ARP协议.docx...
  5. 【java笔记】缓冲流
  6. C++从文件中查找特定的字符串,并提取该字符串
  7. java8 clock_java8新的时间api
  8. 【AcWing】103. 电影(离散化)
  9. 【图论】二分图学习笔记
  10. mininet编程实现交换机规则的插入、删除与修改。_Mysql闲聊之从Redo、Undo日志到MVCC实现原理