FlinkX脏值处理

在大量数据的传输过程中,必定会由于各种原因导致很多数据传输报错(比如类型转换错误),这种数据DataX认为就是脏数据。

​ – by DataX

配置实例

"dirty": {"path": "/tmp","hadoopConfig": {"fs.default.name": "hdfs://flinkhadoop:8020","dfs.nameservices": "ns1","dfs.ha.namenodes.ns1": "flinkhadoop","dfs.namenode.rpc-address.ns1.nn1": "hdfs://flinkhadoop:8020","dfs.ha.automatic-failover.enabled": "true","dfs.client.failover.proxy.provider.ns1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider","fs.hdfs.impl.disable.cache": "true"}}

我这里的hdfs是一个单机。

实现逻辑

FlinkX的脏值处理逻辑是放在写入数据过程中的。按照脏值的定义,在读取过程中,能读取进来的都是正常的值,但是在写入过程中,以目标源的标准,可能读取的值是存在瑕疵的,所以是脏值。

在脏值处理的过程中,脏值处理器的作用肯定是首当其冲,我们先看一下DirtyManager的定义和初始化过程:

DirtyDataManager

全局视角:

创建

public DirtyDataManager(String path, Map<String, Object> configMap, String[] fieldNames, String jobId) {this.fieldNames = fieldNames;location = path + "/" + UUID.randomUUID() + ".txt";this.config = configMap;this.jobId = jobId;}
  1. 可以看出来 location 是根据我们配置的路径+一个uuid生成的txt,其实这样在查看起来的时候不是很方便。

初始化

public void open() {try {FileSystem fs = FileSystemUtil.getFileSystem(config, null);Path path = new Path(location);stream = fs.create(path, true);} catch (Exception e) {throw new RuntimeException("Open dirty manager error", e);}}// -------------------- FileSystem ---------------------------public static FileSystem getFileSystem(Map<String, Object> hadoopConfigMap, String defaultFs) throws Exception {if(isOpenKerberos(hadoopConfigMap)){return getFsWithKerberos(hadoopConfigMap, defaultFs);}Configuration conf = getConfiguration(hadoopConfigMap, defaultFs);setHadoopUserName(conf);return FileSystem.get(getConfiguration(hadoopConfigMap, defaultFs));}

flinkX的脏值是存放在hadoop上面的。

脏值写入

public String writeData(Row row, WriteRecordException ex) {String content = RowUtil.rowToJson(row, fieldNames);String errorType = retrieveCategory(ex);String line = StringUtils.join(new String[]{content,errorType, gson.toJson(ex.toString()), DateUtil.timestampToString(new Date()) }, FIELD_DELIMITER);try {// stream.write(line.getBytes(StandardCharsets.UTF_8));stream.write(LINE_DELIMITER.getBytes(StandardCharsets.UTF_8));DFSOutputStream dfsOutputStream = (DFSOutputStream) stream.getWrappedStream();dfsOutputStream.hsync(syncFlags);return errorType;} catch (IOException e) {throw new RuntimeException(e);}}private String retrieveCategory(WriteRecordException ex) {Throwable cause = ex.getCause();if(cause instanceof NullPointerException) {return ERR_NULL_POINTER;}for(String keyword : PRIMARY_CONFLICT_KEYWORDS) {if(cause.toString().toLowerCase().contains(keyword)) {return ERR_PRIMARY_CONFLICT;}}return ERR_FORMAT_TRANSFORM;}
  1. 获取脏值数据内容
  2. 获取脏值类型: NPE、主键重复、其它错误(统称为转换错误)
  3. 将数据内容和错误原因进行拼接,分割符为 \u0001
  4. 将拼接后的数据以utf-8编码以及换行符\n写入到hdfs中
  5. 通过hsync刷入,根据UPDATE_LENGTH策略刷入

hsync的语义是:client端所有的数据都发送到副本的每个datanode上,并且datanode上的每个副本都完成了posix中fsync的调用,也就是说操作系统已经把数据刷到磁盘上(当然磁盘也可能缓冲数据);需要注意的是当调用fsync时只有当前的block会刷到磁盘中,要想每个block都刷到磁盘,必须在创建流时传入Sync标示。

UPDATE_LENGTH: 同步到DataNodes时,还更新NameNode中的元数据(块长度)。

脏值写入时机

在写入每一行数据writeSingleRecord的时候,进行脏值的捕获

protected void writeSingleRecord(Row row) {if(errorLimiter != null) {errorLimiter.acquire();}try {writeSingleRecordInternal(row);if(!restoreConfig.isRestore() || isStreamButNoWriteCheckpoint()){numWriteCounter.add(1);snapshotWriteCounter.add(1);}} catch(WriteRecordException e) {// 写入错误限流器saveErrorData(row, e);// 更新指标以及持久化存储脏值updateStatisticsOfDirtyData(row, e);// 总记录数加1numWriteCounter.add(1);snapshotWriteCounter.add(1);if(dirtyDataManager == null && errCounter.getLocalValue() % LOG_PRINT_INTERNAL == 0){LOG.error(e.getMessage());}if(DtLogger.isEnableTrace()){LOG.trace("write error row, row = {}, e = {}", row.toString(), ExceptionUtil.getErrorMessage(e));}}}
private void updateStatisticsOfDirtyData(Row row, WriteRecordException e){if(dirtyDataManager != null) {String errorType = dirtyDataManager.writeData(row, e);if (ERR_NULL_POINTER.equals(errorType)){nullErrCounter.add(1);} else if(ERR_FORMAT_TRANSFORM.equals(errorType)){conversionErrCounter.add(1);} else if(ERR_PRIMARY_CONFLICT.equals(errorType)){duplicateErrCounter.add(1);} else {otherErrCounter.add(1);}}}

这代码逻辑确实和我的逻辑稍微有些区别,为什么会在这里进行存储。。。。

应该逻辑应该分离的。将dirtyDataManager.writeData(row, e)放在上一个saveErrorData方法中可能更合适。

参考 https://github.com/DTStack/flinkx/issues/220

脏值实例测试

脏值文件实例

根据dirty配置,初始化hadoop的连接,并创建对应文件,如我们这里配置的path是:/tmp/flinkx/bond_info_mongodb_to_mysql,如我们配置的是4个处理器。在对应的hdfs上面,有四个文件:

感觉官方需要对这个作业存储位置进行一些处理:

脏值模拟

我们模拟将mysql对应的表的string=>bigint,这样肯定会在转换中发生错误。

{"bond_name":"xxxx","bond_stop_time":"xxx","bond_time_limit":"xx","bond_type":"xxx","plan_issued_quantity":"xx","publish_expire_time":"xxx","publish_time":"xx","publisher_name":"xxx","real_issued_quantity":"14","start_cal_interest_time":"xx","inst_code":"x":"x","city_code":"x","area_code":"x","input_date":x,"input_time":x}conversion"com.dtstack.flinkx.exception.WriteRecordException: Incorrect integer value: 'xxxx' for column 'bond_type' at row 1\njava.sql.SQLException: Incorrect integer value: 'xxxx' for column 'bond_type' at row 1"2020-05-24 17:33:15

可以看出来是类型转换错误,它会把错误数据和错误原因都进行存储,并且根据u0001进行分割。

总结

本文对脏值的定义,以及FlinkX的处理进行详细的分解,并进行了相关的测试,与实例展示。从本文中可以了解到hdfs的hsynchdfs的基本配置。

FlinkX脏值处理相关推荐

  1. angular4的脏值检测

    背景 年前对实验室研一的同学做了一个angular专题学习的培训,培训的效果不太理想,想了想原因还是对angular中的原理不理解所致.以后再做angular项目的时候多了解一下其中的原理.本次学习a ...

  2. vue3子组件修改父组件值,vue3 子组件修改属性

    如何在TypeScript中应用像Jquery之类的第三方JavaScript框架 要在TypeScript引用第三方JavaScript库和框架,首先要了解TypeScript的类型定义文件. Ty ...

  3. Angular2:从AngularJS 1.x 中学到的经验

    小编说:Angular 2 的最终版正式发布,Angular 1 的全平台继任者从此诞生.在上一篇文章中我们讨论了Web 的进化和前端开发的变革对Angular 2诞生的推动,不过不只如此, 1.x中 ...

  4. 剖析Vue原理实现双向绑定MVVM

    本文能帮你做什么? 1.了解vue的双向数据绑定原理以及核心代码模块 2.缓解好奇心的同时了解如何实现双向绑定 为了便于说明原理与实现,本文相关代码主要摘自vue源码, 并进行了简化改造,相对较简陋, ...

  5. Angular 中得 scope 作用域梳理

    2019独角兽企业重金招聘Python工程师标准>>> $scope 的使用贯穿整个 Angular App 应用,它与数据模型相关联,同时也是表达式执行的上下文.有了 $scope ...

  6. vue 原理简单实现

    实现数据绑定的做法有大致如下几种: 发布者-订阅者模式(backbone.js) 脏值检查(angular.js) 数据劫持(vue.js) 发布者-订阅者模式: 一般通过sub, pub的方式实现数 ...

  7. synchronized不能锁静态变量_多线程编程不可错过——彻底理解synchronized

    持续分享互联网研发技术,欢迎关注我.本人是一线架构师,有问题可以沟通. 1. synchronized简介 在学习知识前,我们先来看一个现象: public class SynchronizedDem ...

  8. 4 angular 重构 项目_vuejs angularjs 框架的一些比较(vue项目重构四)

    使用Angularjs和Vue.js对比 首先需要说明的是:现在默认angularjs指angular1.0+版本,angular默认指2.0以上版本.本文的名词也默认指定angular的1.0+版本 ...

  9. 自己动手实现一个MVVM库

    我们知道的,常见的数据绑定的实现方法 1.数据劫持(vue):通过Object.defineProperty() 去劫持数据每个属性对应的getter和setter 2.脏值检测(angular):通 ...

  10. AngularJS 深入理解 $scope 转载▼

    AngularJS 深入理解 $scope 转载▼ (2015-04-07 14:09:50) $scope 的使用贯穿整个 AngularJS App 应用,它与数据模型相关联,同时也是表达式执行的 ...

最新文章

  1. python实现将文件内容按照某一列内容的大小值重新排序_Python数据分析入门教程(四):数值操作...
  2. python怎么将png转为tif_png转tif
  3. mybatis转义符(亲测)
  4. Java Spring注解实现分析之@requestMapping工作原理
  5. centos7限制普通用户访问单一目录下的单一文件
  6. Entity Framework 4 in Action 读书笔记——开篇
  7. SSH连接时出现Host key verification failed的原因及解决方法
  8. JavaWeb应用项目部署到云ubuntu
  9. 1040. Airline Company
  10. java oracle数据备份_Java实现Oracle数据库备份
  11. unity3D实现小游戏案例--弹开小球
  12. Payment支付平台API接口文档
  13. 华为员工吐槽加班太多,晒出7天上班打卡记录网友:不怕猝死吗?
  14. 计算机共享文件输入网络密码是什么,Win7共享文件时需要输入网络密码怎么办?...
  15. 【办公】关于←(Backspace)退格键和Delete键盘的区别
  16. Android如何定制主题
  17. 操作系统-动态内存分配算法
  18. 如何通过腾讯SOSO问问获得巨大IP流量?
  19. 奇偶数分离c语言,如何把一个整型数组中,奇数偶数进行分离 怎么使数组进行奇偶奇偶相间排序...
  20. 网心科技打造全球首个百万量级节点的边缘云计算网络“星域云”

热门文章

  1. Cuba 设置debug模式
  2. rds对mysql优化_RDS MySQL参数调优最佳实践
  3. linux无线网卡驱动编写,博通无线网卡驱动linux版
  4. 同源策略——CORS和JSONP劫持漏洞
  5. 1068 万绿丛中一点红 (20 分)测试点3、5
  6. linux网卡team0,team
  7. C++出错_Heap corruption detected
  8. CSDN博客编写快捷键
  9. 企业如何架设代理服务器联接互联网
  10. Aspnet Mvc 前后端分离项目手记(二)关于token认证