部署环境

服务器名 IP 部署服务
yhcs_1 192.168.1.200 filebeat-8.6.2、kafka_2.13-3.4.0
yhcs_2 192.168.1.210 filebeat、kafka、logstash-8.6.2
yhcs_3 192.168.1.220 filebeat、kafka、elasticsearch-8.6.2、kibana-8.6.2

备注:资源有限,仅做kafka集群,其他服务做单点测试

日志获取

filebeat ----> kafka <---- logstash ----> es -------------> kibana
写入 读取 写入 可视化展示

部署kafka集群

1、上传下载的jar至/data/soft目录

2、解压相关文件至指定目录,修改文件夹名称

[root@yhcs_1 soft]# tar -xvf jdk-19_linux-x64_bin.tar.gz -C /opt

[root@yhcs_1 soft]# tar -xvf apache-zookeeper-3.8.0-bin.tar.gz -C /data

[root@yhcs_1 soft]# tar -xvf kafka_2.13-3.4.0.tgz -C /data

3、配置JDK(zookeeper依赖java1.8+)

[root@yhcs_1 data]# cat /etc/profile
...

#JDK
export JAVA_HOME=/opt/jdk-19.0.2
export JAVA_BIN=/opt/jdk-19.0.2/bin
export PATH=$PATH:$JAVA_HOME/bin
export JAVA_HOME JAVA_BIN PATH

[root@yhcs_1 data]# source /etc/profile

4、配置zookeeper

[root@yhcs_1 conf]# cat zoo.cfg 
tickTime=2000
#通信心跳时间,Zookeeper服务器与客户端心跳时间,单位毫秒
initLimit=5
#Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量),这里表示为5*2s
syncLimit=2
#Leader和Follower之间同步通信的超时时间,这里表示如果超过2*2s,Leader认为Follwer死掉,并从服务器列表中删除Follwer
dataDir=/data/zookeeper-3.8.0/data
#修改,指定保存Zookeeper中的数据的目录,目录需要单独创建
dataLogDir=/data/zookeeper-3.8.0/logs
#添加,指定存放日志的目录,目录需要单独创建
clientPort=2181
#客户端连接端口

#添加集群信息
server.1=192.168.1.200:2888:3888
server.2=192.168.1.210:2888:3888
server.3=192.168.1.220:2888:3888

#创建配置文件涉及的目录

[root@yhcs_1 conf]# mkdir -p /data/zookeeper-3.8.0/{data,logs}

#设置myid数值与上面配置文件server.1、server.2、server.3对应

[root@yhcs_1 conf]# echo 1 >/data/zookeeper-3.8.0/data/myid

[root@yhcs_2 conf]# echo 2 >/data/zookeeper-3.8.0/data/myid

[root@yhcs_3 conf]# echo 3 >/data/zookeeper-3.8.0/data/myid

#编写zookeeper启动脚本

[root@yhcs_1 data]# cat /etc/init.d/zookeeper 
#!/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper Service Control Script
ZK_HOME='/data/zookeeper-3.8.0'
JAVA_HOME=/opt/jdk-19.0.2
JAVA_BIN=/opt/jdk-19.0.2/bin
PATH=$PATH:$JAVA_HOME/bin
case $1 in
start)
    echo "---------- zookeeper 启动 ------------"
    $ZK_HOME/bin/zkServer.sh start
;;
stop)
    echo "---------- zookeeper 停止 ------------"
    $ZK_HOME/bin/zkServer.sh stop
;;
restart)
    echo "---------- zookeeper 重启 ------------"
    $ZK_HOME/bin/zkServer.sh restart
;;
status)
    echo "---------- zookeeper 状态 ------------"
    $ZK_HOME/bin/zkServer.sh status
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac
[root@yhcs_1 data]# chmod +x /etc/init.d/zookeeper

[root@yhcs_1 data]# chkconfig --add zookeeper

#启动zookeeper

[root@yhcs_1 data]# systemctl start zookeeper

[root@yhcs_1 data]# systemctl status zookeeper

5、配置kafka

[root@yhcs_1 data]# cat /data/kafka_2.13-3.4.0/config/server.properties
#broker的全局唯一编号,每个broker不能重复
broker.id=1
delete.topic.enable=true
port=9092        
host.name=192.168.1.200
#指定监听的IP和端口,如果修改每个broker的IP需区分开来,也可保持默认配置不用修改
listeners=PLAINTEXT://:9092
advertised.host.name=192.168.1.200
#broker 处理网络请求的线程数量,一般情况下不需要去修改
num.network.threads=3
#用来处理磁盘IO的线程数量,数值应该大于硬盘数
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#日志目录需先创建,不会自动创建。  日志存放目录、数据存放目录
log.dirs=/data/kafka_2.13-3.4.0/logs
#topic在当前broker上的默认分区个数,会被topic创建时的指定参数覆盖
num.partitions=3
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=2
transaction.state.log.min.isr=1
#segment文件(数据文件)保留的最长时间,单位为小时,默认为7天,超时将被删除
log.retention.hours=168
#一个segment文件最大的大小,默认为 1G,超出将新建一个新的segment文件
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.1.200:2181,192.168.1.210:2181,192.168.1.220:2181
zookeeper.connection.timeout.ms=6000
session.timeout.ms=10s
max.poll.records=100
max.poll.interval.ms=600000
group.initial.rebalance.delay.ms=0

#创建配置文件涉及目录

[root@yhcs_1 data]# mkdir -p /data/kafka_2.13-3.4.0/logs

#编写kafka启动脚本

[root@yhcs_1 data]# cat /etc/init.d/kafka 
#配置 kafka 启动脚本
#!/bin/bash
#chkconfig:2345 60 20
#description:Kafka Service Control Script
KAFKA_HOME='/data/kafka_2.13-3.4.0'
JAVA_HOME=/opt/jdk-19.0.2
JAVA_BIN=/opt/jdk-19.0.2/bin
PATH=$PATH:$JAVA_HOME/bin
case $1 in
start)
    echo "---------- Kafka 启动 ------------"
    ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
    echo "---------- Kafka 停止 ------------"
    ${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
    $0 stop
    $0 start
;;
status)
    echo "---------- Kafka 状态 ------------"
    count=$(ps -ef | grep kafka | egrep -cv "grep|$$")
    if [ "$count" -eq 0 ];then
        echo "kafka is not running"
    else
        echo "kafka is running"
    fi
;;
*)
    echo "Usage: $0 {start|stop|restart|status}"
esac

[root@yhcs_1 data]# chmod +x /etc/init.d/kafka

[root@yhcs_1 data]# chkconfig --add kafka

#启动kafka

[root@yhcs_1 data]# systemctl start kafka

[root@yhcs_1 data]# systemctl status kafka

#kafka集群测试

#最新的kafka创建主题topic方式:
./kafka-topics.sh --create --bootstrap-server 192.168.1.200:9092 --replication-factor 2 --partitions 3 --topic test
./kafka-topics.sh --create --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --replication-factor 2 --partitions 3 --topic test1

#查看当前服务器中的所有 topic
 ./kafka-topics.sh --list --bootstrap-server 192.168.1.200:9092
 ./kafka-topics.sh --list --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092
#查看某个 topic 的详情
./kafka-topics.sh  --describe --bootstrap-server 192.168.1.200:9092
#发布消息
./kafka-console-producer.sh --broker-list 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092  --topic test
#消费消息
./kafka-console-consumer.sh --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test --from-beginning

--from-beginning:会把主题中以往所有的数据都读取出来

#修改分区数
./kafka-topics.sh --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --alter --topic test --partitions 6

#删除 topic
./kafka-topics.sh --delete --bootstrap-server 192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092 --topic test

部署filebeat

#解压filebeat

[root@yhcs_1 soft]# tar -xvf filebeat-8.6.2-linux-x86_64.tar.gz -C /data

#配置filebeat

[root@yhcs_1 filebeat-8.6.2]# cat filebeat.yml
filebeat.inputs:
- type: log
  enabled: true
  paths:
    - /var/log/messages
 
output.kafka:
    enabled: true
    hosts: ["192.168.1.200:9092","192.168.1.210:9092","192.168.1.220:9092"]
    topic: "demo-topic"

#启动filebeat
[root@yhcs_1 filebeat-8.6.2]# nohup ./filebeat -e -c filebeat.yml -d "publish" 2>&1 &

部署elasticsearch

#解压es

[root@yhcs_3 soft]# tar -xvf elasticsearch-8.6.2-linux-x86_64.tar.gz -C /data

#配置es

[root@yhcs_3 config]# egrep -v "^$|#" elasticsearch.yml
node.name: node-1
path.data: /data/elasticsearch-8.6.2/data
path.logs: /data/elasticsearch-8.6.2/logs
network.host: 192.168.1.220
http.port: 9200
cluster.initial_master_nodes: ["node-1"]
ingest.geoip.downloader.enabled: false
xpack.security.enabled: false
discovery.seed_hosts: ["192.168.1.220"]

#创建涉及目录

[root@yhcs_3 config]# mkdir -p /data/elasticsearch-8.6.2/data

#创建es用户(es、kibana需要使用非root用户)

[root@yhcs_3 config]# useradd esroot
[root@yhcs_3 config]# passwd esroot_@321
[root@yhcs_3 config]# chown -R esroot:esroot elasticsearch-8.6.2/

#修改其他配置

[root@yhcs_3 config]# vi /etc/security/limits.conf

#在文件末尾中增加下面内容

esroot soft nofile 65536

esroot hard nofile 65536

[root@yhcs_3 config]# vi /etc/security/limits.d/20-nproc.conf

#在文件末尾中增加下面内容

esroot soft nofile 65536

esroot hard nofile 65536

* hard nproc 4096

#注:*代表Linux所有用户名称

[root@yhcs_3 config]# vi /etc/sysctl.conf

#在文件中增加下面内容

vm.max_map_count=655360

#重新加载,输入下面命令

sysctl -p

#启动es

[root@yhcs_3 soft]# su esroot
[esroot@yhcs_3 soft]$ cd /data/elasticsearch-8.6.2/bin/
[esroot@yhcs_3 bin]$ nohup ./elasticsearch -d 2>&1 &

#访问es

http://192.168.1.220:9200/

部署kibana

#解压kibana

[root@yhcs_3 soft]# tar -xvf kibana-8.6.2-linux-x86_64.tar.gz -C /data

#配置kibana

[root@yhcs_3 config]# egrep -v "^$|#" kibana.yml 
server.port: 5601
server.host: "192.168.1.220"
elasticsearch.hosts: ["http://192.168.1.220:9200"]
logging.appenders.default:
  type: file
  fileName: /data/kibana-8.6.2/logs/kibana.log
  layout:
    type: json
pid.file: /data/kibana-8.6.2/run/kibana.pid
telemetry.enabled: false
i18n.locale: "zh-CN"

#创建涉及目录

[root@yhcs_3 config]# mkdir -p /data/kibana-8.6.2/{logs,run}

#启动kibana

[root@yhcs_3 soft]# su esroot
[esroot@yhcs_3 soft]$ cd /data/kibana-8.6.2/bin/
[esroot@yhcs_3 bin]$ nohup ./kibana 2>&1 &

#访问kibana(这个时候还没有数据)

http://192.168.1.220:5601/

部署logstash

#解压kibana

[root@yhcs_2 soft]# tar -xvf logstash-8.6.2-linux-x86_64.tar.gz -C /data

#配置logstash

[root@yhcs_2 config]# cat /data/logstash-8.6.2/config/logstash-sample.conf
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  kafka {
    bootstrap_servers => "192.168.1.200:9092,192.168.1.210:9092,192.168.1.220:9092"
    topics => "demo-topic"
    group_id => "test"
  }
}

output {
  elasticsearch {
    hosts => ["http://192.168.1.220:9200"]
    index => "test_index"
    #user => "elastic"
    #password => "changeme"
  }
}

#启动logstash

[root@yhcs_2 config]# cd /data/logstash-8.6.2
[root@yhcs_2 bin]# nohup ./bin/logstash -f ./config/logstash-sample.conf --config.reload.automatic --path.data=/data/logstash-8.6.2/ >/dev/null  2>&1 &

#访问kibana(整个链路走通就可以在kibana看到采集的日志信息)

http://192.168.1.220:5601/

ELK部署(filebeat+kafka+logstash+elasticsearch+kibana)相关推荐

  1. 【转】Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日志采集方案

    前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产 ...

  2. Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

    文章目录 一.前言 二.背景信息 三.操作流程 四.准备工作 1.Docker 环境 2.Docker Compose 环境 3.版本准备 4.环境初始化 5.服务安装 6.服务设置 五.配置 Fil ...

  3. Filebeat+Kafka+Logstash+ElasticSearch+Kibana搭建完整版

    https://www.cnblogs.com/jiashengmei/p/8857053.html

  4. ELK学习3_使用redis+logstash+elasticsearch+kibana快速搭建日志平台

    日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数.执行时间.成功率等 异常数 ...

  5. 安装logstash,elasticsearch,kibana三件套

    原文地址:http://www.cnblogs.com/yjf512/p/4194012.html logstash,elasticsearch,kibana三件套 elk是指logstash,ela ...

  6. Logstash+Elasticsearch+kibana搭建日志平台文档(xjh20160527重庆公司亲测可用)

    Logstash+Elasticsearch+kibana搭建日志平台文档 v1.0_xjh  20160527 http://wenku.baidu.com/link?url=_HaXh4neqWm ...

  7. 基于ELK打造强大的日志收集分析系统(springboot2+logback+logstash+elasticsearch+kibana)

    前言 上一代的日志分析系统springboot+log4j+flume+kafka仿佛还是昨天,现在已经流行起了新一代的springboot2+logback+logstash+elasticsear ...

  8. filebeat+redis+logstash+elasticsearch filebeat+kafka+zookeeper+logstash+elasticsearch

    收集日志的工具 日志易(收费) splunk(国外,按流量收费) 介绍 发展史:使用java语言,在luncen的基础上做二次封装,提供restful接口 搜索的原理:倒排索引 特点:水平扩展方便.提 ...

  9. 企业日志分析ELK(Logstash+Elasticsearch+Kibana)介绍及搭建

    目录 一.ELK概述 1.ELK日志分析系统 2.ELK中日志处理步骤 二.Elasticsearch介绍 1.Elasticsearch的基础核心概念 2.分片和副本 三.LogStash概述 1. ...

最新文章

  1. SVN之版本控制系统
  2. jsTree 插件Ajax数据
  3. File类 读取文件
  4. Safari 10默认禁用Flash插件
  5. scala从集合中提取不重复的元素
  6. 服务器如何返回429状态,Instagram远程服务器返回一个错误:(429)UNKNOWN STATUS CODE...
  7. R语言本地安装包教程
  8. 8个国外在线学习网课的网站
  9. 项目管理工具二、STAR原则
  10. 提高信息技术课堂教学实效研究 课题论证报告
  11. 反正不要钱,多少看一点系列:win10环境下安装Kali Linux双系统的装机经历(个人血泪史)
  12. 微信小程序 image-cropper 还原按钮控制
  13. 学java应该学什么
  14. 如何使用Proteus进行电路设计仿真?
  15. Lingo的基本用法(更新版)
  16. 【Ant Design】下拉列表Select 、日期选择框DatePicker等跟随滚动条上下移动解决方案
  17. 为什么Mysql底层采用B+树做索引?
  18. Java微信公众号开发之微信粉丝信息一键同步
  19. 【量化】股市技术分析利器之TA-Lib(一)
  20. java游戏健值测试软件,教你使用JavaMagic来改JAVA手机游戏软件键值和改全屏.doc

热门文章

  1. 网线传输速度测试_家中重新布了网线,要测试有没有达到千兆的网速,应该怎么做?...
  2. C语言:【U-boot 和 kernel】添加【调试log宏】
  3. bootstrap datetimepicker的时间变成1899年
  4. 全栈-协议-抓包-脚本-编程开发
  5. 录屏软件FastStone Capture汉化破解版
  6. 2019年东南大学计算机553C++复试编程题
  7. matlab实现鬼波信号压制算法  代码实践--第二篇 频率-波数域鬼波压制
  8. 翻译:数据科学简历中的7个必备条件
  9. 汽车传动系统纵向动力学模型
  10. 原油稳步前行因俄罗斯反制措施