目录

  • 基于docker的airflow的构建
  • 主要记录排错手段和几个巨坑:
  • 调度 && 失败告警(邮件 && 微信)

基于docker的airflow的构建

基于docker安装的官网教程,官网是入门的第一手好资料,虽说是官网,但大家环境各部相同,坑也是五花八门

主要记录排错手段和几个巨坑:

手段
1. 如果pod出现 unhealthy , 请用docker inspect ***(container_id)查询定位错误
2. 如果pod还在但是报错,请用docker logs ***(container_id)分析日志记录

巨坑(坑中之坑)

你没看错就是这个地方导致的,不能基于root安装,Ubuntu默认非root用户按教程一路起飞,centos一般都是root,一路各种掉坑,不然死活就是unhealthy,一查缺celery等各种依赖,但是用docker exec -it ***(container_id) bash进入pod查看,依赖都在,各种掉头发,别问我是怎么知道的
解决方法有两种:

3. 在.env里面指定为非0,如5000等
或
4.在docker-compose.yaml中指定这个非0的uid

小坑

1.hosts文件不能在通过Dockerfile覆盖要通过docker-compose.yaml中追加
2.有些系统提示权限受限,记得在docker-comopse.yaml中加privilege为true
3.改动docker网络中暴露的redis接口,具体查看docker-comopse.yaml

安装步骤:
1.二次封装镜像
Dockerfile

FROM apache/airflow:2.5.3-python3.8
USER root
# RUN sed -i 's#http://deb.debian.org#https://mirrors.163.com#g' /etc/apt/sources.list
RUN sed -i 's#http://deb.debian.org#https://mirrors.cloud.tencent.com#g' /etc/apt/sources.list
RUN  apt-get clean
RUN apt-get update \&& apt-get install -y gcc libkrb5-dev krb5-user \&& apt-get autoremove -yqq --purge \&& apt-get clean \&& rm -rf /var/lib/apt/lists/*
USER airflow
COPY requirements.txt /
RUN pip install --no-cache-dir -r /requirements.txt -i https://mirrors.cloud.tencent.com/pypi/simple
# COPY airflow.cfg                   ${AIRFLOW_HOME}/
COPY html_content_template_file    ${AIRFLOW_HOME}/
COPY subject_template_file         ${AIRFLOW_HOME}/
COPY krb5.conf                     /etc/

构建命令: docker build -t tian/airflow:2.5.3 .注意最后的点

docker-compose.yaml

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
## Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME           - Docker image name used to run Airflow.
#                                Default: apache/airflow:2.5.3
# AIRFLOW_UID                  - User ID in Airflow containers
#                                Default: 50000
# AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
#                                Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''
#
# Feel free to modify this file to suit your needs.
---
version: '3.8'
x-airflow-common:&airflow-common# In order to add custom dependencies or upgrade provider packages you can use your extended image.# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml# and uncomment the "build" line below, Then run `docker-compose build` to build the images.image: ${AIRFLOW_IMAGE_NAME:-tian/airflow:2.5.3}# build: .environment:&airflow-common-envAIRFLOW__CORE__DEFAULT_TIMEZONE: Asia/Shanghai #解决UTC时区问题AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: Asia/Shanghai #解决UTC时区问题AIRFLOW__CORE__EXECUTOR: CeleryExecutorAIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow# For backward compatibility, with Airflow <2.3AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflowAIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflowAIRFLOW__CELERY__BROKER_URL: redis://:@redis:16379/0AIRFLOW__CORE__FERNET_KEY: ''AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'#AIRFLOW__CORE__LOAD_EXAMPLES: 'true'AIRFLOW__CORE__LOAD_EXAMPLES: 'false'AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'# # E-mailAIRFLOW__EMAIL__EMAIL_BACKEND: airflow.utils.email.send_email_smtpAIRFLOW__EMAIL__FROM_EMAIL: airflow<bi_alert@yijiupi.com>AIRFLOW__EMAIL__SUBJECT_TEMPLATE: /opt/airflow/template/subject_template_fileAIRFLOW__EMAIL__HTML_CONTENT_TEMPLATE: /opt/airflow/template/html_content_template_fileAIRFLOW__SMTP__SMTP_STARTTLS: 'false'AIRFLOW__SMTP__SMTP_SSL: 'true'AIRFLOW__SMTP__SMTP_HOST: smtp.exmail.qq.comAIRFLOW__SMTP__SMTP_PORT: 465AIRFLOW__SMTP__SMTP_USER: bi_alert@yijiupi.comAIRFLOW__SMTP__SMTP_PASSWORD: LRBiN99ZgiGGKabMAIRFLOW__SMTP__SMTP_MAIL_FROM: bi_alert@yijiupi.comAIRFLOW__WEBSERVER__BASE_URL: http://localhost:8080# yamllint disable rule:line-length# Use simple http server on scheduler for health checks# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server# yamllint enable rule:line-lengthAIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'# WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks# for other purpose (development, test and especially production usage) build/extend Airflow image._PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}volumes:- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins- ${AIRFLOW_PROJ_DIR:-.}/script:/opt/airflow/script  # 其他大项目通过bash来启动- ${AIRFLOW_PROJ_DIR:-.}/template:/opt/airflow/template # 发邮件的模板# user: "${AIRFLOW_UID:-50000}:0"user: "${AIRFLOW_UID:-0}:${AIRFLOW_GID:-0}"depends_on:&airflow-common-depends-onredis:condition: service_healthypostgres:condition: service_healthyservices:postgres:image: postgres:13privileged: trueenvironment:POSTGRES_USER: airflowPOSTGRES_PASSWORD: airflowPOSTGRES_DB: airflowvolumes:- postgres-db-volume:/var/lib/postgresql/datahealthcheck:test: ["CMD", "pg_isready", "-U", "airflow"]interval: 10sretries: 5start_period: 5srestart: alwaysredis:image: redis:latestcommand: redis-server --port 16379privileged: trueexpose:- 16379healthcheck:test: ["CMD", "redis-cli", "-p" ,"16379" , "ping"]interval: 10stimeout: 30sretries: 50start_period: 30srestart: alwaysairflow-webserver:<<: *airflow-commoncommand: webserverprivileged: trueports:- "8080:8080"healthcheck:test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]interval: 30stimeout: 10sretries: 5start_period: 30srestart: alwaysdepends_on:<<: *airflow-common-depends-onairflow-init:condition: service_completed_successfullyairflow-scheduler:<<: *airflow-commoncommand: schedulerprivileged: truehealthcheck:test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]interval: 30stimeout: 10sretries: 5start_period: 30srestart: alwaysdepends_on:<<: *airflow-common-depends-onairflow-init:condition: service_completed_successfullyairflow-worker:<<: *airflow-commoncommand: celery workerprivileged: trueextra_hosts:- "kdc-master:172.16.11.81"- "kdc-slave:172.16.11.82"- "haproxy.cdh.yjp.com:172.16.11.46"- "bi-ml-redis.yjp.com:10.20.1.130"- "Mysql-Portrait.yjp.com:10.20.1.59"- "wuhu-master1.cdh.yjp.com:172.16.11.151"- "wuhu-master2.cdh.yjp.com:172.16.11.152"- "wuhu-master3.cdh.yjp.com:172.16.11.153"- "in-yjgj-gateway.yjp.com:10.20.4.150" # 通过yjgj下推告警到wechathealthcheck:test:- "CMD-SHELL"- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'interval: 30stimeout: 10sretries: 5start_period: 30senvironment:<<: *airflow-common-env# Required to handle warm shutdown of the celery workers properly# See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagationDUMB_INIT_SETSID: "0"restart: alwaysdepends_on:<<: *airflow-common-depends-onairflow-init:condition: service_completed_successfullyairflow-triggerer:<<: *airflow-commoncommand: triggererprivileged: truehealthcheck:test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']interval: 30stimeout: 10sretries: 5start_period: 30srestart: alwaysdepends_on:<<: *airflow-common-depends-onairflow-init:condition: service_completed_successfullyairflow-init:<<: *airflow-commonentrypoint: /bin/bash# yamllint disable rule:line-lengthcommand:- -c- |function ver() {printf "%04d%04d%04d%04d" $${1//./ }}airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)airflow_version_comparable=$$(ver $${airflow_version})min_airflow_version=2.2.0min_airflow_version_comparable=$$(ver $${min_airflow_version})if (( airflow_version_comparable < min_airflow_version_comparable )); thenechoecho -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"echoexit 1fiif [[ -z "${AIRFLOW_UID}" ]]; thenechoecho -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"echo "If you are on Linux, you SHOULD follow the instructions below to set "echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."echo "For other operating systems you can get rid of the warning with manually created .env file:"echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"echofione_meg=1048576mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)disk_available=$$(df / | tail -1 | awk '{print $$4}')warning_resources="false"if (( mem_available < 4000 )) ; thenechoecho -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"echowarning_resources="true"fiif (( cpus_available < 2 )); thenechoecho -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"echo "At least 2 CPUs recommended. You have $${cpus_available}"echowarning_resources="true"fiif (( disk_available < one_meg * 10 )); thenechoecho -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"echowarning_resources="true"fiif [[ $${warning_resources} == "true" ]]; thenechoecho -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"echo "Please follow the instructions to increase amount of resources available:"echo "   https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"echofimkdir -p /sources/logs /sources/dags /sources/pluginschown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}exec /entrypoint airflow version# yamllint enable rule:line-lengthenvironment:<<: *airflow-common-env_AIRFLOW_DB_UPGRADE: 'true'_AIRFLOW_WWW_USER_CREATE: 'true'_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}_PIP_ADDITIONAL_REQUIREMENTS: ''user: "0:0"volumes:- ${AIRFLOW_PROJ_DIR:-.}:/sourcesairflow-cli:<<: *airflow-commonprofiles:- debugenvironment:<<: *airflow-common-envCONNECTION_CHECK_MAX_COUNT: "0"# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252command:- bash- -c- airflow# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up# or by explicitly targeted on the command line e.g. docker-compose up flower.# See: https://docs.docker.com/compose/profiles/flower:<<: *airflow-commoncommand: celery flowerprofiles:- flowerports:- "5555:5555"healthcheck:test: ["CMD", "curl", "--fail", "http://localhost:5555/"]interval: 30stimeout: 10sretries: 5start_period: 30srestart: alwaysdepends_on:<<: *airflow-common-depends-onairflow-init:condition: service_completed_successfullyvolumes:postgres-db-volume:

cd ~/airflow && mkdir -p ./dags ./logs ./plugins ./template
构建命令: docker-compose up -d
卸载pod命令: docker-compose down --volumes --remove-orphans
卸载pod和镜像: docker-compose down --volumes --rmi all

可能需要用到的几个文件
html_content_template_file

这重试的第 {{try_number}} 次,共 {{max_tries + 1}}次<br>
<br>
<br>
<strong>主机名:</strong><br> {{ti.hostname}}<br>
<strong><font color="red">错误时间:</font></strong><br>{{ execution_date.in_timezone("Asia/Shanghai")}}<br>
<strong><font color="red">错误原因:</font></strong><br>{{exception_html}}<br>
<strong>错误日志路径:</strong><br> <a href="{{ti.log_url}}">{{ti.log_url}}</a><br>

subject_template_file

Airflow错误告警:{{dag.dag_id}}_{{task.task_id}}_{{ execution_date.in_timezone("Asia/Shanghai").strftime("%Y-%m-%d") }}

krb5.conf

[libdefaults]dns_lookup_realm = falseticket_lifetime = 24hrenew_lifetime = 7dforwardable = truerdns = false
#  pkinit_anchors = /etc/pki/tls/certs/ca-bundle.crtdefault_realm = CDH.COM#default_ccache_name = KEYRING:persistent:%{uid}[realms]CDH.COM = {kdc = kdc-masterkdc = kdc-slaveadmin_server = kdc-masterdefault_domain = CDH.COM}[domain_realm].cdh.com = CDH.COMcdh.com = CDH.COM

requirements.txt

six
bitarray
thrift==0.16.0
thrift_sasl==0.4.3
kerberos==1.3.0
impyla==0.18.0
pandas==1.4.4
numpy==1.23.3
configparser==5.0.2
krbcontext==0.10
kafka-python==2.0.2
PyMySQL==1.0.3

调度 && 失败告警(邮件 && 微信)

1.公共 util工具类:

import json
import timeimport pandas as pd
import pendulum
import requests
from impala.dbapi import connect
from kafka import KafkaProducer
from krbcontext import krbcontextdef read_sql(path):"""读取路径中的文档"""with open(path, 'r', encoding='utf-8') as f:return f.read()def get_data_from_impala(sql, host='haproxy.cdh.yjp.com', port=21050, keytab='/opt/airflow/dags/impala.keytab'):"""通过impala取数,转化为padans数据集"""def parse_cols(desc):"""取impala表列明"""return [i[0] for i in desc]data_list = []cols = []with krbcontext(using_keytab=True, principal='impala',keytab_file=keytab,ccache_file='krb5cc_0'):conn = connect(host, port, auth_mechanism='GSSAPI', kerberos_service_name='impala')cur = conn.cursor()print(sql)cur.execute(sql)for row in cur:# print(row)data_list.append(row)cols = parse_cols(cur.description)cur.close()conn.close()return pd.DataFrame(data_list, columns=cols)def df2kafka_dic(df, table_name, bootstrap_servers='''wuhu-master1.cdh.yjp.com:9092,wuhu-master2.cdh.yjp.com:9092,wuhu-master3.cdh.yjp.com:9092''',topic='ai_wide_table_neo4j'):"""将dataframe格式转换为推送至kafka的格式:param topic::param bootstrap_servers::param key_cols::param df: dataframe:param table_name: 表名:param need_cols: 需要的字段, 主键需要写在最前面:return:"""producer = KafkaProducer(bootstrap_servers=bootstrap_servers,# compression_type='gzip', # 导致消息无法正常发送,且不报异常retries=10,linger_ms=0,batch_size=16384 * 10,max_request_size=1024 * 1024 * 10,buffer_memory=1024 * 1024 * 256)results = df.to_dict('records')i = 0for result in results:result['tbl_name'] = table_namejs = json.dumps(result).encode('utf-8')# print(js)producer.send(topic=topic, value=js)i += 1print('-------写入kafka的数据条数:', i)def send_msg_2_wechat(context):"""发送消息到wechat:param context::return:"""ti = context['task_instance']dag_id = ti.dag_idtask_id = ti.taskexecution_date = context['execution_date']time = execution_date.in_timezone("Asia/Shanghai").strftime("%Y-%m-%d %H:%M:%S")url = 'http://*******user_ids = [***, ***, ***, ***]msg = f'Airflow错误告警:{dag_id}_{task_id}_{time}'print(msg)content = {'content': msg}headers = {'x-tenant-id': '9', 'content-type': 'application/json'}params = {'userIds': user_ids, 'magType': 'text', 'content': json.dumps(content)}# response = requests.post(url=url, data=json.dumps(params), headers=headers)#print(response.text)print("----发送告警到wechat---")if __name__ == '__main__':print('--send msg to wechat--')send_msg_2_wechat()
  1. dag-tutorial调度测试

from datetime import datetime, timedelta
from textwrap import dedent# The DAG object; we'll need this to instantiate a DAG
import pendulum
from airflow import DAG# Operators; we need this to operate!
from airflow.operators.bash import BashOperatorfrom utils import send_msg_2_wechatwith DAG("tutorial2",# These args will get passed on to each operator# You can override them on a per-task basis during operator initializationdefault_args={"depends_on_past": False,"email": ["tianjun@yijiupi.com"],"email_on_failure": True,"email_on_retry": False,"retries": 1,"retry_delay": timedelta(seconds=10),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016, 1, 1),# 'wait_for_downstream': False,# 'sla': timedelta(hours=2),# 'execution_timeout': timedelta(seconds=300),'on_failure_callback': send_msg_2_wechat,# 'on_success_callback': some_other_function,# 'on_retry_callback': another_function,# 'sla_miss_callback': yet_another_function,# 'trigger_rule': 'all_success'},description="A simple tutorial DAG",schedule="56 23 * * *",start_date=pendulum.datetime(2017, 1, 1, tz=pendulum.timezone("Asia/Chongqing")),catchup=False,tags=["example"],
) as dag:# t1, t2 and t3 are examples of tasks created by instantiating operatorst1 = BashOperator(task_id="print_date",bash_command="date",)t2 = BashOperator(task_id="sleep",depends_on_past=False,bash_command="sleep 5 && exit -1",retries=3,)t1.doc_md = dedent("""\#### Task DocumentationYou can document your task using the attributes `doc_md` (markdown),`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which getsrendered in the UI's Task Instance Details page.![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)""")dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; OR# dag.doc_md = """# This is a documentation placed anywhere# """  # otherwise, type it like thisdag.doc_md = dedent("""\#### Task DocumentationYou can document your task using the attributes `doc_md` (markdown),`doc` (plain text), `doc_rst`, `doc_json`, `doc_yaml` which getsrendered in the UI's Task Instance Details page.![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)**Image Credit:** Randall Munroe, [XKCD](https://xkcd.com/license.html)""")templated_command = dedent("""{% for i in range(5) %}echo "{{ ds }}"echo "{{ macros.ds_add(ds, 7)}}"{% endfor %}""")t3 = BashOperator(task_id="templated",depends_on_past=False,bash_command=templated_command,)t1 >> [t2, t3]
  1. cdh-kerberos调度测试
import datetime
from textwrap import dedentimport pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from utils import read_sql, get_data_from_impala, df2kafka_dic, send_msg_2_wechatdef get_data_impala_2_kafka(path):date_key = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime('%Y%m%d')for sql in read_sql(path).split(';')[:-1]:sql = sql.format(date_key=date_key)print(sql)df = get_data_from_impala(sql)df2kafka_dic(df, table_name='yjp_dm_ai.dm_ai_trd_features_user_bak')with DAG("ai-feature-user",# These args will get passed on to each operator# You can override them on a per-task basis during operator initializationdefault_args={"depends_on_past": False,"email": ["tianjun@yijiupi.com"],"email_on_failure": True,"email_on_retry": False,"retries": 2,"retry_delay": datetime.timedelta(minutes=1),# 'queue': 'bash_queue',# 'pool': 'backfill',# 'priority_weight': 10,# 'end_date': datetime(2016, 1, 1),# 'wait_for_downstream': False,# 'sla': timedelta(hours=2),# 'execution_timeout': timedelta(seconds=300),'on_failure_callback': send_msg_2_wechat,# 'on_success_callback': some_other_function,# 'on_retry_callback': another_function,# 'sla_miss_callback': yet_another_function,# 'trigger_rule': 'all_success'},description="user features ",schedule="56 01 * * *",start_date=pendulum.datetime(2023, 4, 19, tz=pendulum.timezone("Asia/Shanghai")),catchup=False,tags=["ai-user-features"],
) as dag:t1 = PythonOperator(task_id='user-features',provide_context=True,python_callable=get_data_impala_2_kafka,op_kwargs={'path': '/opt/airflow/dags/sql/user_features/user_features.sql'})t1.doc_md = dedent("""#### 用户侧特征详情...""")dag.doc_md = __doc__  # providing that you have a docstring at the beginning of the DAG; ORdag.doc_md = """用户侧特征调度"""  # otherwise, type it like thist1

最后截图看看最后的成果:

构建基于docker的airflow并访问有kerberos认证的cdh集群相关推荐

  1. 基于对称加密的密钥分配和Kerberos认证

    基于对称加密的密钥分配和Kerberos认证 对于对称加密,加密双方必须共享同一密钥,而且必须保护密钥不被他人读取.此外,常常需要频繁地改变密钥来减少某个攻击者可能知道密钥带来的数据泄露.因此,任何密 ...

  2. 《Linux运维实战:Centos7.6基于ansible一键离线部署mongodb4.2.23容器版副本集群》

    一.部署背景 由于业务系统的特殊性,我们需要针对不同的客户环境部署 mongodb副本集群,由于大都数用户都是专网环境,无法使用外网,为了更便捷,高效的部署,针对业务系统的特性,我这边编写了基于ans ...

  3. 最简单DIY基于51单片机、PCA9685、IIC、云台的舵机集群控制程序

    51单片机物联网智能小车系列文章目录 第一篇:最简单DIY的51蓝牙遥控小车设计方案 第二篇:最简单DIY串口蓝牙硬件实现方案 第三篇:最简单DIY蓝牙PS2遥控器控制蓝牙智能小车 第四篇:最简单DI ...

  4. mac启动本地redis_通过 Laravel Sail 构建基于 Docker 的本地开发环境

    Laravel 官方最近发布了 Laravel Sail -- 一个轻量级的.基于 Docker 的 Laravel 本地集成开发环境,今天学院君就以 Mac 系统为例,给大家演示下如何基于 Lara ...

  5. 3、基于多播、安全认证的corosync集群(VIP、Httpd、Filesystem)

    Messaging Layer --> CRM --> RA systemd:/usr/lib/systemd/system systemd有一个特性,即便一个服务开机启动,但是在开机后这 ...

  6. docker mysql日志写入本地_Docker Compose搭建MySQL主从复制集群

    转载自https://zhuanlan.zhihu.com/p/45193580 前言 随着应用业务数据不断的增大,应用的 响应速度不断下降,在检测过程中我们不难发现大多数的请求都是 查询操作.此时, ...

  7. Docker下RabbitMQ四部曲之一:极速体验(单机和集群)

    从本章开始,我们一起在Docker环境实战RabbitMQ环境部署和对应的Java开发,当前是<Docker下RabbitMQ四部曲>系列的第一篇,整个系列由以下四篇文章组成: 第一篇,即 ...

  8. 【基于容器的部署、扩展和管理】3.6 集群监控和日志收集

    往期回顾: 第一章:[云原生概念和技术] 第二章:[容器化应用程序设计和开发] 第三章:[3.1 容器编排系统和Kubernetes集群的构建] 第三章:[3.2 基于容器的应用程序部署和升级] 第三 ...

  9. spark ui的访问地址_Spark篇之HA集群搭建

    一.下载Spark安装包 可以从官网下载,本集群选择的版本是spark-1.6.0-bin-hadoop2.6 二.部署和规划Spark集群 提前准备好四台虚拟主机,三台主机 node1 node2 ...

最新文章

  1. Python学习--not语句
  2. MIT 6.824 Lab2A (raft) -- Leader Election
  3. 016_Vue数组数据的响应式处理
  4. 数学图形(2.23)Cylindric sine wave柱面正弦曲线
  5. 关于Android SDK工具Lint的误报分析
  6. Tomcat路径配置
  7. c语言解析字符串报文,传递字符串数组报文和解析
  8. port security violation protect retrict shutdown 之具体解释
  9. mysql 根据总分排名
  10. java 通用查询_java 通用查询
  11. 微信小程序API之getSystemInfo
  12. soapUI和Jmeter的接口测试结构区别
  13. OpenCV_(Fit Line with points)用直线拟合一组点
  14. Ipad2.4 9.3.5 降级8.4.1方法
  15. MKV文件提取dts音频转化成ac3
  16. Python实战,截图识别文字,过万使用量版本?
  17. 计算机不识别加密狗,用友加密狗识别不到_电脑无法识别用友软件加密狗
  18. python 学习笔记之手把手讲解如何使用原生的 urllib 发送网络请求
  19. Python初学者应该选择哪个版本
  20. mysql b树子节点个数_MySQL 和 B 树的那些事-爱可生

热门文章

  1. 大会倒计时|2020 PostgreSQL亚洲大会-中文分论坛议程安排
  2. 解析智能抄表工作系统是如何运作的
  3. idea maven项目提示程序包xxx不存在,找不到符号xxx的解决办法
  4. 【无标题】在 VirtualBox 上安装 macOS Big Sur 和 Catalina
  5. 将一个数组中的值按逆序重新存放。例如,原来的顺序为8,6,5,4,1。要求改为1,4,5,6,8。输出逆序后数组的整数,每两个整数之间用空格分隔。
  6. Windows运行vbs在微信下自动发送烟花、庆祝
  7. 智能门锁怎么选(3)
  8. D3D11 立方体贴图(天空盒子)
  9. jvm-性能优化专题-jvm最全
  10. WHUT第九周训练整理