Flume 1.7 源码分析(三)程序入口
Flume 1.7 源码分析(一)源码编译
Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(三)程序入口
Flume 1.7 源码分析(四)从Source写数据到Channel
4 程序入口
启动Flume的过程可以简单分为2个步骤:
1. 获取相关配置文件(一般来说就是flume-conf.properties)。
2. 启动各组件。不特别说明,本文中的组件是指实现了LifecycleAware接口的类的对象,一般就是Source、Channel、Sink这3种对象。
4.1 获取启动配置
4.1.1 Main函数
启动Flume的Main函数在flume-ng-node模块的org.apache.flume.node.Application。该函数的功能可以简单划分为以下三个步骤:
1. 使用commons.cli类获取命令行参数(就是启动时传入的参数)
2. 根据启动参数确定的读取配置的方式。读取配置的方式总共有4种,分别根据配置是保存在zookeeper上还是本地properties文件、以及是否reload(自动重载配置文件)分为4种方式。
3. 根据相应的配置启动程序,并注册关闭钩子。
接下来以properties文件、不重载的方式为例,主要的代码如下:
PropertiesFileConfigurationProvider configurationProvider =new PropertiesFileConfigurationProvider(agentName, configurationFile);
//创建Application对象,包含初始化组件列表(components),初始化LifecycleSupervisor。
application = new Application();
application.handleConfigurationEvent(configurationProvider.getConfiguration());
//start方法用于检查所有组件是否是启动状态,如果不是则启动该组件。
application.start();
//监听程序关闭事件,用于当程序被kill后能够执行一些清理工作。
final Application appReference = application;
Runtime.getRuntime().addShutdownHook(new Thread("agent-shutdown-hook") {public void run() {appReference.stop();}
});
上面的代码,有两处比较关键:
- configurationProvider.getConfiguration()会返回一个MaterializedConfiguration类型的对象,用于从文件形式的配置转为物化的配置,即包含实际的channel、sinkRunner等对象的实例,在“物化配置”一节分析。
- handleConfigurationEvent用于停止所有components,并使用新的配置进行启动,在“使用新配置重启”一节分析。
4.1.2 物化配置
configurationProvider.getConfiguration()方法主要做了以下两件事:
1. 读取配置文件(flume-conf.properties),保存在AgentConfiguration对象中。
public static class AgentConfiguration {private final String agentName;private String sources;private String sinks;private String channels;private String sinkgroups;private final Map<String, ComponentConfiguration> sourceConfigMap;private final Map<String, ComponentConfiguration> sinkConfigMap;private final Map<String, ComponentConfiguration> channelConfigMap;private final Map<String, ComponentConfiguration> sinkgroupConfigMap;private Map<String, Context> sourceContextMap;private Map<String, Context> sinkContextMap;private Map<String, Context> channelContextMap;private Map<String, Context> sinkGroupContextMap;private Set<String> sinkSet;private Set<String> sourceSet;private Set<String> channelSet;private Set<String> sinkgroupSet;
}
到这个步骤还仅仅是做好了分类的文本形式的配置项。
2. 创建出配置中的各组件实例,并添加到MaterializedConfiguration实例中。
public interface MaterializedConfiguration {public void addSourceRunner(String name, SourceRunner sourceRunner);public void addSinkRunner(String name, SinkRunner sinkRunner);public void addChannel(String name, Channel channel);public ImmutableMap<String, SourceRunner> getSourceRunners();public ImmutableMap<String, SinkRunner> getSinkRunners();public ImmutableMap<String, Channel> getChannels();
}
在这个实例中,可以获取配置文件中配置的所有的source、channel、sink,并且是“物化”的,即可以直接取得相关组件的实例。
4.2 启动所有组件
4.2.1 使用新配置重启
有了上面的MaterializedConfiguration实例,我们就可以启动组件了。
在handleConfigurationEvent方法中,首先会停止所有组件,然后再启动所有组件。
stopAllComponents();
startAllComponents(conf); //这里的conf就是上节的MaterializedConfiguration。
在startAllComponents方法中,会遍历组件列表(SourceRunners、SinkRunners、Channels),分别调用supervise方法。以Channel为例:
for (Entry<String, Channel> entry :materializedConfiguration.getChannels().entrySet()) {try {logger.info("Starting Channel " + entry.getKey());supervisor.supervise(entry.getValue(),new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);} catch (Exception e) {logger.error("Error while starting {}", entry.getValue(), e);}
}
这个supervise方法简单来说,就是将相应组件的状态转化为期望的状态。例如上面代码中的LifecycleState.START就是期望的状态。
4.2.2 LifecycleSupervisor
上节的supervisor是一个LifecycleSupervisor对象。前面有说到,在创建Application的时候初始化了一个LifecycleSupervisor对象,就是这里的supervisor。这个对象,我理解为各组件生命周期的管理者,用于实时监控所有组件的状态,如果不是期望的状态(desiredState),则进行状态转换。
上节的代码中调用了supervisor.supervise方法,接下来分析一下supervise这个方法:
public synchronized void supervise(LifecycleAware lifecycleAware,SupervisorPolicy policy, LifecycleState desiredState) {//省略状态检查的代码
Supervisoree process = new Supervisoree();process.status = new Status();process.policy = policy;process.status.desiredState = desiredState;process.status.error = false;MonitorRunnable monitorRunnable = new MonitorRunnable();monitorRunnable.lifecycleAware = lifecycleAware;monitorRunnable.supervisoree = process;monitorRunnable.monitorService = monitorService;supervisedProcesses.put(lifecycleAware, process);ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(monitorRunnable, 0, 3, TimeUnit.SECONDS);monitorFutures.put(lifecycleAware, future);
}
由于所有的组件都实现了LifecycleAware接口,所以这里的supervise方法传入的是LifecycleAware接口的对象。
可以看到创建了一个Supervisoree对象,顾名思义,就是被监控的的对象,该对象有以下几种状态:IDLE, START, STOP, ERROR。
scheduleWithFixedDelay每隔3秒触发一次监控任务(monitorRunnable)。
4.2.3 MonitorRunnable
在MonitorRunnable中主要是检查组件的状态,并实现从lifecycleState到desiredState的转变。
switch (supervisoree.status.desiredState) {case START:try {lifecycleAware.start();} catch (Throwable e) {省略}break;case STOP:try {lifecycleAware.stop();} catch (Throwable e) {省略}break;default:logger.warn("I refuse to acknowledge {} as a desired state", supervisoree.status.desiredState);
}
到这里为止,可以看到监控的进程,调用了组件自己的start和stop方法来启动、停止。前面有提到有3种类型的组件,SourceRunner、Channel、SinkRunner,而Channel的start只做了初始化计数器,没什么实质内容,所以接下来从SourceRunner的启动(从Source写数据到Channel)和SinkRunner的启动(从Channel获取数据写入Sink)来展开说明。
Flume 1.7 源码分析(三)程序入口相关推荐
- MYC编译器源码分析之程序入口
前文.NET框架源码解读之MYC编译器讲了MyC编译器的架构,整个编译器是用C#语言写的,上图列出了MyC编译器编译一个C源文件的过程,编译主路径如下: 首先是入口Main函数用来解析命令行参数,读取 ...
- Flume 1.7 源码分析(五)从Channel获取数据写入Sink
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- Flume 1.7 源码分析(四)从Source写数据到Channel
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- Flume 1.7 源码分析(一)源码编译
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 1 说明 Flume是Cloudera提供的一个高可用的,高可靠的, ...
- 【投屏】Scrcpy源码分析三(Client篇-投屏阶段)
Scrcpy源码分析系列 [投屏]Scrcpy源码分析一(编译篇) [投屏]Scrcpy源码分析二(Client篇-连接阶段) [投屏]Scrcpy源码分析三(Client篇-投屏阶段) [投屏]Sc ...
- Nouveau源码分析(三):NVIDIA设备初始化之nouveau_drm_probe
Nouveau源码分析(三) 向DRM注册了Nouveau驱动之后,内核中的PCI模块就会扫描所有没有对应驱动的设备,然后和nouveau_drm_pci_table对照. 对于匹配的设备,PCI模块 ...
- Spring源码分析(三)
Spring源码分析 第三章 手写Ioc和Aop 文章目录 Spring源码分析 前言 一.模拟业务场景 (一) 功能介绍 (二) 关键功能代码 (三) 问题分析 二.使用ioc和aop重构 (一) ...
- 【转】ABP源码分析三十五:ABP中动态WebAPI原理解析
动态WebAPI应该算是ABP中最Magic的功能之一了吧.开发人员无须定义继承自ApiController的类,只须重用Application Service中的类就可以对外提供WebAPI的功能, ...
最新文章
- mysql存储引擎简书_MySQL存储引擎详解
- oracle数据库的select,Oracle数据库--基本的select语句
- spss非线性回归分析步骤_SPSS与简单线性回归分析
- 编译我的hello.ko
- 插入公式_word2016编写文档 插入公式选项为暗,不能操作
- 面试准备每日五题:C++(四)——typedefdefine、指针常量、队列栈、地址赋值、C和C++结构体
- 《Xcode实战开发》——1.1节下载
- 如何借助大数据进行宏观经济分析
- IMX8基于FFT的GPU和CPU的性能测试
- 汽车HUD抬头显示全产业链深度解析报告
- Protel DXP 使用教程 - 自定义集成库
- 创业之路 - 人脉关系 VS 人际关系
- 面试知识总结(八股、网络、LINUX)
- Office2016 Visio2016 Project2016零售版转换VL版
- SpringCloud Alibaba 从入门到精通(精选)
- 关于定时器setTimeout与setInterval的定时与关闭操作
- Snapchat和Facebook同意社交媒体的未来看起来像TikTok
- Kotlin学习笔记-3 --- 类与对象
- UserWarning: FixedFormatter should only be used together with FixedLocator|Python点点
- 传奇服务器文件,【教程】传奇服务端(版本)的结构以及重要文件功能的概述-A02...