文章目录

  • 一、准备
  • 二、编译
  • 三、配置
  • 四、Alter
  • 五、使用参数详细解析

一、准备

re_data官方文档
完成dbt的安装及测试
可参考文章:DBT的安装及测试(基础)
配置文件的修改:
1、项目profile_test目录下的dbt_project.yml
profile_test这个名字需要和 /root/.dbt/profiles.yml 文件配置的项目名一致

name: 'profile_test'
version: '1.0.0'
config-version: 2profile: 'profile_test'model-paths: ["models"]
analysis-paths: ["analyses"]
test-paths: ["tests"]
docs-paths: ["docs"]
seed-paths: ["seeds"]
macro-paths: ["macros"]
snapshot-paths: ["snapshots"]target-path: "target"  # directory which will store compiled SQL files
clean-targets:         # directories to be removed by `dbt clean`- "target"- "dbt_packages"on-run-end:- "{{ re_data.save_test_history(results) }}"- "{{ re_data.save_run_history(results) }}"#调节整体项目告警等级
#tests:
#  +severity: warn
vars:# 公共变量#dt: '{{ (modules.datetime.date.today() + modules.datetime.timedelta(hours=-8)).strftime("%Y-%m-%d") }}'dt: '{{ (modules.datetime.date.today()).strftime("%Y-%m-%d") }}'monitor_table: 'monitor.monitor_source_table_update_mark_info'# (optional) if not passed, stats for last day will be computedre_data:time_window_start: '{{ (run_started_at - modules.datetime.timedelta(1)).strftime("%Y-%m-%d 00:00:00") }}'re_data:time_window_end: '{{ run_started_at.strftime("%Y-%m-%d 00:00:00") }}'re_data:anomaly_detector:name: modified_z_scorethreshold: 3   阀值  re_data:schemas:- ods- dwd- dwm- dws- ads- monitorre_data:metrics_base:table:- row_count- freshnesscolumn:numeric:- min- max- avg- stddev- variance- nulls_count- nulls_percenttext:- min_length- max_length- avg_length- nulls_count- missing_count- nulls_percent- missing_percent# (optional) tells how much hisory you want to consider when looking for anomaliesre_data:anomaly_detection_look_back_days: 30# In this example config, we tell dbt to build all models in the example/ directory
# as tables. These settings can be overridden in the individual model files
# using the `{{ config(...) }}` macro.
models:+persist_docs:relation: truecolumns: truere_data:enabled: true+schema: monitorinternal:+schema: monitorprofile_test:# Config indicated by + and applies to all files under models/example/ods:schema: odsdwd:schema: dwddwm:schema: dwmdws:schema: dwsads:schema: adsmonitor:schema: monitorseeds:profile_test:+schema: monitor

2、 /root/.dbt/profiles.yml
设置环境变量方便环境切换,或者直接固定

profile_test:outputs:dev:type: redshiftthreads: 1host: "{{ env_var('REDSHIFT_HOST') }}"cluster_id: "{{ env_var('REDSHIFT_CLUSTER_ID') }}"port: 5439user: "{{ env_var('REDSHIFT_USER') }}"pass: "{{ env_var('REDSHIFT_PASSWD') }}"dbname: "{{ env_var('REDSHIFT_DBNAME') }}"schema: "{{ env_var('REDSHIFT_SCHEMA') }}"prod:type: redshiftthreads: 1host: "{{ env_var('REDSHIFT_HOST') }}"cluster_id: "{{ env_var('REDSHIFT_CLUSTER_ID') }}"port: 5439user: "{{ env_var('REDSHIFT_USER') }}"pass: "{{ env_var('REDSHIFT_PASSWD') }}"dbname: "{{ env_var('REDSHIFT_DBNAME') }}"schema: "{{ env_var('REDSHIFT_SCHEMA') }}"target: dev
# 禁止发送指标收集信息
config:send_anonymous_usage_stats: False

3、packages.yml
此处re_data 使用本地路径安装,对re_data做了二次开发

packages:- package: dbt-labs/dbt_utilsversion: [">=0.8.0", "<0.9.0"]- local: ../re_data

二、编译

Python环境3.7及以上
1、源码下载:
https://github.com/re-data/re-data
编译需重写以下全部方法
参数添加
@notify.command()
方法重写
@add_options(dbt_flags)
@anonymous_tracking
2、报警方法添加:
可以新建python文件,添加各种报警方法
添加到 notifications 目录下
例如:钉钉报警
修改slack.py 文件
添加如下方法:

def dingTalk(webhook_url,message):headers={"Content-Type": "application/json"}data={"msgtype": "text","text": {"content": message}}json_data=json.dumps(data)requests.post(url=webhook_url,data=json_data,headers=headers)

修改re_data以下文件

详细代码如下

import click
import subprocess
import json
from datetime import date, timedelta
import shutilimport os
from re_data.templating import render
from re_data.include import OVERVIEW_INDEX_FILE_PATH
from http.server import SimpleHTTPRequestHandler
import webbrowser
from socketserver import TCPServer
from yachalk import chalk
import yaml
from re_data.notifications.slack import slack_notify,dingTalk
from re_data.utils import format_alerts_to_table, parse_dbt_vars
from dbt.config.project import Project
from re_data.tracking import anonymous_trackingfrom re_data.notifications.push_monitor import alter_rundef add_options(options):def _add_options(func):for option in reversed(options):func = option(func)return funcreturn _add_optionsdef add_dbt_flags(command_list, flags):for key, value in flags.items():# exclude the --dbt-vars flag, as it's not a valid dbt flagif value and key != 'dbt_vars':key = key.replace('_', '-')command_list.extend([f'--{key}', value])print(' '.join(command_list))def get_target_paths(kwargs, re_data_target_dir=None):project_root = os.getcwd() if not kwargs.get('project_dir') else os.path.abspath(kwargs['project_dir'])partial = Project.partial_load(project_root)dbt_target_path = os.path.abspath(partial.project_dict['target-path'])if re_data_target_dir:re_data_target_path = os.path.abspath(re_data_target_dir)else:re_data_target_path = os.path.join(dbt_target_path, 're_data')return dbt_target_path, re_data_target_pathdbt_profile_option = click.option('--profile',type=click.STRING,help="""Which profile to load. Overrides setting in dbt_project.yml"""
)
dbt_target_option = click.option('--target',type=click.STRING,help="""Which target to load for the given profile."""
)
dbt_profiles_dir_option = click.option('--profiles-dir',type=click.STRING,help="""Which directory to look in for the profiles.yml file.Default = ~/.dbt"""
)
dbt_project_dir_option = click.option('--project-dir',type=click.STRING,help="""Which directory to look in for the dbt_project.ymlfile. Default is the current working directory and itsparents"""
)
dbt_vars_option = click.option('--dbt-vars',type=click.STRING,help="""Supply variables to the project. This argumentoverrides variables defined in your dbt_project.ymlfile. This argument should be a YAML string, eg.{my_var: my_val}'"""
)dbt_flags = [dbt_profile_option,dbt_target_option,dbt_project_dir_option,dbt_profiles_dir_option,dbt_vars_option
]@click.group(help=f"re_data CLI")
def main():pass@main.command()
@click.argument('project_name'
)
@anonymous_tracking
def init(project_name):print(f"Creating {project_name} template project")dir_path = os.path.dirname(os.path.realpath(__file__))shutil.copytree(os.path.join(dir_path, 'dbt_template'), project_name)with open(f"{project_name}/dbt_project.yml", "w") as f:f.write(render.render_dbt_project(project_name))bash_command = f'cd {project_name} && dbt deps'response = os.system(bash_command)if not response:info = chalk.green("SUCCESS")else:info = chalk.red("FAILURE")print(f"Creating {project_name} template project", info)if not response:print(f"Setup profile & re_data:schemas var in dbt_project.yml", "INFO")@main.command()
@add_options(dbt_flags)
@anonymous_tracking
def detect(**kwargs):print(f"Detecting tables", "RUN")dbt_vars = parse_dbt_vars(kwargs.get('dbt_vars'))run_list = ['dbt', 'run', '--models', 're_data_columns', 're_data_monitored']if dbt_vars: run_list.extend(['--vars', yaml.dump(dbt_vars)])add_dbt_flags(run_list, kwargs)completed_process = subprocess.run(run_list)completed_process.check_returncode()print(f"Detecting tables", "SUCCESS")@main.command()
@click.option('--start-date',type=click.DateTime(formats=["%Y-%m-%d"]),default=str(date.today() - timedelta(days=1)),help="Specify starting date to compute monitoring data, by default re_data will use yesterday for that value"
)
@click.option('--end-date',type=click.DateTime(formats=["%Y-%m-%d"]),default=str(date.today()),help="""Specify end date to compute monitoring data, by default re_data will use today for that.And compute stats for last full data for that"""
)
@click.option('--interval',type=click.STRING,default='days:1',help="""Specify interval format. e.g. `days:1` translates to a time interval of 1 day"""
)
@click.option('--full-refresh',is_flag=True,help='Warning! If specified re_data runs first dbt run with --full-refresh option cleaning all previously gathered profiling information'
)
@add_options(dbt_flags)
@anonymous_tracking
def run(start_date, end_date, interval, full_refresh, **kwargs):for_date = start_datetime_grain, num_str = interval.split(':')num = int(num_str)dbt_vars = parse_dbt_vars(kwargs.get('dbt_vars'))if time_grain == 'days':delta = timedelta(days=num)elif time_grain == 'hours':delta = timedelta(hours=num)else:raise Exception(f"Unsupported time grain {time_grain}")while for_date < end_date:start_str = for_date.strftime("%Y-%m-%d %H:%M")end_str = (for_date + delta).strftime("%Y-%m-%d %H:%M")print(f"Running for time interval: {start_str} - {end_str}", "RUN")re_data_dbt_vars = {'re_data:time_window_start': str(for_date),'re_data:time_window_end': str(for_date + delta),'re_data:anomaly_detection_window_start': str(for_date - timedelta(days=30))}dbt_vars.update(re_data_dbt_vars)run_list = ['dbt'] + ['run'] + ['--models'] + ['package:re_data'] + ['--vars'] + [json.dumps(dbt_vars)]if for_date == start_date and full_refresh:run_list.append('--full-refresh')add_dbt_flags(run_list, kwargs)completed_process = subprocess.run(run_list)completed_process.check_returncode()for_date += deltaprint(f"Running for date: {for_date.date()}",chalk.green("SUCCESS"),)@click.group(help=f"Generate overview page for your re_data project")
def overview():pass@click.group(help=f"Notification for various channels (email, slack, etc)")
def notify():pass@overview.command()
@click.option('--start-date',type=click.DateTime(formats=["%Y-%m-%d"]),default=str((date.today() - timedelta(days=7)).strftime("%Y-%m-%d")),help="Specify starting date to generate overview data, by default re_data will use 7 days ago for that value"
)
@click.option('--end-date',type=click.DateTime(formats=["%Y-%m-%d"]),default=str(date.today().strftime("%Y-%m-%d")),help="""Specify end date to compute monitoring data, by default re_data will use today for that.And compute stats for last full data for that"""
)
@click.option('--interval',type=click.STRING,default='days:1',help="""Specify interval format. e.g. `days:1` for a time interval of 1 dayor `hours:1` for a time interval of 1 hour"""
)
@click.option('--re-data-target-dir',type=click.STRING,help="""Which directory to store artefacts generated by re_dataDefaults to the 'target-path' used in dbt_project.yml"""
)
@add_options(dbt_flags)
@anonymous_tracking
def generate(start_date, end_date, interval, re_data_target_dir, **kwargs):start_date = str(start_date.date())end_date = str(end_date.date())dbt_target_path, re_data_target_path = get_target_paths(kwargs=kwargs, re_data_target_dir=re_data_target_dir)overview_path = os.path.join(re_data_target_path, 'overview.json')dbt_vars = parse_dbt_vars(kwargs.get('dbt_vars'))args = {'start_date': start_date,'end_date': end_date,'interval': interval,'overview_path': overview_path}command_list = ['dbt', 'run-operation', 'generate_overview', '--args', yaml.dump(args)]if dbt_vars: command_list.extend(['--vars', yaml.dump(dbt_vars)])add_dbt_flags(command_list, kwargs)completed_process = subprocess.run(command_list)completed_process.check_returncode()dbt_manifest_path = os.path.join(dbt_target_path, 'manifest.json')re_data_manifest = os.path.join(re_data_target_path, 'dbt_manifest.json')shutil.copyfile(dbt_manifest_path, re_data_manifest)target_file_path = os.path.join(re_data_target_path, 'index.html')shutil.copyfile(OVERVIEW_INDEX_FILE_PATH, target_file_path)print(f"Generating overview page", chalk.green("SUCCESS"))@click.option('--port',type=click.INT,default=8085,help="Specify the port number for the UI server. Default is 8085"
)
@click.option('--re-data-target-dir',type=click.STRING,help="""Which directory to store artefacts generated by re_dataDefaults to the 'target-path' used in dbt_project.yml"""
)
@overview.command()
@anonymous_tracking
@add_options([dbt_project_dir_option])
def serve(port, re_data_target_dir, **kwargs):_, serve_dir = get_target_paths(kwargs=kwargs, re_data_target_dir=re_data_target_dir)os.chdir(serve_dir)address = '0.0.0.0'httpd = TCPServer((address, port), SimpleHTTPRequestHandler)if True:try:webbrowser.open_new_tab(f'http://127.0.0.1:{port}/#/alerts')except webbrowser.Error:passtry:httpd.serve_forever()  # blocksfinally:httpd.shutdown()httpd.server_close()@notify.command()
@click.option('--start-date',type=click.DateTime(formats=["%Y-%m-%d"]),default=str((date.today() - timedelta(days=7)).strftime("%Y-%m-%d")),help="Specify starting date to generate alert data, by default re_data will use 7 days ago for that value"
)
@click.option('--end-date',type=click.DateTime(formats=["%Y-%m-%d"]),default=str(date.today().strftime("%Y-%m-%d")),help="""Specify end date used in generating alert data, by default re_data will use current date for that."""
)
@click.option('--webhook-url',type=click.STRING,required=True,help="Incoming webhook url to post messages from external sources into Slack."" e.g. https://hooks.slack.com/services/T0JKJQKQS/B0JKJQKQS/XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
)
@click.option('--subtitle',type=click.STRING,default='',help="Extra markdown text to be added to the alert message"
)
@click.option('--type',type=click.STRING,default='1',help="The default output is alter.json file, and other number will be output as customized eg.2"
)
@click.option('--re-data-target-dir',type=click.STRING,help="""Which directory to store artefacts generated by re_dataDefaults to the 'target-path' used in dbt_project.yml"""
)
@add_options(dbt_flags)
@anonymous_tracking
def dingding(start_date, end_date, webhook_url,type, subtitle, re_data_target_dir, **kwargs):start_date = str(start_date.date())end_date = str(end_date.date())_, re_data_target_path = get_target_paths(kwargs=kwargs, re_data_target_dir=re_data_target_dir)if type=='1':alerts_path = os.path.join(re_data_target_path, 'alerts.json')dbt_vars = parse_dbt_vars(kwargs.get('dbt_vars'))args = {'start_date': start_date,'end_date': end_date,'alerts_path': alerts_path,}command_list = ['dbt', 'run-operation', 'export_alerts', '--args', yaml.dump(args)]if dbt_vars: command_list.extend(['--vars', yaml.dump(dbt_vars)])add_dbt_flags(command_list, kwargs)completed_process = subprocess.run(command_list)completed_process.check_returncode()with open(alerts_path) as f:alerts = json.load(f)if len(alerts) > 0:tabulated_alerts = format_alerts_to_table(alerts[:20])message = f"""{len(alerts)} alerts found between {start_date} and {end_date}.{subtitle}_Showing most recent 20 alerts._<https://docs.getre.io/latest/docs/reference/cli/overview|Generate Observability UI> to show more details.```{tabulated_alerts}```"""else:message = f""" No alerts found between {start_date} and {end_date}.{subtitle}"""#if webhook_url.find("dingtalk") >= 0:dingTalk(webhook_url, message)else:slack_notify(webhook_url, message)print(f"Notification sent", chalk.green("SUCCESS"))else:message=subtitleif webhook_url.find("dingtalk") >= 0:dingTalk(webhook_url, message)else:slack_notify(webhook_url, message)print(f"Notification sent", chalk.green("SUCCESS"))@notify.command()
@click.option('--start-date',type=click.DateTime(formats=["%Y-%m-%d"]),default=str((date.today() - timedelta(days=7)).strftime("%Y-%m-%d")),help="Specify starting date to generate alert data, by default re_data will use 7 days ago for that value"
)
@click.option('--end-date',type=click.DateTime(formats=["%Y-%m-%d"]),default=str(date.today().strftime("%Y-%m-%d")),help="""Specify end date used in generating alert data, by default re_data will use current date for that."""
)
@click.option('--user_id',type=click.STRING,default=''
)
@click.option('--subtitle',type=click.STRING,default='',help="Extra markdown text to be added to the alert message"
)
@click.option('--re-data-target-dir',type=click.STRING,help="""Which directory to store artefacts generated by re_dataDefaults to the 'target-path' used in dbt_project.yml"""
)
@click.option('--type',type=click.STRING,default='1',help="The default output is alter.json file, and other number will be output as customized eg.2"
)
@add_options(dbt_flags)
@anonymous_tracking
def push(start_date, end_date, user_id , type, subtitle, re_data_target_dir, **kwargs):start_date = str(start_date.date())end_date = str(end_date.date())_, re_data_target_path = get_target_paths(kwargs=kwargs, re_data_target_dir=re_data_target_dir)project_root = os.getcwd() if not kwargs.get('project_dir') else os.path.abspath(kwargs['project_dir'])dbt_push_monitor_path = os.path.abspath(project_root)yaml_file = os.path.join(dbt_push_monitor_path, "push_monitor.yml")f = open(yaml_file, "r", encoding="utf-8")dist_str = yaml.safe_load(f.read())target = dist_str['target']app_key = dist_str[target]['app_key']app_secret = dist_str[target]['app_secret']app_id = dist_str[target]['app_id']HOST = dist_str[target]['host']portal = dist_str[target]['portal']token_url = dist_str[target]['token_url']if len(user_id) >= 3:user_idelse:user_id = dist_str[target]['user_id']if type=='1':alerts_path = os.path.join(re_data_target_path, 'alerts.json')dbt_vars = parse_dbt_vars(kwargs.get('dbt_vars'))args = {'start_date': start_date,'end_date': end_date,'alerts_path': alerts_path,}command_list = ['dbt', 'run-operation', 'export_alerts', '--args', yaml.dump(args)]if dbt_vars: command_list.extend(['--vars', yaml.dump(dbt_vars)])add_dbt_flags(command_list, kwargs)completed_process = subprocess.run(command_list)completed_process.check_returncode()with open(alerts_path) as f:alerts = json.load(f)if len(alerts) > 0:tabulated_alerts = format_alerts_to_table(alerts[:20])message = f"""{len(alerts)} alerts found between {start_date} and {end_date}.{subtitle}_Showing most recent 20 alerts._<https://docs.getre.io/latest/docs/reference/cli/overview|Generate Observability UI> to show more details.```{tabulated_alerts}```"""else:message = f""" No alerts found between {start_date} and {end_date}.{subtitle}"""alter_run(app_key,app_secret,app_id,HOST,portal,token_url, user_id, 'rtf', 're_data alerts', message)print(f"Notification sent", chalk.green("SUCCESS"))else:message=subtitlealter_run(app_key,app_secret,app_id,HOST,portal,token_url, user_id, 'rtf', 're_data alerts', message)print(f"Notification sent", chalk.green("SUCCESS"))main.add_command(overview)
main.add_command(notify)def slack2(start_date, end_date, webhook_url, subtitle, re_data_target_dir, **kwargs):start_date = str(start_date.date())end_date = str(end_date.date())_, re_data_target_path = get_target_paths(kwargs=kwargs, re_data_target_dir=re_data_target_dir)alerts_path = os.path.join(re_data_target_path, 'alerts.json')dbt_vars = parse_dbt_vars(kwargs.get('dbt_vars'))args = {'start_date': start_date,'end_date': end_date,'alerts_path': alerts_path,}command_list = ['dbt', 'run-operation', 'export_alerts', '--args', yaml.dump(args)]if dbt_vars: command_list.extend(['--vars', yaml.dump(dbt_vars)])add_dbt_flags(command_list, kwargs)completed_process = subprocess.run(command_list)completed_process.check_returncode()with open(alerts_path) as f:alerts = json.load(f)if len(alerts) > 0:tabulated_alerts = format_alerts_to_table(alerts[:20])message = f"""
:red_circle: {len(alerts)} alerts found between {start_date} and {end_date}.
{subtitle}_Showing most recent 20 alerts._
<https://docs.getre.io/latest/docs/reference/cli/overview|Generate Observability UI> to show more details.```{tabulated_alerts}```
"""else:message = f""":white_check_mark: No alerts found between {start_date} and {end_date}.
{subtitle}"""#if webhook_url.find("dingtalk") >= 0:dingTalk(webhook_url, message)elif webhook_url.find("slack.com") >= 0:slack_notify(webhook_url, message)else:alter_run(webhook_url, 'rtf', 're_data alerts', message)print(f"Notification sent", chalk.green("SUCCESS"))

3、编译打包

python setup.py sdist bdist_wheel || true

三、配置

1、通过pip将编译好的包安装
(1)原生态安装

pip install re_data

(2) 二次编译安装

pip uninstall re_data-0.7.1-py3-none-any.whl
pip install re_data-0.7.1-py3-none-any.whl

2、添加配置文件到DBT项目的 packages.yml

packages:- package: re-data/re_dataversion: [">=0.7.0", "<0.8.0"]  # 方法二:git- git: "https://git-codecommit.ap-southeast-1.amazonaws.com/v1/repos/re_data_packages"warn-unpinned: false# 指定下载二级目录输出每个文件subdirectory: re_data

3、执行依赖安装命令

dbt deps

4、添加hook到dbt_project.yml

on-run-end:- "{{ re_data.save_test_history(results) }}"

可以修改时区设置

dbt_date:time_zone: Asia/Shanghai

5、执行

dbt test --select package:dbt项目名

re_data初始化,会创建相关表,已存在会删除重建

dbt run --models package:re_data --vars \'{"re_data:time_window_start": "2022-03-21 00:00:00","re_data:time_window_end": "2022-03-22 00:00:00"}'

数据回填

re_data run --start-date 2022-03-21 --end-date 2022-04-01

6、生成可视化json文件
dbt run 会在{default_schema}_re的数据库架构中创建表
文档访问地址:http://IP:8085/
#普通生成

re_data overview generate
re_data overview serve

#从数据仓库生成特定时间的监控文档

re_data overview generate --start-date 2022-03-21 --interval days:1
re_data overview serve

7、官方信息收集
If you would like to opt out of anonymous usage collection set RE_DATA_SEND_ANONYMOUS_USAGE_STATS=0 in your environment. You can do it before or togather with re_data command call like this:

RE_DATA_SEND_ANONYMOUS_USAGE_STATS=1 re_data overview generate

四、Alter

1、使用参数
–type 可选 默认输出alter.json,数值不为1则传自定义参数
–subtitle 可选
钉钉报警

re_data notify dingding \
--start-date 2022-03-01 \
--end-date 2022-03-29 \
--webhook-url https://oapi.dingtalk.com/robot/send?access_token=dd65dc3990c24db56e844ccdb5a78f4967f2c54532f6197723e47b9be1eb6a22 \
--type 2 \
--subtitle="alter test"

五、使用参数详细解析

1、简单参数
name:项目文件名(可以和下个参数profile名字不一样)
profile:默认的profile.yml文件中的连接名称
model-paths:sql文件
analysis-paths:
test-paths:
seed-paths:存放csv等文件,执行dbt seed 可以将数据加载到数据库
macro-paths:存放operation命令文件,数据自定义sql方法,执行 dbt run-operation 方法名 实现数据更改
snapshot-paths:
target-path:dbt run 等执行命令产生的文件
clean-targets:[“target”, “dbt_modules”, “dbt_packages”] dbt clean 命令指定清理文件
2、on-run-end自定义配置

on-run-end:- "{{ re_data.save_test_history(results) }}"  用于re_data监控

1、修改/dbt_packages/re_data/macros/run_end/save_results_history.sql


{% macro save_test_history(results) %}-- depends_on: {{ ref('re_data_test_history') }}{% if execute and results %}{% set to_insert = [] %}{% set run_started_at_str = run_started_at.strftime('%Y-%m-%d %H:%M:%S') %}{% for el in results %}{% if el.node.resource_type.name == 'Test' %}{% set any_refs = modules.re.findall("ref\(\'(?P<name>.*)\'\)", el.node.test_metadata.kwargs['model']) %}{% set any_source = modules.re.findall("source\(\'(?P<one>.*)\'\,\s+\'(?P<two>.*)\'\)", el.node.test_metadata.kwargs['model']) %}{% if any_refs %}{% set name = any_refs[0] %}{% set node_name = re_data.priv_full_name_from_depends(el.node, name) %}{% set schema = graph.nodes.get(node_name)['schema'] %}{% set database = graph.nodes.get(node_name)['database'] %}{% set name = database + '.' + schema + '.' + name %} {% elif any_source %}{% set package_name = any_source[0][0] %}{% set name = any_source[0][1] %}{% set node_name = re_data.priv_full_name_from_depends(el.node, name) %}{% set schema = graph.sources.get(node_name)['schema'] %}{% set database = graph.sources.get(node_name)['database'] %}{% set name = database + '.' + schema + '.' + name %}{% else %}{% set name = none %}{% endif %}{% do to_insert.append({ 'table_name': name, 'column_name': el.node.column_name or none , 'test_name': el.node.name, 'message': el.message, 'execution_time': el.execution_time, 'original_file_path': el.node.original_file_path, 'status': el.status.name, 'run_at': run_started_at_str}) %}{% endif %}{% endfor %}{% do re_data.insert_list_to_table(ref('re_data_test_history'),to_insert,['table_name', 'column_name', 'test_name','message','execution_time', 'original_file_path', 'status', 'run_at']) %}{% endif %}{{ return ('') }}{% endmacro %}{% macro priv_full_name_from_depends(node, name) %}{% for full_name in node.depends_on.nodes %}{% set node_name = full_name.split('.')[-1] %}{% if node_name == name %}{{ return(full_name) }}{% endif %}{% endfor %}{{ return(none) }}{% endmacro %}

2、修改/dbt_packages/re_data/macros/utils/mock/empty_tables.sql
修改empty_test_history方法

{% macro empty_test_history() %}{{ re_data.dummy_to_select() }}select cast (some_time as {{ string_type() }} ) as table_name,cast (some_string as {{ string_type() }} ) as column_name,cast (some_string as {{ string_type() }} ) as test_name,cast (some_string as {{ string_type() }} ) as message,cast (some_num as {{ numeric_type() }} ) as execution_time,cast (some_string as {{ string_type() }} ) as original_file_path,cast (some_string as {{ string_type() }} ) as status,cast (some_time as {{ timestamp_type() }} ) as run_atfrom dummy_tablewhere some_num = 2
{% endmacro %}

3、添加hook配置

on-run-end:- "{{ re_data.save_run_history(results) }}"

dbt deps之后修改re_data包文件
注意:
re_data默认不收集run结果数据,需要二次开发
1、在/dbt_packages/re_data/macros/run_end目录下新建save_run_history.sql
控制台输出日志命令 :{% do log(results, info=True) %}

{% macro save_run_history(results) %}-- depends_on: {{ ref('re_data_run_history') }}{% if execute and results %}{% set to_insert = [] %}{% set run_started_at_str = run_started_at.strftime('%Y-%m-%d %H:%M:%S') %}{% for el in results %}{% if el.node.resource_type.name == 'Model' %}{% set schema_name = el.node.schema %}{% set database_name = el.node.database %}{% set table_name = el.node.name %}{% set unique_id = el.node.unique_id %}{% set path = el.node.path %}{% set original_file_path = el.node.original_file_path %}{% set compiled_sql = el.node.compiled_sql %}{% set message = el.message %}{% set failures = el.failures %}{% set execution_time = el.execution_time %}{% set status = el.status %}{% do to_insert.append({ 'schema_name': schema_name,'database_name':database_name ,'table_name': table_name, 'unique_id': unique_id, 'status': status, 'run_at': run_started_at_str,'model_path':path,'original_file_path':original_file_path,'compiled_sql':'','message':message,'failures':failures,'execution_time':execution_time}) %}   {% endif %}{% endfor %}{% set run_end_at_str = run_started_at.strftime('%Y-%m-%d %H:%M:%S') %}{% do re_data.insert_list_to_table(ref('re_data_run_history') ,to_insert,['schema_name', 'database_name', 'table_name', 'status', 'unique_id','original_file_path','model_path','compiled_sql','message','execution_time','failures','run_at']) %}{% endif %}{{ return ('') }}{% endmacro %}

2、在/dbt_packages/re_data/macros/utils/mock目录下新建exter_empty_tables.sql

{# /* Quite hacky macros to create empty tables in case nothing is yet to be monitoed */ #}{% macro empty_run_history() %}{{ re_data.dummy_to_select() }}select cast (some_string as {{ string_type() }} ) as schema_name,cast (some_string as {{ string_type() }} ) as database_name,cast (some_string as {{ string_type() }} ) as table_name,cast (some_string as {{ string_type() }} ) as status,cast (some_string as {{ string_type() }} ) as unique_id,cast (some_string as {{ string_type() }} ) as original_file_path,cast (some_string as {{ string_type() }} ) as model_path,cast (some_string as {{ string_type() }} ) as compiled_sql,cast (some_string as {{ string_type() }} ) as message,cast (some_string as {{ numeric_type() }} ) as execution_time,cast (some_string as {{ string_type() }} ) as failures,cast (some_time as {{ timestamp_type() }} ) as run_atfrom dummy_tablewhere some_num = 2
{% endmacro %}

3、在/dbt_packages/re_data/models/logs目录下新建re_data_run_history.sql

{{config(materialized='incremental',on_schema_change='sync_all_columns',)
}}{{ re_data.empty_run_history() }}

4、修改/dbt_packages/re_data/models/anomalies/re_data_alerts.sql文件为以下内容
修改监控报警表输出的数据内容

select'anomaly' as type,{{ re_data.clean_blacklist('table_name', ['"', '`'], '') }} as model,message,last_value_text as value,time_window_end
from{{ ref('re_data_anomalies') }}
union allselect'schema_change' as type,{{ re_data.clean_blacklist('table_name', ['"', '`'], '') }} as model,{{ generate_schema_change_message('operation', 'column_name', 'prev_column_name', 'prev_data_type', 'data_type', 'detected_time') }} as message,'' as value,detected_time as time_window_end
from {{ ref('re_data_schema_changes') }}union allselect 'run_error' AS "type",model_path AS "model",message AS message,cast(execution_time as varchar) AS value,run_at as time_window_endfrom {{ ref('re_data_run_history') }}
where status='error'
union allselect 'test_Fail' AS "type",original_file_path AS "model",column_name || ':' || test_name || '--' || message AS message,cast(execution_time as varchar) AS value,run_at as time_window_end
from {{ ref('re_data_test_history') }}
where status='Fail'

3、vars参数

  # 异常检测re_data:anomaly_detector:name: modified_z_scorethreshold: 3  # 阀值# 监控空间re_data:schemas:- ods_glue_pms- public # this schemas will be monitored# 监控时间窗口全局配置re_data:time_window_start: '{{ (run_started_at - modules.datetime.timedelta(1)).strftime("%Y-%m-%d 00:00:00") }}'re_data:time_window_end: '{{ run_started_at.strftime("%Y-%m-%d 00:00:00") }}'# 异常检测回顾天数re_data:anomaly_detection_look_back_days: 30# 全局基础监控信息re_data:metrics_base:table:- row_count- freshnesscolumn:numeric:- min- max- avg- stddev- variance- nulls_count- nulls_percenttext:- min_length- max_length- avg_length- nulls_count- missing_count- nulls_percent- missing_percent

4、models参数
报警信息存储位置var参数

  re_data:+schema: reinternal:+schema: re_internaldbt_dags:# Config indicated by + and applies to all files under models/example/ods:schema: odsdwd:schema: dwddws:schema: dwsads:schema: ads

5macros目录使用样例
1、修改列类型
1.sql

{% macro schema_change_add_orders_column() %}{% set alter_table %}alter table {{ ref('orders')}} add column completed boolean{% endset %}{% do run_query(alter_table) %}
{% endmacro %}{% macro schema_change_drop_orders_column() %}{% set alter_table %}alter table {{ ref('orders')}} drop column completed{% endset %}{% do run_query(alter_table) %}
{% endmacro %}
# this operation adds a column to the orders table
dbt run-operation schema_change_add_orders_column
# run re_data models to detect the column added schema change
re_data run
# this operation drops the column added earlier
dbt run-operation schema_change_drop_orders_column
# re-run re_data models to detect the removed column
re_data run

2、固定数据库空间schema名

{% macro generate_schema_name(custom_schema_name, node) -%}{%- set default_schema = target.schema -%}{%- if custom_schema_name is none -%}{{ default_schema }}{%- else -%}{{ custom_schema_name | trim }}{%- endif -%}{%- endmacro %}

效果展示:

大数据——DBT:dbt的数据质量监控插件re_data二次开发及编译(生产环境篇)相关推荐

  1. 节省1个MCU?大彩串口屏提供PWM/AD/IO/UART扩展口二次开发

    扩展口操作演示视频 大彩君最近一直收到客户反馈: 贵司的明星产品医用级M070能否增加U盘?我们需要大量数据导出和升级. 我需要屏幕支持2路IO输出,接报警器和报警灯. 是否支持PWM输出,直接屏幕控 ...

  2. python海康威视网络摄像头_OpenCV:非海康威视大华宇视(其他)网络摄像头IPC 二次开发(Python)获取视频图像...

    # -*- coding: UTF-8 -*- # ! /usr/bin/python import cv2 # 参数 # -------------------------------------- ...

  3. 浅谈网易大数据平台下的数据质量

    大数据平台的核心理念是构建于业务之上,用数据为业务创造价值.大数据平台的搭建之初,优先满足业务的使用需求,数据质量往往是被忽视的一环.但随着业务的逐渐稳定,数据质量越来越被人们所重视. 千里之堤,溃于 ...

  4. 加米谷大数据行业报告:为何数据科学团队需要通才而非专才

    加米谷大数据的工作团队在实践中发现在处理大数据工作时和工业化以来的分工模式还是有区别的 通过功能专业化,每位工人都变得非常娴熟于其所专一的任务,从而提高了效率.人均产出增加了,工厂生产钢针的效率也变得 ...

  5. 15、RDA8910(4GCAT1)CSDK二次开发:通过OneWire驱动库获取DS18B20/DHT11的数据

    目录 点击这里查看所有博文   本系列博客所述资料均来自合宙官方,并不是本人原创(只有博客是自己写的),csdk只是得到了口头的允许公开授权.出于热心,本人将自己的所学笔记整理并推出相对应的使用教程, ...

  6. 开源大数据数据质量监控调研

    开源工具选择不多,目前了解到的就以下3种: Qualitis griffin DolphinScheduler(最近有人上传了分支,支持数据质量监控功能) 1 Qualitis 腾讯微众银行开源,主要 ...

  7. 爱奇艺数据质量监控的探索和实践

    01 问题和目标:为什么要进行数据质量监控? 数据质量监控其实跟当前疫情的防控工作有些类似,核酸检测能尽早去发现病毒,溯源则会更了解病毒会在哪些场景,或者对哪些人有比较大的影响,方便进行跟踪,这和数据 ...

  8. 数据质量监控Griffin——使用

    一.环境 生产环境 数据质量监控griffin: 地址:http://XXXXXXXXX:4200/#/health 账号:admin 密码:123456 二.Griffin是干什么的? 官方介绍 大 ...

  9. 【数据仓库学习】数据质量监控

    0x00 前言 往往那些不起眼的功能,最能毁掉你的工作成果. 本篇分享一些和数据质量监控相关的内容.数据质量监控是一个在快速发展的业务中最容易被牺牲和忽略的功能,但是它确实至关重要的. 文章结构 数据 ...

  10. 【金猿案例展】中国中车——基于大数据的车辆运维预警监控系统项目

    天津卓朗案例 本案例由天津卓朗投递并参与"数据猿年度金猿策划活动--2020大数据产业创新服务企业榜单及奖项"评选. 大数据产业创新服务媒体 --聚焦数据 · 改变商业 中车唐山机 ...

最新文章

  1. Intellij idea workflow 工作流插件安装
  2. 【原创】 关于全局静态变量初始化
  3. Python遍历列表里面序号和值的方法
  4. DotText源码学习——从配置文件Web.config入手(一)
  5. php 数据导出到excel文件,PHP - 如何将数据动态导出到Excel文件(.xls)?
  6. C#用链式方法表达循环嵌套
  7. 解决Teamviewer屏保锁屏、黑屏无法进入问题
  8. bim技术应用有哪些
  9. 当输入条件不符合规范c语言,【图片】【高二会考】选择判断答案大全【郑州12中吧】_百度贴吧...
  10. 微信扫码跳转浏览器下载app
  11. ffmpeg视频格式转换
  12. 第一篇 Qt能做什么?
  13. 虎牙不想做一家游戏直播公司
  14. 虎牙年报披露2019年扭亏为盈 腾讯操持下与斗鱼合并倒计时?
  15. Android 9.0系统软件进入视频界面发生闪退
  16. 疲劳检测——眨眼检测
  17. 考研-英语经验贴2.0
  18. 【数值分析×机器学习】使用CNN进行雅可比预条件子的生成(烦)
  19. Android 8.0(Android O) AccountManager 行为变更
  20. [强制类型转换]C语言中的强制类型转换相关

热门文章

  1. JMeter 安装教程
  2. 资源吧网站模板下载织梦cms精仿资源吧网站模板
  3. 新存储、新格局、新飞跃,浪潮存储应时而来
  4. 王者荣耀英雄分析--孙悟空
  5. CPU,缓存,内存,外存概念挖掘
  6. Ubuntu16.04 安装 indicator-sysmonitor
  7. python为什么叫胶水语言 举例_python为什么叫胶水语言 python为什么是系统脚本
  8. 只有VOB 文件,怎样使用IfoEdit生成烧制DVD所需的IFO、BUP文件!
  9. 贵阳依托大数据勇闯改革深水区
  10. 特拉华大学计算机科学,特拉华大学计算机科学理学硕士研究生申请要求及申请材料要求清单...