ELK+kafaka+filebeat实现系统日志收集与预警

总体的流程图如下

1.项目准备

添加相关依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId><!--     排除spring-boot-starter-logging --><exclusions><exclusion><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-logging</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- log4j2 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-log4j2</artifactId></dependency><dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.58</version></dependency></dependencies><build><finalName>collector</finalName><!-- 打包时包含properties、xml --><resources><resource><directory>src/main/java</directory><includes><include>**/*.properties</include><include>**/*.xml</include></includes><!-- 是否替换资源中的属性--><filtering>true</filtering></resource><resource><directory>src/main/resources</directory></resource></resources><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><mainClass>com.xp.logcollector.Application</mainClass></configuration></plugin></plugins></build>

配置log4j2.xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration status="INFO" schema="Log4J-V2.0.xsd" monitorInterval="600" ><Properties><Property name="LOG_HOME">logs</Property><property name="FILE_NAME">logcollector</property><property name="patternLayout">[%d{yyyy-MM-dd'T'HH:mm:ss.SSSZZ}] [%level{length=5}] [%thread-%tid] [%logger] [%X{hostName}] [%X{ip}] [%X{applicationName}] [%F,%L,%C,%M] [%m] ## '%ex'%n</property></Properties><Appenders><Console name="CONSOLE" target="SYSTEM_OUT"><PatternLayout pattern="${patternLayout}"/></Console>  <RollingRandomAccessFile name="appAppender" fileName="${LOG_HOME}/app-${FILE_NAME}.log" filePattern="${LOG_HOME}/app-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" ><PatternLayout pattern="${patternLayout}" /><Policies><TimeBasedTriggeringPolicy interval="1"/><SizeBasedTriggeringPolicy size="500MB"/></Policies><DefaultRolloverStrategy max="20"/>         </RollingRandomAccessFile><RollingRandomAccessFile name="errorAppender" fileName="${LOG_HOME}/error-${FILE_NAME}.log" filePattern="${LOG_HOME}/error-${FILE_NAME}-%d{yyyy-MM-dd}-%i.log" ><PatternLayout pattern="${patternLayout}" /><Filters><ThresholdFilter level="warn" onMatch="ACCEPT" onMismatch="DENY"/></Filters>              <Policies><TimeBasedTriggeringPolicy interval="1"/><SizeBasedTriggeringPolicy size="500MB"/></Policies><DefaultRolloverStrategy max="20"/>         </RollingRandomAccessFile>            </Appenders><Loggers><!-- 业务相关 异步logger --><AsyncLogger name="com.xp.*" level="info" includeLocation="true"><AppenderRef ref="appAppender"/></AsyncLogger><AsyncLogger name="com.xp.*" level="info" includeLocation="true"><AppenderRef ref="errorAppender"/></AsyncLogger>       <Root level="info"><Appender-Ref ref="CONSOLE"/><Appender-Ref ref="appAppender"/><AppenderRef ref="errorAppender"/></Root>         </Loggers>
</Configuration>

项目中打印日志使用

package com.xp.logcollector.web;import com.xp.logcollector.utils.InputMDC;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;/*** @Author xp* @Description**/
@Slf4j
@RestController
public class IndexController {@RequestMapping("/index")public String index(){InputMDC.putMDC();log.info("index中的info级别日志");log.error("index中的error级别日志");log.warn("index中的warn级别日志");return "index";}@RequestMapping(value = "/err")public String err() {InputMDC.putMDC();try {int a = 1/0;} catch (Exception e) {log.error("算术异常", e);}return "err";}
}

相关工具包
FastJsonConvertUtil

package com.xp.logcollector.utils;import java.util.ArrayList;
import java.util.List;import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.serializer.SerializerFeature;import lombok.extern.slf4j.Slf4j;/*** $FastJsonConvertUtil*/
@Slf4j
public class FastJsonConvertUtil {private static final SerializerFeature[] featuresWithNullValue = { SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullBooleanAsFalse,SerializerFeature.WriteNullListAsEmpty, SerializerFeature.WriteNullNumberAsZero, SerializerFeature.WriteNullStringAsEmpty };/*** <B>方法名称:</B>将JSON字符串转换为实体对象<BR>* <B>概要说明:</B>将JSON字符串转换为实体对象<BR>* @param data JSON字符串* @param clzss 转换对象* @return T*/public static <T> T convertJSONToObject(String data, Class<T> clzss) {try {T t = JSON.parseObject(data, clzss);return t;} catch (Exception e) {log.error("convertJSONToObject Exception", e);return null;}}/*** <B>方法名称:</B>将JSONObject对象转换为实体对象<BR>* <B>概要说明:</B>将JSONObject对象转换为实体对象<BR>* @param data JSONObject对象* @param clzss 转换对象* @return T*/public static <T> T convertJSONToObject(JSONObject data, Class<T> clzss) {try {T t = JSONObject.toJavaObject(data, clzss);return t;} catch (Exception e) {log.error("convertJSONToObject Exception", e);return null;}}/*** <B>方法名称:</B>将JSON字符串数组转为List集合对象<BR>* <B>概要说明:</B>将JSON字符串数组转为List集合对象<BR>* @param data JSON字符串数组* @param clzss 转换对象* @return List<T>集合对象*/public static <T> List<T> convertJSONToArray(String data, Class<T> clzss) {try {List<T> t = JSON.parseArray(data, clzss);return t;} catch (Exception e) {log.error("convertJSONToArray Exception", e);return null;}}/*** <B>方法名称:</B>将List<JSONObject>转为List集合对象<BR>* <B>概要说明:</B>将List<JSONObject>转为List集合对象<BR>* @param data List<JSONObject>* @param clzss 转换对象* @return List<T>集合对象*/public static <T> List<T> convertJSONToArray(List<JSONObject> data, Class<T> clzss) {try {List<T> t = new ArrayList<T>();for (JSONObject jsonObject : data) {t.add(convertJSONToObject(jsonObject, clzss));}return t;} catch (Exception e) {log.error("convertJSONToArray Exception", e);return null;}}/*** <B>方法名称:</B>将对象转为JSON字符串<BR>* <B>概要说明:</B>将对象转为JSON字符串<BR>* @param obj 任意对象* @return JSON字符串*/public static String convertObjectToJSON(Object obj) {try {String text = JSON.toJSONString(obj);return text;} catch (Exception e) {log.error("convertObjectToJSON Exception", e);return null;}}/*** <B>方法名称:</B>将对象转为JSONObject对象<BR>* <B>概要说明:</B>将对象转为JSONObject对象<BR>* @param obj 任意对象* @return JSONObject对象*/public static JSONObject convertObjectToJSONObject(Object obj){try {JSONObject jsonObject = (JSONObject) JSONObject.toJSON(obj);return jsonObject;} catch (Exception e) {log.error("convertObjectToJSONObject Exception", e);return null;}     }public static String convertObjectToJSONWithNullValue(Object obj) {try {String text = JSON.toJSONString(obj, featuresWithNullValue);return text;} catch (Exception e) {log.error("convertObjectToJSONWithNullValue Exception", e);return null;}}
}

InputMDC

package com.xp.logcollector.utils;import org.jboss.logging.MDC;
import org.springframework.context.EnvironmentAware;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;/*** 获取相关主机名 ip地址 和应用名 补充日志内容*/
@Component
public class InputMDC implements EnvironmentAware {private static Environment environment;@Overridepublic void setEnvironment(Environment environment) {InputMDC.environment = environment;}public static void putMDC() {MDC.put("hostName", NetUtil.getLocalHostName());MDC.put("ip", NetUtil.getLocalIp());MDC.put("applicationName", environment.getProperty("spring.application.name"));}}

NetUtil

package com.xp.logcollector.utils;import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.SocketChannel;
import java.util.Enumeration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;/*** $NetUtil*/
public class NetUtil {   public static String normalizeAddress(String address){String[] blocks = address.split("[:]");if(blocks.length > 2){throw new IllegalArgumentException(address + " is invalid");}String host = blocks[0];int port = 80;if(blocks.length > 1){port = Integer.valueOf(blocks[1]);} else {address += ":"+port; //use default 80} String serverAddr = String.format("%s:%d", host, port);return serverAddr;}public static String getLocalAddress(String address){String[] blocks = address.split("[:]");if(blocks.length != 2){throw new IllegalArgumentException(address + " is invalid address");} String host = blocks[0];int port = Integer.valueOf(blocks[1]);if("0.0.0.0".equals(host)){return String.format("%s:%d",NetUtil.getLocalIp(), port);}return address;}private static int matchedIndex(String ip, String[] prefix){for(int i=0; i<prefix.length; i++){String p = prefix[i];if("*".equals(p)){ //*, assumed to be IPif(ip.startsWith("127.") ||ip.startsWith("10.") || ip.startsWith("172.") ||ip.startsWith("192.")){continue;}return i;} else {if(ip.startsWith(p)){return i;}} }return -1;}public static String getLocalIp(String ipPreference) {if(ipPreference == null){ipPreference = "*>10>172>192>127";}String[] prefix = ipPreference.split("[> ]+");try {Pattern pattern = Pattern.compile("[0-9]+\\.[0-9]+\\.[0-9]+\\.[0-9]+");Enumeration<NetworkInterface> interfaces = NetworkInterface.getNetworkInterfaces();String matchedIp = null;int matchedIdx = -1;while (interfaces.hasMoreElements()) {NetworkInterface ni = interfaces.nextElement();Enumeration<InetAddress> en = ni.getInetAddresses(); while (en.hasMoreElements()) {InetAddress addr = en.nextElement();String ip = addr.getHostAddress();  Matcher matcher = pattern.matcher(ip);if (matcher.matches()) {  int idx = matchedIndex(ip, prefix);if(idx == -1) continue;if(matchedIdx == -1){matchedIdx = idx;matchedIp = ip;} else {if(matchedIdx>idx){matchedIdx = idx;matchedIp = ip;}}} } } if(matchedIp != null) return matchedIp;return "127.0.0.1";} catch (Exception e) { return "127.0.0.1";}}public static String getLocalIp() {return getLocalIp("*>10>172>192>127");}public static String remoteAddress(SocketChannel channel){SocketAddress addr = channel.socket().getRemoteSocketAddress();String res = String.format("%s", addr);return res;}public static String localAddress(SocketChannel channel){SocketAddress addr = channel.socket().getLocalSocketAddress();String res = String.format("%s", addr);return addr==null? res: res.substring(1);}public static String getPid(){RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean();String name = runtime.getName();int index = name.indexOf("@");if (index != -1) {return name.substring(0, index);}return null;}public static String getLocalHostName() {try {return (InetAddress.getLocalHost()).getHostName();} catch (UnknownHostException uhe) {String host = uhe.getMessage();if (host != null) {int colon = host.indexOf(':');if (colon > 0) {return host.substring(0, colon);}}return "UnknownHost";}}
}

警告的回调准备
entity

package com.xp.logcollector.entity;import lombok.Data;@Data
public class AccurateWatcherMessage {private String title;private String executionTime;private String applicationName;private String level;private String body;}

controller

package com.xp.logcollector.web;import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import com.alibaba.fastjson.JSON;
import com.xp.logcollector.entity.AccurateWatcherMessage;@RestController
public class WatcherController {@RequestMapping(value ="/accurateWatch")public String watch(@RequestBody AccurateWatcherMessage accurateWatcherMessage) {String ret = JSON.toJSONString(accurateWatcherMessage);System.err.println("----告警内容----:" + ret);return "is watched" + ret;}
}

2.配置filebeat

我们使用filebeat进行日志的收集
配置filebeat.yml

###################### Filebeat Configuration Example #########################
filebeat.prospectors:- input_type: logpaths:## app-服务名称.log, 为何写死,防止发生轮转抓取历史数据- /usr/local/software/logs/app-logcollector.log       #日志文件地址#定义写入 ES 时的 _type 值document_type: "app-log"multiline:#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)。具体以哪一种形式进行匹配要根据实际的日志格式来配置。negate: true                                # 是否必须匹配到match: after                                # 以[开头的多行数据,从第二行开始合并到上一行的末尾max_lines: 2000                             # 最大的行数,多余的再也不合并到上一行末尾timeout: 2s                                 # 若是在规定时间没有新的日志事件就不等待后面的日志,提交数据fields:logbiz: collectorlogtopic: app-log-collector   ## 按服务划分用做kafka topicevn: dev- input_type: logpaths:- /usr/local/software/logs/error-logcollector.logdocument_type: "error-log"multiline:#pattern: '^\s*(\d{4}|\d{2})\-(\d{2}|[a-zA-Z]{3})\-(\d{2}|\d{4})'   # 指定匹配的表达式(匹配以 2017-11-15 08:04:23:889 时间格式开头的字符串)pattern: '^\['                              # 指定匹配的表达式(匹配以 "{ 开头的字符串)negate: true                                # 是否匹配到match: after                                # 合并到上一行的末尾max_lines: 2000                             # 最大的行数timeout: 2s                                 # 若是在规定时间没有新的日志事件就不等待后面的日志fields:logbiz: collectorlogtopic: error-log-collector   ## 按服务划分用做kafka topicevn: devoutput.kafka:enabled: truehosts: ["ip:9092"]topic: '%{[fields.logtopic]}'partition.hash:reachable_only: truecompression: gzipmax_message_bytes: 1000000required_acks: 1
logging.to_files: true

3.配置Kafka

使用Kafka存储日志消息
启动Kafka

/usr/local/software/kafka_2.12-2.1.1/bin/kafka-server-start.sh /usr/local/software/kafka_2.12-2.1.1/config/server.properties &

创建两个主题

/usr/local/software/kafka_2.12-2.1.1/bin/kafka-topics.sh --zookeeper i[:2181 --create --topic app-log-collector --partitions 1  --replication-factor 1
/usr/local/software/kafka_2.12-2.1.1/bin/kafka-topics.sh --zookeeper ip:2181 --create --topic error-log-collector --partitions 1  --replication-factor 1

查看主题

bin/kafka-console-consumer.sh --bootstrap-server ip:9092 --topic error-log-collector  --from-beginning

4.配置elastcisearch

详细查看
elasticsearch配置
添加ik分词器

## 下载 https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.6.0/elasticsearch-analysis-ik-6.6.0.zip
mkdir -p /usr/local/elasticsearch-6.6.0/plugins/ik/
## 上传到/usr/local/software下 elasticsearch-analysis-ik-6.6.0.zip
## 进行解压到刚创建的/usr/local/elasticsearch-6.6.0/plugins/ik/目录:
unzip -d /usr/local/elasticsearch-6.6.0/plugins/ik/ elasticsearch-analysis-ik-6.6.0.zip## 查看是否ok
cd /usr/local/elasticsearch-6.6.0/plugins/ik/
## 重新复权
chown -R elk:elk /usr/local/elasticsearch-6.6.0/## 重新启动ES节点,显示如下信息代表加载ik分词器成功
[es-node01] loaded plugin [analysis-ik]

5.配置logstash

logstash充当消费者将kafaka里面的日志topic消费到elasticsearch中去
在logstash目录创建script
添加脚本logstash-script.conf

input {kafka {## app-log-服务名称topics_pattern => "app-log-.*"bootstrap_servers => "ip:9092"codec => jsonconsumer_threads => 1    ## 由于只设置了一个partition,因此消费者线程数设置为1decorate_events => true#auto_offset_rest => "latest"group_id => "app-log-group"}kafka {## error-log-服务名称topics_pattern => "error-log-.*"bootstrap_servers => "ip:9092"codec => jsonconsumer_threads => 1decorate_events => true#auto_offset_rest => "latest"group_id => "error-log-group"}}filter {## 时区转换ruby {code => "event.set('index_time',event.timestamp.time.localtime.strftime('%Y.%m.%d'))"}if "app-log" in [fields][logtopic]{grok {## 表达式match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]}}if "error-log" in [fields][logtopic]{grok {## 表达式match => ["message", "\[%{NOTSPACE:currentDateTime}\] \[%{NOTSPACE:level}\] \[%{NOTSPACE:thread-id}\] \[%{NOTSPACE:class}\] \[%{DATA:hostName}\] \[%{DATA:ip}\] \[%{DATA:applicationName}\] \[%{DATA:location}\] \[%{DATA:messageInfo}\] ## (\'\'|%{QUOTEDSTRING:throwable})"]}}}## 测试输出到控制台:
output {stdout { codec => rubydebug }
}## elasticsearch,未实现:
output {if "app-log" in [fields][logtopic]{## es插件elasticsearch {# es服务地址hosts => ["ip:9200"]# 用户名密码user => "elastic"password => "123456"## 索引名,+ 号开头的,就会自动认为后面是时间格式:## javalog-app-service-2019.01.23index => "app-log-%{[fields][logbiz]}-%{index_time}"# 是否嗅探集群ip:通常设置true;http://ip:9200/_nodes/http?pretty# 经过嗅探机制进行es集群负载均衡发日志消息sniffing => true# logstash默认自带一个mapping模板,进行模板覆盖template_overwrite => true}}if "error-log" in [fields][logtopic]{elasticsearch {hosts => ["ip:9200"]user => "elastic"password => "123456"index => "error-log-%{[fields][logbiz]}-%{index_time}"sniffing => truetemplate_overwrite => true}}}

注意你的logstash和elasticsearch的私网ip要互通,logstash会用公网ip连接你的elasticsearch,然后通过你elasticsearch里面配置的连接池私网ip去推你的数据,如果你的私网ip不通会一直连接失败

Elasticsearch pool URLs updated {:changes=>{:removed=>[http://elastic:xxxxxx

6.kibana配置


## 解压kibana
wget https://artifacts.elastic.co/downloads/kibana/kibana-6.6.0-linux-x86_64.tar.gz
tar -zxvf kibana-6.6.0-linux-x86_64.tar.gz -C /usr/local/
mv kibana-6.6.0-linux-x86_64/ kibana-6.6.0
## 进入kibana目录,修改配置文件
vim /usr/local/kibana-6.6.0/config/kibana.yml
## 修改配置如下:
server.host: "0.0.0.0"
server.name: "ip"
elasticsearch.hosts: ["http://ip:9200"]
elasticsearch.username: "elastic"
elasticsearch.password: "123456"
## 启动:
/usr/local/kibana-6.6.0/bin/kibana &
## 指定配置文件启动:
nohup /usr/local/kibana-6.6.0/bin/kibana -c /usr/local/kibana-6.6.0/config/kibana.yml > /dev/null 2>&1 &
## 访问:
http://192.168.0.236:5601/app/kibana (5601为kibana默认端口)##申请license:
https://license.elastic.co/registration## 修改申请的license, 注意license.json文件名称不能变否则认证失败
1."type":"basic" 替换为 "type":"platinum" # 基础版变更为铂金版
2."expiry_date_in_millis":1561420799999 替换为 "expiry_date_in_millis":3107746200000 # 1年变为50年## 启动elasticsearch服务 和 kibana服务
## 进入kibana后台,Management->License Management上传修改后的token

7.依次启动,配置watcher

zookeeper---->filebeat---->kafka---->项目jar---->elasticsearch---->logstash---->kibana

在kibana中创建index,如果index输入关键词没有下一步提升,你的数据没有推送到elastic search中,注意你的端口是否打开,看其他应用是否启动起来了

创建erro的索引

PUT _template/error-log-
{"template": "error-log-*","order": 0,"settings": {"index": {"refresh_interval": "5s"}},"mappings": {"_default_": {"dynamic_templates": [{"message_field": {"match_mapping_type": "string","path_match": "message","mapping": {"norms": false,"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"}}},{"throwable_field": {"match_mapping_type": "string","path_match": "throwable","mapping": {"norms": false,"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word"}}},{"string_fields": {"match_mapping_type": "string","match": "*","mapping": {"norms": false,"type": "text","analyzer": "ik_max_word","search_analyzer": "ik_max_word","fields": {"keyword": {"type": "keyword"}}}}}],"_all": {"enabled": false},"properties": {         "hostName": {"type": "keyword"},"ip": {"type": "ip"},"level": {"type": "keyword"},"currentDateTime": {"type": "date"}}}}
}

创建对应的watcher

PUT _xpack/watcher/watch/error_log_collector_watcher
{"trigger": {"schedule": {"interval": "5s"}},"input": {"search": {"request": {"indices": ["<error-log-collector-{now+8h/d}>"],"body": {"size": 0,"query": {"bool": {"must": [{"term": {"level": "ERROR"}}],"filter": {"range": {"currentDateTime": {"gt": "now-30s" , "lt": "now"}}}}}}}}},"condition": {"compare": {"ctx.payload.hits.total": {"gt": 0}}},"transform": {"search": {"request": {"indices": ["<error-log-collector-{now+8h/d}>"],"body": {"size": 1,"query": {"bool": {"must": [{"term": {"level": "ERROR"}}],"filter": {"range": {"currentDateTime": {"gt": "now-30s" , "lt": "now"}}}}},"sort": [{"currentDateTime": {"order": "desc"}}]}}}},"actions": {"test_error": {"webhook" : {"method" : "POST","url" : "http://ip:8001/accurateWatch","body" : "{\"title\": \"异常错误告警\", \"applicationName\": \"{{#ctx.payload.hits.hits}}{{_source.applicationName}}{{/ctx.payload.hits.hits}}\", \"level\":\"告警级别P1\", \"body\": \"{{#ctx.payload.hits.hits}}{{_source.messageInfo}}{{/ctx.payload.hits.hits}}\", \"executionTime\": \"{{#ctx.payload.hits.hits}}{{_source.currentDateTime}}{{/ctx.payload.hits.hits}}\"}"}}}
}

查看对应日志



查看相关watcher

项目中回调预警业务

配置成功,祝君好梦

ELK+kafaka+filebeat实现系统日志收集与预警相关推荐

  1. ELK 使用filebeat替代Logstash收集日志

    使用beats采集日志 之前也介绍过beats是ELK体系中新增的一个工具,它属于一个轻量的日志采集器,以上我们使用的日志采集工具是logstash,但是logstash占用的资源比较大,没有beat ...

  2. 部署ELK+Kafka+Filebeat日志收集分析系统

    ELK+Kafka+Filebeat日志系统 文章目录 ELK+Kafka+Filebeat日志系统 1.环境规划 2.部署elasticsearch集群 2.1.配置es-1节点 2.2.配置es- ...

  3. ELK部署+filebeat应用

    系统配置:CentOS Linux release 7.6.1810 (2核1536M,机器配置问题,容易踩很多坑,特别是低配置,jvm参数可能需要去调整,不然起不来) ELK版本:7.10.2 el ...

  4. ELK 构建 MySQL 慢日志收集平台详解

    ELK 介绍 ELK 最早是 Elasticsearch(以下简称ES).Logstash.Kibana 三款开源软件的简称,三款软件后来被同一公司收购,并加入了Xpark.Beats等组件,改名为E ...

  5. 日志分析管理系统ELK+redis+filebeat搭建

    日志分析平台建设方案 1.建设原因 日志文件分散在各个应用服务器,人员需要远程登录才能查看日志,不利于服务器安全管控,加大服务器的风险 各服务器日志配置不统一,分布杂乱,需要进行规范与管理 日志文件信 ...

  6. ELK下filebeat性能调优

    filebeat作为ELK全家桶中的采集器,具备开箱即用的特点,配置非常简单.生产环境实践下来有几个值得关注的地方: 1.适当设置clean_和ignore_,防止文件重收 2.exclude_fil ...

  7. 从零搭建一个基于 ELK 的日志、指标收集与监控系统

    在需要私有化部署的系统中,大部分系统仅提供系统本身的业务功能,例如用户管理.财务管理.客户管理等.但是系统本身仍然需要进行日志的采集.应用指标的收集,例如请求速率.主机磁盘.内存使用量的收集等.同时方 ...

  8. 采集日志实践-ELK以及filebeat配置解析

    上一篇完整介绍elk等的安装步骤,下面介绍下它们的配置 我们做日志采集的时候一般步骤如: 日志庞大时,filebeat和logstash或者logstash和es之间可以增加kafka或redis 首 ...

  9. 项目实战|史上最简单的springboot 整合elk教程,实现日志收集(带视频哦)

    配套视频教程已经上传 整合ELK-实现日志收集(知乎) 整合ELK-实现日志收集(CSDN) 项目源码已上传至 https://gitee.com/yangleliu/learning.git,免费索 ...

最新文章

  1. 由单例模式学到:静态构造函数和静态字段
  2. Spring+MyBatis 多数据源配置和切换
  3. oracle---函数(trunc,nvl,nvl2)
  4. 双语经典:告别单身的必杀技之情话连篇
  5. Hibernate面试问题与解答
  6. 【连载】如何掌握openGauss数据库核心技术?秘诀三:拿捏存储技术(2)
  7. springboot细节挖掘(知识积累)
  8. ubuntu 11 mysql_Ubuntu 11.10是否包含MySQL 5.5?
  9. CCNA初学者应该知道的词
  10. C语言客户端窗口创建,【自己动手】用C语言写一个基于服务器和客户端!
  11. Fedora25安装mariadb并设置权限
  12. HttpStatusCode 枚举
  13. 【构造】构造一个字符串满足k个子序列问题总结
  14. 使用Zoiper与freeSWITCH开视频会议
  15. Linux:telnet命令安装
  16. 求通俗讲解下tensorflow的embedding_lookup接口的意思
  17. 1111,你的能量够买包卫生巾吗?
  18. App上架各大应用市场的地址及操作方法
  19. BUUCTF 荷兰宽带数据泄露
  20. 安装AD软件后,无法打开PCB,打开后一直提示advpcb.dll丢失,然后一直是停不下来的滚动条,解决方法。

热门文章

  1. linux系统解压权限,Linux之rar文件解压之路
  2. 【趣味】0基础快速掌握区块链服务关键概念
  3. 【MQ】kafka(一)——什么是kafka?在系统中干什么用?
  4. Python很慢?Python之父一句话亮了
  5. 防止文件过期的设置,2招解决
  6. VMware创建Ubuntu虚拟机步骤
  7. Vue移动端项目中px转rem的两种方法
  8. 聚合搜索推荐 - 氪主页
  9. Three.js新手入门实践案例解析
  10. 淘宝、1688、京东、拼多多,抖音五个平台的区别分析