文章目录

  • FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)
    • 下载地址
    • 目的
    • 日志数据
    • 模拟Pipeline
    • 创建pipeline
      • 查看Pipeline是否创建成功
    • 创建FileBeat配置文件 filebeat.yml
    • 创建自定义字段 FileBeat fields.yml
    • 执行 FileBeat
    • filebeat 启动命令说明
    • 测试
  • Pipeline 配置详解
    • 1. 根据日志数据指定索引 _id
  • FileBeat 配置详解
    • 1.设置Filebeat保存到ElasticSearch索引副本、分片数量
  • 异常处理
    • 提示 ERROR instance/beat.go:802 Exiting: error initializing processors:

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)

下载地址

https://www.elastic.co/cn/downloads/past-releases#filebeat

目的

使用FileBeat收集日志,Pipeline解析日志,最终写入ES

日志数据

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

模拟Pipeline

注意:如果同时通过setscript设置字段,会以script为准。

POST /_ingest/pipeline/_simulate
{"pipeline": {"processors" : [{"dissect": {"field": "message","pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"}},{"split": {"field": "logdata","separator": "\\|","target_field": "logdata"}},{"set": {"field": "actionOrFunction","value": "{{logdata.0}}"}},{"set": {"field": "businessType","value": "{{logdata.1}}"}},{"set": {"field": "callMethod","value": "{{logdata.2}}"}},{"set": {"field": "requestMethod","value": "{{logdata.3}}"}},{"set": {"field": "callLink","value": "{{logdata.4}}"}},{"set": {"field": "loginUserIp","value": "{{logdata.5}}"}},{"set": {"field": "userName","value": "{{logdata.6}}"}},{"set": {"field": "userId","value": "{{logdata.7}}"}},{"set": {"field": "paramOrInputData","value": "{{logdata.8}}"}},{"set": {"field": "resultOrOutputData","value": "{{logdata.9}}"}},{"set": {"field": "exceptionInfo","value": "{{logdata.10}}"}},{"set": {"field": "systemEnv","value": "{{logdata.11}}"}},{"set": {"field": "status","value": "{{logdata.12}}"}},{"set": {"field": "fullLinkId","value": "{{logdata.13}}"}},{"set": {"field": "subFullLinkId","value": "{{logdata.14}}"}},{"set": {"field": "currentTimeMillisecond","value": "{{logdata.15}}"}},{"convert": {"field": "currentTimeMillisecond","type": "long"}},{"set": {"field": "detail","value": "{{logdata.16}}"}},{"set": {"field": "other","value": "{{logdata.17}}"}},{"set": {"field": "errorData","value": "{{logdata.18}}"}},{"set": {"field": "errorDataSource","value": "{{logdata.19}}"}},{"set": {"field": "errorDataDetail","value": "{{logdata.20}}"}},{"set": {"field": "logTime","value": "{{logdata.21}}"}},{"set": {"field": "processTime","value": "{{logdata.22}}"}},{"convert": {"field": "processTime","type": "long"}},{"set": {"field": "orgCode","value": "{{logdata.23}}"}},{"set": {"field": "orgName","value": "{{logdata.24}}"}},{"set": {"field": "exceptionDetailInfo","value": "{{logdata.25}}"}},{"set": {"field": "message","value": ""}},{"set": {"field": "logdata","value": ""}},{"script": {"lang": "painless","source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """}}]},"docs": [{"_source": {"message": "2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询运营商宽带用户|4|com.bjga.internet.operator.controller.OperatorBroadbandController.list()|GET|http://127.0.0.1:8080/operator2/broadband/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAccount=%E5%8C%97%E4%BA%AC1%E5%B8%8256&installedPhone=639857&accountHolderName=%E4%B8%9C%E7%A5%A5%E6%9E%97&operatorCreditCode=91110108101909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{\"code\":200,\"msg\":\"查询成功\",\"rows\":[],\"took\":2,\"total\":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X"}}]
}

创建pipeline

PUT _ingest/pipeline/logdatapipeline
{"description" : "outer pipeline","processors" : [{"dissect": {"field": "message","pattern": "%{@logTimestamp} [%{logTthread}] %{loglevel} fileBeatLogData - %{logdata}"}},{"split": {"field": "logdata","separator": "\\|","target_field": "logdata"}},{"set": {"field": "actionOrFunction","value": "{{logdata.0}}"}},{"set": {"field": "businessType","value": "{{logdata.1}}"}},{"set": {"field": "callMethod","value": "{{logdata.2}}"}},{"set": {"field": "requestMethod","value": "{{logdata.3}}"}},{"set": {"field": "callLink","value": "{{logdata.4}}"}},{"set": {"field": "loginUserIp","value": "{{logdata.5}}"}},{"set": {"field": "userName","value": "{{logdata.6}}"}},{"set": {"field": "userId","value": "{{logdata.7}}"}},{"set": {"field": "paramOrInputData","value": "{{logdata.8}}"}},{"set": {"field": "resultOrOutputData","value": "{{logdata.9}}"}},{"set": {"field": "exceptionInfo","value": "{{logdata.10}}"}},{"set": {"field": "systemEnv","value": "{{logdata.11}}"}},{"set": {"field": "status","value": "{{logdata.12}}"}},{"set": {"field": "fullLinkId","value": "{{logdata.13}}"}},{"set": {"field": "subFullLinkId","value": "{{logdata.14}}"}},{"set": {"field": "currentTimeMillisecond","value": "{{logdata.15}}"}},{"convert": {"field": "currentTimeMillisecond","type": "long"}},{"set": {"field": "detail","value": "{{logdata.16}}"}},{"set": {"field": "other","value": "{{logdata.17}}"}},{"set": {"field": "errorData","value": "{{logdata.18}}"}},{"set": {"field": "errorDataSource","value": "{{logdata.19}}"}},{"set": {"field": "errorDataDetail","value": "{{logdata.20}}"}},{"set": {"field": "logTime","value": "{{logdata.21}}"}},{"set": {"field": "processTime","value": "{{logdata.22}}"}},{"convert": {"field": "processTime","type": "long"}},{"set": {"field": "orgCode","value": "{{logdata.23}}"}},{"set": {"field": "orgName","value": "{{logdata.24}}"}},{"set": {"field": "exceptionDetailInfo","value": "{{logdata.25}}"}},{"set": {"field": "message","value": ""}},{"set": {"field": "logdata","value": ""}},{"script": {"lang": "painless","source": """ ctx.insertTime = new Date(System.currentTimeMillis()+1000l*60*60*8);  """}}]
}

查看Pipeline是否创建成功

GET _ingest/pipeline/logDataPipeline?pretty

创建FileBeat配置文件 filebeat.yml

读取 /var/log2/*.log 文件写入ES

filebeat.inputs:
- type: logenabled: true
#读取的文件paths:- /var/log2/*.log
# 标记,在后面用于判断写入的索引fields:type: logDataPipelinesource: common
- type: logenabled: truepaths:- /var/log/1.log- /var/log/2.logfields:source: exception
- type: logenabled: truepaths:- /var/log/3.logfilebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: false# ======================= Elasticsearch template setting =======================setup.template.settings:# 索引默认分片数index.number_of_shards: 1# 索引默认副本数index.number_of_replicas: 1#index.codec: best_compression#_source.enabled: false# # 生成index模板的名称
#允许自动生成index模板
setup.template.enabled: true
# # 如果存在模块则覆盖
setup.template.overwrite: true
# # # 生成index模板时字段配置文件
setup.template.fields: fields.yml
setup.template.name: "logdata"
# # # 生成index模板匹配的index格式
setup.template.pattern: "logdata-*"
setup.ilm.enabled: auto
# 这里一定要注意 会在alias后面自动添加-*
setup.ilm.rollover_alias: "park-ssm"
setup.ilm.pattern: "{now/d}"
# # # 生成kibana中的index pattern,便于检索日志
# #setup.dashboards.index: myfilebeat-7.0.0-*
# #filebeat默认值为auto,创建的elasticsearch索引生命周期为50GB+30天。如果不改,可以不用设置
setup.ilm.enabled: false# =================================== Kibana ===================================
setup.kibana:# ---------------------------- Elasticsearch Output ----------------------------
output.elasticsearch:# Array of hosts to connect to.hosts: ["10.8.10.12:9200"]index: "logdata-%{+yyyy.MM.dd}"indices:- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"when.equals: fields: source: "common"- index: "logdata-%{[fields.source]}-%{+yyyy.MM.dd}"when.equals:fields:source: "exception"pipelines:- pipeline: logDataPipelinewhen.equals:fields.type: logDataPipeline# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~- add_docker_metadata: ~- add_kubernetes_metadata: ~

创建自定义字段 FileBeat fields.yml

# 我们自定义的
- key: rbttitle: rbtdescription: rbt log data fields fields:- name: logdatatype: keyword- name: actionOrFunctiontype: keyword- name: businessTypetype: keyword- name: callMethodtype: keyword- name: requestMethodtype: keyword- name: callLinktype: keyword- name: loginUserIptype: keyword- name: userNametype: keyword- name: userIdtype: keyword- name: paramOrInputDatatype: keyword- name: resultOrOutputDatatype: keyword- name: exceptionInfotype: keyword- name: systemEnvtype: keyword- name: statustype: long- name: fullLinkIdtype: keyword- name: subFullLinkIdtype: keyword- name: currentTimeMillisecondtype: long- name: detailtype: keyword- name: othertype: keyword- name: errorDatatype: keyword- name: errorDataSourcetype: keyword- name: errorDataDetailtype: keyword- name: logTimetype: keyword- name: processTimetype: long- name: orgCodetype: keyword- name: orgNametype: keyword- name: exceptionDetailInfotype: keyword- name: insertTimetype: date# FileBeat自带的
- key: ecstitle: ECSdescription: ECS Fields.fields:- name: '@timestamp'level: corerequired: truetype: datedescription: 'Date/time when the event originated.This is the date/time extracted from the event, typically representing whenthe event was generated by the source.If the event source has no original timestamp, this value is typically populatedby the first time the event was received by the pipeline.Required field for all events.'example: '2016-05-23T08:05:34.853Z'

执行 FileBeat

[root@test13 filebeat-7.9.3-linux-x86_64]# ls
data        fields.yml.bak  filebeat.reference.yml  filebeat.yml.bak  LICENSE.txt  modules.d   README.md
fields.yml  filebeat        filebeat.yml            kibana            module       NOTICE.txt  s.log
[root@test13 filebeat-7.9.3-linux-x86_64]# ./filebeat -e

filebeat 启动命令说明

-c 指定配置文件
-d "*" 报错时候,查看具体的错误原因。

测试

新增数据到 vim /var/log2/test.log

2021-07-01 20:07:25 [XNIO-1 task-2] INFO  fileBeatLogData - 查询用户|4|com.internet.operator.controller..list()|GET|http://127.0.0.1:8080/list|127.0.0.1|jast110|9a2e232170744efda8c526d67f4f5405|userAcco909571P&installedLocation=&pageNum=10&pageSize=10&superQuery=1|{"code":200,"msg":"查询成功","rows":[],"took":2,"total":1}|||0|||1625141245843||||||2021-07-01 20:07:25|142|91110108769392234H|测试111|X

查询结果发现日志已经进入到ES

个人公众号(大数据学习交流): hadoopwiki

Pipeline 配置详解

1. 根据日志数据指定索引 _id

每个文档都会有一些元数据字段信息(metadata filed),比如_id,_index,_type 等,我们在 processors 中也可以直接访问这些信息的,比如下面的例子:

{"set": {"field": "_id","value": "{{logdata.6}}"}
}

FileBeat 配置详解

注意:首次创建的时候FileBeat会在ElasticSearch设置我们再FileBeat配置的_template索引模板,后续重启服务即便配置改了都不会更新该模板,比如下面的分片副本数量,首次启动后,该配置会写入索引模板中,后续修改不起作用。需要重新配置修改,需要删除filebeat目录下的data目录。

1.设置Filebeat保存到ElasticSearch索引副本、分片数量

修改 filebeat.yml 文件中下面参数

setup.template.settings:# 索引默认分片数index.number_of_shards: 1# 索引默认副本数index.number_of_replicas: 1

异常处理

提示 ERROR instance/beat.go:802 Exiting: error initializing processors:

异常内容如下

2022-01-20T14:39:22.441+0800    ERROR   instance/beat.go:802    Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?
Exiting: error initializing processors: Cannot connect to the Docker daemon at unix:///var/run/docker.sock. Is the docker daemon running?

解决方法
注释掉filebeat.yml文件中的add_docker_metadataadd_kubernetes_metadata

# ================================= Processors =================================
processors:- add_host_metadata:when.not.contains.tags: forwarded- add_cloud_metadata: ~
#  - add_docker_metadata: ~
#  - add_kubernetes_metadata: ~

FileBeat + Pipeline 解析日志 保存至ElasticSearch(实战)相关推荐

  1. Filebeat+Kafka+ELK日志采集(五)——Elasticsearch

    一.下载.安装.配置.启动: 1.下载 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.3.2-li ...

  2. ES Filebeat 使用 Pipeline 处理日志中的 @timestamp

    推荐阅读 Helm3(K8S 资源对象管理工具)视频教程:https://edu.csdn.net/course/detail/32506 Helm3(K8S 资源对象管理工具)博客专栏:https: ...

  3. filebeat 解析日志 并发送到Elasticsearch

    起先,是出于了解我的网站逐步前行STEP的访问情况而做一个Nginx日志统计分析的功能,首选的就是ELK,但是,由于Logstash占用内存和CPU占有率都不是我的小服务器能承受的,转而将logsta ...

  4. ElasticSearch实战:Linux日志对接Kibana

    本文由云+社区发表 ElasticSearch是一个基于Lucene的搜索服务器.它提供了一个分布式多用户能力的全文搜索引擎,基于RESTFul web接口.ElasticSearch是用Java开发 ...

  5. Springboot/Springcloud整合ELK平台,(Filebeat方式)日志采集及管理(Elasticsearch+Logstash+Filebeat+Kibana)

    前言 最近在搞一套完整的云原生框架,详见 spring-cloud-alibaba专栏,目前已经整合的log4j2,但是想要一套可以实时观察日志的系统,就想到了ELK,然后上一篇文章是socket异步 ...

  6. 2021年大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 使用FileBeat采集Kafka日志到Elasticsearch 一.需求分 ...

  7. ElasticSearch设置日志保存时间-自动删除

    ElasticSearch提供索引生命周期管理(6.6版本开始),参考地址: https://www.elastic.co/guide/en/elasticsearch/reference/6.6/_ ...

  8. ElasticSearch实战-日志监控平台

    1.概述 在项目业务倍增的情况下,查询效率受到影响,这里我们经过讨论,引进了分布式搜索套件--ElasticSearch,通过分布式搜索来解决当下业务上存在的问题.下面给大家列出今天分析的目录: El ...

  9. ElasticSearch实战系列十一: ElasticSearch错误问题解决方案

    前言 本文主要介绍ElasticSearch在使用过程中出现的各种问题解决思路和办法. ElasticSearch环境安装问题 1,max virtual memory areas vm.max_ma ...

最新文章

  1. Laravel 中简约而不简单的 Macroable 宏指令
  2. Java虚拟机字节码指令概述
  3. 【学术软件】ETPS(English Text Processing Software)
  4. 子页面刷新父页面,避免弹出重复提交窗口
  5. 一个网友问的该不该加入公司Share Matching Plan
  6. canvas元素简易教程(7)(大部分转自火狐,自己只写了简单的代码分析)
  7. ASCll码字符对照表
  8. linux mpeg4ip 编译,利用Linux实现MPEG4流媒体技术
  9. LoadRunner视频教程
  10. 阿里云开发者藏品计划【阿里云飞天5K纪念碑】
  11. sql2000安装失败的解决方法
  12. 《黑白团团队》第八次团队作业:Alpha冲刺 第二天
  13. 超信Linux版(超信 for Linux下载) v1.3.0官方版
  14. 学生成绩管理系统(C语言)(链表)
  15. 小米平板1(A0101)官方线刷包_救砖包_解账户锁
  16. 红帽 linux 安装gns3,在Arch Linux和Manjaro系统上安装GNS3的方法
  17. 阿里云服务器好用吗?
  18. 近期刷题总结 [19 03 17]
  19. Salesforce(0):使用VsCode使用Aura组件并展示组件扩展
  20. oracle数据库恢复aul_Oracle数据库恢复dmp

热门文章

  1. mysql 集群与主从_Mysql集群和主从
  2. 泰森怎么会输给道格拉斯_泰森24岁就被击败,如果能像他学习巅峰时期至少能延长三年...
  3. javabean连接mysql数据库,jsp+javabean 链接 mysql 数据库
  4. 人工智能python小程序_Python:一个可以套路别人的python小程序
  5. python 找到两个排序数组的中位数_Python查找两个有序列表中位数的方法【基于归并算法】...
  6. matlab保存数据到excel_Excel意外退出数据未保存?这个方法可以帮你找回所有数据...
  7. 【LeetCode笔记】剑指 Offer 93. 复原 IP 地址(Java、DFS、字符串)
  8. 【学习笔记】传输层:概述、UDP协议
  9. php中文本设置随机颜色,php 产生随机整数,随机字符串,随机颜色等类用法
  10. 计算机网络 --- 数据链路层aloha协议