Run Duration

一些处理器支持配置运行持续时间(Run Duration)。此设置告诉处理器在单个任务中继续使用同一task尽可能多地来处理来自传入队列的的FlowFiles(或成批的流文件)。
对于处理单个任务本身非常快并且FlowFile数量也很大的处理器来说,这是一个理想的选择。

在上面的示例中,将完全相同的FlowFiles传递到这两个处理器,这些处理器被配置为执行相同的Attribute更新。两者在过去5分钟内处理了相同数量的FlowFiles;但是,配置为运行持续时间的处理器消耗的总体CPU时间更少。并非所有处理器都支持设置Run Duration。处理器功能的性质,使用的方法或使用的客户端库可能决定了不支持此功能。这样的话你将无法在此类处理器上设置Run Duration

工作原理叙述

  1. 处理器已为其任务分配了线程。处理器从传入连接的Active queue中获取最高优先级的FlowFile(或一批FlowFile)。如果对FlowFile的处理未超过配置的运行持续时间,则会从Active queue中拉出另一个FlowFile(或一批FlowFile)。此过程将在同一线程下继续进行所有操作,直到达到Run Duration时间或Active queue为空。届时,会话完成,所有处理过的FlowFiles都立即提交给适当的关系。

  2. 由于直到整个运行完成才提交所有的FlowFiles,因此在FlowFiles上导致了一些延迟。你配置的Run Duration决定了至少要发生多少延迟(Active queue不为空的时候)。

  3. 如果针对FlowFile执行处理器所需的时间比配置的Run Duration更长,那么调整此配置没有任何其他好处。

这对于堆使用意味着什么

  1. 由于它仅处理Active queue中的传入FlowFiles,因此此处没有增加堆压力。(Active queue中的FlowFiles已经在堆空间中,关于Active queue请看深入理解Apache NIFI Connection)。

  2. 新生成的FlowFiles(如果有的话,取决于处理器功能)全部保留在堆中,直到最终提交为止。这可能会带来一些额外的堆压力,因为所有新生成的FlowFiles都将保留在堆中,直到在运行时间结束时将它们全部提交给输出关系为止(尤其是新FlowFile的content,还没有刷到repository)。

实现

使用SupportsBatching注解标注的Processor是支持Run Duration的,如果一个处理器使用了这个注释,那么它就允许框架对ProcessSession进行批处理的提交,以及允许框架从后续对ProcessSessionFactory.createSession() 的调用中多次返回相同的ProcessSession

比如UpdateAttribute

@EventDriven
@SideEffectFree
@SupportsBatching
...
public class UpdateAttribute extends AbstractProcessor implements Searchable {

重点看在哪里处理了这个SupportsBatching注解,在(深入解析Apache NIFI的调度策略)[./9NIFI调度.md]一文中,我们在讲解Timer driven的时候有提到ConnectableTask.invoke方法,是线程执行调度具体Processor的ontrigger方法前的处理(里面有检测Processor是否有工作可做),下面我们看一下这个方法:

public InvocationResult invoke() {//任务终止if (scheduleState.isTerminated()) {logger.debug("Will not trigger {} because task is terminated", connectable);return InvocationResult.DO_NOT_YIELD;}···//查看Processor是否有工作可做if (!isWorkToDo()) {logger.debug("Yielding {} because it has no work to do", connectable);return InvocationResult.yield("No work to do");}//背压机制if (numRelationships > 0) {final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;if (!repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships)) {logger.debug("Yielding {} because Backpressure is Applied", connectable);return InvocationResult.yield("Backpressure Applied");}}//可以运行logger.debug("Triggering {}", connectable);//获取 Run Duration的配置final long batchNanos = connectable.getRunDuration(TimeUnit.NANOSECONDS);final ProcessSessionFactory sessionFactory;final StandardProcessSession rawSession;final boolean batch;//处理SupportsBatching注解if (connectable.isSessionBatchingSupported() && batchNanos > 0L) {rawSession = new StandardProcessSession(repositoryContext, scheduleState::isTerminated);sessionFactory = new BatchingSessionFactory(rawSession);batch = true;} else {rawSession = null;sessionFactory = new StandardProcessSessionFactory(repositoryContext, scheduleState::isTerminated);batch = false;}final ActiveProcessSessionFactory activeSessionFactory = new WeakHashMapProcessSessionFactory(sessionFactory);scheduleState.incrementActiveThreadCount(activeSessionFactory);final long startNanos = System.nanoTime();final long finishIfBackpressureEngaged = startNanos + (batchNanos / 25L);final long finishNanos = startNanos + batchNanos;int invocationCount = 0;final String originalThreadName = Thread.currentThread().getName();try {try (final AutoCloseable ncl = NarCloseable.withComponentNarLoader(flowController.getExtensionManager(), connectable.getRunnableComponent().getClass(), connectable.getIdentifier())) {boolean shouldRun = connectable.getScheduledState() == ScheduledState.RUNNING;while (shouldRun) {//循环onTrigger处理 直到Run Duration时间到了或者Processor没有工作可做或者触发背压机制了invocationCount++;connectable.onTrigger(processContext, activeSessionFactory);if (!batch) {return InvocationResult.DO_NOT_YIELD;}final long nanoTime = System.nanoTime();if (nanoTime > finishNanos) {return InvocationResult.DO_NOT_YIELD;}if (nanoTime > finishIfBackpressureEngaged && isBackPressureEngaged()) {return InvocationResult.DO_NOT_YIELD;}if (connectable.getScheduledState() != ScheduledState.RUNNING) {break;}if (!isWorkToDo()) {break;}if (isYielded()) {break;}if (numRelationships > 0) {final int requiredNumberOfAvailableRelationships = connectable.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;shouldRun = repositoryContext.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);}}} catch (final TerminatedTaskException tte) {final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());procLog.info("Failed to process session due to task being terminated", new Object[] {tte});} catch (final ProcessException pe) {final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());procLog.error("Failed to process session due to {}", new Object[] {pe});} catch (final Throwable t) {// Use ComponentLog to log the event so that a bulletin will be created for this processorfinal ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());procLog.error("{} failed to process session due to {}; Processor Administratively Yielded for {}",new Object[] {connectable.getRunnableComponent(), t, schedulingAgent.getAdministrativeYieldDuration()}, t);logger.warn("Administratively Yielding {} due to uncaught Exception: {}", connectable.getRunnableComponent(), t.toString(), t);connectable.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);}} finally {try {//批量提交if (batch) {try {rawSession.commit();} catch (final Throwable t) {final ComponentLog procLog = new SimpleProcessLogger(connectable.getIdentifier(), connectable.getRunnableComponent());procLog.error("Failed to commit session {} due to {}; rolling back", new Object[] { rawSession, t.toString() }, t);try {rawSession.rollback(true);} catch (final Exception e1) {procLog.error("Failed to roll back session {} due to {}", new Object[] { rawSession, t.toString() }, t);}}}···return InvocationResult.DO_NOT_YIELD;}

通过这个方法我们看到

  1. 设置了SupportsBatching注解的Processor并且配置了Run Duration时,传到onTrigger方法的ProcessSessionFactory sessionFactory是不一样的。
  2. 批量对应传入的是BatchingSessionFactory,这个类的commit方法可以简单理解为并没有实际干提交事务的事儿,只是做了一些check
  3. 批量的最后对应的是rawSession.commit()

所以,如果你自定义的组件想要支持批处理并且符合批处理的特征(简单说就是任务执行快并且FlowFile数量也很大),只要加一个SupportsBatching注解就可以了。

注意

理论分析:对于一些源组件来说(source 一个流程的源),然后是需要记录状态的(比如说记录一个增量值到state,再比如是从别的地方取数据或者接受数据,拿到数据后告诉对方数据已到手),正常来说Processor的实现都是先session.commit再干记录状态那些事,但如果是批量处理配置Run Duration,通过上面的代码分析发现,processor.onTrigger里我们写的session.commit其实并没有提交,而是等到批处理结束后再提交,如果这个任务是依赖记录状态来获取数据的,其实是不保证后面的commit一定执行的(NIFI shutdown了,NIFI宕了),最终没有commit但是状态已经记录,那么这次批处理的数据是丢失的。

场景模拟描述:现有一个Rest服务,提供类似于kafka的功能,消费者可以来注册获取数据,服务端记录客户端消费的offset,然后使用InvokeHttp批处理的去到这个服务获取数据,那么就有概率发生上面说的情况。

公众号

关注公众号 得到第一手文章/文档更新推送。

深入理解Apache NIFI Run Duration相关推荐

  1. 深入解析Apache NIFI的调度策略

    简介:本文主要讲解Apache NIFI的调度策略,对象主要是针对Processor组件.本文假定读者已经对Apache NIFI有了一定的了解和使用经验,同时作者也尽可能的去讲解的更透彻,使得本文尽 ...

  2. Apache NiFi用户指南

    Apache NiFi用户指南 介绍 Apache NiFi是基于流程编程概念的数据流系统.它支持强大且可扩展的数据路由,转换和系统中介逻辑的有向图.NiFi具有基于Web的用户界面,用于设计,控制, ...

  3. Apache NiFi系统管理员指南 [ 三 ]

    基本群集设置 故障排除 State管理 配置状态提供程序 嵌入式ZooKeeper服务器 ZooKeeper访问控制 ZooKeeper安全 ZooKeeper Migrator Bootstrap属 ...

  4. Apache nifi 集群安装

    原文地址:https://pierrevillard.com/2016/08/13/apache-nifi-1-0-0-cluster-setup/ 文章写的很好了,步骤性的英文写得也比较易懂,原样搬 ...

  5. Apache NIFI入门(读完即入门)

    Apache NIFI入门(读完即入门) 编辑人(全网同名):酷酷的诚 邮箱:zhangchengk@foxmail.com 我将在本文中介绍: 什么是ApacheNIFI,应在什么情况下使用它,理解 ...

  6. Apache NiFi系统管理员指南 [ 一 ]

    如何安装和启动NiFi 端口配置 NiFi 嵌入式Zookeeper 配置最佳实践 安全配置 TLS生成工具包 用户认证 轻量级目录访问协议(LDAP) Kerberos的 OpenId Connec ...

  7. Apache NiFi系统管理员指南 [ 四 ]

    系统属性 核心属性 State管理 H2设置 FlowFile存储库 交换管理(Swap Management) 内容存储库 (Content Repository) 文件系统内容存储库属性 (Fil ...

  8. Apache NiFi远程代码执行-RCE

    目录 一. 漏洞简介 二. 影响版本 三. docker-compose进行漏洞环境搭建 四. 漏洞复现 五. 漏洞挖掘 六. 漏洞修复 一. 漏洞简介 Apache NiFi 是一个易于使用.功能强 ...

  9. Apache NIFI 安装 ● 操作 ● 文件同步 ● oracle 数据库增量同步实例讲解

    nifi简介 nifi背景 NiFi之前是在美国国家安全局(NSA)开发和使用了8年的一个可视化.可定制的数据集成产品.2014年NSA将其贡献给了Apache开源社区,2015年7月成功成为Apac ...

  10. Apache NiFi深度扩展

    介绍 该高级文档旨在深入了解NiFi的实施和设计决策.它假设读者已经阅读了足够的其他文档来了解NiFi的基础知识. FlowFiles是NiFi的核心,也是基于流程的设计.FlowFile是一种数据记 ...

最新文章

  1. 微软职位内部推荐-Principal Software Developer
  2. js之base64上传图片
  3. 模线性同余方程组求解
  4. 解决 appcompat 1.1.0 导致 webview crash 的问题
  5. 2018-2019-2 《网络对抗技术》Exp5 MSF基础应用 Week7-8 20165233
  6. 6-1 uniapp 打包 App 横屏竖屏切换出现样式混乱问题
  7. 纯CSS实现点击一个元素,背景颜色切换
  8. 小程序 Rsa加密
  9. android如何开手机,安卓手机如何打开.data文件?
  10. STM32开发基础知识——定时器
  11. 动态规划的理解(DP)
  12. STM32的串口硬件流控(RS232/RS485)
  13. 神经网络与深度学习(一)——机器学习基础
  14. 传奇外网架设全套图文教程-Hero引擎
  15. 更多有效反链推广 增加反向链接十个方法
  16. 【练习题】python列表练习题1
  17. 前端:HTML+CSS+JavaScript实现轮播图
  18. PLSql连接Oracle时提示TNS:无监听程序的解决方法
  19. WEB标准,Web前端开发工程师必备技术列表
  20. 服务器系统事件1014,如何排除每天都会在事件查看器中出现的来源:DNS Client Events 事件 ID:1014 的警告记录呢?...

热门文章

  1. 有没有测试牙齿需不需要修正的软件,三分钟教你测试自己的牙齿需不需要矫正!...
  2. Qt显示调用dll库失败
  3. 微信小程序中 三元运算符的嵌套使用
  4. 《可复制的领导力》——樊登书摘
  5. Mac 激活win10-报错:所请求的操作需要提升特权
  6. 网络信息安全 一场没有硝烟的战争
  7. 黑市最流行的黑客匿名工具
  8. Unity3D手机斗地主游戏开发实战(01)_发牌功能实现
  9. Java学习笔记(13)-构造方法
  10. EasyNLP 发布融合语言学和事实知识的中文预训练模型 CKBERT