heka主程序的启动源码为cmd/heka/main.go,首先来分析一下main函数源码如下:

主程序代码分析

func main() {exitCode := 0// `os.Exit` will skip any registered deferred functions, so to support// exit codes we put it in the first registerred deferred (i.e. the last to// run), we can set the exitCode and then call `return` to exit with an// error code.// 退出运行的函数defer func() {os.Exit(exitCode)}()// 从命令行中获取config配置文件,默认/etc/heka.tomlconfigPath := flag.String("config", filepath.FromSlash("/etc/hekad.toml"),"Config file or directory. If directory is specified then all files "+"in the directory will be loaded.")version := flag.Bool("version", false, "Output version and exit")// 解析命令行参数flag.Parse()// heka公共配置类config := &HekadConfig{}var err error// cpu 分析文件名称var cpuProfName string// 内存分析文件名称var memProfName stringif *version {fmt.Println(VERSION)return}// 加载heka公共参数,调用pipeline.ReplaceEnvsFile替换环境变量,并调用toml.Decode解析config, err = LoadHekadConfig(*configPath)if err != nil {pipeline.LogError.Println("Error reading config: ", err)exitCode = 1return}// 设置日志级别pipeline.LogInfo.SetFlags(config.LogFlags)pipeline.LogError.SetFlags(config.LogFlags)if config.SampleDenominator <= 0 {pipeline.LogError.Println("'sample_denominator' value must be greater than 0.")exitCode = 1return}// 解析最大空闲时间if _, err = time.ParseDuration(config.MaxPackIdle); err != nil {pipeline.LogError.Printf("Can't parse `max_pack_idle` time duration: %s\n",config.MaxPackIdle)exitCode = 1return}// 解析公共配置globals, cpuProfName, memProfName := setGlobalConfigs(config)// 生成跟目录if err = os.MkdirAll(globals.BaseDir, 0755); err != nil {pipeline.LogError.Printf("Error creating 'base_dir' %s: %s", config.BaseDir, err)exitCode = 1return}// 设置最大消息大小if config.MaxMessageSize > 1024 {message.SetMaxMessageSize(config.MaxMessageSize)} else if config.MaxMessageSize > 0 {pipeline.LogError.Println("Error: 'max_message_size' setting must be greater than 1024.")exitCode = 1return}// 进程文件if config.PidFile != "" {// 读取进程号contents, err := ioutil.ReadFile(config.PidFile)if err == nil {// 转换进程号 pid, err := strconv.Atoi(strings.TrimSpace(string(contents)))if err != nil {pipeline.LogError.Printf("Error reading proccess id from pidfile '%s': %s",config.PidFile, err)exitCode = 1return}// 根据进程号查找进程process, err := os.FindProcess(pid)// on Windows, err != nil if the process cannot be foundif runtime.GOOS == "windows" {if err == nil {pipeline.LogError.Printf("Process %d is already running.", pid)exitCode = 1return}} else if process != nil {// err is always nil on POSIX, so we have to send the process// a signal to check whether it existsif err = process.Signal(syscall.Signal(0)); err == nil {pipeline.LogError.Printf("Process %d is already running.", pid)exitCode = 1return}}}// 把进程号写入到进程文件if err = ioutil.WriteFile(config.PidFile, []byte(strconv.Itoa(os.Getpid())),0644); err != nil {pipeline.LogError.Printf("Unable to write pidfile '%s': %s", config.PidFile, err)exitCode = 1}pipeline.LogInfo.Printf("Wrote pid to pidfile '%s'", config.PidFile)defer func() {// 进程退出时,移除进程号记录文件if err = os.Remove(config.PidFile); err != nil {pipeline.LogError.Printf("Unable to remove pidfile '%s': %s", config.PidFile, err)}}()}if cpuProfName != "" {// 创建cpu 分析文件profFile, err := os.Create(cpuProfName)if err != nil {pipeline.LogError.Println(err)exitCode = 1return}// 开启CPU分析,默认30s得到一个cpu分析文件pprof.StartCPUProfile(profFile)defer func() {pprof.StopCPUProfile()profFile.Close()}()}if memProfName != "" {defer func() {// 创建内存分析文件profFile, err := os.Create(memProfName)if err != nil {pipeline.LogError.Fatalln(err)}// 退出时把堆信息写入到配置文件中pprof.WriteHeapProfile(profFile)profFile.Close()}()}// Set up and load the pipeline configuration and start the daemon.// 根据全局配置生成pipeline配置 NewPipelineConfig代码实现位于pipeline/config.go中pipeconf := pipeline.NewPipelineConfig(globals)// 加载配置文件或目录下所有配置if err = loadFullConfig(pipeconf, configPath); err != nil {pipeline.LogError.Println("Error reading config: ", err)exitCode = 1return}// 启动pipeline,代码位于pipeline/pipeline_runner.go(263行)exitCode = pipeline.Run(pipeconf)
}

PipelineConfig struct

在分析pipeline启动之前,首先要熟悉一下主目录/pipeline/config.go中PipelineConfig类

type PipelineConfig struct {// Heka 全局配置.Globals *GlobalConfigStruct// 按类别存放的New插件的工厂方法,主要类别有Input,Decoder,Filter,Encoder,Output,Splitter,用于生成对象用的makers map[string]map[string]PluginMaker// Direct access to makers["Decoder"] since it's needed by MultiDecoder// outside of the pipeline package.// 除非需要多解码器,一般使用makers["Decoder"]中的解码器DecoderMakers map[string]PluginMaker// Mutex protecting the makers map.// 读写锁,操作makers的map时使用makersLock sync.RWMutex// All running InputRunners, by name.// 输入input插件运行RunnersInputRunners map[string]InputRunner// All running FilterRunners, by name.// Filter插件运行器RunnerFilterRunners map[string]FilterRunner// All running OutputRunners, by name.// 输出Output插件运行器RunnerOutputRunners map[string]OutputRunner// Heka message router instance.// router *messageRouter// PipelinePack supply for Input plugins.inputRecycleChan chan *PipelinePack// PipelinePack supply for Filter plugins (separate pool prevents// deadlocks).injectRecycleChan chan *PipelinePack// Stores log messages generated by plugin config errors.LogMsgs []string// Lock protecting access to the set of running filters so dynamic filters// can be safely added and removed while Heka is running.// filters的读写锁filtersLock sync.RWMutex// Is freed when all FilterRunners have stopped.// 等待所有的filterRunners退出filtersWg sync.WaitGroup// Is freed when all DecoderRunners have stopped.// 等待所有的decoderRunners退出decodersWg sync.WaitGroup// Slice providing access to all running DecoderRunners.// 所有解码器allDecoders []DecoderRunner// Mutex protecting allDecoders.// 所有解码器读写锁allDecodersLock sync.RWMutex// Slice providing access to all Decoders called synchronously by InputRunner// 所有同步解码器allSyncDecoders []ReportingDecoder// Mutex protecting allSyncDecoders// 所有同步解码器读写锁allSyncDecodersLock sync.RWMutex// Slice providing access to all Splitters// 所有Spliter的切割或者说分割运行器allSplitters []SplitterRunner// Mutex protecting AllSplittersallSplittersLock sync.RWMutex// Slice providing access to all instantiated Encoders.// 所有的加密allEncoders map[string]Encoder// Mutex protecting allEncoders.allEncodersLock sync.RWMutex// Name of host on which Heka is running.// 当前主机IPhostname string// Heka process id.// 进程IDpid int32// Lock protecting access to the set of running inputs so they// can be safely added while Heka is running.// input的读写锁inputsLock sync.RWMutex// Is freed when all Input runners have stopped.// 等待所有input runners退出inputsWg sync.WaitGroup// Lock protecting access to running outputs so they can be removed// safely.// output读写锁outputsLock sync.RWMutex// Internal reporting channel.reportRecycleChan chan *PipelinePack// The next few values are used only during the initial configuration// loading process.// Track default plugin registration.defaultConfigs map[string]bool// Loaded PluginMakers sorted by category.makersByCategory map[string][]PluginMaker// Number of config loading errors.errcnt uint
}

PipelineConfig 方法

type PipelineConfig struct {// Pipeline message 添加时间戳,主机,进程号,传递次数PipelinePack(msgLoopCount uint) (*PipelinePack, error) // 返回Message路由Router() MessageRouter// 返回当前inputRecycleChan管道InputRecycleChan() chan *PipelinePack// 返回当前injectRecycleChan管道InjectRecycleChan() chan *PipelinePack// 返回主机名Hostname() string// 返回Output插件的RunnerOutput(name string) (oRunner OutputRunner, ok bool)PipelineConfig() *PipelineConfig// 返回Decoder插件实例Decoder(name string) (decoder Decoder, ok bool)// 创建Decoder Runner实例,并starts启动DecoderRunner(baseName, fullName string)// 停止和unregisters注销DecoderRunnerStopDecoderRunner(dRunner DecoderRunner) (ok bool)// 创建Encoder实例Encoder(baseName, fullName string) (Encoder, bool) // 从Map中获取FilterFilter(name string) (fRunner FilterRunner, ok bool)// 根据InputRunner的名字获取StatAccumulator input pluginStatAccumulator(name string) (statAccum StatAccumulator,err error)// 添加FilterRunner到PipelineConfigAddFilterRunner(fRunner FilterRunner) error// 移除FilterRunnerRemoveFilterRunner(name string) bool// 增加InputRunnerAddInputRunner(iRunner InputRunner) error// 移除InputRunnerRemoveInputRunner(iRunner InputRunner)// 移除OutputRunnerRemoveOutputRunner(oRunner OutputRunner)// 注册默认的插件makersRegisterDefault(name string) error// 预先加载配置文件中的所有插件的配置PreloadFromConfigFile(filename string) error// 加载所有的没有预先加载默认插件LoadConfig() error
}

pipeline启动

主程序最后调用了pipeline的Run调用方法,此方法主要启动OutputRunner,FilterRunner,InputTracker,router,InputRunner等
主要启动步骤:
1. 启动OutputRunners
2. 启动FilterRunners
3. 启动inputTracker和injectTracker(跟踪有关)
4. 启动router(路由匹配器)
5. 启动InputRunners

代码位于pipeline/pipeline_runner.go(263行)

// Main function driving Heka execution. Loads config, initializes PipelinePack
// pools, and starts all the runners. Then it listens for signals and drives
// the shutdown process when that is triggered.
func Run(config *PipelineConfig) (exitCode int) {LogInfo.Println("Starting hekad...")// 用于等待所有的output退出var outputsWg sync.WaitGroupvar err errorglobals := config.Globals// 遍历配置文件中所有的OutputRunnersfor name, output := range config.OutputRunners {// 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出outputsWg.Add(1)// 启动Output Runner的运行,调用Output Runner的Start方法 PipelineConfig实现了PluginHelper接口 if err = output.Start(config, &outputsWg); err != nil {LogError.Printf("Output '%s' failed to start: %s", name, err)outputsWg.Done()if !output.IsStoppable() {globals.ShutDown(1)}continue}LogInfo.Println("Output started:", name)}// 开启所有的Filter Runnerfor name, filter := range config.FilterRunners {// 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出config.filtersWg.Add(1)// 启动Filter  PipelineConfig实现了PluginHelper接口 if err = filter.Start(config, &config.filtersWg); err != nil {LogError.Printf("Filter '%s' failed to start: %s", name, err)config.filtersWg.Done()if !filter.IsStoppable() {globals.ShutDown(1)}continue}LogInfo.Println("Filter started:", name)}// Finish initializing the router's matchers.// 初始化路由的匹配器matchersconfig.router.initMatchSlices()// Setup the diagnostic trackers// 启动跟踪相关inputTracker := NewDiagnosticTracker("input", globals)injectTracker := NewDiagnosticTracker("inject", globals)// Create the report pipeline pack// 创建循环report的管道config.reportRecycleChan <- NewPipelinePack(config.reportRecycleChan)// Initialize all of the PipelinePacks that we'll need// 初始化所有的PipelinePackfor i := 0; i < globals.PoolSize; i++ {inputPack := NewPipelinePack(config.inputRecycleChan)inputTracker.AddPack(inputPack)config.inputRecycleChan <- inputPackinjectPack := NewPipelinePack(config.injectRecycleChan)injectTracker.AddPack(injectPack)config.injectRecycleChan <- injectPack}// 启动go程(子进程)go inputTracker.Run()go injectTracker.Run()// 启动路由config.router.Start()// 启动所有的InputRunner运行器,调用Input的Start方法启动for name, input := range config.InputRunners {// 信号量+1,退出的时候-1,直到为0时 此类型所有插件退出config.inputsWg.Add(1)// 启动if err = input.Start(config, &config.inputsWg); err != nil {LogError.Printf("Input '%s' failed to start: %s", name, err)config.inputsWg.Done()if !input.IsStoppable() {globals.ShutDown(1)}continue}LogInfo.Println("Input started:", name)}// wait for sigint// 注册需要的信号signal.Notify(globals.sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP,SIGUSR1, SIGUSR2)// for !globals.IsShuttingDown() {select {case sig := <-globals.sigChan:switch sig {case syscall.SIGHUP:LogInfo.Println("Reload initiated.")if err := notify.Post(RELOAD, nil); err != nil {LogError.Println("Error sending reload event: ", err)}case syscall.SIGINT, syscall.SIGTERM:// 中断退出LogInfo.Println("Shutdown initiated.")globals.stop()case SIGUSR1:// 查询report报表输出到stdoutLogInfo.Println("Queue report initiated.")go config.allReportsStdout()case SIGUSR2:LogInfo.Println("Sandbox abort initiated.")go sandboxAbort(config)}}}// 退出流程和启动流程刚好相反config.inputsLock.Lock()// 退出所有的InputRunners 调用Input的Stop方法for _, input := range config.InputRunners {input.Input().Stop()LogInfo.Printf("Stop message sent to input '%s'", input.Name())}config.inputsLock.Unlock()config.inputsWg.Wait()config.allDecodersLock.Lock()LogInfo.Println("Waiting for decoders shutdown")// 退出所有的decoders解码器for _, decoder := range config.allDecoders {close(decoder.InChan())LogInfo.Printf("Stop message sent to decoder '%s'", decoder.Name())}// 移除所有的解码器config.allDecoders = config.allDecoders[:0]config.allDecodersLock.Unlock()config.decodersWg.Wait()LogInfo.Println("Decoders shutdown complete")config.filtersLock.Lock()// 退出所有的Filter过滤器for _, filter := range config.FilterRunners {// needed for a clean shutdown without deadlocking or orphaning messages// 1. removes the matcher from the router// 2. closes the matcher input channel and lets it drain// 3. closes the filter input channel and lets it drain// 4. exits the filterconfig.router.RemoveFilterMatcher() <- filter.MatchRunner()LogInfo.Printf("Stop message sent to filter '%s'", filter.Name())}config.filtersLock.Unlock()config.filtersWg.Wait()// 退出所有的output运行器,调用MatchRunner停止并返回 移除相关的Output Matcherfor _, output := range config.OutputRunners {config.router.RemoveOutputMatcher() <- output.MatchRunner()LogInfo.Printf("Stop message sent to output '%s'", output.Name())}outputsWg.Wait()// 停止所有的序列化Encoderfor name, encoder := range config.allEncoders {if stopper, ok := encoder.(NeedsStopping); ok {LogInfo.Printf("Stopping encoder '%s'", name)stopper.Stop()}}LogInfo.Println("Shutdown complete.")return globals.exitCode
}

OutputRunner, FilterRunner ,InputRunner等接口实现

type iRunner struct 实现了InputRunner接口,type foRunner struct同时实现了OutputRunner和FilterRunner接口,主要代码在pipeline/plugin_runners.go文件中。此节写到这里,后续编写具体Input,Filter,Output插件的加载。

heka 0.11.0源码分析--主要启动流程分析相关推荐

  1. Android 源码 Camera2 预览流程分析四

    <Android 源码 Camera2 预览流程分析二>中进行了流启动,这是调用 QCamera3Channel start() 方法实现的,对应于 HAL_PIXEL_FORMAT_YC ...

  2. bluetoothd源码剖析(一)启动流程

    蓝牙系列: bluez调试笔记_weixin_41069709的博客-CSDN博客_bluezbluez移植https://blog.csdn.net/weixin_41069709/article/ ...

  3. Kubelet源码分析(一):启动流程分析

    源码版本 kubernetes version: v1.3.0 简介 在Kubernetes急群众,在每个Node节点上都会启动一个kubelet服务进程.该进程用于处理Master节点下发到本节点的 ...

  4. 【安卓 R 源码】Activity 启动流程及其生命周期源码分析

    1. Activty 的生命周期 activity的生命周期 oncreate()->onstart()->onResume()->onPause()->onStop()-&g ...

  5. netty 关闭chnnal_Netty 源码学习——服务端流程分析

    在上一篇我们已经介绍了客户端的流程分析,我们已经对启动已经大体上有了一定的认识,现在我们继续看对服务端的流程来看一看到底有什么区别. 服务端代码 public class NioServer { pr ...

  6. android源码学习- APP启动流程(android12源码)

    前言: 百度一搜能找到很多讲APP启动流程的,但是往往要么就是太老旧(还是基于android6去分析的),要么就是不全(往往只讲了整个流程的一小部分).所以我结合网上现有的文章,以及源码的阅读和调试, ...

  7. zygoteinit.java_源码跟踪之启动流程:从ZygoteInit到onCreate

    Instrumentation SDK版本名称: Pie API Level: 28 一.源码调用时序图 1. Activity的启动流程 说明:其中ActivityThread中执行的schedul ...

  8. CC00055.hadoop——|HadoopMapReduce.V27|——|Hadoop.v27|源码剖析|DataNode启动流程|

    一.[源码剖析之DataNode启动流程] :DataNode 启动流程 ### --- datanode的Main Class是DataNode,先找到DataNode.main()public c ...

  9. 【Flink源码】JobManager启动流程

    写在前面 在 [Flink源码]再谈 Flink 程序提交流程(中) 一文中,笔者后来发现谬误颇多,且随着 Flink 版本的更迭,部分方法实现方式已发生较大改变.因此,思虑再三决定针对 JobMan ...

  10. skynet源码解析(三)——启动流程

    对于你不了解的框架或者引擎,介绍再多的逻辑结构都好像有点茫然的感觉.所以小编认为,最有效的方式就是搞清楚框架启动流程的步骤,让自己心中有一条线可以牵引着. 当你在终端输入./skeynet examp ...

最新文章

  1. python 异常分类_python的异常处理
  2. php联系人表单,PHP联系人表单布局不允许在电子邮件中添加新行
  3. CVPR 2020 《Transform and Tell: Entity-Aware News Image Captioning》论文笔记(数据集)
  4. 20170804 - 今日技能封装 - Q
  5. c语言如何获取按键,c语言获得键盘的按键
  6. IE8采用IE7模式
  7. ubuntu 16.04 挂载新硬盘
  8. 腾讯视频下载官方_腾讯视频评论在哪
  9. VI 修改^M为unix换行符
  10. 颠覆传统-面向对象的设计思想(序章续)
  11. Java socket编程详解,TCPUDP实现
  12. 移动端下拉刷新,兼容ios,Android及微信浏览器
  13. 分布式系统数据层设计模式
  14. 2014522420145238 《信息安全系统设计基础》 第四次实验
  15. R |数据分析中缺失值处理
  16. Mac电脑用预览功能调整图像大小?Mac调整图片大小方法
  17. DW怎么把两个html放在一起,用Dreamweaver怎么制作网页
  18. 解决数值输入框可以输入字母E的问题
  19. ESRI技术认证考试大纲
  20. MAC电脑使用教程大全

热门文章

  1. effect和watch 的区别详解
  2. C#Form窗体模仿PhotoShop软件,高仿真原PS界面,实现PS对图片基本操作、拍照等,计算机图形学相关
  3. 【你问我答】不包装简历是不是面试机会都没有?
  4. HTML期末学生大作业-视频影视网页html+css+javascript(带报告册)
  5. IOS神器-fastlane工具实战-IOS自动化接入应用
  6. ASP.NET实现将word文档转换成pdf的方法
  7. 随手写的QT程序:文件大小转化可读字符串,整数转 B,KB,MB......
  8. RIFT Multi-Modal Image Matching Based on Radiation-Variation Insensitive Feature Transform
  9. d3.js之中国地图
  10. 解决Edge及Chrome等浏览器主页被篡改2345导航页