ELK部署(filebeat+kafka+logstash+elasticsearch+kibana)
部署环境
服务器名 | IP | 部署服务 |
---|---|---|
yhcs_1 | 192.168.1.200 | filebeat-8.6.2、kafka_2.13-3.4.0 |
yhcs_2 | 192.168.1.210 | filebeat、kafka、logstash-8.6.2 |
yhcs_3 | 192.168.1.220 | filebeat、kafka、elasticsearch-8.6.2、kibana-8.6.2 |
备注:资源有限,仅做kafka集群,其他服务做单点测试
日志获取
filebeat | ----> | kafka | <---- | logstash | ----> | es | -------------> | kibana |
写入 | 读取 | 写入 | 可视化展示 |
部署kafka集群
1、上传下载的jar至/data/soft目录
2、解压相关文件至指定目录,修改文件夹名称
[root@yhcs_1 soft]# tar -xvf jdk-19_linux-x64_bin.tar.gz -C /opt
[root@yhcs_1 soft]# tar -xvf apache-zookeeper-3.8.0-bin.tar.gz -C /data
[root@yhcs_1 soft]# tar -xvf kafka_2.13-3.4.0.tgz -C /data
3、配置JDK(zookeeper依赖java1.8+)
[root@yhcs_1 data]# cat /etc/profile
...
#JDK
export JAVA_HOME=/opt/jdk-19.0.2
export JAVA_BIN=/opt/jdk-19.0.2/bin
export PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME JAVA_BIN PATH
[root@yhcs_1 data]# source /etc/profile
4、配置zookeeper
[root@yhcs_1 conf]# cat zoo.cfg
tickTime=2000
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=5
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为5*2s
syncLimit=2
#Leader和Follower之间同步通信的超时时间,这里表示如果超过2*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/data/zookeeper-3.8.0/data
#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/data/zookeeper-3.8.0/logs
#添加,指定存放日志的目录,目录需要单独创建
clientPort=2181
#客户端连接端口
#添加集群信息
server.1=192.168.1.200:2888:3888
server.2=192.168.1.210:2888:3888
server.3=192.168.1.220:2888:3888
#创建配置文件涉及的目录
[root@yhcs_1 conf]# mkdir -p /data/zookeeper-3.8.0/{data,logs}
#设置myid数值与上面配置文件server.1、server.2、server.3对应
[root@yhcs_1 conf]# echo 1 >/data/zookeeper-3.8.0/data/myid
[root@yhcs_2 conf]# echo 2 >/data/zookeeper-3.8.0/data/myid
[root@yhcs_3 conf]# echo 3 >/data/zookeeper-3.8.0/data/myid
#编写zookeeper启动脚本
[root@yhcs_1 data]# cat /etc/init.d/zookeeper
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/data/zookeeper-3.8.0'
JAVA_HOME=/opt/jdk-19.0.2
JAVA_BIN=/opt/jdk-19.0.2/bin
PATH=$PATH:$JAVA_HOME/bin
case $1 in
start)
echo "---------- zookeeper 启动 ------------"
$ZK_HOME/bin/zkServer.sh start
;;
stop)
echo "---------- zookeeper 停止 ------------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo "---------- zookeeper 重启 ------------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo "---------- zookeeper 状态 ------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
[root@yhcs_1 data]# chmod +x /etc/init.d/zookeeper
[root@yhcs_1 data]# chkconfig --add zookeeper
#启动zookeeper
[root@yhcs_1 data]# systemctl start zookeeper
[root@yhcs_1 data]# systemctl status zookeeper
5、配置kafka
[root@yhcs_1 data]# cat /data/kafka_2.13-3.4.0/config/server.properties
#broker的全局唯一编号,每个broker不能重复
broker.id=1
delete.topic.enable=true
port=9092
host.name=192.168.1.200
#指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
listeners=PLAINTEXT://:9092
advertised.host.name=192.168.1.200
#broker 处理网络请求的线程数量,一般情况下不需要去修改
num.network.threads=3
#用来处理磁盘IO的线程数量,数值应该大于硬盘数
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#日志目录需先创建,不会自动创建。 日志存放目录、数据存放目录
log.dirs=/data/kafka_2.13-3.4.0/logs
#topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.partitions=3
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
#segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.retention.hours=168
#一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.200:2181,192.168.1.210:2181,192.168.1.220:2181
zookeeper.connection.timeout.ms=6000
session.timeout.ms=10s
max.poll.records=100
max.poll.interval.ms=600000
group.initial.rebalance.delay.ms=0
#创建配置文件涉及目录
[root@yhcs_1 data]# mkdir -p /data/kafka_2.13-3.4.0/logs
#编写kafka启动脚本
[root@yhcs_1 data]# cat /etc/init.d/kafka
#配置 kafka 启动脚本
#!/bin/bash
#chkconfig:2345 60 20
#description:Kafka Service Control Script
KAFKA_HOME='/data/kafka_2.13-3.4.0'
JAVA_HOME=/opt/jdk-19.0.2
JAVA_BIN=/opt/jdk-19.0.2/bin
PATH=$PATH:$JAVA_HOME/bin
case $1 in
start)
echo "---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo "---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo "---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
if [ "$count" -eq 0 ];then
echo "kafka is not running"
else
echo "kafka is running"
fi
;;
*)
echo "Usage: $0 {start|stop|restart|status}"
esac
[root@yhcs_1 data]# chmod +x /etc/init.d/kafka
[root@yhcs_1 data]# chkconfig --add kafka
#启动kafka
[root@yhcs_1 data]# systemctl start kafka
[root@yhcs_1 data]# systemctl status kafka
#kafka集群测试
#最新的kafka创建主题topic方式:
./kafka-topics.sh --create --bootstrap-server 192.168.1.200:9092 --replication-factor 2 --partitions 3 --topic test
./kafka-topics.sh --create --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --replication-factor 2 --partitions 3 --topic test1
#查看当前服务器中的所有 topic
./kafka-topics.sh --list --bootstrap-server 192.168.1.200:9092
./kafka-topics.sh --list --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092
#查看某个 topic 的详情
./kafka-topics.sh --describe --bootstrap-server 192.168.1.200:9092
#发布消息
./kafka-console-producer.sh --broker-list 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test
#消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test --from-beginning
--from-beginning:会把主题中以往所有的数据都读取出来
#修改分区数
./kafka-topics.sh --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --alter --topic test --partitions 6
#删除 topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test
部署filebeat
#解压filebeat
[root@yhcs_1 soft]# tar -xvf filebeat-8.6.2-linux-x86_64.tar.gz -C /data
#配置filebeat
[root@yhcs_1 filebeat-8.6.2]# cat filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/messages
output.kafka:
enabled: true
hosts: ["192.168.1.200:9092","192.168.1.210:9092","192.168.1.220:9092"]
topic: "demo-topic"
#启动filebeat
[root@yhcs_1 filebeat-8.6.2]# nohup ./filebeat -e -c filebeat.yml -d "publish" 2>&1 &
部署elasticsearch
#解压es
[root@yhcs_3 soft]# tar -xvf elasticsearch-8.6.2-linux-x86_64.tar.gz -C /data
#配置es
[root@yhcs_3 config]# egrep -v "^$|#" elasticsearch.yml
node.name: node-1
path.data: /data/elasticsearch-8.6.2/data
path.logs: /data/elasticsearch-8.6.2/logs
network.host: 192.168.1.220
http.port: 9200
cluster.initial_master_nodes: ["node-1"]
ingest.geoip.downloader.enabled: false
xpack.security.enabled: false
discovery.seed_hosts: ["192.168.1.220"]
#创建涉及目录
[root@yhcs_3 config]# mkdir -p /data/elasticsearch-8.6.2/data
#创建es用户(es、kibana需要使用非root用户)
[root@yhcs_3 config]# useradd esroot
[root@yhcs_3 config]# passwd esroot_@321
[root@yhcs_3 config]# chown -R esroot:esroot elasticsearch-8.6.2/
#修改其他配置
[root@yhcs_3 config]# vi /etc/security/limits.conf
#在文件末尾中增加下面内容
esroot soft nofile 65536
esroot hard nofile 65536
[root@yhcs_3 config]# vi /etc/security/limits.d/20-nproc.conf
#在文件末尾中增加下面内容
esroot soft nofile 65536
esroot hard nofile 65536
* hard nproc 4096
#注:*代表Linux所有用户名称
[root@yhcs_3 config]# vi /etc/sysctl.conf
#在文件中增加下面内容
vm.max_map_count=655360
#重新加载,输入下面命令
sysctl -p
#启动es
[root@yhcs_3 soft]# su esroot
[esroot@yhcs_3 soft]$ cd /data/elasticsearch-8.6.2/bin/
[esroot@yhcs_3 bin]$ nohup ./elasticsearch -d 2>&1 &
#访问es
http://192.168.1.220:9200/
部署kibana
#解压kibana
[root@yhcs_3 soft]# tar -xvf kibana-8.6.2-linux-x86_64.tar.gz -C /data
#配置kibana
[root@yhcs_3 config]# egrep -v "^$|#" kibana.yml
server.port: 5601
server.host: "192.168.1.220"
elasticsearch.hosts: ["http://192.168.1.220:9200"]
logging.appenders.default:
type: file
fileName: /data/kibana-8.6.2/logs/kibana.log
layout:
type: json
pid.file: /data/kibana-8.6.2/run/kibana.pid
telemetry.enabled: false
i18n.locale: "zh-CN"
#创建涉及目录
[root@yhcs_3 config]# mkdir -p /data/kibana-8.6.2/{logs,run}
#启动kibana
[root@yhcs_3 soft]# su esroot
[esroot@yhcs_3 soft]$ cd /data/kibana-8.6.2/bin/
[esroot@yhcs_3 bin]$ nohup ./kibana 2>&1 &
#访问kibana(这个时候还没有数据)
http://192.168.1.220:5601/
部署logstash
#解压kibana
[root@yhcs_2 soft]# tar -xvf logstash-8.6.2-linux-x86_64.tar.gz -C /data
#配置logstash
[root@yhcs_2 config]# cat /data/logstash-8.6.2/config/logstash-sample.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {
kafka {
bootstrap_servers => "192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092"
topics => "demo-topic"
group_id => "test"
}
}
output {
elasticsearch {
hosts => ["http://192.168.1.220:9200"]
index => "test_index"
#user => "elastic"
#password => "changeme"
}
}
#启动logstash
[root@yhcs_2 config]# cd /data/logstash-8.6.2
[root@yhcs_2 bin]# nohup ./bin/logstash -f ./config/logstash-sample.conf --config.reload.automatic --path.data=/data/logstash-8.6.2/ >/dev/null 2>&1 &
#访问kibana(整个链路走通就可以在kibana看到采集的日志信息)
http://192.168.1.220:5601/
ELK部署(filebeat+kafka+logstash+elasticsearch+kibana)相关推荐
- 【转】Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日志采集方案
前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产 ...
- Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统
文章目录 一.前言 二.背景信息 三.操作流程 四.准备工作 1.Docker 环境 2.Docker Compose 环境 3.版本准备 4.环境初始化 5.服务安装 6.服务设置 五.配置 Fil ...
- Filebeat+Kafka+Logstash+ElasticSearch+Kibana搭建完整版
https://www.cnblogs.com/jiashengmei/p/8857053.html
- ELK学习3_使用redis+logstash+elasticsearch+kibana快速搭建日志平台
日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数.执行时间.成功率等 异常数 ...
- 安装logstash,elasticsearch,kibana三件套
原文地址:http://www.cnblogs.com/yjf512/p/4194012.html logstash,elasticsearch,kibana三件套 elk是指logstash,ela ...
- Logstash+Elasticsearch+kibana搭建日志平台文档(xjh20160527重庆公司亲测可用)
Logstash+Elasticsearch+kibana搭建日志平台文档 v1.0_xjh 20160527 http://wenku.baidu.com/link?url=_HaXh4neqWm ...
- 基于ELK打造强大的日志收集分析系统(springboot2+logback+logstash+elasticsearch+kibana)
前言 上一代的日志分析系统springboot+log4j+flume+kafka仿佛还是昨天,现在已经流行起了新一代的springboot2+logback+logstash+elasticsear ...
- filebeat+redis+logstash+elasticsearch filebeat+kafka+zookeeper+logstash+elasticsearch
收集日志的工具 日志易(收费) splunk(国外,按流量收费) 介绍 发展史:使用java语言,在luncen的基础上做二次封装,提供restful接口 搜索的原理:倒排索引 特点:水平扩展方便.提 ...
- 企业日志分析ELK(Logstash+Elasticsearch+Kibana)介绍及搭建
目录 一.ELK概述 1.ELK日志分析系统 2.ELK中日志处理步骤 二.Elasticsearch介绍 1.Elasticsearch的基础核心概念 2.分片和副本 三.LogStash概述 1. ...
最新文章
- SVN之版本控制系统
- jsTree 插件Ajax数据
- File类 读取文件
- Safari 10默认禁用Flash插件
- scala从集合中提取不重复的元素
- 服务器如何返回429状态,Instagram远程服务器返回一个错误:(429)UNKNOWN STATUS CODE...
- R语言本地安装包教程
- 8个国外在线学习网课的网站
- 项目管理工具二、STAR原则
- 提高信息技术课堂教学实效研究 课题论证报告
- 反正不要钱,多少看一点系列:win10环境下安装Kali Linux双系统的装机经历(个人血泪史)
- 微信小程序 image-cropper 还原按钮控制
- 学java应该学什么
- 如何使用Proteus进行电路设计仿真?
- Lingo的基本用法(更新版)
- 【Ant Design】下拉列表Select 、日期选择框DatePicker等跟随滚动条上下移动解决方案
- 为什么Mysql底层采用B+树做索引?
- Java微信公众号开发之微信粉丝信息一键同步
- 【量化】股市技术分析利器之TA-Lib(一)
- java游戏健值测试软件,教你使用JavaMagic来改JAVA手机游戏软件键值和改全屏.doc