执行 datax 作业,创建执行文件,在 crontab 中每天1点(下面有关系)执行:

其中 job_start 及 job_finish 这两行记录是自己添加的,为了方便识别出哪张表。 #!/bin/bash

source /etc/profile

user1="root"

pass1="pwd"

user2="root"

pass2="pwd"

job_path="/opt/datax/job/"

jobfile=(

job_table_a.json

job_table_b.json

)

for filename in ${jobfile[@]}

do

echo "job_start: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"

python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename}

echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"

done

# 0 1 * * * /opt/datax/job/dc_to_ods_incr.sh >> /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1

# egrep '任务|速度|总数|job_start|job_finish' /opt/datax/job/log/

datax 执行日志: job_start: 2018-08-08 01:13:28 job_table_a.json

任务启动时刻 : 2018-08-08 01:13:28

任务结束时刻 : 2018-08-08 01:14:49

任务总计耗时 : 81s

任务平均流量 : 192.82KB/s

记录写入速度 : 1998rec/s

读出记录总数 : 159916

读写失败总数 : 0

job_finish: 2018-08-08 01:14:49 job_table_a.json

job_start: 2018-08-08 01:14:49 job_table_b.json

任务启动时刻 : 2018-08-08 01:14:50

任务结束时刻 : 2018-08-08 01:15:01

任务总计耗时 : 11s

任务平均流量 : 0B/s

记录写入速度 : 0rec/s

读出记录总数 : 0

读写失败总数 : 0

job_finish: 2018-08-08 01:15:01 job_table_b.json

接下来读取这些信息保存到数据库,在数据库中创建表: CREATE TABLE `datax_job_result` (

`log_file` varchar(200) DEFAULT NULL,

`job_file` varchar(200) DEFAULT NULL,

`start_time` datetime DEFAULT NULL,

`end_time` datetime DEFAULT NULL,

`seconds` int(11) DEFAULT NULL,

`traffic` varchar(50) DEFAULT NULL,

`write_speed` varchar(50) DEFAULT NULL,

`read_record` int(11) DEFAULT NULL,

`failed_record` int(11) DEFAULT NULL,

`job_start` varchar(200) DEFAULT NULL,

`job_finish` varchar(200) DEFAULT NULL,

`insert_time` datetime DEFAULT CURRENT_TIMESTAMP

) ENGINE=InnoDB DEFAULT CHARSET=utf8;

定时执行以下文件,因为 datax 作业 1 点执行,为了获取一天内最新生产的日志,脚本中取 82800内生产的日志文件,及23 小时内生产的那个最新日志。所以一天内任何时间执行都可以。此文件也是定时每天执行(判断 datax 作业完成后执行) #!/usr/bin/python

# -*- coding: UTF-8 -*-

# 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1

import re

import os

import sqlalchemy

import pandas as pd

import datetime as dt

def save_to_db(df):

engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8")

df.to_sql("datax_job_result", engine, index=False, if_exists='append')

def get_the_latest_file(path):

t0 = dt.datetime.utcfromtimestamp(0)

d2 = (dt.datetime.now() - t0).total_seconds()

d1 = d2 - 82800

for (dirpath, dirnames, filenames) in os.walk(path):

for filename in sorted(filenames, reverse = True):

if filename.endswith(".log"):

f = os.path.join(dirpath,filename)

ctime = os.stat(f)[-1]

if ctime>=d1 and ctime <=d2:

return f

def get_job_result_from_logfile(path):

result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish'])

log_file = get_the_latest_file(path)

index = 0

content = open(log_file, "r")

for line in content:

result.loc[index, 'log_file'] = log_file

if re.compile(r'job_start').match(line):

result.loc[index, 'job_file'] = line.split(' ')[4].strip()

result.loc[index, 'job_start'] = line,

elif re.compile(r'任务启动时刻').match(line):

result.loc[index, 'start_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()

elif re.compile(r'任务结束时刻').match(line):

result.loc[index, 'end_time'] = line.split('刻')[1].strip().split(' ')[1].strip() + ' ' + line.split('刻')[1].strip().split(' ')[2].strip()

elif re.compile(r'任务总计耗时').match(line):

result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','')

elif re.compile(r'任务平均流量').match(line):

result.loc[index, 'traffic'] = line.split(':')[1].strip()

elif re.compile(r'记录写入速度').match(line):

result.loc[index, 'write_speed'] = line.split(':')[1].strip()

elif re.compile(r'读出记录总数').match(line):

result.loc[index, 'read_record'] = line.split(':')[1].strip()

elif re.compile(r'读写失败总数').match(line):

result.loc[index, 'failed_record'] = line.split(':')[1].strip()

elif re.compile(r'job_finish').match(line):

result.loc[index, 'job_finish'] = line,

index = index + 1

else:

pass

save_to_db(result)

get_job_result_from_logfile("/opt/datax/job/log")

以上这篇Python 获取 datax 执行结果保存到数据库的方法就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持聚米学院。

python读取数据库导出文件_Python 获取 datax 执行结果保存到数据库的方法相关推荐

  1. python读取mac地址_python - 获取MAC地址

    python - 获取MAC地址 我需要一种在运行时确定计算机MAC地址的跨平台方法. 对于Windows,可以使用'wmi'模块,我可以找到Linux下唯一的方法是运行ifconfig并在其输出中运 ...

  2. python切换ip群发邮件_python获取外网IP并发邮件的实现方法

    第一步:通过ip138来爬取外网ip 第二步:通过python的smtplib模块和email来发送邮件,具体用法去网上搜索, 下面是代码示例: #!/usr/bin/env python #codi ...

  3. python读取git日志_Python获取gitlab提交历史!

    工作中的场景,记录下来分享给大家. 需求 公司私有部署了gitlab保存公司代码,希望在发布系统中可以展示项目在gitlab的提交历史,供发布人员选择提交commit记录并构建对应的docker镜像. ...

  4. python读取图片分辨率_python获取网页中所有图片并筛选指定分辨率的方法

    压测时,图片太少,想着下载网页中的图片,然后过滤指定分辨率,但网页中指定分辨率的图片太少了(见下) 后使用格式工厂转换图片 import urllib.request # 导入urllib模块 imp ...

  5. python 读取邮件内容_python获取邮件内容(邮件内容为html)

    用python获取邮件内容比较简单,直接用现成的imap和pop3包即可,但是有时候邮件的内容不是plainText而是html甚至是一个url链接,原本的操作流程是点击url获取内容(比如csv等等 ...

  6. python读取mssql文件_python 读取mssql数据库中文的搜索结果-阿里云开发者社区

    sphinx 配置文件全解析 sphinx的配置文件是在配置的时候最容易出错的了: 我们先要明白几个概念: source:数据源,数据是从什么地方来的. index:索引,当有数据源之后,从数据源处构 ...

  7. python读取手机文件_python 读取 网络 文件

    Python之pandas数据加载.存储 Python之pandas数据加载.存储 0. 输入与输出大致可分为三类: 0.1 读取文本文件和其他更好效的磁盘存储格式 2.2 使用数据库中的数据 0.3 ...

  8. python读取pdf表格_Python使用Tabula提取PDF表格数据

    今天遇到一个批量读取pdf文件中表格数据的需求,样式大体是以下这样: python读取PDF无非就是三种方式(我所了解的),pdfminer.pdf2htmlEX 和 Tabula.综合考虑后,选择了 ...

  9. python 读取 word 表格_python读取word表格

    python调用com,如何完成word表格操作 word中doc这个格式的文件是微软特有格式,微软没有向外公开任何的api接口文档,只能通过微软提供的OLE组件来提其COM接口,只要你的机器上安装了 ...

最新文章

  1. 在pcduino上实现图像识别的程序
  2. 完成css的切图 图片任意,css切图是什么意思
  3. sscanf 连续相同编辑符_【第1995期】钉钉文档编辑器的前世今生
  4. 【MySQL】数据库基本操作、表的操作
  5. android动态调试防止,Android应用防止so注入防止动态调试参考代码
  6. Android 10正式版发布,支持5G和折叠屏设备
  7. 001:InVEST学习——产水/水源涵养
  8. CSS3之颜色渐变效果
  9. 虚拟机xfs文件系统因根分区爆满损坏修复
  10. 谷歌浏览器(chrome)无法正常打开网页的解决办法
  11. 基于STM32的高精度温度测控系统-原理图设计
  12. 初识pytest框架及其应用原理
  13. 面试必问的8个CSS响应式单位,你知道几个?
  14. 用层次分析法建模解决交通问题论文
  15. 学编程语言C/C++、Java、Python的入门教程都在这里
  16. css实现渐变色字体
  17. 拿到JS异步函数返回值的几种方式
  18. springboot集合MySQL删除_SpringBoot集成Spring JdbcTemplate并完成增删改查操作
  19. 全球及中国地理信息产业应用建设发展及产值规模预测报告2021-2027
  20. 使用HTML 5/CSS3五步快速制作便签贴特效

热门文章

  1. 给gridview动态生成radiobutton添加OnCheckedChanged监听函数
  2. python处理xml中非法字符的一种思路
  3. NUnit2.0详细使用方法
  4. grub error:unknown filesystem的解决方案
  5. PostgreSQL切换用户,提示对等认证失败的解决方案
  6. PHP使用文件锁解决高并发问题示例
  7. js浮点数精度丢失问题及如何解决js中浮点数计算不精准
  8. Java 常见异常种类
  9. 如何通过Geth、Node.js和UNIX/PHP访问以太坊节点
  10. CentOS6.5下Redis安装与配置