Airflow实战--获取REST参数并通过Xcom传递给Bash算子
Airflow实战–获取REST参数并通过Xcom传递给Bash算子
有时候我们需要通过REST API给调度任务来传递参数。参数可能有多个,也可能只有一个。比如,可能为Spark任务传递一个SQL语句等。
本文分析如何通过REST API为BashOperator的任务来传递参数。
前面已经分析过,如何在PythonOperator任务中来获取REST API的参数,这里分析如何把参数传递给BashOperator任务。
这里我们使用xcom方式来进行任务间的参数传递。也可以使用共享变量的方式来完成该任务,这个后面的文章再介绍。
把参数传递DAG中给不同类型的算子
这里演示:通过Python算子来接收参数,并把参数传递给DAG的Bash算子。
以下代码实现了两个Task,其中一个是PythonOperator,一个是BashOperator。其中的PythonOperator主要是用来接收REST API传递过来的参数值,并把该参值通过xcom保存起来。
BashOperator可以是执行任意任务的shell脚本,它可以通过xcom来获取PythonOperator传递过来的参数。通过这样的方式来实现从REST API给任务传递参数的目的。
#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedeltadefault_args = {'owner': 'user1','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(1),'email': ['user1@qq.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(seconds=3),
}dag = DAG('bash_get_param_2',default_args=default_args,description='bash get param job',schedule_interval=timedelta(days=1))# 通过模板的方式来获取xcom传递的参数值。
# 注意:return_value这个变量名是固定的,表示从任务:first_task,过来的变量名
bash_prit_param='echo "I get: {{ ti.xcom_pull(task_ids="first_task", key="return_value") }}"'print_param_task = BashOperator(task_id='print_param_task',do_xcom_push=True,bash_command=bash_prit_param,dag=dag)# 接收REST API的参数,并通过返回参数值来使用xcom保存参数
# 注意:可能有多个参数,可以把这些参数合并到一个变量中,然后返回。若以元组方式返回,则是一个列表。
def receive_param(**context):# 此句用来接收rest接口的参数,参数名为:push_keystock_list = context["dag_run"].conf.get("push_key")print("returned tickers: %s" % str(stock_list))return stock_list# 接收来自rest api的参数和值
first_operator = PythonOperator(task_id='first_task',python_callable=receive_param,dag=dag)print_param_task.set_upstream(first_operator)
代码说明
以上代码需要注意的是Bash算子接收参数的命令,也就是下面的语句:
bash_prit_param='echo "{{ ti.xcom_pull(task_ids="hello_task", key="return_value") }}"'
该语句表示从任务实例(taskinstance)也就是ti中获取返回的参数。这里的ti名称是固定的,表示任务实例;task_ids是返回参数的任务名;key参数的"return_value"也是固定的,表示任务对象的返回参数。
也就是说,通过bash获取参数时,{{ ti.xcom_pull(task_ids="first_task", key="return_value") }}
就是bash获取上一个任务的返回值。其中的key的值是固定的。
触发DAG的执行(带参数)
通过以下的请求就可以把参数实时传递给执行的任务。
EXE_DATE=$(TZ=America/Curacao date '+%Y-%m-%dT%H:%M:%SZ')curl -X POST 'http://localhost:20001/api/v1/dags/bash_get_param_2/dagRuns' \-d "{\"execution_date\": \"${EXE_DATE}\", \"conf\": {\"push_key\":\"use db1; show tables;\"}}" \-H 'content-type: application/json' \--user "user1:user1"
命令行说明:以上命令中的conf是传递给远端DAG的参数值。该参数可以是多个,其格式可以不同。我这的参数名是push_key。
小结
本文分析了如何通过REST API给BASH任务传递参数。总结起来,就是:通过python的任务来获取REST API传递过来的参数(可以进行参数检查),并把需要的参数返回(return),这样就是把参数放到了xcom中。然后,其他类型的任务就可以通过模板格式,比如:"{{ ti.xcom_pull(task_ids=“hello_task”, key=“return_value”) }}"来获取参数的值。
参数可以是多个,但若python任务返回的参数是多个,可以放到一个变量中,然后返回。
Airflow实战--获取REST参数并通过Xcom传递给Bash算子相关推荐
- 将参数传递给Bash函数
我试图搜索如何在Bash函数中传递参数,但是出现的是如何从命令行传递参数. 我想在我的脚本中传递参数. 我试过了: myBackupFunction("..", "... ...
- 检查传递给Bash脚本的参数数量
本文翻译自:Check number of arguments passed to a Bash script I would like my Bash script to print an erro ...
- 爬取知乎壁纸:selenium模拟登陆获取cookies,再将cookies传递给requests
selenium很好用,但是爬取大量数据时速度较慢. 通过selenium模拟登陆,获取cookies,再将cookies传递给requests,通过requests爬取加快速度. 以为知乎网爬取壁纸 ...
- Flask框架(flask中的request对象,获取请求参数,保存上传的文件)
1.request中包含了前端发送过来的所有数据 ,请求的 request 对象中保存了一次HTTP请求的一切信息. 通过request.from可以直接发送提取请求体中的表单格式数据,是一个类字典的 ...
- python中multiple函数_关于多处理:在Python中将多个参数传递给pool.map()函数
本问题已经有最佳答案,请猛点这里访问. 我需要一些方法来使用pool.map()中接受多个参数的函数.根据我的理解,pool.map()的目标函数只能有一个iterable作为参数,但是有没有其他参数 ...
- nginx 获取body参数_分布式实战:Nginx缓存之流量分发层
本文首发于Ressmix个人站点:https://www.tpvlog.com 本章,我将进行Nginx流量分发层的lua代码编写.流量分发层的Nginx服务器,会基于商品id执行流量分发策略: 获取 ...
- gin框架之获取URL参数
gin学习之获取URL参数 获取URL参数 获取querystring参数 获取form参数 获取json参数 获取path参数 参数的绑定 获取URL参数 在前面的学习中,我们已经学会了使用gin框 ...
- html的子页面获取自己url,如何从html页面获取url参数并将其显示在textarea中?
我用下面的在我的HTML页面的JavaScript功能,我想没有textarea的任何连接操作正常显示参数....如何从html页面获取url参数并将其显示在textarea中? function g ...
- postman发送post请求,用request.getParameter()获取请求参数
1.接口代码 @RequestMapping("notifyUrl") public void notifyUrl(HttpServletRequest request, Ht ...
- 通过BeanShell获取UUID并将参数传递给Jmeter
有些HTTPS请求报文的报文体中包含由客户端生成的UUID,在用Jmeter做接口自动化测试的时候,因为越过了客户端,直接向服务器端发送报文,所以,需要在Jmeter中通过beanshell获取UUI ...
最新文章
- Spring和shiro整合 logout 配置方式
- Mysql清理binlog日志
- 《Linux内核设计与实现》读书笔记(一)-内核简介
- MapReduce自定义Partitioner
- 把对象转换成JSON字符串 第二稿支持移动设备
- 需求分析挑战之旅(疯狂的订餐系统)(8)——最后的疯狂
- 阿里腾讯面试梳理个人成长经历分享
- 噪声产生原因_空调噪声大?啄木鸟家庭维修,看看属于哪一个问题
- 如何加快 Node.js 应用的启动速度
- 在字符串中找出第一个只出现一次的字符
- 华为P20云文档空间满了怎么清理_原来华为手机能这样清理垃圾,怪不得别人的手机再用两年不卡顿...
- 单精度浮点数(float)加法计算出错
- android finish 判断当前_Android开发,源码分析finish()和onBackPressed()的区别
- 计算机在环境工程专业中的应用,计算机在环境科学与工程方向的应用.pdf
- ASP.NET设置背景图案
- 2013-9百度技术沙龙:Clouda与nodejs
- 用计算机唱你笑起来真好看,你笑起来真好看简谱-李昕融/樊桐舟/李凯稠演唱-孙世彦制谱...
- 微信公众号如何上传html5,微信平台公众号怎么上传视频? 视频支持哪些格式
- 关于2010年部分节假日安排
- Linux系统增加新硬盘
热门文章
- Android 音乐播放器SD卡本地播放器实现
- FormData 对象上传二进制文件
- 检验样本是否服从泊松分布
- C语言——三位数排序
- MYSQL根据经纬度查询最近距离
- mysql 根据经纬度查询范围点
- no javac in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin)
- 小学计算机应用能力培训的计划,小学老师信息技术应用能力提升培训个人计划...
- Q3手机银行运营报告:直销银行江湖再起波澜,数字员工助力手机银行活跃度提升
- 舒淇出席活动兴致低 否认收张震结婚请柬