kettle实战-巧用合并记录组件轻松完成业务对账

应用场景:

对于互联网公司而言,无论是支付还是其他产品的交易操作都难免会涉及到对账的环节,这是保证公司与支付平台或者公司与渠道之间交易对等的关键。kettle作为开源的ETL工具为我们提供了强大的组件支持,在实际的开发中数据处理应用到的地方也是非常的广泛,之前在实际的开发中碰巧遇到了这么一个对账的场景,便借此机会运用kettle的合并记录组件完成了这一功能的开发,相当的方便快捷,极大地降低了开发的工作量。

技术

kettle:pdi-ce-8.3.0.0-371

组件简介-合并记录

1.功能介绍
该步骤用于将两个不同来源的数据合并,这两个来源的数据分别为旧数据和新数据,该步骤将旧数据和新数据按照指定的关键字匹配、比较、合并。
2.设置参数
旧数据来源:旧数据来源的步骤
新数据来源:新数据来源的步骤
关键字段:用于定位两个数据源中的同一条记录
比较字段:对于两个数据源中的同一条记录中,指定需要比较的字段
3.运行结果分析
合并后的数据将包括旧数据来源和新数据来源里的所有数据,对于变化的数据,使用新数据代替旧数据,同时在结果里用一个标示字段,来指定新旧数据的比较结果:

第一列 第二列
identical 旧数据和新数据一样
changed 数据发生了变化
new 新数据中有而旧数据中没有的记录
deleted 旧数据中有而新数据中没有的记录

4.关键点
旧数据和新数据需要事先按照关键字段排序
旧数据和新数据要有相同的字段名称

业务场景

每天下午五点从SFTP上获取渠道交易数据(交易时间:昨日下午三点至今日下午三点,交易类型:转入、转出、退保等),文件名为 交易类型_当前日期.zip(例如:5_20201124.zip),加压缩后包含:交易类型_当前日期.csv(例如:5_20201124.csv)和交易类型_当前日期.csv.md5(例如:5_20201124.csv.md5)文件,解析时对csv文件+key进行MD5获取签名进行核对,然后进行业务对账。

简要流程

SFTP下载渠道侧对账文件——合并记录——结果汇总(落库)——异常数据报表(告警)

整体流程

流程拆解

第一步:设置变量
设置文件地址,SFTP配置信息,当然常用的配置也可选择配到kettle.properties文件中

第二步:设置系统时间变量
该步骤主要将系统时间格式化为自己需要使用的格式,例如:文件目录(2020/11/24,或者业务开始时间:2020-11-23 15:00:00,供sql查询使用)

第三步:下载SFTP上的文件到本地

第四步:解压缩文件

第五步:MD5验签(验签代码见下方)


第六步:留存渠道交易数据

第七步:核对数据(关键步骤)
该步骤主要利用合并记录组件来进行数据的核对,需要注意:sql查询时字段的顺序、命名、个数保持、格式保持一致,并且按照相同的排序规则进行排序,然后设置关键字段进行对比将对比结果输出,将验证结果进行处理入库



第七步:汇总对账结果
该步骤可以将当前日期对账的结果进行汇总形成报表,或者是直接进行告警(比如企业微信机器人告警等)通知运维进行相关问题的核查及时处理。示例中打印日志的地方往往会被设置为异常告警便于及时发现。

运行结果分析

模拟数据

渠道端交易数据:

我方交易数据:

运行日志:

2020/11/23 23:42:09 - verify - 开始项[检查文件存放目录]
2020/11/23 23:42:09 - verify - 开始项[SFTP 下载]
2020/11/23 23:42:09 - verify - 开始项[解压缩文件]
2020/11/23 23:42:09 - verify - 开始项[验签]
2020/11/23 23:42:09 - 验签 - Using run configuration [Pentaho local]
2020/11/23 23:42:09 - 验签 - Using legacy execution engine
2020/11/23 23:42:09 - Md5 - 为了转换解除补丁开始  [Md5]
2020/11/23 23:42:09 - 获取全局变量.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:09 - Java 代码.0 - >>>>>MD5:3a9715f0fa2081119665d82ccf8c72b7
2020/11/23 23:42:10 - Java 代码.0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:10 - 结束文件输出Y/N .0 - 完成处理 (I=0, O=0, R=1, W=1, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[判断验签是否正确]
2020/11/23 23:42:10 - verify - 开始项[同步交易数据(入库)]
2020/11/23 23:42:10 - 同步交易数据(入库) - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 同步交易数据(入库) - Using legacy execution engine
2020/11/23 23:42:10 - synchRefundData - 为了转换解除补丁开始  [synchRefundData]
2020/11/23 23:42:10 - 读取csv文件.0 - Header row skipped in file 'D:/kettle/verify/download/2020/11/23/5_20201123.csv'
2020/11/23 23:42:10 - 读取csv文件.0 - 完成处理 (I=3, O=0, R=0, W=2, U=0, E=0)
2020/11/23 23:42:10 - 获取变量.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 插入 / 更新.0 - 完成处理 (I=2, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[核对数据一致性]
2020/11/23 23:42:10 - 核对数据一致性 - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 核对数据一致性 - Using legacy execution engine
2020/11/23 23:42:10 - verify - 为了转换解除补丁开始  [verify]
2020/11/23 23:42:10 - 京东退保数据.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 公司交易数据.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 处理单位-渠道.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 排序-渠道侧数据.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 处理单位-公司.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 排序-公司数据.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 京东退保数据.0 - 完成处理 (I=2, O=0, R=0, W=2, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 1------------------------------
2020/11/23 23:42:10 - 写日志.0 - 不一致记录
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - policyNo = 86000020201600385734
2020/11/23 23:42:10 - 写日志.0 - refundNo = 2005112940872742984
2020/11/23 23:42:10 - 写日志.0 - amount = 101399.0
2020/11/23 23:42:10 - 写日志.0 - refundTime = 2020/11/23 09:21:12.000000000
2020/11/23 23:42:10 - 写日志.0 - flagfield = changed
2020/11/23 23:42:10 - 写日志.0 - flag = 1
2020/11/23 23:42:10 - 写日志.0 - channelCode = dd
2020/11/23 23:42:10 - 写日志.0 - tradeType = 5
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 公司交易数据.0 - 完成处理 (I=3, O=0, R=0, W=3, U=0, E=0)
2020/11/23 23:42:10 - 正向对比.0 - 完成处理 (I=0, O=0, R=5, W=3, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 2------------------------------
2020/11/23 23:42:10 - 写日志.0 - 不一致记录
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - policyNo = 86000020201600385736
2020/11/23 23:42:10 - 写日志.0 - refundNo = 2005112940872742985
2020/11/23 23:42:10 - 写日志.0 - amount = 101399.0
2020/11/23 23:42:10 - 写日志.0 - refundTime = 2020/11/23 09:21:12.000000000
2020/11/23 23:42:10 - 写日志.0 - flagfield = new
2020/11/23 23:42:10 - 写日志.0 - flag = 3
2020/11/23 23:42:10 - 写日志.0 - channelCode = dd
2020/11/23 23:42:10 - 写日志.0 - tradeType = 5
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 字段选择.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 对账状态映射.0 - 完成处理 (I=0, O=0, R=3, W=3, U=0, E=0)
2020/11/23 23:42:10 - 过滤对账一致记录.0 - 完成处理 (I=0, O=0, R=3, W=2, U=0, E=0)
2020/11/23 23:42:10 - 写日志.0 - 完成处理 (I=0, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - 插入对账明细.0 - 完成处理 (I=2, O=0, R=2, W=2, U=0, E=0)
2020/11/23 23:42:10 - verify - 开始项[汇总对账结果]
2020/11/23 23:42:10 - 汇总对账结果 - Using run configuration [Pentaho local]
2020/11/23 23:42:10 - 汇总对账结果 - Using legacy execution engine
2020/11/23 23:42:10 - gatherVerifyData - 为了转换解除补丁开始  [gatherVerifyData]
2020/11/23 23:42:10 - 表输入.0 - Finished reading query, closing connection.
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 1------------------------------
2020/11/23 23:42:10 - 写日志.0 - 汇总对账信息
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - result = 不一致的交易数量为:1条
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ====================
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ------------> 行号 2------------------------------
2020/11/23 23:42:10 - 写日志.0 - 汇总对账信息
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - result = 保险公司多出的交易数量为:1条
2020/11/23 23:42:10 - 写日志.0 -
2020/11/23 23:42:10 - 写日志.0 - ====================

代码解析

MD5验签代码片.

import com.aliyun.openservices.shade.org.apache.commons.codec.digest.DigestUtils;
import com.aliyun.openservices.shade.org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.compress.utils.IOUtils;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.math.BigInteger;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.security.MessageDigest;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Calendar;
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException{//获取到上一个步骤的输入行Object[] r = getRow();if (r == null) {setOutputDone();return false;}r = createOutputRow(r, data.outputRowMeta.size());//读取出參数变量值String key = getVariable("key", "");String fileDir = getVariable("fileDir", "");String todayF = get(Fields.In, "todayF").getString(r);String today = get(Fields.In, "today").getString(r);String tradeType = getVariable("tradeType", "");//获取MD5摘要String Md5Dir = fileDir  + "/" +todayF + "/" + tradeType +"_"+ today + ".csv.md5";//拼接文件地址String csvFileDir = fileDir  + "/" + todayF  + "/"+ tradeType +"_"+ today + ".csv";File file = new File(csvFileDir);FileInputStream fis = null;File channelfile = new File(Md5Dir);FileInputStream channelfis = null;String md5code = null;String channelmd5code = null;boolean flag = true;try {fis = new FileInputStream(file);byte[] csvFileNameBytes  = IOUtils.toByteArray(fis);md5code = DigestUtils.md5Hex(ArrayUtils.addAll(key.getBytes(), csvFileNameBytes));//logBasic(">>>>>MD5:"+md5code);//读取渠道验签channelfis = new FileInputStream(channelfile);byte[]bytes=new byte[1024];int len =-1;while ((len=channelfis.read(bytes))!=-1){channelmd5code=new String(bytes);}if(channelmd5code!=null){channelmd5code=channelmd5code.trim();}if(!md5code.equals(channelmd5code)){flag=false;}} catch (Exception e) {e.printStackTrace();} finally {try {fis.close();} catch (IOException e) {e.printStackTrace();}try {channelfis.close();} catch (IOException e) {e.printStackTrace();}}//把计算好的值放入到输出记录中get(Fields.Out, "flag").setValue(r, flag);//输出到下一个节点做处理putRow(data.outputRowMeta, r);return true;}

demo

demo示例

kettle实战-巧用合并记录组件完成业务对账相关推荐

  1. Kettle8.2连接组件之合并记录

    Kettle8.2连接组件之合并记录 一.相关说明 二.设计转换 三.转换配置 四.运行转换 五.结果分析 一.相关说明 连接组件说明: 连接是结果集通过关键字进行连接. 合并记录组件说明: 合并记录 ...

  2. kettle 分列、合并记录

    项目当中遇到一个问题 一个班级会和其他班级合班.合班的数量不一定. 目前人数= 合班班级目前人数之和. 处理思路 分列,获取班级目前人数,过滤,排序记录.分组.输出结果 实践 1.目前数据的原始格式: ...

  3. Kettle 合并记录报错!

    在Kettle的合并记录过程的时候,在"为了转换解除补丁开始 "这一步的时候报错.具体错误如图所示: Kettle的转换如图所示: 问题原因:可能是你的数据库链接驱动和Kettle ...

  4. Git巧用贮藏避免解决冲突时的合并记录

    背景 使用git进行协同开发的时候,本地的改动很经常会和同事提交的代码产生冲突,需要先在本地将改动提交后才能拉取同事的代码,而后在本地解决冲突.这种做法虽然解决了冲突问题,但是会产生自己的提交和他人的 ...

  5. Kettle: 合并记录

    [合并记录]控件能对两个输入流中的数据进行合并.合并之后的数据会比原始的数据在结构多一个标志字段(默认名为:flagfield,通常[合并记录]之后都会有[字段选择]操作,原因就在此).使用该转换步骤 ...

  6. Kettle 实战教程

    Kettle 实战教程 1.引言.................................................................................... ...

  7. 安卓 Native+Flutter 应用开发入门资料、亲身实战及踩坑记录

    安卓 Native+Flutter 应用开发实战及踩坑记录,练手入门项目:FluLearn 入门资料 第三方共享包检索(国内).第三方共享包检索(国外) Flutter开发环境搭建(中文版).Flut ...

  8. kettle实战教程-纯实战开发

    kettle实战教程-纯实战开发 欢迎关注笔者的公众号: java大师, 每日推送java.kettle运维等领域干货文章,关注即免费无套路附送 100G 海量学习.面试资源哟!!个人网站: http ...

  9. 小巧的日志记录组件 - 开源研究系列文章

    今天给大家带来一个小巧的日志记录组件LogHelper.这个组件是由Log4Net这个组件的由来而来的,不过只是写入.txt文本文件而已.如果能够对大家的项目有帮助那就更好了. 首先,打开.SLN解决 ...

最新文章

  1. nfs文件服务器读取文件夹,NFS文件服务器.ppt
  2. 《Tensorflow实战》之6.3VGGnet学习
  3. matlab根据结构体数组,用邻接矩阵和序遍历创建树形结构:
  4. 前端:40 个 CSS 布局技巧
  5. Activity之间传递bitmap,Observer观察者模式
  6. 微软披露了Spartan中所使用的渲染引擎的细节
  7. ActivityGroup简单介绍
  8. Keil4与keil5共存问题
  9. 单片机控制两个步进电机画圆_单片机控制的步进电机程序框图
  10. 汇编程序的有符号数与无符号数的加减乘除(8086)
  11. 计算机cpu属于什么症状,电脑CPU超频过度有什么表现
  12. 【ICPC 2018 Malaysia】UPC-9302 ELI'S CURIOUS MIND(递推)
  13. 银湖联手博通欲收购东芝芯片业务 出价180亿美元
  14. 安装Win10之后如何进入BIOS
  15. 使用 fitter 拟合数据分布
  16. 美学心得(第二百二十四集)罗国正
  17. 学生用计算机怎么没音效,电脑突然没声音了
  18. java解析与生成json数据的四种方式,比如将json字符串转为json对象或json对象转为json字符串
  19. Windows安装ADB驱动
  20. (附源码)计算机毕业设计SSM驾考服务系统

热门文章

  1. 下一座“金矿”:移动医疗的契机和风险
  2. Linux LVM在线扩容xfs文件系统(创建大于2T的磁盘分区)
  3. 【EEG】关于SEED-VIG数据集介绍
  4. java 仿qq mysql_基于Java远程通信(仿QQ)及应用研究设计(MySQL)(含录像)
  5. 传智高校平台python答案_传智播客高校教辅平台学生端下载-传智播客高校教辅平台app学生版v4.13.0官网最新版_新绿资源网...
  6. Jenkins集成Gitlab配置提交流水线
  7. 分成两栏后文字顺序混乱的问题解决【写期刊论文时】
  8. 我们不该是这样的结局
  9. 含二硫键多功能/促血管神经发生/PVA-HA/释放硫化氢的透明质酸复合水凝胶的制备
  10. 关于乙肝携带者的歧视维权问题