NiFi ListSFTP精讲
序
since: 2021年5月20日 22:29
auth :Hadi
前言
从去年年末开始接触使用到NiFi,到现在为止已经将近半年,这里将一下关于ListSFTP类相关组件的使用。NiFi可以当做Flink进行使用,但不是很推荐进行复杂计算的使用,对于我的使用场景来说主要是做数据采集和预处理相关的工作,负责数据流程的第一步,同时也做数据的转换操作比如流式转文件,文件转流式等等。
那么获取数据是整个数据预处理的第一步,一般我们都是采用List & Fetch的操作进行数据预处理,比如:
预先通过List进行数据列表的扫描,然后通过Fetch进行数据的拉取形成带真正带有数据的FlowFile(FlowFile为NiFi的最小处理单位,代表一个文件,数据集,message等)。List只会进行列表的输出,比如XXX服务器上的一个文件清单,交给Fetch,根据FlowFile上的Attributes进行文件的拉取。
这篇Blog主要讲解ListSFTP为模板,进行各类List的讲解。
ListSFTP配置
直接上图:
列表策略
在进行扫描的时候,List的任务都是想要扫描出最新的文件,或者是被修改过的文件,不然就没有任何扫描的意义。那么在这个前提下,诞生了四种策略:
Tracking Timestamps
根据时间戳对文件的筛选过滤,简单的说就是通过上一次扫描出来的文件的最后修改时间T作为下一次扫描的标准,判断是否是最新的文件的标准为:只要文件的最后修改时间大于等于上个时间T,并且大于上一次生成的最大文件的时间,那么我就认为是新的文件。
一个为扫描出来的最大时间listing.timestamp,另一个为输出的文件最大时间processed.timestamp。
那么如果我们配置好List后,第一次扫描,是无法将扫描对象中的所有符合条件的数据都扫描出来;在没有新的文件出来的时候,必须在第二次才会将最后一个时间戳(并不准确,最小单位受到 Target System TimeStamp Precision的影响)的文件列出来。
这里需要注意的是,一般我们常用的Listing Strategy为Tracking Timestamps的话,可能导致以下一些问题:
当生成文件被更改了最后修改时间,那么List很可能是无法进行数据的拉取。1. 比如被刻意进行的 touch -t 202105205200 文件,那么这个办法完全无法进行数据的拉取。这种情况可能并不多见,但是在生产环境下会有极大的概率导致文件缺失。2. 在所有List相关的组件里面,如果某个目录无法进行递归,那么就会报错并跳过该目录;遇到这种情况时,可能会导致其他文件夹将时间限定往前推了,导致这个文件夹的一段时间数据丢失。
所以在生产环境下极力不推荐使用Tracjing Timestamps,除非你有神人帮助,或者数据种类少,方便维护,偷个懒。
No Tracking
更简单了,就直接不进行跟踪,直接进行全量列表的输出,那么就不用管到底数据是新是旧,直接拿走。
Tracking Entities
根据每个实体进行跟踪。这个配置起来就比较麻烦,需要配置一个缓存器,然后配置连接缓存器的线程池。
在Listing Strategy中选择Tracking Entities,然后在Entity Tacking State Cache中选择使用的缓存方法包含:
CassandraDistributed、CouchBase、DistributedMapCacheClientService(NiFi自带)、HBase、Hazelcast、Redis,共计六种。
使用上来说NiFi自带的当然最简单,但是也越不可靠,推荐使用Cassandra和Hbase和Redis。由于大数据系统一般至少有Hbase,那么就以Hbase为列进行讲解:
创建了Hbase_XX_ClientMapCacheService后,还需要进行这个Service的配置,点击这里进行下一步的配置操作:
跳转后,点击后面的小齿轮进行配置:
继续配置HbaseClientService:
上面HBase所需的文件为CoreSite.Xml,HbaseSite.Xml,HdfsSite.Xml三个文件,需要放在NiFi集群的所有服务器节点上同一个路径下,三个文件的路径使用“,”来进行拼接即可。
配置成功后,然后可以看见:
可以看到有两个Service,一个是HBaseCache服务,一个是连接HBase集群的服务,前者依赖于后者。
这个时候我们运行List,就会将缓存存储到HBase表中,然后每次List都会进行前后两次List的对比,来获取文件。这个好处在于在时间窗口中的数据也会被比较,如果这部分数据被漏采,那么就会被List出来,而不是直接被抛弃;如果文件时间戳被更改,还在时间窗口内,那么数据文件也不会被更改。坏处在于:配置麻烦;性能降低;而且在HBase中存储时是一个List Component为一条数据,这Component List出来的数据为Value保存,所以这个value可能特别特别大。
Time Window
抓取最近一段时间的文件,这个意义也不太大,一般用于更新数据的时候使用。
List BUG和改造
List任务只能单个线程进行,为了保证这个任务的单线程进行,直到1.13.2版本的NiFi只能配置这些任务在主节点进行,那么过多的List任务会占用主节点过多的CPU和内存。并且由于NiFi的机制,在这个节点上的数据,后续如果没有balance那么就只会在这个节点进行运行,所以要注意的是:1.不能list过多的数据,防止主节点GG。2.必须在List后续进行balance操作然后Fetch,将主节点的数据分配到各个节点上。
NiFi是开源的,所以很容易就能找到NiFi的源码,在ListSFTP/FTP这块,NiFi貌似并没有准备将其打造成一个可以扫描大数据场景的组件,而是假设了NiFi不会扫描上50w,100w,1000w的文件系统。当扫描这么多数据的时候,JVM堆直接撑爆GC,导致Stop The Word,更会导致与ZooKeeper断开连接,导致主节点切换,导致所有主节点任务脑裂的问题出现。
这个问题主要是这个原因导致的:使用ArrayList进行全量的数据存储。
在 org.apache.nifi.processor.util.list.AbstractListProcessor 中的代码,大家可以好好读一下,基本逻辑就搞清楚了。
主要问题可以直接聚焦org.apache.nifi.processors.standard.util.SFTPTransfer#getListing()中,我们主要看SFTP的实现:
红线中主listing主要用于 SFTPClient进行扫描时,扫出来的全量数据,再根据这个listing进行后续的时间戳、实体之类的进行判断,但这个是个ArrayList,当数据量大的时候,需要进行扩容,导致内存中必须一直开连续的空间给他。第二,为什么要进行全量的数据扫描呢?本来就会根据文件的时间戳进行筛选过滤的。这个逻辑也不太对,导致多塞入了很多数据。
然后聚焦于下面的getListing递归方法:
在后续定义了一个filter,用于递归筛选后续的文件和文件夹;文件放入listing中,文件夹放入subDirs中,但可以看到如果是文件夹那么就直接放入递归队列中,这种也是匪夷所思的,因为我们配置路径的时候已经有正则进行筛选了。(Path Filter Regex)。所以,如果需要进行优化的同学可以自己优化,我这边优化过了,但是是公司代码,所以现在还不能公布。
优化:对于我们大多数来说,一般来说都是扫相同等级的目录,必须/XXX数据大类/XXX数据小类/XXX标识/20210520/数据,这样进行累加,那么必然我每次都会扫描时间那一层及的所有数据,但是根据时间戳的时候,我们可以扫描到时间文件夹这层的时候,对这个时间文件夹进行判断,看里面时候有新的数据。如果在一段时间内有我们才进入进行扫描等等。
同类List
对于SFTP是这样的,同比FTP、HDFS、File等文件系统的List,NiFi组件都有这个毛病,没有将时间戳用到极致,浪费了挺多的资源进行扫描。各位在进行组件优化的时候,还是根据公司的业务情况进行实际组件的开发时最合理的。
比如对Hive离线表的扫描,如果使用原生的ListHDFS那么对于SQL任务,很可能是无法进行扫描的。(SQL是先生成.Hive-staging文件,然后将整个文件夹迁移过来;并不是直接生成在表的分区中)很可能导致数据的缺失。
后记
在大数据场景下,我也一直在怀疑NiFi是否能支撑主我的想象,从目前发展来看,有好有坏。
我想表达一点,NiFi这样的可视化界面操作,让整个开发流程变得很简单,同样的会导致整个门槛的降低。良莠不齐,不思进取,贪图享受,在有了工具之后,我们需要想象的是将工具用的更好,多多去社区上提issue,改善代码环境才是真。
NiFi ListSFTP精讲相关推荐
- 【C++自我精讲】基础系列二 const
[C++自我精讲]基础系列二 const 0 前言 分三部分:const用法.const和#define比较.const作用. 1 const用法 const常量:const可以用来定义常量,不可改变 ...
- 第三百三十八节,Python分布式爬虫打造搜索引擎Scrapy精讲—深度优先与广度优先原理...
第三百三十八节,Python分布式爬虫打造搜索引擎Scrapy精讲-深度优先与广度优先原理 网站树形结构 深度优先 是从左到右深度进行爬取的,以深度为准则从左到右的执行(递归方式实现)Scrapy默认 ...
- 【牛客每日一题】tokitsukaze and Soldier 题目精讲 贪心、优先队列、堆
链接:https://ac.nowcoder.com/acm/problem/50439 来源:牛客网 ACM在线模板 今天才发现牛客推出了一个每日一题的版块,3月25号就开始了,今天才发现,赶紧补救 ...
- PyTorch深度学习入门与实战(案例视频精讲)
作者:孙玉林,余本国 著 出版社:中国水利水电出版社 品牌:智博尚书 出版时间:2020-07-01 PyTorch深度学习入门与实战(案例视频精讲)
- 水稻微生物组时间序列分析精讲1-模式图与主坐标轴分析
写在前面 上周五我们分享了3月底发表的的 <水稻微生物组时间序列分析>的文章,大家对其中图绘制过程比较感兴趣.一上午收到了超30条留言,累计收到41个小伙伴的留言求精讲. 我们也争取花时间 ...
- python flask高级编程之restful_('Python Flask高级编程之RESTFul API前后端分离精讲',),全套视频教程学习资料通过百度云网盘下载...
资源详情 r n t某课网好评度100%的Python Flask高级编程之RESTFul API前后端分离精讲 r n t t t第1章 随便聊聊 r n t t t聊聊Flask与Django,聊 ...
- python相关函数_python常用函数精讲
原标题:python常用函数精讲 返回值为bool类型的函数 bool是Boolean的缩写,只有真(True)和假(False)两种取值 bool函数只有一个参数,并根据这个参数的值返回真或者假. ...
- 精讲深度学习RNN三大核心点,三分钟掌握循环神经网络
本文将剖析循环神经网络(RNN)的工作原理,精讲循环神经网络的特点和实现方式.野蛮智能,小白也能看懂的人工智能. 循环神经网络从何而来? 我在我的这篇文章介绍了卷积神经网络(CNN)卷积神经网络(CN ...
- Python精讲Numpy基础,大牛笔记详细解释
https://www.toutiao.com/a6664936105076326920/ 总认为Numpy是渣渣,直到深入接触以后才知道功能这么强大.堪比Matlab啊.果然是人生苦短,我用Pyth ...
- JavaScript异步精讲,让你更加明白Js的执行流程!
JavaScript异步精讲,让你更加明白Js的执行流程! 问题点 什么是单线程,和异步有什么关系 什么是 event-loop jQuery的Deferred Promise 的基本使用和原理 as ...
最新文章
- 滑动轮播图实现最后一张图片无缝衔接第一张图片
- 转 互联网推送服务原理:长连接+心跳机制(MQTT协议)
- Spring 的 BeanUtils 踩坑记,你是不是遇到过这些问题?
- go sync.WaitGroup源码分析
- C语言 ,嵌入式 ,数据结构 面试题目(3)
- 我理解设计模式C++实现观察者模式Observer Pattern
- 智能门禁考勤机:刷脸同时开门和打卡
- 3D游戏中的画质与效率适配(转)
- Spring的Bean有哪些作用域?
- 现代男人养生的10点忠告
- MATLAB表示非线性系统,matlab非线性控制系统分析.ppt
- 计算机财务管理模型的建立步骤,计算机财务管理系统的建立.ppt
- 拼音分词器_自动补全
- Citrix提供免费阳春版XenServer
- 软件测试mysql数据库相关操作
- 【绿盟】检测到目标Strict-Transport-Security响应头缺失
- 《麦肯锡精英的48个工作习惯》书评
- 用php照片艺术化,Photoshop把人像照片转化为艺术格子效果图
- Bash shell语言学习
- 考研英语作文笔记(刘晓燕强化班)
热门文章
- 身份证扫描件用手机怎么弄?手把手教你生成电子身份证
- STM32F0xx系列 基于LL库的Flash模拟EEPROM
- socks v5 协议解析
- 修复漏洞显示连接服务器失败,服务器安全狗漏洞补丁失败怎么办
- android 办公桌面壁纸,android系统励志的壁纸欣赏
- 戴尔微型计算机7050配置,戴尔 Dell OptiPlex 7060 微型机体验分享 - 1升的迷你小钢炮...
- 漫画:什么是二分查找?(修订版)
- [方便WAP网站开发]在线手机WAP模拟器或软件
- 东芝300D粉盒清零
- c#程序设计实训报告心得体会_C#.NET程序设计实验一实验报告