Log-Pilot 源码简析
Log-Pilot 源码简析
- 简单介绍
- 源码简析
- Pilot结构体
- Piloter接口
- main函数
- Pilot.Run
- Pilot.New
- Pilot.watch
- Pilot.processEvent
- Pilot.newContainer
- Pilot.delContainer
- LogConfig
- pilot.getLogConfigs
- pilot.parseLogConfig
- pilot.render
- 参考资料:
简单介绍
Log-Pilot是阿里开源的一款容器日志收集项目,他的基本流程是动态监听容器的事件变化,然后依据容器的标签来进行解析,然后生成filebeat/fluentd
的配置文件交由filebeat/fluentd
采集插件来进行日志采集,来达到日志收集随容器的动态调度而自动伸缩的效果。
源码简析
Pilot结构体
- 主要监听
docker
容器事件并获取容器日志挂载目录、标签、环境变量等信息,动态生成filebeat/fluentd
配置文件
// Pilot entry point
type Pilot struct {piloter Piloter // Piloter 相关mutex sync.Mutex // 并发锁,多个容器事件触发,谁先抢到锁,谁先处理templ *template.Template // 日志采集客户端配置文件模板,log-pilot 使用 golang 的 text/template 模块渲染配置文件client *k8s.Client // docker 容器客户端,通过docker 事件接口 api 获取相关容器信息lastReload time.Time // 最后一次配置文件重载时间reloadChan chan bool // 重载通知 chanstopChan chan bool // 停止通知 chanbaseDir string // docker 在宿主机上面的数据存储位置logPrefix []string // 定义环境变量以什么字符开头来表示应用日志所在目录,log-pilot 以配置环境变量的方式配置定每个容器中应用程序的日志路径位置,默认为aliyuncreateSymlink bool // 是否创建硬连接的方式关联要搜集的日志文件
}
Piloter接口
log-pilot
支持filebeat
和fluentd
两种日志收集器。 Piloter定义了收集器需要操作的一些方法,主要负责收集工具的启用、停止、重载的具体操作。
type Piloter interface {Name() string // "filebeat" 与 "fluentd",分别表示不同的搜集工具Start() error // 启动搜集工具Reload() error // 重载配置文件Stop() error // 停止搜集工具GetBaseConf() string // 日志采集客户端配置文件位置,如 filebeat 的 /etc/filebeatGetConfHome() string // 日志采集客户端统一配置文件目录,如 filebeat 的 prospectors.d 位置GetConfPath(container string) string // 具体配置文件路径OnDestroyEvent(container string) error // 监听容器停止事件
}
main函数
- 程序的入口、命令行处理:指定日志收集配置模板、收集日志等级、Docker日志路径等。
- [
pilot.Run
]( # Pilot.Run) 启动程序
func main() {// 指定配置模板template := flag.String("template", "", "Template filepath for fluentd or filebeat.")// 指定docker日志所在目录base := flag.String("base", "", "Directory which mount host root.")// 指定收集日志等级level := flag.String("log-level", "INFO", "Log level")// ....// 读取模板文件b, err := ioutil.ReadFile(*template)// 启动程序入口log.Fatal(pilot.Run(string(b), baseDir))
}
Pilot.Run
- [
pilot.New
](# pilot.New) 初始化 Pilot 数据,Polit 中包含了对应filebeat/fluentd
配置模版、dokcer client
、并发锁、piloter
对象等 - [
pilot.watch
](# pilot.watch) 开启容器事件监控
func Run(templ string, baseDir string) error {// 初始化Pilot数据p, err := New(templ, baseDir)if err != nil {panic(err)}// 开启监听return p.watch()
}
Pilot.New
- 初始化Pilot数据
func New(tplStr string, baseDir string) (*Pilot, error) {// 解析模板文件templ, err := template.New("pilot").Parse(tplStr)// ...// 创建Docker容器客户端 client, err := k8s.NewEnvClient()// 创建Piloter接口piloter, err := NewPiloter(baseDir)// ...// 从环境变量中获取日志路径位置,默认aliyunlogPrefix := []string{"aliyun"}if os.Getenv(ENV_PILOT_LOG_PREFIX) != "" {envLogPrefix := os.Getenv(ENV_PILOT_LOG_PREFIX)logPrefix = strings.Split(envLogPrefix, ",")}// 从环境变量中获取是否创建硬连接的方式关联要搜集的日志文件createSymlink := os.Getenv(ENV_PILOT_CREATE_SYMLINK) == "true"return &Pilot{client: client,templ: templ,baseDir: baseDir,reloadChan: make(chan bool),stopChan: make(chan bool),piloter: piloter,logPrefix: logPrefix,createSymlink: createSymlink,}, nil
}
Pilot.watch
- 使用 docker api 连接 docker
pilot.processEvent
监听 docker 事件
func (p *Pilot) watch() error {// ....// 启动收集工具err := p.piloter.Start() // ....// 接受 docker 事件,返回 chanmsgs, errs := p.client.Events(ctx, options) go func() {// ....// 无限循环获取事件for { select {case msg := <-msgs:// 处理 docker 事件if err := p.processEvent(msg); err != nil { log.Errorf("fail to process event: %v, %v", msg, err)}// ....}}()// ....
}
Pilot.processEvent
- docker event 的handler函数
pilot.newContainer
生成配置文件pilot.delContainer
删除配置文件
func (p *Pilot) processEvent(msg events.Message) error {// 获取容器idcontainerId := msg.Actor.ID// 上下文ctx := context.Background()// ....// 判断事件switch msg.Action {case "start", "restart":// ....// 返回容器信息containerJSON, err := p.client.ContainerInspect(ctx, containerId)return p.newContainer(&containerJSON)case "destroy", "die":// ....err := p.delContainer(containerId)return nil
}
Pilot.newContainer
- 处理环境变量/tag标签/mount
- 渲染配置文键模板,生成新的配置文件并reload生效
pilot.getLogConfigs
pilot.render
func (p *Pilot) newContainer(containerJSON *types.ContainerJSON) error {// .... // 创建容器对象container := container(containerJSON)for _, e := range env { // 处理环境变量, env由containerJSON 得到// .....}// 获取配置文件模板数据,属性由ContainerJSON提供logConfigs, err := p.getLogConfigs(jsonLogPath, mounts, labels)if err != nil {return err}// ....// 关联 docker 容器中应用日志文件或目录p.createVolumeSymlink(containerJSON)// 渲染配置文件模板数据,生成具体的配置文件logConfig, err := p.render(id, container, logConfigs)if err != nil {return err}// 保存配置文件if err = ioutil.WriteFile(p.piloter.GetConfPath(id), []byte(logConfig), os.FileMode(0644)); err != nil {return err}// 重载配置文件p.tryReload()return nil
}
Pilot.delContainer
- 删除配置文件
- reload 配置文件
func (p *Pilot) delContainer(id string) error {// 移除关联docker 容器中应用日志文件或目录p.removeVolumeSymlink(id)if p.piloter.Name() == PILOT_FLUENTD {clean := func() {// ....// 删除if err := os.Remove(p.piloter.GetConfPath(id)); err != nil {// ...return}// 重载配置文件p.tryReload()}// 15分钟后执行cleantime.AfterFunc(15*time.Minute, clean)return nil}return p.piloter.OnDestroyEvent(id)
}
LogConfig
- 动态渲染配置文件模板数据集
type LogConfig struct {Name string // 日志名HostDir string // 日志文件在宿主机上的目录ContainerDir string // 容器应用日志目录Format string // 日志采集应用的格式(none, json, csv等)FormatConfig map[string]string File string // 具体的日志文件名Tags map[string]string // 标签数据Target string // 自定义输出目标,可以是索引或kafka主题等EstimateTime boolStdout boolCustomFields map[string]string // 自定义添加日志字段CustomConfigs map[string]string // 自定义配置文件项
}
pilot.getLogConfigs
- 获取配置文件模板渲染数据
pilot.parseLogConfig
func (p *Pilot) getLogConfigs(jsonLogPath string, mounts []types.MountPoint, labels map[string]string) ([]*LogConfig, error) {var ret []*LogConfig// 获取容器的挂载目录mountsMap := make(map[string]types.MountPoint)for _, mount := range mounts {mountsMap[mount.Destination] = mount}var labelNames []string// 取出label并排序for k := range labels {labelNames = append(labelNames, k)}sort.Strings(labelNames)// 通过label获取配置的日志所在目录root := newLogInfoNode("")for _, k := range labelNames {for _, prefix := range p.logPrefix {// ....}}// 接续容器数据获得配置文件模板渲染数据for name, node := range root.children {logConfig, err := p.parseLogConfig(name, node, jsonLogPath, mountsMap)// ....ret = append(ret, logConfig)}return ret, nil
}
pilot.parseLogConfig
- 接续容器数据获得配置文件模板渲染数据
func (p *Pilot) parseLogConfig(name string, info *LogInfoNode, jsonLogPath string, mounts map[string]types.MountPoint) (*LogConfig, error) {// 获取日志目录path := strings.TrimSpace(info.value)// 获取标签数据tags := info.get("tags")tagMap, err := p.parseTags(tags)// 获取索引或者kafka主题target := info.get("target")// 添加默认索引或主题if _, ok := tagMap["index"]; !ok {// ....}if _, ok := tagMap["topic"]; !ok {// ....}// 检查是否有效if err := p.tryCheckKafkaTopic(tagMap["topic"]); err != nil {return nil, err}// 日志格式化format := info.children["format"]if format == nil || format.value == "none" {format = newLogInfoNode("nonex")}formatConfig, err := Convert(format)//特殊处理路径中通配符 regexif format.value == "regexp" {format.value = fmt.Sprintf("/%s/", formatConfig["pattern"])delete(formatConfig, "pattern")}// 如果是stdout, 采集容器的标准输出日志if path == "stdout" {logFile := filepath.Base(jsonLogPath)if p.piloter.Name() == PILOT_FILEBEAT {logFile = logFile + "*"}return &LogConfig{Name: name,HostDir: filepath.Join(p.baseDir, filepath.Dir(jsonLogPath)),File: logFile,Format: format.value,Tags: tagMap,FormatConfig: map[string]string{"time_format": "%Y-%m-%dT%H:%M:%S.%NZ"},Target: target,EstimateTime: false,Stdout: true,}, nil}// 输出到容器内部的具体文件日志路径containerDir := filepath.Dir(path)file := filepath.Base(path)hostDir := p.hostDirOf(containerDir, mounts)cfg := &LogConfig{Name: name,ContainerDir: containerDir,Format: format.value,File: file,Tags: tagMap,HostDir: filepath.Join(p.baseDir, hostDir),FormatConfig: formatConfig,Target: target,}return cfg, nil
}
pilot.render
- 渲染配置文件模板数据,生成具体的配置文件
参考资料:
log-pilot源码简析
容器日志采集利器Log-Pilot
Log-Pilot 源码简析相关推荐
- 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析
目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...
- django源码简析——后台程序入口
django源码简析--后台程序入口 这一年一直在用云笔记,平时记录一些tips或者问题很方便,所以也就不再用博客进行记录,还是想把最近学习到的一些东西和大家作以分享,也能够对自己做一个总结.工作中主 ...
- (Ajax)axios源码简析(三)——请求与取消请求
传送门: axios源码简析(一)--axios入口文件 axios源码简析(二)--Axios类与拦截器 axios源码简析(三)--请求与取消请求 请求过程 在Axios.prototype.re ...
- Spring Boot源码简析 @EnableTransactionManagement
相关阅读 Spring Boot源码简析 事务管理 Spring Boot源码简析 @EnableAspectJAutoProxy Spring Boot源码简析 @EnableAsync Sprin ...
- java ArrayList 概述 与源码简析
ArrayList 概述 与源码简析 1 ArrayList 创建 ArrayList<String> list = new ArrayList<>(); //构造一个初始容量 ...
- ffmpeg实战教程(十三)iJKPlayer源码简析
要使用封装优化ijk就必须先了解ffmpeg,然后看ijk对ffmpeg的C层封装! 这是我看ijk源码时候的笔记,比较散乱.不喜勿喷~ ijk源码简析: 1.ijkplayer_jni.c 封装的播 ...
- 【Android项目】本地FM收音机开发及源码简析
[Android项目]本地FM收音机开发及源码简析 目录 1.概述 2.收音机的基本原理 3.收音机其他信息 RDS功能 4.Android开发FM收音机源码解析 5.App层如何设计本地FM应用 6 ...
- Lottie动画框架入门及源码简析
现在越来越多的APP中添加动画来提升用户体验,下面简单介绍下Airbnb开源的动画框架Lottie的使用 一.基本使用 首先添加依赖 compile 'com.airbnb.android:lotti ...
- Spring Boot源码简析 @Qualifier
源码 @Target({ElementType.FIELD, ElementType.METHOD, ElementType.PARAMETER, ElementType.TYPE, ElementT ...
最新文章
- Sublime Text 3中文乱码解决方法以及安装包管理器方法
- c++初学者使用文件流需要了解的一些坑(持续更新)
- OpenCASCADE:Modeling Algorithms模块之Sweeping: Prism, Revolution and Pipe
- python Demo 01 爬取大学名称
- mysql 中有什么命令_常用mysql命令大全
- 【转】Office365完整离线安装包下载及自定义安装教程
- Linux下如何手动搭建论坛?
- python基础知识汇总01
- Vivado 2019.1下载与安装
- mac通过u盘启动linux系统,在mac下制作linux启动U盘
- PHP工程改成微擎的步骤_微擎系统搭建
- centos中设置邮件发送
- JUC —— 常用辅助类
- Excel:INDEX与MATCH函数
- 【占星学】天蝎座女生性格特点
- linux 1000权限不够,LINUX常见问题1000个详细解答
- 西门子1200PLC的MODBUS_RTU轮询程序
- Python基础——数据类型—集合
- 神经网络模型的基本原理,如何建立神经网络模型
- 车载高速CAN(HighSpeed CAN)通信之CAN Bus Off
热门文章
- springboot结合druid使用多数据源,动态切换
- Android 强大软件,4款强大的安卓黑科技APP,各个都是压箱底,请大家要低调使用...
- Swoole Framework 入门教程(2)-默认路由方式以及GSF扩展路由方式
- 模拟退火(SA, Simulated Annealing)算法解决旅行商TSP问题
- Kubernetes笔记(九) Kubernetes 应用封装与扩展
- windows下编译darknet
- 我的java学习笔记之hibernate进阶…
- 中国青年报:“区块链+供应链金融”为小微企业融资推开一扇窗
- 一个免费的专利检索网站
- CNCF里程碑:超过375家会员