传统的ELFK架构中,filebeat到logstash这个过程中,由于logstash要进行日志的分析处理,而filebeat至进行日志的收集和发送,处理过程较为简单,所以当日志量非常巨大的时候,logstash会由于处理不及时导致日志或数据的丢失,这时候可以在filebeat和logstash之间加入kafka存储信息,在logstash处理不及时的时候,日志或数据不至于丢失。

kafka的安装和配置

直接下载解压安装即可

wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz

由于新版本的kafka自带zookeeper,作为实验使用,就不再单独下载zk了

kafka的配置如下

[root@VM-20-10-centos config]# cat server.properties |egrep -v "^#|^$"
broker.id=0
listeners=PLAINTEXT://10.0.20.10:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.retention.check.interval.ms=300000
zookeeper.connect=10.0.20.10:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

这里主要需要配置的就是kafka的ip和port及zookeeper的ip和port

数据目录因为只是实验使用,就不再修改

zookeeper配置如下,基本采用默认

[root@VM-20-10-centos config]# cat zookeeper.properties  |egrep -v "^#|^$"
dataDir=/tmp/zookeeper
clientPort=2181
maxClientCnxns=0
admin.enableServer=false

配置好后启动应用,这里需要注意的是,先起zookeeper,再起kafka

启动脚本如下

[root@VM-20-10-centos bin]# nohup ./zookeeper-server-start.sh ../config/zookeeper.properties > zknohup.out 2>&1 &
[root@VM-20-10-centos bin]# nohup ./kafka-server-start.sh ../config/server.properties > kafkanohup.out 2>&1 &

创建一个topic

创建topic
[root@VM-20-10-centos bin]# ./kafka-topics.sh --create --topic testxj --bootstrap-server 10.0.20.10:9092
Created topic testxj.
修改topic分区数为3
[root@VM-20-10-centos bin]#  ./kafka-topics.sh --bootstrap-server 10.0.20.10:9092 --topic testxj --alter --partitions 3
查看topic信息
[root@VM-20-10-centos bin]# ./kafka-topics.sh --describe --bootstrap-server 10.0.20.10:9092
Topic: testxj   TopicId: 18qGviHQQ0WO1FZt0Z8qNg PartitionCount: 3       ReplicationFactor: 1    Configs: Topic: testxj   Partition: 0    Leader: 0       Replicas: 0     Isr: 0Topic: testxj   Partition: 1    Leader: 0       Replicas: 0     Isr: 0Topic: testxj   Partition: 2    Leader: 0       Replicas: 0     Isr: 0

修改filebeat和logstash配置进行测试

先不直接将kafka加入elfk中,而是先测试一下使用是否正常

filebeat使用标准输入,logstash输出到标准输出

filebeat配置

[root@VM-20-10-centos filebeat]# vim filebeat_2_kafka.yml
filebeat.inputs:
- type: stdinoutput.kafka:hosts:- 10.0.20.10:9092topic: "testxj"

logstash配置

[root@VM-20-10-centos conf.d]# vim logstash_from_kafka.conf
input {kafka {bootstrap_servers => "10.0.20.10:9092"topics => ["testxj"]group_id => "testxj-logstash"}
}output{stdout {}
}

配置好后,直接前台启动进程进行测试

filebeat启动并向kafka写入aaaaaa和bbbbbb

[root@VM-20-10-centos filebeat]# filebeat -e -c ./filebeat_2_kafka.yml
。。。。
2023-01-20T00:24:13.736+0800    INFO    [monitoring]    log/log.go:144  Non-zero metrics in the last 30s     {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":10,"time":{"ms":4}},"total":{"ticks":20,"time":{"ms":8},"value":20},"user":{"ticks":10,"time":{"ms":4}}},"handles":{"limit":{"hard":100002,"soft":100001},"open":9},"info":{"ephemeral_id":"b783c9f8-6a5a-444c-b5a1-f73ef61f8cd7","uptime":{"ms":90024}},"memstats":{"gc_next":6031808,"memory_alloc":3031768,"memory_total":6498752,"rss":3366912}},"filebeat":{"events":{"added":1,"done":1},"harvester":{"open_files":0,"running":1}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":1,"batches":1,"total":1}},"outputs":{"kafka":{"bytes_read":1512,"bytes_write":376}},"pipeline":{"clients":1,"events":{"active":0,"published":1,"retry":1,"total":1},"queue":{"acked":1}}},"registrar":{"states":{"current":0}},"system":{"load":{"1":1.22,"15":0.63,"5":0.49,"norm":{"1":0.61,"15":0.315,"5":0.245}}}}}}
aaaaaaa
2023-01-20T00:24:35.872+0800    INFO    kafka/log.go:53 producer/broker/0 state change to [open] on testxj/22023-01-20T00:24:43.736+0800    INFO    [monitoring]    log/log.go:144  Non-zero metrics in the last 30s     {"monitoring": {"metrics": {"beat":{"cpu":{"system":{"ticks":10,"time":{"ms":1}},"total":{"ticks":20,"time":{"ms":4},"value":20},"user":{"ticks":10,"time":{"ms":3}}},"handles":{"limit":{"hard":100002,"soft":100001},"open":9},"info":{"ephemeral_id":"b783c9f8-6a5a-444c-b5a1-f73ef61f8cd7","uptime":{"ms":120024}},"memstats":{"gc_next":6031808,"memory_alloc":4337120,"memory_total":7804104,"rss":544768}},"filebeat":{"events":{"added":1,"done":1},"harvester":{"open_files":0,"running":1}},"libbeat":{"config":{"module":{"running":0}},"output":{"events":{"acked":1,"batches":1,"total":1}},"outputs":{"kafka":{"bytes_read":50,"bytes_write":355}},"pipeline":{"clients":1,"events":{"active":0,"published":1,"total":1},"queue":{"acked":1}}},"registrar":{"states":{"current":0}},"system":{"load":{"1":0.74,"15":0.61,"5":0.44,"norm":{"1":0.37,"15":0.305,"5":0.22}}}}}}
bbbbbbb

logstash启动并从kafka中读取信息并向屏幕输出,可以看到信息读取成功

[root@VM-20-10-centos bin]# ./logstash -r -f ../conf.d/logstash_from_kafka.conf
Sending Logstash logs to /usr/local/logstash/logs which is now configured via log4j2.properties
[2023-01-20T00:24:00,706][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
[2023-01-20T00:24:01,268][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.4.2"}
[2023-01-20T00:24:03,696][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2023-01-20T00:24:03,759][INFO ][logstash.pipeline        ] Pipeline started successfully {:pipeline_id=>"main", :thread=>"#<Thread:0x4b7db664 run>"}
[2023-01-20T00:24:03,805][INFO ][logstash.agent           ] Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[2023-01-20T00:24:03,983][INFO ][org.apache.kafka.clients.consumer.ConsumerConfig] ConsumerConfig values: auto.commit.interval.ms = 5000auto.offset.reset = latestbootstrap.servers = [10.0.20.10:9092]check.crcs = trueclient.id = logstash-0connections.max.idle.ms = 540000enable.auto.commit = trueexclude.internal.topics = truefetch.max.bytes = 52428800fetch.max.wait.ms = 500fetch.min.bytes = 1group.id = testxj-logstashheartbeat.interval.ms = 3000interceptor.classes = []internal.leave.group.on.close = trueisolation.level = read_uncommittedkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializermax.partition.fetch.bytes = 1048576max.poll.interval.ms = 300000max.poll.records = 500metadata.max.age.ms = 300000metric.reporters = []metrics.num.samples = 2metrics.recording.level = INFOmetrics.sample.window.ms = 30000partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]receive.buffer.bytes = 65536reconnect.backoff.max.ms = 1000reconnect.backoff.ms = 50request.timeout.ms = 305000retry.backoff.ms = 100sasl.jaas.config = nullsasl.kerberos.kinit.cmd = /usr/bin/kinitsasl.kerberos.min.time.before.relogin = 60000sasl.kerberos.service.name = nullsasl.kerberos.ticket.renew.jitter = 0.05sasl.kerberos.ticket.renew.window.factor = 0.8sasl.mechanism = GSSAPIsecurity.protocol = PLAINTEXTsend.buffer.bytes = 131072session.timeout.ms = 10000ssl.cipher.suites = nullssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]ssl.endpoint.identification.algorithm = nullssl.key.password = nullssl.keymanager.algorithm = SunX509ssl.keystore.location = nullssl.keystore.password = nullssl.keystore.type = JKSssl.protocol = TLSssl.provider = nullssl.secure.random.implementation = nullssl.trustmanager.algorithm = PKIXssl.truststore.location = nullssl.truststore.password = nullssl.truststore.type = JKSvalue.deserializer = class org.apache.kafka.common.serialization.StringDeserializer[2023-01-20T00:24:04,043][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 1.1.0
[2023-01-20T00:24:04,043][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka commitId : fdcf75ea326b8e07
[2023-01-20T00:24:04,205][INFO ][org.apache.kafka.clients.Metadata] Cluster ID: phZ-gSpnRyiDNlzsg7DbUw
[2023-01-20T00:24:04,211][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=testxj-logstash] Discovered group coordinator 10.0.20.10:9092 (id: 2147483647 rack: null)
[2023-01-20T00:24:04,214][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=testxj-logstash] Revoking previously assigned partitions []
[2023-01-20T00:24:04,214][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=testxj-logstash] (Re-)joining group
[2023-01-20T00:24:04,241][INFO ][org.apache.kafka.clients.consumer.internals.AbstractCoordinator] [Consumer clientId=logstash-0, groupId=testxj-logstash] Successfully joined group with generation 1
[2023-01-20T00:24:04,242][INFO ][org.apache.kafka.clients.consumer.internals.ConsumerCoordinator] [Consumer clientId=logstash-0, groupId=testxj-logstash] Setting newly assigned partitions [testxj-0, testxj-1, testxj-2]
[2023-01-20T00:24:04,265][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=testxj-logstash] Resetting offset for partition testxj-0 to offset 20.
[2023-01-20T00:24:04,265][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=testxj-logstash] Resetting offset for partition testxj-1 to offset 13.
[2023-01-20T00:24:04,265][INFO ][org.apache.kafka.clients.consumer.internals.Fetcher] [Consumer clientId=logstash-0, groupId=testxj-logstash] Resetting offset for partition testxj-2 to offset 0.
[2023-01-20T00:24:04,314][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
{"@timestamp" => 2023-01-19T16:24:35.918Z,"@version" => "1","message" => "{\"@timestamp\":\"2023-01-19T16:24:34.871Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"doc\",\"version\":\"6.8.23\",\"topic\":\"testxj\"},\"prospector\":{\"type\":\"stdin\"},\"input\":{\"type\":\"stdin\"},\"beat\":{\"name\":\"VM-20-10-centos\",\"hostname\":\"VM-20-10-centos\",\"version\":\"6.8.23\"},\"host\":{\"name\":\"VM-20-10-centos\"},\"message\":\"aaaaaaa\",\"source\":\"\",\"offset\":0,\"log\":{\"file\":{\"path\":\"\"}}}"
}
{"@timestamp" => 2023-01-19T16:24:49.772Z,"@version" => "1","message" => "{\"@timestamp\":\"2023-01-19T16:24:48.766Z\",\"@metadata\":{\"beat\":\"filebeat\",\"type\":\"doc\",\"version\":\"6.8.23\",\"topic\":\"testxj\"},\"input\":{\"type\":\"stdin\"},\"beat\":{\"name\":\"VM-20-10-centos\",\"hostname\":\"VM-20-10-centos\",\"version\":\"6.8.23\"},\"host\":{\"name\":\"VM-20-10-centos\"},\"offset\":0,\"log\":{\"file\":{\"path\":\"\"}},\"message\":\"bbbbbbb\",\"source\":\"\",\"prospector\":{\"type\":\"stdin\"}}"
}

在ELFK中使用kafka

filebeat的配置修改为

[root@VM-12-8-centos filebeat]# egrep -v "#|^$" filebeat.yml
filebeat.inputs:
- type: logenabled: truepaths:- /data/shell/access.log
filebeat.config.modules:path: ${path.config}/modules.d/*.ymlreload.enabled: false
setup.template.settings:index.number_of_shards: 3
setup.kibana:
output.kafka:hosts: ["10.0.20.10:9092"]topic: "testxj"
processors:- add_host_metadata: ~- add_cloud_metadata: ~

logstash的配置修改为

[root@VM-20-10-centos conf.d]# cat logstash_to_elasticsearch.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.input {kafka {bootstrap_servers => "10.0.20.10:9092"topics => ["testxj"]group_id => "testxj-logstash"}
}filter {grok {match => { "message" => "%{IP:clientip} - (%{USERNAME:user}|-) \[%{HTTPDATE:timestamp}\] \"(%{WORD:request_verb} %{NOTSPACE:request} HTTP/%{NUMBER:httpversion}\s*|%{GREEDYDATA:request})\" %{NUMBER:status:int} %{NUMBER:body_sent:int} \"%{GREEDYDATA:http_referer}\" \"%{GREEDYDATA:http_user_agent}\" \"(%{IPV4:http_x_forwarded_for}|-)\"" }remove_field =>  "message" }date {match => [ "time_local", "dd/MMM/yyyy:HH:mm:ss Z" ]}geoip {source => "clientip"fields => ["city_name","country_name","ip"]}useragent {source => "http_user_agent"target => "acesss_useragent"}
}output {elasticsearch {hosts => ["http://10.0.20.10:9200"]index => "nginx-accesslog-%{+YYYY.MM.dd}"document_type => "nginx-accesslog"template_overwrite => true#user => "elastic"#password => "changeme"}stdout {codec =>rubydebug}
}

最后启动进程即可

在ELFK架构中加入kafka相关推荐

  1. 盘点Zookeeper在分布式架构中的应用

    近日了解到 Kafka 正在酝酿重大更新,可能会提供自管理的元数据仲裁机制以消除对 Zookeeper 的依赖,社区呼吁也相当强烈.那么一般而言 Zookeeper 在分布式系统中扮演什么角色?目前 ...

  2. 微服务架构中配置中心的选择

    点击上方蓝色"方志朋",选择"设为星标" 回复"666"获取独家整理的学习资料! 来源:r6d.cn/XsTR 目前公司内部微服务架构基础设 ...

  3. 面试官:哥们,你们的系统架构中为什么要引入消息中间件?

    点击上方"蓝字", 右上角选择"设为星标" 周一至五早11点半!精品文章准时送上! 本文来自石杉的架构笔记 这篇文章开始,我们把消息中间件这块高频的面试题给大家 ...

  4. 在微服务架构中做机器学习,真的太难了

    2020-05-29 14:42:56 我曾经参与过很多由深度学习技术驱动的项目,最糟糕的情况就是被迫处理面向微服务的架构,我不是呼吁大家停止使用微服务,但想在面向微服务的架构中推动机器学习项目,很大 ...

  5. 大数据架构:flume-ng+Kafka+Storm+HDFS 实时系统组合

    个人观点:大数据我们都知道hadoop,但并不都是hadoop.我们该如何构建大数据库项目.对于离线处理,hadoop还是比较适合的,但是对于实 时性比较强的,数据量比较大的,我们可以采用Storm, ...

  6. c语言进程间通信架构,构建微服务之:微服务架构中的进程间通信

    这是使用微服务架构构建应用系列的第三篇文章.第一篇文章介绍了微服务架构模式并讨论了使用微服务的优势和劣势 :第二篇文章介绍了应用的客户端如何通过API网关作为中介实现服务间的通信:在这篇文章中我们将看 ...

  7. Java生鲜电商平台-SpringCloud微服务架构中分布式事务解决方案

    Java生鲜电商平台-SpringCloud微服务架构中分布式事务解决方案 说明:Java生鲜电商平台中由于采用了微服务架构进行业务的处理,买家,卖家,配送,销售,供应商等进行服务化,但是不可避免存在 ...

  8. kafka tool 查看指定group下topic的堆积数量_ELK架构下利用Kafka Group实现Logstash的高可用...

    系统运维的过程中,每一个细节都值得我们关注 下图为我们的基本日志处理架构 所有日志由Rsyslog或者Filebeat收集,然后传输给Kafka,Logstash作为Consumer消费Kafka里边 ...

  9. 云原生大数据架构中实时计算维表和结果表的选型实践

    简介: 随着互联网技术的日渐发展.数据规模的扩大与复杂的需求场景的产生,传统的大数据架构无法承载. 作者 | 志羽 来源 | 阿里技术公众号 一 前言 传统的大数据技术起源于 Google 三架马车 ...

最新文章

  1. c 语言socket粘包,C# Socket粘包处理讲解示例
  2. 【智能算法】迭代局部搜索(Iterated Local Search, ILS)详解
  3. SQL Cookbook—数字、日期
  4. Chapter7-6_Text Style Transfer
  5. Flowable 数据库表结构 ACT_HI_ATTACHMENT
  6. Android 功耗(19)---LCD背光驱动节电技术-LABC/CABC
  7. Tensorrt7: AttributeError: ‘NoneType‘ object has no attribute ‘create_execution_context‘
  8. 分布式数据库架构及企业实践--基于Mycat中间件pdf
  9. 数组中每个元素都出现了两次,但是其中一个元素只出现了一次,求出此元素
  10. lua 遍历删除_lua中table如何安全移除元素
  11. tlwdr5660间歇性掉线_tl-wdr7660无线掉线?tl-wdr7660路由不稳定怎么办?
  12. [LeetCode] 707.设计链表
  13. db2 matlab实现,MATLAB MIMO-OFDM无线通信技术及 实现一书的源码和配套英文书267万源代码下载- www.pudn.com...
  14. 二维矩形件排样算法之最低水平线算法实现
  15. django 搜索功能的实现
  16. Qt在设计ui界面时,在控件中输入中文,会自动变成英文字母,解决方案
  17. 骁龙8gen1Plus和骁龙8gen1区别
  18. android 图片压缩,bitmap压缩总结
  19. vue页面中el-carousel轮播页面或图片
  20. java利用栈进行进制转换

热门文章

  1. Vue3数组使用push,导致数组每一个元素都改变
  2. 数据库表同步的三种方法
  3. 组件通信之sync-父子数据同步
  4. No module named 'gensim'
  5. sklearn多分类任务自定义cv交叉验证scoring
  6. 百度算法发布历史列表
  7. Linux配置Redis主从
  8. 前端工程师说明(仅以自勉)
  9. 记一次个人网站logo设计
  10. 19-05【icloud】照片备份