kafka 可视化工具_两小时带你轻松实战SpringBoot+kafka+ELK分布式日志收集
一、背景
随着业务复杂度的提升以及微服务的兴起,传统单一项目会被按照业务规则进行垂直拆分,另外为了防止单点故障我们也会将重要的服务模块进行集群部署,通过负载均衡进行服务的调用。那么随着节点的增多,各个服务的日志也会散落在各个服务器上。这对于我们进行日志分析带来了巨大的挑战,总不能一台一台的登录去下载日志吧。那么我们需要一种收集日志的工具将散落在各个服务器节点上的日志收集起来,进行统一的查询及管理统计。那么ELK就可以做到这一点。
ELK是ElasticSearch+Logstash+Kibana的简称,在这里我分别对如上几个组件做个简单的介绍:
1.1、ElasticSearch(简称ES)
Elasticsearch
是一个高度可扩展的开源全文搜索和分析引擎。它允许您快速、实时地存储、搜索和分析大量数据。它通常用作底层引擎/技术,为具有复杂搜索特性和需求的应用程序提供动力。我们可以借助如ElasticSearch
完成诸如搜索,日志收集,反向搜索,智能分析等功能。ES设计的目标:
- 快速实时搜索
Elasticsearch
是一个实时搜索平台。这意味着,从索引文档到可搜索文档,存在轻微的延迟(通常为一秒)。
- 集群
集群是一个或多个节点(服务器)的集合,这些节点(服务器)一起保存整个数据,并提供跨所有节点的联合索引和搜索功能。集群由一个惟一的名称来标识,默认情况下该名称为“elasticsearch”。这个名称很重要,因为节点只能是集群的一部分,如果节点被设置为通过其名称加入集群的话。确保不要在不同的环境中重用相同的集群名称,否则可能会导致节点加入错误的集群。例如,您可以使用logging-dev、logging-test和logging-prod开发、测试和生产集群。
- 节点
节点是单个服务器,它是集群的一部分,它用来存储数据,并参与集群的索引和搜索功能。与集群一样,节点的名称默认为在启动时分配给节点的随机惟一标识符(UUID)。如果不需要默认值,可以定义任何节点名称。这个名称对于管理非常重要,因为您想要确定网络中的哪些服务器对应于Elasticsearch
集群中的哪些节点。
- 索引
索引是具有类似特征的文档的集合。例如,您可以有一个客户数据索引、另一个产品目录索引和另一个订单数据索引。索引由一个名称标识(必须是小写的),该名称用于在对其中的文档执行索引、搜索、更新和删除操作时引用索引。在单个集群中,可以定义任意数量的索引。
- 文档
文档是可以建立索引的基本信息单元。例如,可以为单个客户提供一个文档,为单个产品提供一个文档,为单个订单提供另一个文档。这个文档用JSON (JavaScript对象符号)表示。在索引中,可以存储任意数量的文档。请注意,尽管文档在物理上驻留在索引中,但实际上文档必须被索引/分配到索引中的类型中。
1.2、Logstash
Logstash
是一个开源数据收集引擎,具有实时流水线功能。Logstash
可以动态地将来自不同数据源的数据统一起来,并将数据规范化后(通过Filter过滤)传输到您选择的目标。
在这里inputs代表数据的输入通道,大家可以简单理解为来源。常见的可以从kafka,FileBeat, DB等获取日志数据,这些数据经过fliter过滤后(比如说:日志过滤,json格式解析等)通过outputs传输到指定的位置进行存储(Elasticsearch,Mogodb,Redis等)
简单的实例:
cd logstash-6.4.1bin/logstash -e 'input { stdin { } } output { stdout {} }'
1.3、Kibana
kibana是用于Elasticsearch检索数据的开源分析和可视化平台。我们可以使用Kibana搜索、查看或者与存储在Elasticsearch索引中的数据交互。同时也可以轻松地执行高级数据分析并在各种图表、表和映射中可视化数据。基于浏览器的Kibana界面使您能够快速创建和共享动态仪表板,实时显示对Elasticsearch查询的更改。
1.4、处理方案
用户通过java应用程序的Slf4j写入日志,SpringBoot默认使用的是logback。我们通过实现自定义的Appender将日志写入kafka,同时logstash通过input插件操作kafka订阅其对应的主题。当有日志输出后被kafka的客户端logstash所收集,经过相关过滤操作后将日志写入Elasticsearch,此时用户可以通过kibana获取elasticsearch中的日志信息
二、SpringBoot中的配置
在SpringBoot当中,我们可以通过logback-srping.xml来扩展logback的配置。不过我们在此之前应当先添加logback对kafka的依赖,代码如下:
compile group: 'com.github.danielwegener', name: 'logback-kafka-appender', version: '0.2.0-RC1'
添加好依赖之后我们需要在类路径下创建logback-spring.xml
的配置文件并做如下配置(添加kafka的Appender):
<configuration><!-- springProfile用于指定当前激活的环境,如果spring.profile.active的值是哪个,就会激活对应节点下的配置 --><springProfile name="default"><!-- configuration to be enabled when the "staging" profile is active --><springProperty scope="context" name="module" source="spring.application.name"defaultValue="undefinded"/><!-- 该节点会读取Environment中配置的值,在这里我们读取application.yml中的值 --><springProperty scope="context" name="bootstrapServers" source="spring.kafka.bootstrap-servers"defaultValue="localhost:9092"/><appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"><!-- encoders are assigned the typech.qos.logback.classic.encoder.PatternLayoutEncoder by default --><encoder><pattern>%boldYellow(${module}) | %d | %highlight(%-5level)| %cyan(%logger{15}) - %msg %n</pattern></encoder></appender><!-- kafka的appender配置 --><appender name="kafka" class="com.github.danielwegener.logback.kafka.KafkaAppender"><encoder><pattern>${module} | %d | %-5level| %logger{15} - %msg</pattern></encoder><topic>logger-channel</topic><keyingStrategy class="com.github.danielwegener.logback.kafka.keying.NoKeyKeyingStrategy"/><deliveryStrategy class="com.github.danielwegener.logback.kafka.delivery.AsynchronousDeliveryStrategy"/><!-- Optional parameter to use a fixed partition --><!-- <partition>0</partition> --><!-- Optional parameter to include log timestamps into the kafka message --><!-- <appendTimestamp>true</appendTimestamp> --><!-- each <producerConfig> translates to regular kafka-client config (format: key=value) --><!-- producer configs are documented here: https://kafka.apache.org/documentation.html#newproducerconfigs --><!-- bootstrap.servers is the only mandatory producerConfig --><producerConfig>bootstrap.servers=${bootstrapServers}</producerConfig><!-- 如果kafka不可用则输出到控制台 --><appender-ref ref="STDOUT"/></appender><!-- 指定项目中的logger --><logger name="org.springframework.test" level="INFO" ><appender-ref ref="kafka" /></logger><root level="info"><appender-ref ref="STDOUT" /></root></springProfile></configuration>
在这里面我们主要注意以下几点:
- 日志输出的格式是为模块名 | 时间 | 日志级别 | 类的全名 | 日志内容
- SpringProfile节点用于指定当前激活的环境,如果spring.profile.active的值是哪个,就会激活对应节点下的配置
- springProperty可以读取Environment中的值
三、ELK搭建过程
3.1、检查环境
ElasticSearch
需要jdk8,官方建议我们使用JDK的版本为1.8.0_131,原文如下:
Elasticsearch requires at least Java 8. Specifically as of this writing, it is recommended that you use the Oracle JDK version 1.8.0_131
检查完毕后,我们可以分别在官网下载对应的组件
- ElasticSearch
- Kibana
- Logstash
- kafka
- zookeeper
3.2、启动zookeeper
首先进入启动zookeeper的根目录下,将conf目录下的zoo_sample.cfg文件拷贝一份重新命名为zoo.cfg
mv zoo_sample.cfg zoo.cfg
配置文件如下:
# The number of milliseconds of each ticktickTime=2000# The number of ticks that the initial # synchronization phase can takeinitLimit=10# The number of ticks that can pass between # sending a request and getting an acknowledgementsyncLimit=5# the directory where the snapshot is stored.# do not use /tmp for storage, /tmp here is just # example sakes.dataDir=../zookeeper-data# the port at which the clients will connectclientPort=2181# the maximum number of client connections.# increase this if you need to handle more clients#maxClientCnxns=60## Be sure to read the maintenance section of the # administrator guide before turning on autopurge.## http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance## The number of snapshots to retain in dataDir#autopurge.snapRetainCount=3# Purge task interval in hours# Set to "0" to disable auto purge feature#autopurge.purgeInterval=1
紧接着我们进入bin目录启动zookeeper:
./zkServer.sh start
3.3、启动kafka
在kafka根目录下运行如下命令启动kafka:
./bin/kafka-server-start.sh config/server.properties
启动完毕后我们需要创建一个logger-channel主题:
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic logger-channel
3.4、配置并启动logstash
进入logstash跟目录下的config目录,我们将logstash-sample.conf
的配置文件拷贝到根目录下重新命名为core.conf
,然后我们打开配置文件进行编辑:
```ruby
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.
input {kafka {id => "my_plugin_id"bootstrap_servers => "localhost:9092"topics => ["logger-channel"]auto_offset_reset => "latest" }
}
filter {grok {patterns_dir => ["./patterns"]match => { "message" => "%{WORD:module} | %{LOGBACKTIME:timestamp} | %{LOGLEVEL:level} | %{JAVACLASS:class} - %{JAVALOGMESSAGE:logmessage}" }}}
output {stdout { codec => rubydebug }elasticsearch {hosts =>["localhost:9200"]}
}
```
我们分别配置logstash的input,filter和output(懂ruby的童鞋们肯定对语法结构不陌生吧):
- 在input当中我们指定日志来源为kafka,具体含义可以参考官网:kafka-input-plugin
- 在filter中我们配置grok插件,该插件可以利用正则分析日志内容,其中patterns_dir属性用于指定自定义的分析规则,我们可以在该文件下建立文件配置验证的正则规则。举例子说明:55.3.244.1 GET /index.html 15824 0.043的 日志内容经过如下配置解析:
grok {match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}" }}
解析过后会变成:
client: 55.3.244.1method: GETrequest: /index.htmlbytes: 15824duration: 0.043
这些属性都会在elasticsearch
中存为对应的属性字段。更详细的介绍请参考官网:grok ,当然该插件已经帮我们定义好了好多种核心规则,我们可以在这里查看所有的规则。
- 在output当中我们将过滤过后的日志内容打印到控制台并传输到elasticsearch中,我们可以参考官网上关于该插件的属性说明:[地址]((https://www.elastic.co/guide/en/logstash/current/plugins-outputs-elasticsearch.html)
- 另外我们在patterns文件夹中创建好自定义的规则文件logback,内容如下:
# yyyy-MM-dd HH:mm:ss,SSS ZZZ eg: 2014-01-09 17:32:25,527 LOGBACKTIME 20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})
编辑好配置后我们运行如下命令启动logstash:
bin/logstash -f first-pipeline.conf --config.reload.automatic
该命令会实时更新配置文件而不需启动
3.5、启动ElasticSearch
启动ElasticSearch很简单,我们可以运行如下命令:
./bin/elasticsearch
我们可以发送get请求来判断启动成功:
GET http://localhost:9200
我们可以得到类似于如下的结果:
{"name" : "Cp8oag6","cluster_name" : "elasticsearch","cluster_uuid" : "AT69_T_DTp-1qgIJlatQqA","version" : {"number" : "6.4.0","build_flavor" : "default","build_type" : "zip","build_hash" : "f27399d","build_date" : "2016-03-30T09:51:41.449Z","build_snapshot" : false,"lucene_version" : "7.4.0","minimum_wire_compatibility_version" : "1.2.3","minimum_index_compatibility_version" : "1.2.3"},"tagline" : "You Know, for Search"}
3.5.1 配置IK分词器(可选)
我们可以在github上下载elasticsearch的IK分词器,地址如下:ik分词器,然后把它解压至your-es-root/plugins/ik的目录下,我们可以在{conf}/analysis-ik/config/IKAnalyzer.cfg.xml
or {plugins}/elasticsearch-analysis-ik-*/config/IKAnalyzer.cfg.xml
里配置自定义分词器:
<?xml version="1.0" encoding="UTF-8"?><!DOCTYPE properties SYSTEM "http://java.sun.com/dtd/properties.dtd"><properties><comment>IK Analyzer 扩展配置</comment><!--用户可以在这里配置自己的扩展字典 --><entry key="ext_dict">custom/mydict.dic;custom/single_word_low_freq.dic</entry><!--用户可以在这里配置自己的扩展停止词字典--><entry key="ext_stopwords">custom/ext_stopword.dic</entry><!--用户可以在这里配置远程扩展字典 --><entry key="remote_ext_dict">location</entry><!--用户可以在这里配置远程扩展停止词字典--><entry key="remote_ext_stopwords">http://xxx.com/xxx.dic</entry></properties>
首先我们添加索引:
curl -XPUT http://localhost:9200/my_index
我们可以把通过put请求来添加索引映射:
PUT my_index {"mappings": {"doc": { "properties": { "title": { "type": "text" }, "name": { "type": "text" }, "age": { "type": "integer" }, "created": {"type": "date", "format": "strict_date_optional_time||epoch_millis"}"content": {"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"}}}}}
其中doc是映射名 my_index是索引名称
3.5.2 logstash与ElasticSearch
logstash
默认情况下会在ES中建立logstash-*
的索引,*代表了yyyy-MM-dd
的时间格式,根据上述logstash
配置filter的示例,其会在ES中建立module ,logmessage,class,level等索引。(具体我们可以根据grok插件进行配置)
3.6 启动Kibana
在kibana的bin目录下运行./kibana即可启动。启动之后我们可以通过浏览器访问http://localhost:5601 来访问kibanaUI。我们可以看到如下界面:
kafka 可视化工具_两小时带你轻松实战SpringBoot+kafka+ELK分布式日志收集相关推荐
- 分布式日志收集工具分析比较
目录 写在最前:为什么做日志收集系统❓ 一.多种日志收集工具比较 1.背景介绍 2.Facebook 的 Scribe 3.Apache 的 Chukwa 4.LinkedIn 的 Kafka 5.C ...
- 数据可视化工具_数据可视化
数据可视化工具 Visualizations are a great way to show the story that data wants to tell. However, not all v ...
- cassandra可视化工具_耗时1个月整理!160种Python标准库、第三方库和外部工具都有了...
耗时1个月整理!160种Python标准库.第三方库和外部工具都有了 北京尚学堂 2019-12-09 14:59:15 Python数据工具箱涵盖从数据源到数据可视化的完整流程中涉及到的常用库.函数 ...
- 两小时带你进入软件测试行业风口(附全套软件测试学习路线)
随着信息技术的发展和普及,人们对软件的使用越来越普及.但是在软件的使用过程中,软件的效果却不尽如人意.为了确保软件的质量,整个软件业界已经逐渐意识到测试的重要性,软件测试已经成为IT 领域的黄金行业. ...
- 【kafka可视化工具】kafka-eagle在windows环境的下载、安装、启动与访问
本文目录 一.Kafka eagle的下载 步骤一:访问官方网站:Download - EFAK 步骤二:点击 Direct File Download 二.kafka-eagle的安装 步骤一:解压 ...
- git 可视化工具_版本控制可视化神器Gource:简单易上手,效果恰似烟花秀
鱼羊 发自 凹非寺 量子位 报道 | 公众号 QbitAI 如此华丽的绽放,莫非是一场动画烟火秀? 非也,这其实是GitLab社区版的进化史,7年间82000次commit,尽皆在2.5分钟的视频内展 ...
- kafka可视化工具_Kafka值得一用的监控系统
文章作者:哥不是小萝莉 内容出处:作者本人 适用人群:大数据 注:欢迎转载,转载请注明出处 什么是KAFKA? Apache Kafk ...
- bi可视化工具_适用于您的BI解决方案的最佳数据可视化和Web报告工具
bi可视化工具 通过智能数据分析使复杂变得简单 (Making the complex simple with smart data analysis) It is hard to overestim ...
- 【kafka】Kafka 可视化工具Kafka Eagle安装和使用
一.背景 Kafka产线环境需要管理的Topic和Consumser越来越多,使用命令行工具进行管理会非常繁杂.因此,大数据平台上需要一套Kafka的管理监控系统,Kafka-Eagle. Kafka ...
最新文章
- golang 生成 指定大小 空白文件
- 图像处理前沿技术_深入浅出人工智能前沿技术—机器视觉检测,看清人类智慧工业...
- 在NetBeans,Eclipse,IntelliJ,OpenShift和Maven上使用WildFly 9
- shiro的集群动态权限更新
- php date( ymd_PHP DATE()
- java华容道swing_一道java的界面初级题目,已有代码,加几行即可,华容道游戏。...
- catch里面不想做任何处理_处理异常的三种健壮方式
- mysql order优化2019_mysql 增加排序 性能差很多 怎么优化
- python 新手常见问题
- 极简嵌入式C语言教程——从入门到入土(1)
- 网络安全实验室 脚本关 解析
- [项目分享]JSP+Servlet+JDBC+DBCP2实现在线购书系统
- java递归生成无限层级的树--分类管理
- 破解Prezi桌面版30天限制的方法
- Aho_Corasick_Automaton
- linux下sd分区扩容,实用技巧:Linux系统分区容量扩充的方法
- Latex 1: 解决latex中遇到一个常见错误:Improper alphabetic constant.
- 域名信息备案管理系统php,如何查询域名备案号
- 【Ubuntu】查询显卡型号
- React中过渡动画的编写方式
热门文章
- 高效使用PC需要记住的快捷键
- ASP.NET Core实现类库项目读取配置文件
- 基于多进程和基于多线程服务器的优缺点及nginx服务器的启动过程
- IIS7.0 部署wcf 404或者配置MIME(转)
- 【数据结构】数组和广义表
- linux下find查找带有指定权限的文件(windows下编译的源代码文件)
- 在Asp.net应用程序中构建基于WCF Web.Api的服务
- Ext.DomHelper类的使用示例(内容操作)
- IT服务台的进化(2)--企业外部服务台的优缺点
- 世纪佳缘,玫瑰和面包开始PK