2021年大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch
全网最详细的大数据ELK文章系列,强烈建议收藏加关注!
新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点。
目录
使用FileBeat采集Kafka日志到Elasticsearch
一、需求分析
二、配置FileBeats
1、input配置
2、output配置
三、配置文件
1、创建配置文件
2、复制一下到配置文件中
四、运行FileBeat
1、运行FileBeat
2、将日志数据上传到/var/kafka/log,并解压
五、查询数据
1、查看索引信息
六、解决一个日志涉及到多行问题
1、导入错误日志
2、问题分析
3、FileBeat多行配置选项
4、重新配置FileBeat
使用FileBeat采集Kafka日志到Elasticsearch
一、需求分析
在资料中有一个kafka_server.log.tar.gz压缩包,里面包含了很多的Kafka服务器日志,现在我们为了通过在Elasticsearch中快速查询这些日志,定位问题。我们需要用FileBeats将日志数据上传到Elasticsearch中。
问题:
- 首先,我们要指定FileBeat采集哪些Kafka日志,因为FileBeats中必须知道采集存放在哪儿的日志,才能进行采集。
- 其次,采集到这些数据后,还需要指定FileBeats将采集到的日志输出到Elasticsearch,那么Elasticsearch的地址也必须指定。
二、配置FileBeats
FileBeats配置文件主要分为两个部分。
- inputs
- output
从名字就能看出来,一个是用来输入数据的,一个是用来输出数据的。
1、input配置
filebeat.inputs:
- type: logenabled: truepaths:- /var/log/*.log#- c:\programdata\elasticsearch\logs\*
在FileBeats中,可以读取一个或多个数据源。
2、output配置
默认FileBeat会将日志数据放入到名称为:filebeat-%filebeat版本号%-yyyy.MM.dd 的索引中。
PS:
FileBeats中的filebeat.reference.yml包含了FileBeats所有支持的配置选项。
三、配置文件
1、创建配置文件
cd /export/server/es/filebeat-7.6.1-linux-x86_64
vim filebeat_kafka_log.yml
2、复制一下到配置文件中
filebeat.inputs:
- type: logenabled: truepaths:- /export/server/es/data/kafka/server.log.*output.elasticsearch:hosts: ["node1:9200", "node2:9200", "node3:9200"]
四、运行FileBeat
1、运行FileBeat
./filebeat -c filebeat_kafka_log.yml -e
2、将日志数据上传到/var/kafka/log,并解压
mkdir -p /export/server/es/data/kafka/tar -xvzf kafka_server.log.tar.gz
注意: 文件权限的报错
如果在启动fileBeat的时候, 报了一个配置文件权限的错误, 请修改其权限为 -rw-r--r--
五、查询数据
1、查看索引信息
GET /_cat/indices?v
{"health": "green","status": "open","index": "filebeat-7.6.1-2021.12.05-000001","uuid": "dplqB_hTQq2XeSk6S4tccQ","pri": "1","rep": "1","docs.count": "213780","docs.deleted": "0","store.size": "71.9mb","pri.store.size": "35.8mb"}
GET /filebeat-7.6.1-2021.12.05-000001/_search
{"_index": "filebeat-7.6.1-2021.12.05-000001","_type": "_doc","_id": "-72pX3IBjTeClvZff0CB","_score": 1,"_source": {"@timestamp": "2021-12-05T09:00:40.041Z","log": {"offset": 55433,"file": {"path": "/var/kafka/log/server.log.2021-12-05-16"}},"message": "[2021-12-05 09:01:30,682] INFO Socket connection established, initiating session, client: /192.168.88.100:46762, server: node1.cn/192.168.88.100:2181 (org.apache.zookeeper.ClientCnxn)","input": {"type": "log"},"ecs": {"version": "1.4.0"},"host": {"name": "node1"},"agent": {"id": "b4c5c4dc-03c3-4ba4-9400-dc6afcb36d64","version": "7.6.1","type": "filebeat","ephemeral_id": "b8fbf7ab-bc37-46dd-86c7-fa7d74d36f63","hostname": "node1"}}}
FileBeat自动给我们添加了一些关于日志、采集类型、Host各种字段。
六、解决一个日志涉及到多行问题
我们在日常日志的处理中,经常会碰到日志中出现异常的情况。类似下面的情况:
[2021-12-05 14:00:05,725] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error when sending leader epoch request for Map(test_10m-2 -> (currentLeaderEpoch=Optional[161], leaderEpoch=158)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to node2:9092 (id: 1 rack: null) failed.at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:71)at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:102)at kafka.server.ReplicaFetcherThread.fetchEpochEndOffsets(ReplicaFetcherThread.scala:310)at kafka.server.AbstractFetcherThread.truncateToEpochEndOffsets(AbstractFetcherThread.scala:208)at kafka.server.AbstractFetcherThread.maybeTruncate(AbstractFetcherThread.scala:173)at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:96)
[2021-12-05 14:00:05,725] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Retrying leaderEpoch request for partition test_10m-2 as the leader reported an error: UNKNOWN_SERVER_ERROR (kafka.server.ReplicaFetcherThread)
[2021-12-05 14:00:08,731] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Connection to node 1 (node2/192.168.88.101:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
在FileBeat中,Harvest是逐行读取日志文件的。但上述的日志会出现一条日志,跨多行的情况。有异常信息时,肯定会出现多行。我们先来看一下,如果默认不处理这种情况会出现什么问题。
1、导入错误日志
1)在/export/server/es/data/kafka/中创建名为server.log.2021-12-05的日志文件
2)将资料中的err.txt日志文本贴入到该文件中
观察FileBeat,发现FileBeat已经针对该日志文件启动了Harvester,并读取到数据数据。
2021-12-05T19:11:01.236+0800 INFO log/harvester.go:297 Harvester started for file: /var/kafka/log/server.log.2021-12-05
3)在Elasticsearch检索该文件
我们发现,原本是一条日志中的异常信息,都被作为一条单独的消息来处理了~
"message":"java.io.IOException:Connection to node2:9092 (id;
这明显是不符合我们的预期的,我们想要的是将所有的异常消息合并到一条日志中。那针对这种情况该如何处理呢?
2、问题分析
每条日志都是有统一格式的开头的,就拿Kafka的日志消息来说,[2021-12-05 14:00:05,725]这是一个统一的格式,如果不是以这样的形式开头,说明这一行肯定是属于某一条日志,而不是独立的一条日志。所以,我们可以通过日志的开头来判断某一行是否为新的一条日志。
3、FileBeat多行配置选项
在FileBeat的配置中,专门有一个解决一条日志跨多行问题的配置。主要为以下三个配置:
multiline.pattern: ^\[
multiline.negate: false
multiline.match: after
multiline.pattern表示能够匹配一条日志的模式,默认配置的是以[开头的才认为是一条新的日志。
multiline.negate:配置该模式是否生效,默认为false。
multiline.match:表示是否将未匹配到的行追加到上一日志,还是追加到下一个日志。
4、重新配置FileBeat
1)修改filebeat.yml,并添加以下内容
filebeat.inputs:
- type: logenabled: truepaths:- /var/kafka/log/server.log.*multiline.pattern: '^\['multiline.negate: truemultiline.match: afteroutput.elasticsearch:hosts: ["node1:9200", "node2:9200", "node3:9200"]
2)修改「注册表」/data.json,将server.log.2021-12-05对应的offset设置为0
cd /export/server/es/filebeat-7.6.1-linux-x86_64/data/registry/filebeatvim data.json
3)删除之前创建的文档
// 删除指定文件的文档
POST /filebeat-7.6.1-2021.12.05-000001/_delete_by_query
{"query": {"match": {"log.file.path": "/var/kafka/log/server.log.2021-12-05"}}
}
4)重新启动FileBeat
./filebeat -e
-
2021年大数据ELK(十九):使用FileBeat采集Kafka日志到Elasticsearch相关推荐
- 2021年大数据ELK(一):集中式日志协议栈Elastic Stack简介
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 一.简介 二.ELK 协议栈介绍及体系结构 三.集中式日志协议栈 ...
- 2021年大数据ELK(九):使用VSCode测试分词器
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 使用VSCode测试分词器 一.准备VSCode开发环境 1.打开VSCode ...
- 2021年大数据ELK(五):Elasticsearch中的核心概念
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 Elasticsearch中的核心概念 一.索引 index 二 ...
- 2021年大数据ELK(二):Elasticsearch简单介绍
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 一.Elasticsearch简介 1.介绍 2.创始人 二.E ...
- 2021年大数据ELK(八):Elasticsearch安装IK分词器插件
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 安装IK分词器 一.下载Elasticsearch IK分词器 ...
- 2021年大数据ELK(六):安装Elasticsearch
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 安装Elasticsearch 一.创建普通用户 二.为普通用户 ...
- 2021年大数据ELK(四):Lucene的美文搜索案例
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 美文搜索案例 一.需求 二.准备工作 1.创建IDEA项目 2. ...
- 2021年大数据ELK(三):Lucene全文检索库介绍
全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 一.什么是全文检索 1.结构化数据与非结构化数据 2.搜索结构化 ...
- 2021年大数据HBase(九):Apache Phoenix的安装
全网最详细的大数据HBase文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 前言 系列历史文章 安装Phoenix 一.下载 二.安装 1.上传安装包 ...
最新文章
- (转)C#操作XML的完整例子——XmlDocument篇
- 详解阿里开源分布式事务框架Seata
- 幂等问题 vs 如何判断是否是4的幂
- 海德薇格:我很期待看到 数字货币将如何改变人民币支付市场
- [CF452E]Three strings
- android 动态更改包名,Gradle多渠道打包(动态设定App名称,应用图标,替换常量,更改包名,变更渠道)...
- webpack多环境(dev stg prd qa)打包问题
- 【C/C++】C++函数
- SQL Server2005完全版与精简版的一个差别(抄录)
- UE4C++ Http下载文件
- BitviseSSH绕过4A内网直连服务器
- 阿里云大学生领取免费ECS服务器——测试题答案
- 算法与程序的区别与联系
- 凯利讯分享ECL电路与TTL电路的使用注意事项
- SEO当下的力量,你应该关注的4个属性
- ASP.NET MVC Liu_Cabbage 个人博客
- pytorch之models
- 高通5G平台(SDX55\SDX62\SDX65):ping包异常问题排查指南
- 优于 ViT 和 MLP-Mixer 的全局滤波器:Global Filter Networks for Image Classification [NeurIPS 2021]
- GoogleChrome禁止访问端口解决
热门文章
- php批量导出pdf文件大小,php完美导出pdf,pdf合并批量导出
- C++ 笔记(36)— 接收输入字符串的几种方法
- 前端Vue学习之路(二)-Vue-router路由
- 【VB】学生信息管理系统1——系统设计怎样开始?
- 反向词典_根据描述查找词语
- 通俗理解tf.nn.conv2d() tf.nn.conv3d( )参数的含义 pytorhc 卷积
- ONNX MLIR方法
- Paddle预训练模型应用工具PaddleHub
- AI解决方案:边缘计算和GPU加速平台
- 分层条件关系网络在视频问答VideoQA中的应用:CVPR2020论文解析
- 2021年大数据ELK(一):集中式日志协议栈Elastic Stack简介