研究背景

1、之所以选用kafka是因为量起来的话单台logstash的抗压能力比较差

2、为了解决整个链路查询的问题,多个Feign传层的话,可以按照一个ID进行穿层,所以采用logback的MDC进行对唯一标识存储并且在Feign的调用链放在Header里,这里命名为TID

下载地址:

ZK+Kafka

https://mirrors.bfsu.edu.cn/apache/kafka/2.7.0/kafka_2.13-2.7.0.tgz

https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz

ELK

https://artifacts.elastic.co/downloads/kibana/kibana-7.12.0-windows-x86_64.zip

https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.12.0-windows-x86_64.zip

https://artifacts.elastic.co/downloads/logstash/logstash-7.12.0-windows-x86_64.zip

在拦截器里增加相对应的拦截代码

@Component
@Slf4j
public class ContextInterceptor implements HandlerInterceptor {RequestContext context = RequestContext.getCurrentContext();context.reset();log.debug("traceId:" + MDC.get("traceId"));String requestId = MDC.get("traceId");requestId = StringUtils.isEmpty(requestId) ? request.getHeader(RequestContext.REQUEST_ID) : requestId;requestId = StringUtils.isEmpty(requestId) ? request.getParameter(RequestContext.REQUEST_ID) : requestId;requestId = StringUtils.isEmpty(requestId) ? UUIDUtil.uuid() : requestId;MDC.put("TID", requestId);}

配置日志配置文件logback-spring.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration><!-- springProfile用于指定当前激活的环境,如果spring.profile.active的值是哪个,就会激活对应节点下的配置 --><springProfile name="local"><!-- configuration to be enabled when the "staging" profile is active --><springProperty scope="context" name="module" source="spring.application.name"defaultValue="undefinded"/><!-- 该节点会读取Environment中配置的值,在这里我们读取application.yml中的值 --><springProperty scope="context" name="bootstrapServers" source="spring.kafka.bootstrap-servers"defaultValue="127.0.0.1:9092"/><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><!-- encoders are assigned the typech.qos.logback.classic.encoder.PatternLayoutEncoder by default --><encoder><pattern>%boldYellow(${module})|%d|%highlight(%-5level)|%X{TID}|%cyan(%logger{15}) - %msg %n</pattern></encoder></appender><!-- kafka的appender配置 --><appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender"><encoder><pattern>${module}|%d|%-5level|%X{TID}|%logger{15} - %msg</pattern></encoder><topic>test</topic><keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/><deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/><!-- Optional parameter to use a fixed partition --><!-- <partition>0</partition> --><!-- Optional parameter to include log timestamps into the kafka message --><!-- <appendTimestamp>true</appendTimestamp> --><!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --><!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --><!-- bootstrap.servers is the only mandatory producerConfig --><producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig><!-- 如果kafka不可用则输出到控制台 --><appender-ref ref="STDOUT"/></appender><!-- 指定项目中的logger --><!--<logger name="org.springframework.test" level="INFO" ><appender-ref ref="kafka" /></logger>--><logger name="com.springcloudsite" level="INFO" ><appender-ref ref="kafka" /></logger><root level="info"><appender-ref ref="STDOUT" /></root></springProfile>
</configuration>

正则配置说明

pattern:为正则表达

%boldYellow(${module}) : 黄色的模块名称

%d :日期时间

%highlight(%-5level):高亮的日志级别,如info error trace登

%X{TID} : traceID 追踪使用的ID

%cyan(%logger{15}) :简写类名路径

%msg %n :具体日志信息

打印出来的效果如下:

配置zk+kafka

1. 安装JDK

1.1 安装文件:http://www.oracle.com/technetwork/java/javase/downloads/index.html 下载JDK
1.2 安装完成后需要添加以下的环境变量(右键点击“我的电脑” -> "高级系统设置" -> "环境变量" ):

  • JAVA_HOME: C:\Program Files\Java\jdk1.8.0_171 (jdk的安装路径)
  • Path: 在现有的值后面添加"; %JAVA_HOME%\bin"

1.3 打开cmd运行 "java -version" 查看当前系统Java的版本:

2. 安装ZOOKEEPER

Kafka的运行依赖于Zookeeper,所以在运行Kafka之前我们需要安装并运行Zookeeper

2.1 下载安装文件: http://zookeeper.apache.org/releases.html

2.2 解压文件

2.3 打开zookeeper-3.4.13\conf,把zoo_sample.cfg重命名成zoo.cfg

2.4 从文本编辑器里打开zoo.cfg

2.5 把dataDir的值改成“./zookeeper-3.4.13/data”

2.6 添加如下系统变量:

  • ZOOKEEPER_HOME: C:\Users\localadmin\CODE\zookeeper-3.4.13 (zookeeper目录)
  • Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%\bin;"

2.7 运行Zookeeper: 打开cmd然后执行 zkserver

cmd 窗口不要关闭

3. 安装并运行KAFKA

3.1 下载安装文件: http://kafka.apache.org/downloads.html

3.2 解压文件

3.3 打开kafka_2.11-2.0.0\config

3.4 从文本编辑器里打开 server.properties

3.5 把 log.dirs的值改成 “./logs”

3.6 打开cmd

3.7 进入kafka文件目录: cd C:\Users\localadmin\CODE\kafka_2.11-2.0.0(kafka目录)

3.8 输入并执行:  .\bin\windows\kafka-server-start.bat .\config\server.properties

cmd 窗口不要关闭

4. 创建TOPICS

4.1 打开cmd 并进入cd C:\Users\localadmin\CODE\kafka_2.11-2.0.0\bin\windows

4.2 创建一个topic: kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

5. 打开一个PRODUCER:

cd C:\Users\localadmin\CODE\kafka_2.11-2.0.0\bin\windows
kafka-console-producer.bat --broker-list localhost:9092 --topic test

6. 打开一个CONSUMER:

cd C:\Users\localadmin\CODE\kafka_2.11-2.0.0\bin\windows
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

7. 测试:

配置ELK

kibana.yml

# Kibana is served by a back end server. This setting specifies the port to use.
server.port: 5601# Specifies the address to which the Kibana server will bind. IP addresses and host names are both valid values.
# The default is 'localhost', which usually means remote machines will not be able to connect.
# To allow connections from remote users, set this parameter to a non-loopback address.
server.host: "localhost"# Enables you to specify a path to mount Kibana at if you are running behind a proxy.
# Use the `server.rewriteBasePath` setting to tell Kibana if it should remove the basePath
# from requests it receives, and to prevent a deprecation warning at startup.
# This setting cannot end in a slash.
#server.basePath: ""# Specifies whether Kibana should rewrite requests that are prefixed with
# `server.basePath` or require that they are rewritten by your reverse proxy.
# This setting was effectively always `false` before Kibana 6.3 and will
# default to `true` starting in Kibana 7.0.
#server.rewriteBasePath: false# Specifies the public URL at which Kibana is available for end users. If
# `server.basePath` is configured this URL should end with the same basePath.
#server.publicBaseUrl: ""# The maximum payload size in bytes for incoming server requests.
#server.maxPayloadBytes: 1048576# The Kibana server's name.  This is used for display purposes.
#server.name: "your-hostname"# The URLs of the Elasticsearch instances to use for all your queries.
elasticsearch.hosts: ["http://localhost:9200"]# Kibana uses an index in Elasticsearch to store saved searches, visualizations and
# dashboards. Kibana creates a new index if the index doesn't already exist.
#kibana.index: ".kibana"# The default application to load.
#kibana.defaultAppId: "home"# If your Elasticsearch is protected with basic authentication, these settings provide
# the username and password that the Kibana server uses to perform maintenance on the Kibana
# index at startup. Your Kibana users still need to authenticate with Elasticsearch, which
# is proxied through the Kibana server.
#elasticsearch.username: "kibana_system"
#elasticsearch.password: "pass"# Enables SSL and paths to the PEM-format SSL certificate and SSL key files, respectively.
# These settings enable SSL for outgoing requests from the Kibana server to the browser.
#server.ssl.enabled: false
#server.ssl.certificate: /path/to/your/server.crt
#server.ssl.key: /path/to/your/server.key# Optional settings that provide the paths to the PEM-format SSL certificate and key files.
# These files are used to verify the identity of Kibana to Elasticsearch and are required when
# xpack.security.http.ssl.client_authentication in Elasticsearch is set to required.
#elasticsearch.ssl.certificate: /path/to/your/client.crt
#elasticsearch.ssl.key: /path/to/your/client.key# Optional setting that enables you to specify a path to the PEM file for the certificate
# authority for your Elasticsearch instance.
#elasticsearch.ssl.certificateAuthorities: [ "/path/to/your/CA.pem" ]# To disregard the validity of SSL certificates, change this setting's value to 'none'.
#elasticsearch.ssl.verificationMode: full# Time in milliseconds to wait for Elasticsearch to respond to pings. Defaults to the value of
# the elasticsearch.requestTimeout setting.
#elasticsearch.pingTimeout: 1500# Time in milliseconds to wait for responses from the back end or Elasticsearch. This value
# must be a positive integer.
elasticsearch.requestTimeout: 30000# List of Kibana client-side headers to send to Elasticsearch. To send *no* client-side
# headers, set this value to [] (an empty list).
#elasticsearch.requestHeadersWhitelist: [ authorization ]# Header names and values that are sent to Elasticsearch. Any custom headers cannot be overwritten
# by client-side headers, regardless of the elasticsearch.requestHeadersWhitelist configuration.
#elasticsearch.customHeaders: {}# Time in milliseconds for Elasticsearch to wait for responses from shards. Set to 0 to disable.
#elasticsearch.shardTimeout: 30000# Logs queries sent to Elasticsearch. Requires logging.verbose set to true.
#elasticsearch.logQueries: false# Specifies the path where Kibana creates the process ID file.
#pid.file: /run/kibana/kibana.pid# Enables you to specify a file where Kibana stores log output.
#logging.dest: stdout# Set the value of this setting to true to suppress all logging output.
#logging.silent: false# Set the value of this setting to true to suppress all logging output other than error messages.
#logging.quiet: false# Set the value of this setting to true to log all events, including system usage information
# and all requests.
#logging.verbose: false# Set the interval in milliseconds to sample system and process performance
# metrics. Minimum is 100ms. Defaults to 5000.
#ops.interval: 5000# Specifies locale to be used for all localizable strings, dates and number formats.
# Supported languages are the following: English - en , by default , Chinese - zh-CN .
#i18n.locale: "en"

然后到对应bin目录下启动,直接点击 kibana.bat启动即可,或者在CMD命令启动

之后是启动效果

配置elasticsearch.yml

# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
#       Before you set out to tweak and tune the configuration, make sure you
#       understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
#cluster.name: my-application
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
#node.name: node-1
#
# Add custom attributes to the node:
#
#node.attr.rack: r1
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
#path.data: /path/to/data
#
# Path to log files:
#
#path.logs: /path/to/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# By default Elasticsearch is only accessible on localhost. Set a different
# address here to expose this node on the network:
#
cluster.name: "docker-cluster"
node.name: "node-1"
node.master: true
network.host: 0.0.0.0#xpack.license.self_generated.type: trial
#xpack.security.enabled: true
#xpack.monitoring.collection.enabled: true #
# By default Elasticsearch listens for HTTP traffic on the first free port it
# finds starting at 9200. Set a specific HTTP port here:
#
http.port: 9200
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
#discovery.seed_hosts: ["host1", "host2"]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
cluster.initial_master_nodes: ["node-1"]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

启动bin目录下的elasticsearch.bat

以下是启动效果

配置logstash.conf

# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.input {kafka {bootstrap_servers => "localhost:9092"topics => ["test"]group_id => "test"}
}filter { mutate {split => { "message" => "|" }}if [message][0] {mutate {                add_field =>   {"apiname" => "%{[message][0]}"}}}if [message][1] {mutate {                add_field =>   {"current_time" => "%{[message][1]}"}}} if [message][2] {mutate {                add_field =>   {"current_level" => "%{[message][2]}"}}}      if [message][3] {mutate {                add_field =>   {"traceid" => "%{[message][3]}"}}}}output {elasticsearch {hosts => ["http://localhost:9200"]#index => "local-purchase-order | %{+YYYY-MM-dd}"index => "logstash-%{+YYYY-MM-dd}"#template_name => "logstash"#template_overwrite => true#index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"#user => "elastic"#password => "changeme"}stdout{codec => rubydebug}
}

配置logstash.yml

#/usr/share/logstash/config/logstash.yml
#jvm.options  log4j2.properties  logstash-sample.conf  logstash.yml  pipelines.yml  startup.options
http.host: "0.0.0.0"
# [ "http://elasticsearch:9200" ]
xpack.monitoring.elasticsearch.hosts: ${ELASTICSEARCH_URL}

启动使用命令

可以进到bin下

D:\app\elk\logstash\bin

输入命令:logstash -f D:\app\elk\logstash\config\logstash.conf

最后打开地址

http://localhost:9600/

http://localhost:9200/

http://localhost:5601/

分别验证结果

zookeeper+kafka+logstash+elasticsearc+kibana相关推荐

  1. ELK部署(filebeat+kafka+logstash+elasticsearch+kibana)

    部署环境 服务器名 IP 部署服务 yhcs_1 192.168.1.200 filebeat-8.6.2.kafka_2.13-3.4.0 yhcs_2 192.168.1.210 filebeat ...

  2. 【转】Filebeat+Kafka+Logstash+ElasticSearch+Kibana 日志采集方案

    前言 Elastic Stack 提供 Beats 和 Logstash 套件来采集任何来源.任何格式的数据.其实Beats 和 Logstash的功能差不多,都能够与 Elasticsearch 产 ...

  3. Filebeat+Kafka+Logstash+Elasticsearch+Kibana 构建日志分析系统

    文章目录 一.前言 二.背景信息 三.操作流程 四.准备工作 1.Docker 环境 2.Docker Compose 环境 3.版本准备 4.环境初始化 5.服务安装 6.服务设置 五.配置 Fil ...

  4. Filebeat+Kafka+Logstash+ElasticSearch+Kibana搭建完整版

    https://www.cnblogs.com/jiashengmei/p/8857053.html

  5. EFK6.3+kafka+logstash日志分析平台集群

    转载来源 :EFK6.3+kafka+logstash日志分析平台集群 :https://www.jianshu.com/p/f956ebbb2499 架构解读 : 第一层.数据采集层 安装fileb ...

  6. ELK+Filebeat+zookeeper+Kafka原理和搭建

    目录 引言 一.ZooKeeper介绍 二.Kafka介绍 2.1.为什么需要消息队列(MQ) 2.2.消息队列的好处 解耦合 异步处理 流量削峰 2.3.Kafka的特性 2.4.Kafka作为存储 ...

  7. zookeeper + kafka集群搭建详解

    文章目录 一.消息队列介绍 1.1 为什么需要消息队列 (MO) 1.2 使用消息队列的好处 (1)解耦 (2)可恢复性 (3)缓冲 (4)灵活性 & 峰值处理能力 (5)异步通信很多时候,用 ...

  8. 使用 elasticsearch、LogStash、Kibana完成网站流量的监控系统(基于nginx的访问日志实现流量监控)

    分布式带来的变革: 多节点.日志分散.运维成本高 先看几个实际的案例. 各自的解决方案 一些比较主流的集中式日志管理系统 简单的Rsyslog 商业化的 Splunk 开源的有 Facebook 公司 ...

  9. Kubernetes日志收集:log-pilot+KAFKA+Logstash+ES

    通过log-pilot+KAFKA+Logstash+ES收集K8S中Pod日志 K8S部署应用后收集日志不太好搞,特别是单个服务多个实例的情况. 如果映射到外部地址,多个实例就会写到同一个文件中,无 ...

最新文章

  1. 2008中国国际计算机信息及网络安全展览会
  2. Socket 异步通信编程
  3. 转:PHP非阻塞模式
  4. 生日快乐程序_祝肖战1005生日快乐-用R给他画个蛋糕爱心吧
  5. python终止线程_Python里怎么终止一个线程
  6. 中国公有云 Top10
  7. 架构运维篇(五):Centos7/Linux中安装RocketMQ
  8. tsinsen A1333
  9. windows编程系列知识
  10. 如何抢到腾讯云校园1元优惠资格(新版腾讯云)-更新
  11. ip地址与交换机工作原理
  12. python中英文切换_python国际化(i18n)和中英文切换
  13. 天云大数据_【案例分享】天云大数据最佳实践系列之——信用评分模型
  14. 技巧 | 把光驱拆了,装个固态硬盘,让渣渣电脑复活
  15. onsubmit阻止表单提交的一种方式
  16. 【每日一题】 959. 由斜杠划分区域
  17. 公司基本面分析业绩评价指标
  18. BT种子和BitTorrent协议
  19. 青龙面板—-美团买菜
  20. 想要提升工作效率,教你一招事半功倍

热门文章

  1. ipad连接电脑_这些应用让iPad生产力分分钟UP
  2. 算法—振兴中华(C语言版)
  3. Java局部变量一定要赋初值
  4. JavaScript中带有示例的Math.log10()方法
  5. linux sshd启动失败 sshd re-exec requires execution with an absolute path
  6. 关于非阻塞的recv的时候返回的处理
  7. Java面试必备的集合源码详解,砥砺前行!
  8. mysql数据库安装,真香!
  9. 2016面试——腾讯、蚂蚁金服、蘑菇街
  10. 多线程控制不同的线程取不同的数据的问题