Flume 1.7 源码分析(一)源码编译
Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(三)程序入口
Flume 1.7 源码分析(四)从Source写数据到Channel

4 程序入口

启动Flume的过程可以简单分为2个步骤:
1. 获取相关配置文件(一般来说就是flume-conf.properties)。
2. 启动各组件。不特别说明,本文中的组件是指实现了LifecycleAware接口的类的对象,一般就是Source、Channel、Sink这3种对象。

4.1 获取启动配置

4.1.1 Main函数

启动Flume的Main函数在flume-ng-node模块的org.apache.flume.node.Application。该函数的功能可以简单划分为以下三个步骤:
1. 使用commons.cli类获取命令行参数(就是启动时传入的参数)
2. 根据启动参数确定的读取配置的方式。读取配置的方式总共有4种,分别根据配置是保存在zookeeper上还是本地properties文件、以及是否reload(自动重载配置文件)分为4种方式。
3. 根据相应的配置启动程序,并注册关闭钩子。
接下来以properties文件、不重载的方式为例,主要的代码如下:

PropertiesFileConfigurationProvider configurationProvider =new PropertiesFileConfigurationProvider(agentName, configurationFile);
//创建Application对象,包含初始化组件列表(components),初始化LifecycleSupervisor。
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
//start方法用于检查所有组件是否是启动状态,如果不是则启动该组件。
application.start();
//监听程序关闭事件,用于当程序被kill后能够执行一些清理工作。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {public void run() {appReference.stop();}
});

上面的代码,有两处比较关键:

  • configurationProvider.getConfiguration()会返回一个MaterializedConfiguration类型的对象,用于从文件形式的配置转为物化的配置,即包含实际的channel、sinkRunner等对象的实例,在“物化配置”一节分析。
  • handleConfigurationEvent用于停止所有components,并使用新的配置进行启动,在“使用新配置重启”一节分析。

4.1.2 物化配置

configurationProvider.getConfiguration()方法主要做了以下两件事:
1. 读取配置文件(flume-conf.properties),保存在AgentConfiguration对象中。

public static class AgentConfiguration {private final String agentName;private String sources;private String sinks;private String channels;private String sinkgroups;private final Map<String, ComponentConfiguration> sourceConfigMap;private final Map<String, ComponentConfiguration> sinkConfigMap;private final Map<String, ComponentConfiguration> channelConfigMap;private final Map<String, ComponentConfiguration> sinkgroupConfigMap;private Map<String, Context> sourceContextMap;private Map<String, Context> sinkContextMap;private Map<String, Context> channelContextMap;private Map<String, Context> sinkGroupContextMap;private Set<String> sinkSet;private Set<String> sourceSet;private Set<String> channelSet;private Set<String> sinkgroupSet;
}

到这个步骤还仅仅是做好了分类的文本形式的配置项。
2. 创建出配置中的各组件实例,并添加到MaterializedConfiguration实例中。

public interface MaterializedConfiguration {public void addSourceRunner(String name, SourceRunner sourceRunner);public void addSinkRunner(String name, SinkRunner sinkRunner);public void addChannel(String name, Channel channel);public ImmutableMap<String, SourceRunner> getSourceRunners();public ImmutableMap<String, SinkRunner> getSinkRunners();public ImmutableMap<String, Channel> getChannels();
}

在这个实例中,可以获取配置文件中配置的所有的source、channel、sink,并且是“物化”的,即可以直接取得相关组件的实例。

4.2 启动所有组件

4.2.1 使用新配置重启

有了上面的MaterializedConfiguration实例,我们就可以启动组件了。
在handleConfigurationEvent方法中,首先会停止所有组件,然后再启动所有组件。

stopAllComponents();
startAllComponents(conf); //这里的conf就是上节的MaterializedConfiguration。

在startAllComponents方法中,会遍历组件列表(SourceRunners、SinkRunners、Channels),分别调用supervise方法。以Channel为例:

for (Entry<String, Channel> entry :materializedConfiguration.getChannels().entrySet()) {try {logger.info("Starting Channel " + entry.getKey());supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);} catch (Exception e) {logger.error("Error while starting {}", entry.getValue(), e);}
}

这个supervise方法简单来说,就是将相应组件的状态转化为期望的状态。例如上面代码中的LifecycleState.START就是期望的状态。

4.2.2 LifecycleSupervisor

上节的supervisor是一个LifecycleSupervisor对象。前面有说到,在创建Application的时候初始化了一个LifecycleSupervisor对象,就是这里的supervisor。这个对象,我理解为各组件生命周期的管理者,用于实时监控所有组件的状态,如果不是期望的状态(desiredState),则进行状态转换。

上节的代码中调用了supervisor.supervise方法,接下来分析一下supervise这个方法:

public synchronized void supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState) {//省略状态检查的代码
Supervisoree process = new Supervisoree();process.status = new Status();process.policy = policy;process.status.desiredState = desiredState;process.status.error = false;MonitorRunnable monitorRunnable = new MonitorRunnable();monitorRunnable.lifecycleAware = lifecycleAware;monitorRunnable.supervisoree = process;monitorRunnable.monitorService = monitorService;supervisedProcesses.put(lifecycleAware, process);ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(monitorRunnable, 0, 3, TimeUnit.SECONDS);monitorFutures.put(lifecycleAware, future);
}

由于所有的组件都实现了LifecycleAware接口,所以这里的supervise方法传入的是LifecycleAware接口的对象。

可以看到创建了一个Supervisoree对象,顾名思义,就是被监控的的对象,该对象有以下几种状态:IDLE, START, STOP, ERROR。
scheduleWithFixedDelay每隔3秒触发一次监控任务(monitorRunnable)。

4.2.3 MonitorRunnable

在MonitorRunnable中主要是检查组件的状态,并实现从lifecycleState到desiredState的转变。

switch (supervisoree.status.desiredState) {case START:try {lifecycleAware.start();} catch (Throwable e) {省略}break;case STOP:try {lifecycleAware.stop();} catch (Throwable e) {省略}break;default:logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState);
}

到这里为止,可以看到监控的进程,调用了组件自己的start和stop方法来启动、停止。前面有提到有3种类型的组件,SourceRunner、Channel、SinkRunner,而Channel的start只做了初始化计数器,没什么实质内容,所以接下来从SourceRunner的启动(从Source写数据到Channel)和SinkRunner的启动(从Channel获取数据写入Sink)来展开说明。

Flume 1.7 源码分析(三)程序入口相关推荐

  1. MYC编译器源码分析之程序入口

    前文.NET框架源码解读之MYC编译器讲了MyC编译器的架构,整个编译器是用C#语言写的,上图列出了MyC编译器编译一个C源文件的过程,编译主路径如下: 首先是入口Main函数用来解析命令行参数,读取 ...

  2. Flume 1.7 源码分析(五)从Channel获取数据写入Sink

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  3. Flume 1.7 源码分析(四)从Source写数据到Channel

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  4. Flume 1.7 源码分析(二)整体架构

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  5. Flume 1.7 源码分析(一)源码编译

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 1 说明 Flume是Cloudera提供的一个高可用的,高可靠的, ...

  6. 【投屏】Scrcpy源码分析三(Client篇-投屏阶段)

    Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...

  7. Nouveau源码分析(三):NVIDIA设备初始化之nouveau_drm_probe

    Nouveau源码分析(三) 向DRM注册了Nouveau驱动之后,内核中的PCI模块就会扫描所有没有对应驱动的设备,然后和nouveau_drm_pci_table对照. 对于匹配的设备,PCI模块 ...

  8. Spring源码分析(三)

    Spring源码分析 第三章 手写Ioc和Aop 文章目录 Spring源码分析 前言 一.模拟业务场景 (一) 功能介绍 (二) 关键功能代码 (三) 问题分析 二.使用ioc和aop重构 (一) ...

  9. 【转】ABP源码分析三十五:ABP中动态WebAPI原理解析

    动态WebAPI应该算是ABP中最Magic的功能之一了吧.开发人员无须定义继承自ApiController的类,只须重用Application Service中的类就可以对外提供WebAPI的功能, ...

最新文章

  1. mysql存储引擎简书_MySQL存储引擎详解
  2. oracle数据库的select,Oracle数据库--基本的select语句
  3. spss非线性回归分析步骤_SPSS与简单线性回归分析
  4. 编译我的hello.ko
  5. 插入公式_word2016编写文档 插入公式选项为暗,不能操作
  6. 面试准备每日五题:C++(四)——typedefdefine、指针常量、队列栈、地址赋值、C和C++结构体
  7. 《Xcode实战开发》——1.1节下载
  8. 如何借助大数据进行宏观经济分析
  9. IMX8基于FFT的GPU和CPU的性能测试
  10. 汽车HUD抬头显示全产业链深度解析报告
  11. Protel DXP 使用教程 - 自定义集成库
  12. 创业之路 - 人脉关系 VS 人际关系
  13. 面试知识总结(八股、网络、LINUX)
  14. Office2016 Visio2016 Project2016零售版转换VL版
  15. SpringCloud Alibaba 从入门到精通(精选)
  16. 关于定时器setTimeout与setInterval的定时与关闭操作
  17. Snapchat和Facebook同意社交媒体的未来看起来像TikTok
  18. Kotlin学习笔记-3 --- 类与对象
  19. UserWarning: FixedFormatter should only be used together with FixedLocator|Python点点
  20. 传奇服务器文件,【教程】传奇服务端(版本)的结构以及重要文件功能的概述-A02...

热门文章

  1. linux控制编译so 位数,Linux下解决64位下Apache编译模块时/usr/lib/libexpat.so问题
  2. 【数据结构】线性表的链式存储-单链表
  3. nginx系列之四:web服务器
  4. 注册表操作(VC_Win32)
  5. CImage类显示图片
  6. UDT协议实现分析——UDT Socket的创建
  7. Flink的处理背压​原理及问题-面试必备
  8. Easy Tech:什么是MPEG-DASH协议
  9. 英特尔开源WebRTC开发套件OWT
  10. 张霖峰:AV1和VVC的格局将在2023年后明朗