Airflow实战–获取REST参数并通过Xcom传递给Bash算子

有时候我们需要通过REST API给调度任务来传递参数。参数可能有多个,也可能只有一个。比如,可能为Spark任务传递一个SQL语句等。

本文分析如何通过REST API为BashOperator的任务来传递参数。

前面已经分析过,如何在PythonOperator任务中来获取REST API的参数,这里分析如何把参数传递给BashOperator任务。

这里我们使用xcom方式来进行任务间的参数传递。也可以使用共享变量的方式来完成该任务,这个后面的文章再介绍。

把参数传递DAG中给不同类型的算子

这里演示:通过Python算子来接收参数,并把参数传递给DAG的Bash算子。

以下代码实现了两个Task,其中一个是PythonOperator,一个是BashOperator。其中的PythonOperator主要是用来接收REST API传递过来的参数值,并把该参值通过xcom保存起来。

BashOperator可以是执行任意任务的shell脚本,它可以通过xcom来获取PythonOperator传递过来的参数。通过这样的方式来实现从REST API给任务传递参数的目的。

#!/usr/bin/python3
# -*- coding: utf-8 -*-
#
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import timedeltadefault_args = {'owner': 'user1','depends_on_past': False,'start_date': airflow.utils.dates.days_ago(1),'email': ['user1@qq.com'],'email_on_failure': False,'email_on_retry': False,'retries': 1,'retry_delay': timedelta(seconds=3),
}dag = DAG('bash_get_param_2',default_args=default_args,description='bash get param job',schedule_interval=timedelta(days=1))# 通过模板的方式来获取xcom传递的参数值。
# 注意:return_value这个变量名是固定的,表示从任务:first_task,过来的变量名
bash_prit_param='echo "I get: {{ ti.xcom_pull(task_ids="first_task", key="return_value") }}"'print_param_task = BashOperator(task_id='print_param_task',do_xcom_push=True,bash_command=bash_prit_param,dag=dag)# 接收REST API的参数,并通过返回参数值来使用xcom保存参数
# 注意:可能有多个参数,可以把这些参数合并到一个变量中,然后返回。若以元组方式返回,则是一个列表。
def receive_param(**context):# 此句用来接收rest接口的参数,参数名为:push_keystock_list = context["dag_run"].conf.get("push_key")print("returned tickers: %s" % str(stock_list))return stock_list# 接收来自rest api的参数和值
first_operator = PythonOperator(task_id='first_task',python_callable=receive_param,dag=dag)print_param_task.set_upstream(first_operator)

代码说明

以上代码需要注意的是Bash算子接收参数的命令,也就是下面的语句:

bash_prit_param='echo "{{ ti.xcom_pull(task_ids="hello_task", key="return_value") }}"'

该语句表示从任务实例(taskinstance)也就是ti中获取返回的参数。这里的ti名称是固定的,表示任务实例;task_ids是返回参数的任务名;key参数的"return_value"也是固定的,表示任务对象的返回参数。

也就是说,通过bash获取参数时,{{ ti.xcom_pull(task_ids="first_task", key="return_value") }}就是bash获取上一个任务的返回值。其中的key的值是固定的。

触发DAG的执行(带参数)

通过以下的请求就可以把参数实时传递给执行的任务。

EXE_DATE=$(TZ=America/Curacao date '+%Y-%m-%dT%H:%M:%SZ')curl -X POST 'http://localhost:20001/api/v1/dags/bash_get_param_2/dagRuns' \-d "{\"execution_date\": \"${EXE_DATE}\", \"conf\": {\"push_key\":\"use db1; show tables;\"}}" \-H 'content-type: application/json' \--user "user1:user1"

命令行说明:以上命令中的conf是传递给远端DAG的参数值。该参数可以是多个,其格式可以不同。我这的参数名是push_key。

小结

本文分析了如何通过REST API给BASH任务传递参数。总结起来,就是:通过python的任务来获取REST API传递过来的参数(可以进行参数检查),并把需要的参数返回(return),这样就是把参数放到了xcom中。然后,其他类型的任务就可以通过模板格式,比如:"{{ ti.xcom_pull(task_ids=“hello_task”, key=“return_value”) }}"来获取参数的值。

参数可以是多个,但若python任务返回的参数是多个,可以放到一个变量中,然后返回。

Airflow实战--获取REST参数并通过Xcom传递给Bash算子相关推荐

  1. 将参数传递给Bash函数

    我试图搜索如何在Bash函数中传递参数,但是出现的是如何从命令行传递参数. 我想在我的脚本中传递参数. 我试过了: myBackupFunction("..", "... ...

  2. 检查传递给Bash脚本的参数数量

    本文翻译自:Check number of arguments passed to a Bash script I would like my Bash script to print an erro ...

  3. 爬取知乎壁纸:selenium模拟登陆获取cookies,再将cookies传递给requests

    selenium很好用,但是爬取大量数据时速度较慢. 通过selenium模拟登陆,获取cookies,再将cookies传递给requests,通过requests爬取加快速度. 以为知乎网爬取壁纸 ...

  4. Flask框架(flask中的request对象,获取请求参数,保存上传的文件)

    1.request中包含了前端发送过来的所有数据 ,请求的 request 对象中保存了一次HTTP请求的一切信息. 通过request.from可以直接发送提取请求体中的表单格式数据,是一个类字典的 ...

  5. python中multiple函数_关于多处理:在Python中将多个参数传递给pool.map()函数

    本问题已经有最佳答案,请猛点这里访问. 我需要一些方法来使用pool.map()中接受多个参数的函数.根据我的理解,pool.map()的目标函数只能有一个iterable作为参数,但是有没有其他参数 ...

  6. nginx 获取body参数_分布式实战:Nginx缓存之流量分发层

    本文首发于Ressmix个人站点:https://www.tpvlog.com 本章,我将进行Nginx流量分发层的lua代码编写.流量分发层的Nginx服务器,会基于商品id执行流量分发策略: 获取 ...

  7. gin框架之获取URL参数

    gin学习之获取URL参数 获取URL参数 获取querystring参数 获取form参数 获取json参数 获取path参数 参数的绑定 获取URL参数 在前面的学习中,我们已经学会了使用gin框 ...

  8. html的子页面获取自己url,如何从html页面获取url参数并将其显示在textarea中?

    我用下面的在我的HTML页面的JavaScript功能,我想没有textarea的任何连接操作正常显示参数....如何从html页面获取url参数并将其显示在textarea中? function g ...

  9. postman发送post请求,用request.getParameter()获取请求参数

    1.接口代码 @RequestMapping("notifyUrl")   public void notifyUrl(HttpServletRequest request, Ht ...

  10. 通过BeanShell获取UUID并将参数传递给Jmeter

    有些HTTPS请求报文的报文体中包含由客户端生成的UUID,在用Jmeter做接口自动化测试的时候,因为越过了客户端,直接向服务器端发送报文,所以,需要在Jmeter中通过beanshell获取UUI ...

最新文章

  1. Spring和shiro整合 logout 配置方式
  2. Mysql清理binlog日志
  3. 《Linux内核设计与实现》读书笔记(一)-内核简介
  4. MapReduce自定义Partitioner
  5. 把对象转换成JSON字符串 第二稿支持移动设备
  6. 需求分析挑战之旅(疯狂的订餐系统)(8)——最后的疯狂
  7. 阿里腾讯面试梳理个人成长经历分享
  8. 噪声产生原因_空调噪声大?啄木鸟家庭维修,看看属于哪一个问题
  9. 如何加快 Node.js 应用的启动速度
  10. 在字符串中找出第一个只出现一次的字符
  11. 华为P20云文档空间满了怎么清理_原来华为手机能这样清理垃圾,怪不得别人的手机再用两年不卡顿...
  12. 单精度浮点数(float)加法计算出错
  13. android finish 判断当前_Android开发,源码分析finish()和onBackPressed()的区别
  14. 计算机在环境工程专业中的应用,计算机在环境科学与工程方向的应用.pdf
  15. ASP.NET设置背景图案
  16. 2013-9百度技术沙龙:Clouda与nodejs
  17. 用计算机唱你笑起来真好看,你笑起来真好看简谱-李昕融/樊桐舟/李凯稠演唱-孙世彦制谱...
  18. 微信公众号如何上传html5,微信平台公众号怎么上传视频? 视频支持哪些格式
  19. 关于2010年部分节假日安排
  20. Linux系统增加新硬盘

热门文章

  1. Android 音乐播放器SD卡本地播放器实现
  2. FormData 对象上传二进制文件
  3. 检验样本是否服从泊松分布
  4. C语言——三位数排序
  5. MYSQL根据经纬度查询最近距离
  6. mysql 根据经纬度查询范围点
  7. no javac in (/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin)
  8. 小学计算机应用能力培训的计划,小学老师信息技术应用能力提升培训个人计划...
  9. Q3手机银行运营报告:直销银行江湖再起波澜,数字员工助力手机银行活跃度提升
  10. 舒淇出席活动兴致低 否认收张震结婚请柬