Log-Pilot 源码简析

  • 简单介绍
  • 源码简析
    • Pilot结构体
    • Piloter接口
    • main函数
    • Pilot.Run
    • Pilot.New
    • Pilot.watch
    • Pilot.processEvent
    • Pilot.newContainer
    • Pilot.delContainer
    • LogConfig
    • pilot.getLogConfigs
    • pilot.parseLogConfig
    • pilot.render
  • 参考资料:

简单介绍

Log-Pilot是阿里开源的一款容器日志收集项目,他的基本流程是动态监听容器的事件变化,然后依据容器的标签来进行解析,然后生成filebeat/fluentd的配置文件交由filebeat/fluentd采集插件来进行日志采集,来达到日志收集随容器的动态调度而自动伸缩的效果。

源码简析

Pilot结构体

  • 主要监听 docker 容器事件并获取容器日志挂载目录、标签、环境变量等信息,动态生成filebeat/fluentd配置文件
// Pilot entry point
type Pilot struct {piloter       Piloter               // Piloter 相关mutex         sync.Mutex            // 并发锁,多个容器事件触发,谁先抢到锁,谁先处理templ         *template.Template    // 日志采集客户端配置文件模板,log-pilot 使用 golang 的 text/template 模块渲染配置文件client        *k8s.Client           // docker 容器客户端,通过docker 事件接口 api 获取相关容器信息lastReload    time.Time             // 最后一次配置文件重载时间reloadChan    chan bool             // 重载通知 chanstopChan      chan bool             // 停止通知 chanbaseDir       string                // docker 在宿主机上面的数据存储位置logPrefix     []string              // 定义环境变量以什么字符开头来表示应用日志所在目录,log-pilot 以配置环境变量的方式配置定每个容器中应用程序的日志路径位置,默认为aliyuncreateSymlink bool                  // 是否创建硬连接的方式关联要搜集的日志文件
}

Piloter接口

  • log-pilot支持filebeatfluentd两种日志收集器。 Piloter定义了收集器需要操作的一些方法,主要负责收集工具的启用停止重载的具体操作。
type Piloter interface {Name() string       // "filebeat" 与 "fluentd",分别表示不同的搜集工具Start() error       // 启动搜集工具Reload() error      // 重载配置文件Stop() error        // 停止搜集工具GetBaseConf() string    // 日志采集客户端配置文件位置,如 filebeat 的 /etc/filebeatGetConfHome() string    // 日志采集客户端统一配置文件目录,如 filebeat 的 prospectors.d 位置GetConfPath(container string) string    // 具体配置文件路径OnDestroyEvent(container string) error  // 监听容器停止事件
}

main函数

  • 程序的入口、命令行处理:指定日志收集配置模板、收集日志等级、Docker日志路径等。
  • [pilot.Run]( # Pilot.Run) 启动程序
func main() {// 指定配置模板template := flag.String("template", "", "Template filepath for fluentd or filebeat.")// 指定docker日志所在目录base := flag.String("base", "", "Directory which mount host root.")// 指定收集日志等级level := flag.String("log-level", "INFO", "Log level")// ....// 读取模板文件b, err := ioutil.ReadFile(*template)// 启动程序入口log.Fatal(pilot.Run(string(b), baseDir))
}

Pilot.Run

  • [pilot.New](# pilot.New) 初始化 Pilot 数据,Polit 中包含了对应 filebeat/fluentd 配置模版、dokcer client、并发锁、piloter 对象等
  • [pilot.watch](# pilot.watch) 开启容器事件监控
func Run(templ string, baseDir string) error {// 初始化Pilot数据p, err := New(templ, baseDir)if err != nil {panic(err)}// 开启监听return p.watch()
}

Pilot.New

  • 初始化Pilot数据
func New(tplStr string, baseDir string) (*Pilot, error) {// 解析模板文件templ, err := template.New("pilot").Parse(tplStr)// ...// 创建Docker容器客户端 client, err := k8s.NewEnvClient()// 创建Piloter接口piloter, err := NewPiloter(baseDir)// ...// 从环境变量中获取日志路径位置,默认aliyunlogPrefix := []string{"aliyun"}if os.Getenv(ENV_PILOT_LOG_PREFIX) != "" {envLogPrefix := os.Getenv(ENV_PILOT_LOG_PREFIX)logPrefix = strings.Split(envLogPrefix, ",")}// 从环境变量中获取是否创建硬连接的方式关联要搜集的日志文件createSymlink := os.Getenv(ENV_PILOT_CREATE_SYMLINK) == "true"return &Pilot{client:        client,templ:         templ,baseDir:       baseDir,reloadChan:    make(chan bool),stopChan:      make(chan bool),piloter:       piloter,logPrefix:     logPrefix,createSymlink: createSymlink,}, nil
}

Pilot.watch

  • 使用 docker api 连接 docker
  • pilot.processEvent 监听 docker 事件
func (p *Pilot) watch() error {// ....// 启动收集工具err := p.piloter.Start()           // ....// 接受 docker 事件,返回 chanmsgs, errs := p.client.Events(ctx, options)  go func() {// ....// 无限循环获取事件for {           select {case msg := <-msgs:// 处理 docker 事件if err := p.processEvent(msg); err != nil {     log.Errorf("fail to process event: %v,  %v", msg, err)}// ....}}()// ....
}

Pilot.processEvent

  • docker event 的handler函数
  • pilot.newContainer 生成配置文件
  • pilot.delContainer 删除配置文件
func (p *Pilot) processEvent(msg events.Message) error {// 获取容器idcontainerId := msg.Actor.ID// 上下文ctx := context.Background()// ....// 判断事件switch msg.Action {case "start", "restart":// ....// 返回容器信息containerJSON, err := p.client.ContainerInspect(ctx, containerId)return p.newContainer(&containerJSON)case "destroy", "die":// ....err := p.delContainer(containerId)return nil
}

Pilot.newContainer

  • 处理环境变量/tag标签/mount
  • 渲染配置文键模板,生成新的配置文件并reload生效
  • pilot.getLogConfigs
  • pilot.render
func (p *Pilot) newContainer(containerJSON *types.ContainerJSON) error {// .... // 创建容器对象container := container(containerJSON)for _, e := range env {         // 处理环境变量, env由containerJSON 得到// .....}// 获取配置文件模板数据,属性由ContainerJSON提供logConfigs, err := p.getLogConfigs(jsonLogPath, mounts, labels)if err != nil {return err}// ....// 关联 docker 容器中应用日志文件或目录p.createVolumeSymlink(containerJSON)// 渲染配置文件模板数据,生成具体的配置文件logConfig, err := p.render(id, container, logConfigs)if err != nil {return err}// 保存配置文件if err = ioutil.WriteFile(p.piloter.GetConfPath(id), []byte(logConfig), os.FileMode(0644)); err != nil {return err}// 重载配置文件p.tryReload()return nil
}

Pilot.delContainer

  • 删除配置文件
  • reload 配置文件
func (p *Pilot) delContainer(id string) error {// 移除关联docker 容器中应用日志文件或目录p.removeVolumeSymlink(id)if p.piloter.Name() == PILOT_FLUENTD {clean := func() {// ....// 删除if err := os.Remove(p.piloter.GetConfPath(id)); err != nil {// ...return}// 重载配置文件p.tryReload()}// 15分钟后执行cleantime.AfterFunc(15*time.Minute, clean)return nil}return p.piloter.OnDestroyEvent(id)
}

LogConfig

  • 动态渲染配置文件模板数据集
type LogConfig struct {Name         string                 // 日志名HostDir      string                 // 日志文件在宿主机上的目录ContainerDir string                 // 容器应用日志目录Format       string                 // 日志采集应用的格式(none, json, csv等)FormatConfig map[string]string     File         string                 // 具体的日志文件名Tags         map[string]string      // 标签数据Target       string                 // 自定义输出目标,可以是索引或kafka主题等EstimateTime boolStdout       boolCustomFields  map[string]string     // 自定义添加日志字段CustomConfigs map[string]string     // 自定义配置文件项
}

pilot.getLogConfigs

  • 获取配置文件模板渲染数据
  • pilot.parseLogConfig
func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, labels map[string]string) ([]*LogConfig, error) {var ret []*LogConfig// 获取容器的挂载目录mountsMap := make(map[string]types.MountPoint)for _, mount := range mounts {mountsMap[mount.Destination] = mount}var labelNames []string// 取出label并排序for k := range labels {labelNames = append(labelNames, k)}sort.Strings(labelNames)// 通过label获取配置的日志所在目录root := newLogInfoNode("")for _, k := range labelNames {for _, prefix := range p.logPrefix {// ....}}// 接续容器数据获得配置文件模板渲染数据for name, node := range root.children {logConfig, err := p.parseLogConfig(name, node, jsonLogPath, mountsMap)// ....ret = append(ret, logConfig)}return ret, nil
}

pilot.parseLogConfig

  • 接续容器数据获得配置文件模板渲染数据
func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath string, mounts map[string]types.MountPoint) (*LogConfig, error) {// 获取日志目录path := strings.TrimSpace(info.value)// 获取标签数据tags := info.get("tags")tagMap, err := p.parseTags(tags)// 获取索引或者kafka主题target := info.get("target")// 添加默认索引或主题if _, ok := tagMap["index"]; !ok {// ....}if _, ok := tagMap["topic"]; !ok {// ....}// 检查是否有效if err := p.tryCheckKafkaTopic(tagMap["topic"]); err != nil {return nil, err}// 日志格式化format := info.children["format"]if format == nil || format.value == "none" {format = newLogInfoNode("nonex")}formatConfig, err := Convert(format)//特殊处理路径中通配符 regexif format.value == "regexp" {format.value = fmt.Sprintf("/%s/", formatConfig["pattern"])delete(formatConfig, "pattern")}// 如果是stdout, 采集容器的标准输出日志if path == "stdout" {logFile := filepath.Base(jsonLogPath)if p.piloter.Name() == PILOT_FILEBEAT {logFile = logFile + "*"}return &LogConfig{Name:         name,HostDir:      filepath.Join(p.baseDir, filepath.Dir(jsonLogPath)),File:         logFile,Format:       format.value,Tags:         tagMap,FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"},Target:       target,EstimateTime: false,Stdout:       true,}, nil}// 输出到容器内部的具体文件日志路径containerDir := filepath.Dir(path)file := filepath.Base(path)hostDir := p.hostDirOf(containerDir, mounts)cfg := &LogConfig{Name:         name,ContainerDir: containerDir,Format:       format.value,File:         file,Tags:         tagMap,HostDir:      filepath.Join(p.baseDir, hostDir),FormatConfig: formatConfig,Target:       target,}return cfg, nil
}

pilot.render

  • 渲染配置文件模板数据,生成具体的配置文件

参考资料:

log-pilot源码简析

容器日志采集利器Log-Pilot

Log-Pilot 源码简析相关推荐

  1. 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析

    目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...

  2. django源码简析——后台程序入口

    django源码简析--后台程序入口 这一年一直在用云笔记,平时记录一些tips或者问题很方便,所以也就不再用博客进行记录,还是想把最近学习到的一些东西和大家作以分享,也能够对自己做一个总结.工作中主 ...

  3. (Ajax)axios源码简析(三)——请求与取消请求

    传送门: axios源码简析(一)--axios入口文件 axios源码简析(二)--Axios类与拦截器 axios源码简析(三)--请求与取消请求 请求过程 在Axios.prototype.re ...

  4. Spring Boot源码简析 @EnableTransactionManagement

    相关阅读 Spring Boot源码简析 事务管理 Spring Boot源码简析 @EnableAspectJAutoProxy Spring Boot源码简析 @EnableAsync Sprin ...

  5. java ArrayList 概述 与源码简析

    ArrayList 概述 与源码简析 1 ArrayList 创建 ArrayList<String> list = new ArrayList<>(); //构造一个初始容量 ...

  6. ffmpeg实战教程(十三)iJKPlayer源码简析

    要使用封装优化ijk就必须先了解ffmpeg,然后看ijk对ffmpeg的C层封装! 这是我看ijk源码时候的笔记,比较散乱.不喜勿喷~ ijk源码简析: 1.ijkplayer_jni.c 封装的播 ...

  7. 【Android项目】本地FM收音机开发及源码简析

    [Android项目]本地FM收音机开发及源码简析 目录 1.概述 2.收音机的基本原理 3.收音机其他信息 RDS功能 4.Android开发FM收音机源码解析 5.App层如何设计本地FM应用 6 ...

  8. Lottie动画框架入门及源码简析

    现在越来越多的APP中添加动画来提升用户体验,下面简单介绍下Airbnb开源的动画框架Lottie的使用 一.基本使用 首先添加依赖 compile 'com.airbnb.android:lotti ...

  9. Spring Boot源码简析 @Qualifier

    源码 @Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementT ...

最新文章

  1. Sublime Text 3中文乱码解决方法以及安装包管理器方法
  2. c++初学者使用文件流需要了解的一些坑(持续更新)
  3. OpenCASCADE:Modeling Algorithms模块之Sweeping: Prism, Revolution and Pipe
  4. python Demo 01 爬取大学名称
  5. mysql 中有什么命令_常用mysql命令大全
  6. 【转】Office365完整离线安装包下载及自定义安装教程
  7. Linux下如何手动搭建论坛?
  8. python基础知识汇总01
  9. Vivado 2019.1下载与安装
  10. mac通过u盘启动linux系统,在mac下制作linux启动U盘
  11. PHP工程改成微擎的步骤_微擎系统搭建
  12. centos中设置邮件发送
  13. JUC —— 常用辅助类
  14. Excel:INDEX与MATCH函数
  15. 【占星学】天蝎座女生性格特点
  16. linux 1000权限不够,LINUX常见问题1000个详细解答
  17. 西门子1200PLC的MODBUS_RTU轮询程序
  18. Python基础——数据类型—集合
  19. 神经网络模型的基本原理,如何建立神经网络模型
  20. 车载高速CAN(HighSpeed CAN)通信之CAN Bus Off

热门文章

  1. springboot结合druid使用多数据源,动态切换
  2. Android 强大软件,4款强大的安卓黑科技APP,各个都是压箱底,请大家要低调使用...
  3. Swoole Framework 入门教程(2)-默认路由方式以及GSF扩展路由方式
  4. 模拟退火(SA, Simulated Annealing)算法解决旅行商TSP问题
  5. Kubernetes笔记(九) Kubernetes 应用封装与扩展
  6. windows下编译darknet
  7. 我的java学习笔记之hibernate进阶…
  8. 中国青年报:“区块链+供应链金融”为小微企业融资推开一扇窗
  9. 一个免费的专利检索网站
  10. CNCF里程碑:超过375家会员