上一篇:深夜看了张一鸣的微博,让我越想越后怕

整体流程大概如下:

服务器准备

在这先列出各服务器节点,方便同学们在下文中对照节点查看相应内容

SpringBoot项目准备

引入log4j2替换SpringBoot默认log,demo项目结构如下:

pom

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><!--  排除spring-boot-starter-logging --><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency> <!-- log4j2 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency> <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency>
</dependencies>

log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" ><Properties><Property name="LOG_HOME">logs</Property><property name="FILE_NAME">collector</property><property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property></Properties><Appenders><Console name="CONSOLE" target="SYSTEM_OUT"><PatternLayout pattern="${patternLayout}"/></Console>  <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" ><PatternLayout pattern="${patternLayout}" /><Policies><TimeBasedTriggeringPolicy interval="1"/><SizeBasedTriggeringPolicy size="500MB"/></Policies><DefaultRolloverStrategy max="20"/>         </RollingRandomAccessFile><RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" ><PatternLayout pattern="${patternLayout}" /><Filters><ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/></Filters>              <Policies><TimeBasedTriggeringPolicy interval="1"/><SizeBasedTriggeringPolicy size="500MB"/></Policies><DefaultRolloverStrategy max="20"/>         </RollingRandomAccessFile>            </Appenders><Loggers><!-- 业务相关 异步logger --><AsyncLogger name="com.bfxy.*" level="info" includeLocation="true"><AppenderRef ref="appAppender"/></AsyncLogger><AsyncLogger name="com.bfxy.*" level="info" includeLocation="true"><AppenderRef ref="errorAppender"/></AsyncLogger>       <Root level="info"><Appender-Ref ref="CONSOLE"/><Appender-Ref ref="appAppender"/><AppenderRef ref="errorAppender"/></Root>         </Loggers>
</Configuration>

IndexController

测试Controller,用以打印日志进行调试

@Slf4j
@RestController
public class IndexController {@RequestMapping(value = "/index")public String index() {InputMDC.putMDC();log.info("我是一条info日志");log.warn("我是一条warn日志");log.error("我是一条error日志");return "idx";}@RequestMapping(value = "/err")public String err() {InputMDC.putMDC();try {int a = 1/0;} catch (Exception e) {log.error("算术异常", e);}return "err";}}

InputMDC

用以获取log中的[%X{hostName}][%X{ip}][%X{applicationName}]三个字段值

@Component
public class InputMDC implements EnvironmentAware {private static Environment environment;@Overridepublic void setEnvironment(Environment environment) {InputMDC.environment = environment;}public static void putMDC() {MDC.put("hostName", NetUtil.getLocalHostName());MDC.put("ip", NetUtil.getLocalIp());MDC.put("applicationName", environment.getProperty("spring.application.name"));}}

NetUtil

public class NetUtil {   public static String normalizeAddress(String address){String[] blocks = address.split("[:]");if(blocks.length > 2){throw new IllegalArgumentException(address + " is invalid");}String host = blocks[0];int port = 80;if(blocks.length > 1){port = Integer.valueOf(blocks[1]);} else {address += ":"+port; //use default 80} String serverAddr = String.format("%s:%d", host, port);return serverAddr;}public static String getLocalAddress(String address){String[] blocks = address.split("[:]");if(blocks.length != 2){throw new IllegalArgumentException(address + " is invalid address");} String host = blocks[0];int port = Integer.valueOf(blocks[1]);if("0.0.0.0".equals(host)){return String.format("%s:%d",NetUtil.getLocalIp(), port);}return address;}private static int matchedIndex(String ip, String[] prefix){for(int i=0; i<prefix.length; i++){String p = prefix[i];if("*".equals(p)){ //*, assumed to be IPif(ip.startsWith("127.") ||ip.startsWith("10.") || ip.startsWith("172.") ||ip.startsWith("192.")){continue;}return i;} else {if(ip.startsWith(p)){return i;}} }return -1;}public static String getLocalIp(String ipPreference) {if(ipPreference == null){ipPreference = "*>10>172>192>127";}String[] prefix = ipPreference.split("[> ]+");try {Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();String matchedIp = null;int matchedIdx = -1;while (interfaces.hasMoreElements()) {NetworkInterface ni = interfaces.nextElement();Enumeration<InetAddress> en = ni.getInetAddresses(); while (en.hasMoreElements()) {InetAddress addr = en.nextElement();String ip = addr.getHostAddress();  Matcher matcher = pattern.matcher(ip);if (matcher.matches()) {  int idx = matchedIndex(ip, prefix);if(idx == -1) continue;if(matchedIdx == -1){matchedIdx = idx;matchedIp = ip;} else {if(matchedIdx>idx){matchedIdx = idx;matchedIp = ip;}}} } } if(matchedIp != null) return matchedIp;return "127.0.0.1";} catch (Exception e) { return "127.0.0.1";}}public static String getLocalIp() {return getLocalIp("*>10>172>192>127");}public static String remoteAddress(SocketChannel channel){SocketAddress addr = channel.socket().getRemoteSocketAddress();String res = String.format("%s", addr);return res;}public static String localAddress(SocketChannel channel){SocketAddress addr = channel.socket().getLocalSocketAddress();String res = String.format("%s", addr);return addr==null? res: res.substring(1);}public static String getPid(){RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();String name = runtime.getName();int index = name.indexOf("@");if (index != -1) {return name.substring(0, index);}return null;}public static String getLocalHostName() {try {return (InetAddress.getLocalHost()).getHostName();} catch (UnknownHostException uhe) {String host = uhe.getMessage();if (host != null) {int colon = host.indexOf(':');if (colon > 0) {return host.substring(0, colon);}}return "UnknownHost";}}
}

启动项目,访问/index/ero接口,可以看到项目中生成了app-collector.logerror-collector.log两个日志文件:

我们将Springboot服务部署在192.168.11.31这台机器上。

Kafka安装和启用

kafka下载地址:

http://kafka.apache.org/downloads.html

kafka安装步骤:首先kafka安装需要依赖与zookeeper,所以小伙伴们先准备好zookeeper环境(三个节点即可),然后我们来一起构建kafka broker。

另外,Kafka 系列面试题和答案全部整理好了,微信搜索互联网架构师,在后台发送:面试,可以在线阅读。

## 解压命令:
tar -zxvf kafka_2.12-2.1.0.tgz -C /usr/local/
## 改名命令:
mv kafka_2.12-2.1.0/ kafka_2.12
## 进入解压后的目录,修改server.properties文件:
vim /usr/local/kafka_2.12/config/server.properties
## 修改配置:
broker.id=0
port=9092
host.name=192.168.11.51
advertised.host.name=192.168.11.51
log.dirs=/usr/local/kafka_2.12/kafka-logs
num.partitions=2
zookeeper.connect=192.168.11.111:2181,192.168.11.112:2181,192.168.11.113:2181## 建立日志文件夹:
mkdir /usr/local/kafka_2.12/kafka-logs##启动kafka:
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

创建两个topic

## 创建topic
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
kafka-topics.sh --zookeeper 192.168.11.111:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1

我们可以查看一下topic情况

kafka-topics.sh --zookeeper 192.168.11.111:2181 --topic app-log-test --describe

可以看到已经成功启用了app-log-collectorerror-log-collector两个topic

filebeat安装和启用

filebeat下载

cd /usr/local/software
tar -zxvf filebeat-6.6.0-linux-x86_64.tar.gz -C /usr/local/
cd /usr/local
mv filebeat-6.6.0-linux-x86_64/ filebeat-6.6.0

配置filebeat,可以参考下方yml配置文件

vim /usr/local/filebeat-5.6.2/filebeat.yml
###################### Filebeat Configuration Example #########################
filebeat.prospectors:- input_type: logpaths:## app-服务名称.log, 为什么写死,防止发生轮转抓取历史数据- /usr/local/logs/app-collector.log#定义写入 ES 时的 _type 值document_type: "app-log"multiline:#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)negate: true                                # 是否匹配到match: after                                # 合并到上一行的末尾max_lines: 2000                             # 最大的行数timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志fields:logbiz: collectorlogtopic: app-log-collector   ## 按服务划分用作kafka topicevn: dev- input_type: logpaths:- /usr/local/logs/error-collector.logdocument_type: "error-log"multiline:#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)negate: true                                # 是否匹配到match: after                                # 合并到上一行的末尾max_lines: 2000                             # 最大的行数timeout: 2s                                 # 如果在规定时间没有新的日志事件就不等待后面的日志fields:logbiz: collectorlogtopic: error-log-collector   ## 按服务划分用作kafka topicevn: devoutput.kafka:enabled: truehosts: ["192.168.11.51:9092"]topic: '%{[fields.logtopic]}'partition.hash:reachable_only: truecompression: gzipmax_message_bytes: 1000000required_acks: 1
logging.to_files: true

filebeat启动:

检查配置是否正确

cd /usr/local/filebeat-6.6.0
./filebeat -c filebeat.yml -configtest
## Config OK

启动filebeat

/usr/local/filebeat-6.6.0/filebeat &

检查是否启动成功

ps -ef | grep filebeat

可以看到filebeat已经启动成功

然后我们访问192.168.11.31:8001/index和192.168.11.31:8001/err,再查看kafka的logs文件,可以看到已经生成了app-log-collector-0和error-log-collector-0文件,说明filebeat已经帮我们把数据收集好放到了kafka上。

logstash安装

我们在logstash的安装目录下新建一个文件夹

mkdir scrpit

然后cd进该文件,创建一个logstash-script.conf文件

cd scrpit
vim logstash-script.conf
## multiline 插件也可以用于其他类似的堆栈式信息,比如 linux 的内核日志。
input {kafka {## app-log-服务名称topics_pattern => "app-log-.*"bootstrap_servers => "192.168.11.51:9092"codec => jsonconsumer_threads => 1 ## 增加consumer的并行消费线程数decorate_events => true#auto_offset_rest => "latest"group_id => "app-log-group"}kafka {## error-log-服务名称topics_pattern => "error-log-.*"bootstrap_servers => "192.168.11.51:9092"codec => jsonconsumer_threads => 1decorate_events => true#auto_offset_rest => "latest"group_id => "error-log-group"}}filter {## 时区转换ruby {code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"}if "app-log" in [fields][logtopic]{grok {## 表达式,这里对应的是Springboot输出的日志格式match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]}}if "error-log" in [fields][logtopic]{grok {## 表达式match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]}}}## 测试输出到控制台:
output {stdout { codec => rubydebug }
}## elasticsearch:
output {if "app-log" in [fields][logtopic]{## es插件elasticsearch {# es服务地址hosts => ["192.168.11.35:9200"]# 用户名密码      user => "elastic"password => "123456"## 索引名,+ 号开头的,就会自动认为后面是时间格式:## javalog-app-service-2019.01.23 index => "app-log-%{[fields][logbiz]}-%{index_time}"# 是否嗅探集群ip:一般设置true;http://192.168.11.35:9200/_nodes/http?pretty# 通过嗅探机制进行es集群负载均衡发日志消息sniffing => true# logstash默认自带一个mapping模板,进行模板覆盖template_overwrite => true} }if "error-log" in [fields][logtopic]{elasticsearch {hosts => ["192.168.11.35:9200"]    user => "elastic"password => "123456"index => "error-log-%{[fields][logbiz]}-%{index_time}"sniffing => truetemplate_overwrite => true} }}

启动logstash

/usr/local/logstash-6.6.0/bin/logstash -f /usr/local/logstash-6.6.0/script/logstash-script.conf &

等待启动成功,我们再次访问192.168.11.31:8001/err

可以看到控制台开始打印日志

ElasticSearch与Kibana

ES和Kibana的搭建之前没写过博客,网上资料也比较多,大家可以自行搜索。

搭建完成后,访问Kibana的管理页面192.168.11.35:5601,选择Management -> Kinaba - Index Patterns

然后Create index pattern

  • index pattern 输入 app-log-*

  • Time Filter field name 选择 currentDateTime

这样我们就成功创建了索引。

我们再次访问192.168.11.31:8001/err,这个时候就可以看到我们已经命中了一条log信息

里面展示了日志的全量信息

到这里,我们完整的日志收集及可视化就搭建完成了!

原文链接:https://blog.csdn.net/lt326030434/article/details/107361190

感谢您的阅读,也欢迎您发表关于这篇文章的任何建议,关注我,技术不迷茫!小编到你上高速。

· END ·

最后,关注公众号互联网架构师,在后台回复:2T,可以获取我整理的 Java 系列面试题和答案,非常齐全。

正文结束

推荐阅读 ↓↓↓

1.不认命,从10年流水线工人,到谷歌上班的程序媛,一位湖南妹子的励志故事

2.如何才能成为优秀的架构师?

3.从零开始搭建创业公司后台技术栈

4.程序员一般可以从什么平台接私活?

5.37岁程序员被裁,120天没找到工作,无奈去小公司,结果懵了...

6.IntelliJ IDEA 2019.3 首个最新访问版本发布,新特性抢先看

7.这封“领导痛批95后下属”的邮件,句句扎心!

8.15张图看懂瞎忙和高效的区别!

一个人学习、工作很迷茫?

点击「阅读原文」加入我们的小圈子!

SpringBoot + Kafka + ELK 完成海量日志收集(超详细)相关推荐

  1. SpringBoot+Kafka+ELK 完成海量日志收集(超详细)

    点击关注公众号,实用技术文章及时了解 来源:jiandansuifeng.blog.csdn.net/ article/details/107361190 整体流程大概如下: 服务器准备 在这先列出各 ...

  2. 基于Kafka+ELK搭建海量日志平台

    早在传统的单体应用时代,查看日志大都通过SSH客户端登服务器去看,使用较多的命令就是 less 或者 tail.如果服务部署了好几台,就要分别登录到这几台机器上看,等到了分布式和微服务架构流行时代,一 ...

  3. ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台(elk5.2+filebeat2.11)

    ELK+Filebeat+Kafka+ZooKeeper 构建海量日志分析平台 参考:http://www.tuicool.com/articles/R77fieA 我在做ELK日志平台开始之初选择为 ...

  4. printf 重新实现put_Go 实现海量日志收集系统(四)

    2020.1.16 51Reboot 将在 2020.1.16日(今天) 21:00 为您带来分享主题<大佬教你如何从 ES 初学者到 ES专家> 直播链接(提前报名):https://k ...

  5. Flume 海量日志收集利器

    Flume 海量日志收集利器 关于日志收集 服务器日志收集 服务器日志是大数据系统中最主要的数据来源之一 服务器日志可能包含的信息 访问信息 系统信息 其他业务信息 基于服务器日志的应用 业务仪表盘: ...

  6. kafka 可视化工具_两小时带你轻松实战SpringBoot+kafka+ELK分布式日志收集

    一.背景 随着业务复杂度的提升以及微服务的兴起,传统单一项目会被按照业务规则进行垂直拆分,另外为了防止单点故障我们也会将重要的服务模块进行集群部署,通过负载均衡进行服务的调用.那么随着节点的增多,各个 ...

  7. python分布式日志收集系统_Go实现海量日志收集系统(一)

    项目背景 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 当系统机器比较少时,登陆到服务器上查看即可满足 当系统机器规模巨大,登陆到机器上查看几乎不现实 当然即使是机器规模不大,一个系统通常 ...

  8. 转: 基于elk 实现nginx日志收集与数据分析

    原文链接:https://www.cnblogs.com/wenchengxiaopenyou/p/9034213.html 一.背景 前端web服务器为nginx,采用filebeat + logs ...

  9. 海量日志收集利器 —— Flume

    Flume 是什么? Flume是一个分布式.可靠.和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,用于收集数据:同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的 ...

  10. 从0开始CentOS7上安装ELK,实现日志收集

    从0开始CentOS7上安装ELK实现日志收集 1. ELK Stack 简介 2. 组件下载 2.1 安装环境及版本 2.2 下载安装包 3.安装 3.1 ElasticSearch安装 3.1.1 ...

最新文章

  1. autosar中com模块_详细介绍AUTOSAR各个模块作用PART1(OS,SYS)
  2. 数据中设计中的范式与反范式
  3. 币知识——比特币现金
  4. 第7集 构造函数中抛出的异常
  5. pythonrecord 51 net_python与c#的交互模块pythonnet
  6. PostgreSQL 常见操作
  7. poj1062昂贵的聘礼(Dijkstra**)
  8. 用C#读取XML文档
  9. java uuid fasterxml_可笑!可悲!可叹!你竟然还不知道Java如何生成UUID?
  10. 孙子和外孙就是不一样吗?
  11. java线程间通信 实例_JAVA-初步认识-第十四章-线程间通信-示例
  12. java web 部署_一步一步将java web项目部署到云服务器
  13. RS485电路原理以及设计
  14. vbs整人代码蓝屏_vbs恶作剧(整人代码)-英文报数 蓝屏 重启电脑等
  15. IC前端设计使用的EDA软件
  16. 微信小程序独立服务器的好处,微信小程序的优势和缺点
  17. 多伦多大学计算机专音乐专业,多伦多大学音乐理论专业介绍
  18. DateTime转为特定时区时间
  19. OpenCV彩色图像读取
  20. SWFObject2

热门文章

  1. vue2.0 实现导航守卫(路由守卫)
  2. 轻松使用EasyRecovery恢复丢失照片
  3. JDK下Bin目录的工具介绍
  4. MySQL下xtrabackup与MTS造成的死锁
  5. Flask框架-模板
  6. javascript对象的浅拷贝、深拷贝和Object.assign方法浅析
  7. [转载] 2012年上半年信管网论文复习建议
  8. js赋值时特殊字符完美处理方案
  9. 3D LUT Creator Pro for Mac(专业调色软件)中文版
  10. X Lossless Decoder for mac(XLD音频无损解码器)