目录

一、ELK日志分析集群

1、组件介绍

(1)Elasticsearch:

(2)Logstash:

(3)kibana :

(4)kafka:(kafka集群+elk)

(5)Filebeat:

2、版本说明

二、ELK部署

1、ELK常用架构

(1)基础架构:

(2)多节点部署logstash架构

2、实验环境

3、Elasticsearch部署

部署jdk+Elasticsearch

4、安装配置head监控插件(Web前端)

安装nodejs

安装git

下载安装head插件

配置elasticsearch,允许head插件访问

测试:

5、部署kibana

配置Nginx反向代理

6、部署logstash

1、安装jdk8

2、安装logstash

3、测试logstash

日志搜集

nginx日志搜集

三、 kafka部署

1、简介

2、部署kafka集群

3、部署elk+kafka

四、filebeat

1、filebeat简介

2、工作原理

3、filebeat参数说明

4、filebeat配置文件说明书

五、Packetbeat(扩展)


一、ELK日志分析集群

ELK:日志收集平台

ELK由ElasticSearch、Logstash和Kibana三个开源工具组成:

1、组件介绍

(1)Elasticsearch:

ElasticSearch是一个基于Lucene的开源分布式搜索服务。只搜索和分析日志

特点:分布式,零配置,自动发现,索引自动切片,索引副本机制,多数据源等。它提供了一个分布式多用户能力的全文搜索引擎。Elasticsearch是用java开发的,并作为Apache许可条款下的开放源码发布,是第二流行的企业搜索引擎。设计用于云计算中,能够达到实时搜索,稳定,可靠,快速,安装使用方便。在Elasticsearch集群中,所有节点的数据是均等的。

索引:

索引(库)-->类型(表)-->文档/日志(记录)

(2)Logstash:

Logstash是一个完全开源工具,可以对你的日志进行收集、过滤、分析,并将其存储供以后使用(如,搜索),logstash带有一个web界面,搜索和展示所有日志。 只收集和过滤日志,和改格式

简单来说logstash就是一根具备实时数据传输能力的管道,负责将数据信息从管道的输入端传输到管道的输出端;与此同时这根管道还可以让你根据自己的需求在中间加上滤网,Logstash提供很多功能强大的滤网以满足你的各种应用场景。

Logstash的事件(logstash将数据流中等每一条数据称之为一个事件)处理流水线有三个主要角色完成:inputs –> filters –> outputs:

logstash整个工作流程分为三个阶段:输入、过滤、输出。每个阶段都有强大的插件提供支持

Input 必须,负责产生事件(Inputs generate events),常用的插件有

  • file 从文件系统收集数据

  • syslog 从syslog日志收集数据

  • redis 从redis收集日志

  • beats 从beats family收集日志(如:Filebeats)

Filter常用的插件,负责数据处理与转换(filters modify them)

  • grok是logstash中最常用的日志解释和结构化插件。:grok是一种采用组合多个预定义的正则表达式,用来匹配分割文本并映射到关键字的工具。

  • mutate 支持事件的变换,例如重命名、移除、替换、修改等

  • drop 完全丢弃事件

  • clone 克隆事件

output 输出,必须,负责数据输出(outputs ship them elsewhere),常用的插件有

  • elasticsearch 把数据输出到elasticsearch

  • file 把数据输出为普通的

(3)kibana :

是一个优秀的前端日志展示框架,它可以非常详细的将日志转化为各种图表,为用户提供强大的数据可视化支持,它能够搜索、展示存储在 Elasticsearch 中索引数据。使用它可以很方便的用图表、表格、地图展示和分析数据。

(4)kafka:(kafka集群+elk)

是一种高吞吐量的分布式发布订阅消息系统。具有峰值处理能力,使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

• 1.发布和订阅记录流,类似于消息队列或企业消息传递系统。

• 2.以容错持久的方式存储记录流。

• 3.处理记录发生的流。

(5)Filebeat:

隶属于Beats,轻量级数据收集引擎。基于原先 Logstash-fowarder 的源码改造出来。换句话说:Filebeat就是新版的 Logstash-fowarder,也会是 ELK Stack 在 Agent 的第一选择,目前Beats包含四种工具:

• 1.Packetbeat(搜集网络流量数据)
• 2.Metricbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据。通过从操作系统和服务收集指标,帮助您监控服务器及其托管的服务。)
• 3.Filebeat(搜集文件数据
• 4.Winlogbeat(搜集 Windows 事件日志数据)

为什么会用到ELK?

普通的日志分析场景:直接在日志文件中grep、awk就可以获得自己想要的信息,但在规模较大的场景中,此方法效率底下,面临问题包括日志量太大如何归档、文本搜索太慢、如何多纬度的查询。这样我们就需要集中化的日志管理,所有的服务器上的日志收集汇总。解决方案:建立集中式日志收集系统,将所有节点上的日志统一收集,管理,访问。

大型系统是一个分布式部署的架构,不同的服务模块部署在不同的服务器上,问题出现时,大部分情况需要根据问题暴露的关键信息,定位到具体的服务器和服务模块,构建一个集中式日志系统,可以提高定位问题的效率。

完整的集中式日志系统需要包含以下几个特点:

Elasticsearch是个开源分布式搜索引擎,提供搜集、分析、存储数据三大功能它的特点有:分布式,零配置,自动发现,索引自动分片,索引副本机制,restful风格接口,多数据源,自动搜索负载等。

Logstash 主要是用来日志的搜集、分析、过滤日志的工具,支持大量的数据获取方式。一般工作方式为c/s架构,client端安装在需要收集日志的主机上,server端负责将收到的各节点日志进行过滤、修改等操作在一并发往elasticsearch上去。

Kibana: 数据分析与可视化平台,对Elasticsearch存储的数据进行可视化分析,通过表格的形式展现出来。

比如下面这张图:在需要搜集日志信息的服务器上安装logstash,搜集到我们所需要的日志信息后,server端将受到的日志进行过滤,转发到Elasticsearch上,然后通过kibana插件对Elasticsearch存储的数据进行可视化分析,通过表格的形式展现出来。

2、版本说明

Elasticsearch: 6.5.4  #https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.5.4.tar.gz
Logstash: 6.5.4  #https://artifacts.elastic.co/downloads/logstash/logstash-6.5.4.tar.gz
Kibana: 6.5.4  #https://artifacts.elastic.co/downloads/kibana/kibana-6.5.4-linux-x86_64.tar.gz
Kafka: 2.11-2.1  #https://archive.apache.org/dist/kafka/2.1.0/kafka_2.11-2.1.0.tgz
Filebeat: 6.5.4
相应的版本最好下载对应的插件

相关地址:

官网地址:Free and Open Search: The Creators of Elasticsearch, ELK & Kibana | Elastic

官网搭建:Elastic Stack and Product Documentation | Elastic

这个past releases我们可以通过快捷键ctrl+f搜索,快速找到

我们一般不会使用最新版本的,因为最新版本可能会有些坑,所以我们用稳定版的

二、ELK部署

1、ELK常用架构

(1)基础架构:

单一的架构,logstash作为日志搜集器,从数据源采集数据,并对数据进行过滤,格式化处理,然后交由Elasticsearch存储,kibana对日志进行可视化处理。

(2)多节点部署logstash架构

这种架构模式适合需要采集日志的客户端不多,且各服务端cpu,内存等资源充足的情况下。因为每个节点都安装Logstash, 非常消耗节点资源。其中,logstash作为日志搜集器,将每一台节点的数据发送到Elasticsearch上进行存储,再由kibana进行可视化分析。

以下实验我们用的是软件版本:

elasticsearch-6.5.4.tar.gz
jdk-8u191-linux-x64.tar.gz
logstash-7.3.2.tar.gz

2、实验环境

192.168.242.147:Elasticsearch+jdk+kibana        内存2g以上,我给的是3g
192.168.242.148:Elasticsearch+jdk               内存2g以上,给的3g
192.168.242.144:jdk+logstash                    内存1g以上,给的1g
注意:java的运行环境非常的吃内存,为例实验的正常进行,java堆的大小建议设置为系统内存的一半,一般默认的为1g,其两台3g的机器我们就用默认的java堆为1g,第三台1g的我们要把java堆的大小设为512M,最大堆大小与最小堆大小建议一致。

3、Elasticsearch部署

部署之前我们先简单了解一下Elasticsearch的目录以及配置文件内容

ls /usr/local/elasticsearch

配置文件在/usr/local/elasticsearch/config/中,elasticsearch.yml

 配置文件内容简介:

cluster.name        集群名称,各节点配成相同的集群名称。
node.name       节点名称,各节点配置不同。
node.master     指示某个节点是否符合成为主节点的条件。
node.data       指示节点是否为数据节点。数据节点包含并管理索引的一部分。
path.data       数据存储目录。
path.logs       日志存储目录。
bootstrap.memory_lock       内存锁定,是否禁用交换。
bootstrap.system_call_filter    系统调用过滤器。
network.host    绑定节点IP。
http.port       rest api端口。
discovery.zen.ping.unicast.hosts    提供其他 Elasticsearch 服务节点的单点广播发现功能。
discovery.zen.ping_timeout      节点在发现过程中的等待时间。
discovery.zen.fd.ping_retries        节点发现重试次数。
http.cors.enabled               是否允许跨源 REST 请求,用于允许head插件访问ES。
http.cors.allow-origin              允许的源地址。

部署jdk+Elasticsearch

因为Elasticsearch是java开发的,所以需要java环境,我们安装jdk-8u191-linux-x64

192.168.242.147:
将jdk和Elasticsearch的压缩包上传到我们的虚拟机(略)
1、安装jdk:
[root@localhost ~]# ls
jdk-8u191-linux-x64.tar.gz                 elasticsearch-6.5.4.tar.gz
kibana-6.5.4-linux-x86_64.tar.gz
因为我们做的Elasticsearch集群,所以kibana只需要安装到本台机器就行
[root@localhost ~]# tar zxf jdk-8u191-linux-x64.tar.gz -C /usr/local/
[root@localhost ~]# cd /usr/local/
[root@localhost local]# mv jdk1.8.0_191 java
[root@localhost local]# vim /etc/profile
JAVA_HOME=/usr/local/java
PATH=$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
[root@localhost local]# source /etc/profile
[root@localhost local]# java -version
java version "1.8.0_191"
2、安装Elasticsearch:
[root@localhost ~]# tar zxf elasticsearch-6.5.4.tar.gz -C /usr/local/
[root@localhost ~]# cd /usr/local/
[root@localhost local]# mv elasticsearch-6.5.4 elasticsearch
[root@localhost local]# vim elasticsearch/config/elasticsearch.yml
打开之后把里边的内容全部删除,然后添加如下内容:注意配置文件内容的要求非常严格,冒号后面必须有空格,最后一个字母后不能有空格,否则可能会报错
cluster.name: zcg
node.name: elk01
node.master: true
node.data: true
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.242.147", "192.168.242.148"]
discovery.zen.ping_timeout: 150s
discovery.zen.fd.ping_retries: 10
client.transport.ping_timeout: 60s
http.cors.enabled: true
http.cors.allow-origin: "*"
3、#创建对应的数据和日志存储目录以及运行服务的用户
[root@localhost local]# mkdir -p /data/elasticsearch/{data,logs}
Elasticsearch不能用root运行,不然会报错,所以我们需要创建对应的运行用户
[root@localhost ~]# useradd es   #创建运行服务的用户
[root@localhost ~]# echo "123"|passwd --stdin "es"
创建用户之后我们要修改日志和数据存储目录的权限,因为此时目录的属主属组是root,所以我们要修改为es。(以及Elasticsearch的安装目录的权限)
[root@localhost ~]# chown -R es.es /data/elasticsearch/
[root@localhost ~]# chown -R es.es /usr/local/elasticsearch/
4、系统优化
接下来我们需要做系统优化!!!必做,
[root@localhost ~]# vim /etc/security/limits.conf  将以下内容添加到配置文件中
*       soft     nofile          65536
*       hard     nofile          131072
*       soft     nproc           2048
*       hard     nproc           4096
增加最大内存映射数!!!必做,Elasticsearch默认的内存映射数是65536,这不够,不增加的话运行不起来
[root@localhost ~]# echo "vm.max_map_count=262144" >> /etc/sysctl.conf
[root@localhost ~]# sysctl -p    #使刚才追加的内容生效5、设置JVM堆大小
[root@localhost ~]# sed -i 's/-Xms1g/-Xms4g/' /usr/local/elasticsearch/config/jvm.options
[root@localhost ~]# sed -i 's/-Xmx1g/-Xmx4g/' /usr/local/elasticsearch/config/jvm.options注意: 确保堆内存最小值(Xms)与最大值(Xmx)的大小相同,防止程序在运行时改变堆内存大小。 如果系统内存足够大,将堆内存最大和最小值设置为31G,因为有一个32G性能瓶颈问题。 堆内存大小不要超过系统内存的50%。6、启动Elasticsearch
[root@localhost ~]#su - es -c "cd /usr/local/elasticsearch && nohup bin/elasticsearch &"
su - :切换用户,加-c,不切换到该用户执行一条命令
nohup bin/elasticsearch &  后台运行
命令用引号引起来在192.168.242.147这台虚拟机上做一模一样的操作!!!(不用安装kibana)另外在修改Elasticsearch的配置文件的时候需要把第二行的node.name: 设置为不一样的名字,比如node.name: elk02。

浏览器访问测试

报错解决:

memory locking requested for elasticsearch process but memory is not locked
elasticsearch.yml文件
bootstrap.memory_lock : false
/etc/sysctl.conf文件
vm.swappiness=0错误:
max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]意思是elasticsearch用户拥有的客串建文件描述的权限太低,知道需要65536个解决:切换到root用户下面,vim   /etc/security/limits.conf在最后添加
* hard nofile 65536
* hard nofile 65536
重新启动elasticsearch,还是无效?
必须重新登录启动elasticsearch的账户才可以,例如我的账户名是elasticsearch,退出重新登录。
另外*也可以换为启动elasticsearch的账户也可以,* 代表所有,其实比较不合适启动还会遇到另外一个问题,就是
max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
意思是:elasticsearch用户拥有的内存权限太小了,至少需要262114。这个比较简单,也不需要重启,直接执行
# sysctl -w vm.max_map_count=262144
就可以了

4、安装配置head监控插件(Web前端)

只需要安装一台就可以了。192.168.242.147

前提: head插件是Nodejs实现的,所以需要先安装Nodejs。

安装nodejs

nodejs官方下载地址:Node.js

下载linux64位:

[root@localhost ~]# wget https://nodejs.org/dist/v14.17.6/node-v14.17.6-linux-x64.tar.xz
[root@localhost ~]# tar xf node-v14.17.6-linux-x64.tar.xz -C /usr/local/
[root@localhost nodejs]# vim /etc/profile
# 添加 如下配置
NODE_HOME=/usr/local/node-v14.17.6-linux-x64
JAVA_HOME=/usr/local/java
PATH=$NODE_HOME/bin:$JAVA_HOME/bin:$PATH
export JAVA_HOME PATH
#由于我这里,ES也装在了此台机器上,所以环境变量这样配置;不能删除jdk的配置
[root@es-3-head-kib ~]# source /etc/profile
[root@es-3-head-kib ~]# node --version
v14.17.6
[root@es-3-head-kib ~]# npm -v
6.14.15

npm 是随同NodeJS一起安装的包管理工具,能解决NodeJS代码部署上的很多问题。

安装git

需要使用git方式下载head插件,下面安装git:

[root@es-3-head-kib local]# yum install -y git
[root@es-3-head-kib local]# git --version
git version 1.8.3.1

下载安装head插件

[root@localhost ~]#  cd /usr/local/
[root@localhost local]# git clone git://github.com/mobz/elasticsearch-head.git
[root@localhost local]# cd elasticsearch-head/
[root@localhost elasticsearch-head]# npm install   #注意:这里直接安装,可能会失败,如果你的网络没问题,才能下载成功
#也可以将npm源设置为国内淘宝的,确保能下载成功
[root@localhost elasticsearch-head]# npm install -g cnpm --registry=https://registry.npm.taobao.org
[root@localhost elasticsearch-head]# npm install #报错,不用管它

修改地址:如果你的head插件和ES没在一台机器上,需要进行如下2处修改,在一台机器,不修改即可

[root@localhost elasticsearch-head]# vim Gruntfile.js

[root@localhost elasticsearch-head]# vim _site/app.js #配置连接es的ip和port

原本是http://localhost:9200 ,如果head和ES不在同一个节点,注意修改成ES的IP地址

配置elasticsearch,允许head插件访问

[root@localhost ~]# vim /usr/local/elasticsearch-6.5.4/config/elasticsearch.yml
在配置最后面,加2行

测试:

进入到head目录,执行npm run start
[root@localhost ~]# cd /usr/local/elasticsearch-head/
[root@localhost elasticsearch-head]# nohup npm  run start &

启动成功后,在浏览器访问:http://192.168.242.147:9100/ ,内部输入 http://192.168.242.147:9200/ 点击连接测试,输出黄色背景字体说明配置OK。

以上这种方法比较复杂,想简单点的话,我们可以直接在谷歌浏览器上安装elasticsearch-head插件,可以去谷歌应用店里搜索直接下载,找不到的话可以私信博主。

安装好插件之后点击这个插件

输入节点的ip+端口显示green就可以了。

5、部署kibana

192.168.242.147:
[root@localhost ~]# tar zvxf kibana-6.5.4-linux-x86_64.tar.gz -C /usr/local/
[root@localhost ~]# cd /usr/local/
[root@localhost local]# mv kibana-6.5.4-linux-x86_64 kibana
[root@localhost local]# vim kibana/config/kibana.yml
打开文件之后全都是注释,删除所有,添加以下内容
server.port: 5601
server.host: "192.168.242.147"
elasticsearch.url: "http://192.168.242.147:9200"
kibana.index: ".kibana"
[root@localhost local]# cd kibana/
[root@localhost kibana]# nohup ./bin/kibana &  #后台运行配置文件内容含义:
server.port kibana 服务端口,默认5601
server.host kibana 主机IP地址,默认localhost
elasticsearch.url  用来做查询的ES节点的URL,默认http://localhost:9200
kibana.index       kibana在Elasticsearch中使用索引来存储保存的searches, visualizations和dashboards,默认.kibana

浏览器访问192.168.242.147:5601

配置Nginx反向代理

[root@localhost ~]# yum -y install nginx
[root@localhost ~]# rm -rf /etc/nginx/nginx.conf
[root@localhost ~]# cp /etc/nginx/nginx.conf.default /etc/nginx/nginx.conf
[root@localhost ~]# vim /etc/nginx/nginx.conf
user nginx;
worker_processes auto;
error_log /var/log/nginx/error.log;
pid /run/nginx.pid;# Load dynamic modules. See /usr/share/doc/nginx/README.dynamic.
include /usr/share/nginx/modules/*.conf;events {worker_connections 1024;
}http {    将原本的日志格式注释掉
#    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
#                      '$status $body_bytes_sent "$http_referer" '
#                      '"$http_user_agent" "$http_x_forwarded_for"';
#
#    access_log  /var/log/nginx/access.log  main;log_format  json '{"@timestamp":"$time_iso8601",'    #添加json格式的日志'"@version":"1",''"client":"$remote_addr",''"url":"$uri",''"status":"$status",''"domain":"$host",''"host":"$server_addr",''"size":$body_bytes_sent,''"responsetime":$request_time,''"referer": "$http_referer",''"ua": "$http_user_agent"''}';access_log /var/log/nginx/access.log json;sendfile            on;tcp_nopush          on;tcp_nodelay         on;keepalive_timeout   65;types_hash_max_size 4096;include             /etc/nginx/mime.types;default_type        application/octet-stream;include /etc/nginx/conf.d/*.conf;}
然后在子配置文件中添加server块内容
[root@localhost ~]# vim /etc/nginx/conf.d/elasticsearch.conf
server {listen       80;server_name  192.168.242.147;#charset koi8-r;# access_log  /var/log/nginx/host.access.log  main;# access_log off;location / {proxy_pass http://192.168.242.147:5601;proxy_set_header Host $host:5601;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header Via "nginx";}location /status {stub_status on; #开启网站监控状态 access_log /var/log/nginx/kibana_status.log; #监控日志 auth_basic "NginxStatus"; }location /head/{proxy_pass http://192.168.242.147:9100;proxy_set_header Host $host:9100;proxy_set_header X-Real-IP $remote_addr;proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;proxy_set_header Via "nginx";}
}[root@localhost ~]# nginx -t
[root@localhost ~]# systemctl start nginx

直接访问ip不加端口

6、部署logstash

在192.168.242.144上安装logstash

1、安装jdk8

安装jdk,步骤跟上面一样,过程省略。

2、安装logstash

[root@localhost ~]# tar zxf logstash-7.3.2.tar.gz -C /usr/local/
[root@localhost ~]# cd /usr/local/
[root@localhost local]# mv logstash-7.3.2 logstash
[root@localhost ~]# ln -s /usr/local/logstash/bin/logstash /usr/bin/logstash

3、测试logstash

[root@localhost ~]# logstash -e 'input { stdin{} } output { stdout{} }'
-e 后面跟搜集定义输出(input [filter] output)后面跟{}zhuzhuxia      #输入这个
/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{"host" => "localhost.localdomain","@timestamp" => 2022-04-28T16:50:38.814Z,"message" => "zhuzhuxia",        #输出这个"@version" => "1"
}使用rubydebug详细输出  版本 时间戳
[root@localhost ~]# logstash -e 'input { stdin{} } output { stdout{ codec => rubydebug} }'
codec 定义编码器 用什么格式输出*把内容写到elasticsearch中
[root@localhost ~]# logstash -e 'input { stdin{} } output {  elasticsearch { hosts => ["192.168.242.147:9200"]} }'
#输入下面的测试数据
zhuzhu
ggboy
xiaodaidai
在浏览器的head监控页面刷新,可以看到刚才输入的内容

注意:
master收集到日志后,会把一部分数据碎片到salve上(随机的一部分数据),master和slave又都会各自做副本,并把副本放到对方机器上,这样就保证了数据不会丢失。

如下,master收集到的数据放到了自己的第1,3分片上,其他的放到了slave的第0,2,4分片上。

即写到elasticsearch中又写在文件中一份
logstash -e 'input { stdin{} } output {  elasticsearch { hosts => ["192.168.242.147:9200"]} stdout{ codec =>  rubydebug}}'
alice
/usr/local/logstash/vendor/bundle/jruby/2.5.0/gems/awesome_print-1.7.0/lib/awesome_print/formatters/base_formatter.rb:31: warning: constant ::Fixnum is deprecated
{"host" => "localhost.localdomain","message" => "alice","@version" => "1","@timestamp" => 2022-04-28T17:00:39.849Z
}
在浏览器上也会有一份

测试:

日志搜集

input {file {path => "/var/log/messages"type => "system"start_position => "beginning"}
}
input {file {path => "/var/log/elasticsearch/huanqiu.log"type => "es-error"start_position => "beginning"}
}
output {if [type] == "system"{elasticsearch {hosts => ["192.168.122.119:9200"]index => "system-%{+YYYY.MM.dd}"}}if [type] == "es-error"{elasticsearch {hosts => ["192.168.122.119:9200"]index => "es-error-%{+YYYY.MM.dd}"}}
}参数说明:input 上传数据    path 指定日志文件的路径start_position 从头开始hosts 输出到那台elasticsearch serverindex  定义格式,显示到web界面上注意:
搜集单个文件不需要if
搜集多个文件需要+if
[root@localhost ~]# vim file.conf
input {file {path => "/var/log/yum.log"     #路径必须正确,日志必须存在type => "system"start_position => "beginning"}
}
output {elasticsearch {hosts => ["192.168.242.147:9200"]index => "system-%{+YYYY.MM.dd}"}
}
[root@localhost ~]# logstash -f file.conf   执行这个file.conf文件搜集yum.log日志的数据
在这台机器上用yum任意下载一个服务,浏览器查看

测试:

nginx日志搜集

nginx server 安装logstash 进行日志的搜集,并上传到elasticserach服务器
修改nginx的配置文件
[root@localhost ~]# vim /etc/nginx/nginx.conf
.....log_format json '{"@timestamp":"$time_iso8601",''"@version":"1",''"client":"$remote_addr",''"url":"$uri",''"status":"$status",''"domain":"$host",''"host":"$server_addr",''"size":$body_bytes_sent,''"responsetime":$request_time,''"referer": "$http_referer",''"ua": "$http_user_agent"''}';
....
server {listen       80;server_name  localhost;access_log /var/log/nginx/zcg.access.log json;
....
}    重新加载nginx的配置文件上传到elasticsearch server
[root@localhost ~]# vim nginx.conf
input {file {path => "/var/log/nginx/zcg.access.log"type => "nginx_access"codec => "json"start_position => "beginning"}
}
output {elasticsearch {hosts => ["192.168.242.147:9200"]index => "nginx_zcg_access-%{+YYYY.MM.dd}"}
}
[root@localhost ~]# logstash -f nginx.conf
浏览器访问一下nginx的网站页面,再去查看head监控

没刷新一下nginx的网站页面,就会多一条记录

然后可以去配置kibana添加索引

设置一下刷新时间,然后再访问nginx的页面,

然后我们可以创建可视化

根据需要选择

然后也可以创建仪表盘

三、 kafka部署

1、简介

kafka是一个分布式的消息发布 — 订阅系统(kafka其实是消息队列)  官方网址:http://kafka.apache.org/

kafka是最初由LinkedIn公司开发,是一个分布式、支持分区的(partion)、多副本的(replica),基于zookeeper协调的分布式消息系统,他的最大特性就是可以实时的处理大数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,linkedin于2010年贡献给Apache基金会并成为顶级开源项目。

Kafka的特性:

- 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consumer操作。

- 可扩展性:kafka集群支持热扩展

- 可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失

- 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)

- 高并发:支持数千个客户端同时读写

 kafka组件:

话题(Topic):是特定类型的消息流。消息是字节的有效负载(Payload),话题是消息的分类名或种子(Feed)名。(每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的)。

生产者(Producer):是能够发布消息到话题的任何对象(发布消息到 kafka 集群的终端或服务).

服务代理(Broker):已发布的消息保存在一组服务器中,它们被称为代理(Broker)或Kafka集群。

消费者(Consumer):可以订阅一个或多个话题,并从Broker拉数据,从而消费这些已发布的消息。

partition(区):partition 是物理上的概念,每个 topic 包含一个或多个 partition。每一个topic将被分为多个partition(区)。

Consumer group:high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。

replication:partition 的副本,保障 partition 的高可用。

leader:replication 中的一个角色, producer 和 consumer 只跟 leader 交互。

follower:replication 中的一个角色,从 leader 中复制数据。

controller:kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。

zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息。

2、部署kafka集群

kafka-1:192.168.242.144

kafka-2:192.168.242.145

kafka-3:192.168.242.146

实验所需要的包:

jdk-8u121-linux-x64.tar.gz
kafka_2.11-2.1.0.tgz

1、安装jdk,步骤同上个实验,过程略

2、安装ZK

Kafka运行依赖ZK,Kafka官网提供的tar包中,已经包含了ZK,这里不再额下载ZK程序。

kafka-1:
[root@localhost ~]# tar zxf kafka_2.11-2.1.0.tgz -C /usr/local/
[root@localhost ~]# cd /usr/local/
[root@localhost local]# mv kafka_2.11-2.1.0 kafka
[root@localhost local]# vim /usr/local/kafka/config/zookeeper.properties
将文件中的内容全都删掉或者注释掉,添加以下内容
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.242.144:2888:3888
server.2=192.168.242.145:2888:3888
server.3=192.168.242.146:2888:3888
[root@localhost ~]# mkdir -p /opt/data/zookeeper/{data,logs}
[root@localhost ~]# echo 1 > /opt/data/zookeeper/data/myidkafka-2:
[root@localhost ~]# tar zxf kafka_2.11-2.1.0.tgz -C /usr/local/
[root@localhost ~]# cd /usr/local/
[root@localhost local]# mv kafka_2.11-2.1.0 kafka
[root@localhost local]# vim /usr/local/kafka/config/zookeeper.properties
将文件中的内容全都删掉或者注释掉,添加以下内容
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.242.144:2888:3888
server.2=192.168.242.145:2888:3888
server.3=192.168.242.146:2888:3888
[root@localhost ~]# mkdir -p /opt/data/zookeeper/{data,logs}
[root@localhost ~]# echo 2 > /opt/data/zookeeper/data/myidkafka-3:
[root@localhost ~]# tar zxf kafka_2.11-2.1.0.tgz -C /usr/local/
[root@localhost ~]# cd /usr/local/
[root@localhost local]# mv kafka_2.11-2.1.0 kafka
[root@localhost local]# vim /usr/local/kafka/config/zookeeper.properties
将文件中的内容全都删掉或者注释掉,添加以下内容
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.242.144:2888:3888
server.2=192.168.242.145:2888:3888
server.3=192.168.242.146:2888:3888
[root@localhost ~]# mkdir -p /opt/data/zookeeper/{data,logs}
[root@localhost ~]# echo 3 > /opt/data/zookeeper/data/myid

配置文件内容解释:

dataDir         ZK数据存放目录。
dataLogDir      ZK日志存放目录。
clientPort      客户端连接ZK服务的端口。
tickTime        ZK服务器之间或客户端与服务器之间维持心跳的时间间隔。
initLimit       允许follower(相对于Leaderer言的“客户端”)连接并同步到Leader的初始化连接时间,以tickTime为单位。 当初始化连接时间超过该值,则表示连接失败。
syncLimit        Leader与Follower之间发送消息时,请求和应答时间长度。如果follower在设置时间内不能与leader通信,那么此follower将会被丢弃。
server.1=172.16.244.31:2888:3888    2888是follower与leader交换信息的端口,3888是当leader挂了时用来执行选举时服务器相互通信的端口。

3、配置kafka

kafka-1:
[root@localhost ~]#  vim  /usr/local/kafka/config/server.properties
打开之后发现全部都是注释将文件内的内容全都删掉,或者在最下面添加以下内容
broker.id=1
listeners=PLAINTEXT://192.168.242.144:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.242.144:2181,192.168.242.145:2181,192.168.242.146:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@localhost ~]# mkdir -p /data/kafka/logskafka-2:
[root@localhost ~]#  vim  /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.242.145:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.242.144:2181,192.168.242.145:2181,192.168.242.146:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@localhost ~]# mkdir -p /data/kafka/logskafka-3:
[root@localhost ~]#  vim  /usr/local/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.242.146:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.242.144:2181,192.168.242.145:2181,192.168.242.146:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@localhost ~]# mkdir -p /data/kafka/logs

配置文件内容解释:

broker.id   每个server需要单独配置broker id,如果不配置系统会自动配置。
listeners       监听地址,格式PLAINTEXT://IP:端口。
num.network.threads 接收和发送网络信息的线程数。
num.io.threads          服务器用于处理请求的线程数,其中可能包括磁盘I/O。
socket.send.buffer.bytes    套接字服务器使用的发送缓冲区(SO_SNDBUF)
socket.receive.buffer.bytes 套接字服务器使用的接收缓冲区(SO_RCVBUF)
socket.request.max.bytes        套接字服务器将接受的请求的最大大小(防止OOM)
log.dirs        日志文件目录。
num.partitions  partition数量。
num.recovery.threads.per.data.dir       在启动时恢复日志、关闭时刷盘日志每个数据目录的线程的数量,默认1。
offsets.topic.replication.factor        偏移量话题的复制因子(设置更高保证可用),为了保证有效的复制,偏移话题的复制因子是可配置的,在偏移话题的第一次请求的时候可用的broker的数量至少为复制因子的大小,否则要么话题创建失败,要么复制因子取可用broker的数量和配置复制因子的最小值。
log.retention.hours 日志文件删除之前保留的时间(单位小时),默认168
log.segment.bytes   单个日志文件的大小,默认1073741824
log.retention.check.interval.ms 检查日志段以查看是否可以根据保留策略删除它们的时间间隔。
zookeeper.connect   ZK主机地址,如果zookeeper是集群则以逗号隔开。
zookeeper.connection.timeout.ms     连接到Zookeeper的超时时间。

先启动zookeeper,再启动kafka

启动zookeeper:
三个节点依次执行:
[root@localhost ~]# cd /usr/local/
[root@localhost kafka]# nohup bin/zookeeper-server-start.sh config/zookeeper.properties &验证:
echo conf | nc 127.0.0.1 2181
clientPort=2181
dataDir=/opt/data/zookeeper/data/version-2
dataLogDir=/opt/data/zookeeper/logs/version-2
tickTime=2000
maxClientCnxns=60
minSessionTimeout=4000
maxSessionTimeout=40000
serverId=1
initLimit=20
syncLimit=10
electionAlg=3
electionPort=3888
quorumPort=2888
peerType=0echo stat | nc 127.0.0.1 2181
Zookeeper version: 3.4.13-2d71af4dbe22557fda74f9a9b4309b15a7487f03, built on 06/29/2018 00:39 GMT
Clients:/127.0.0.1:50468[0](queued=0,recved=1,sent=0)Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x500000002
Mode: follower
Node count: 171启动kafka:三个节点依次启动
[root@localhost ~]# cd /usr/local/kafka
[root@localhost kafka]# nohup bin/zookeeper-server-start.sh config/server.properties &

验证:

kafka-1创建topic
[root@localhost kafka]# bin/kafka-topics.sh --create --zookeeper 192.168.242.144:2181 --replication-factor 1 --partitions 1 --topic zitai
Created topic "zitai".模拟消息生产,发送消息到kafka-3
[root@localhost kafka]# bin/kafka-console-producer.sh --broker-list 192.168.242.146:9092 --topic zitai   回车之后先不要发送消息
>  kafka-3接受来自kafka-1的消息
[root@localhost kafka]# bin/kafka-console-consumer.sh --bootstrap-server  192.168.242.144:9092 --topic zitai --from-beginning 然后就可以在kafka-1上发送消息验证:
[root@localhost kafka]# bin/kafka-console-producer.sh --broker-list 192.168.242.146:9092 --topic zitai
>hello   #回车kafka-3就可以收到消息
[root@localhost kafka]# bin/kafka-console-consumer.sh --bootstrap-server  192.168.242.144:9092 --topic zitai --from-beginning
hello查询topic
[root@localhost kafka]# bin/kafka-topics.sh --zookeeper 192.168.242.144:2181 --list
zitai查看主题的信息:
[root@localhost kafka]# /bin/kafka-topics.sh --describe --zookeeper 192.168.242.144:2181 --topic zitai

3、部署elk+kafka

第一层,数据采集层

最左边的是业务服务器集群,上面安装了filebeat做日志采集,同时把采集的日志分别发送给两个logstash服务。

第二层,数据处理层,数据缓存层

logstash服务把接收到的日志通过格式处理,转存到本地的kafka broker+zookeeper集群中。

第三层,数据转发层

这个单独的logstash节点会实时去kafka broker集群拉取数据,转发至ES DataNode。

第四层,数据持久化存储

ES DateNode会把收到的数据,写入磁盘,并建立索引库。

第五层,数据检索,数据展示 

ES Master+Kibana主要协调ES集群,处理数据检索请求,数据展示

注意:在企业生产环境中,我们是不会在web服务器上安装logstash的,因为logstash比较消耗服务器资源,而我们web服务器是服务客户的,用logstash的话会降低处理客户请求的效率,一般会用filebeat,这个下文会讲,下面这个实验是为了说了logstash的使用方法。

由于虚拟机配置限制,我们的elk+kafka集群比较简略

kafka-1:192.168.242.144

kafka-2:192.168.242.145

kafka-3:192.168.242.146   (logstash安装在这台机器上,用来转发信息到Elasticsearch)

Elasticsearch+kibana:192.168.242.147  (这个是第一个实验Elasticsearch中的第一台机器,把第二台机器关掉,等于是单节点的Elasticsearch,把配置文件中有关内容注释掉,以免出现错误)

vim /usr/local/elasticsearch/config/elasticsearch.yml

安装logstash

kafka-3:安装logstash
[root@localhost ~]# tar zxf logstash-7.3.2.tar.gz -C /usr/local
[root@localhost ~]# cd /usr/local
[root@localhost local]# mv logstash-7.3.2 logstash
[root@localhost local]# ln -s /usr/local/logstash/bin/logstash /usr/bin/logstash
编辑logstash的配置文件,用来搜集转发信息
[root@localhost ~]# vim kafka.conf
input {kafka {topics => "mieba"decorate_events => truebootstrap_servers => "192.168.242.144:9092,192.168.242.145:9092,192.168.242.146:9092,"}
}
output {elasticsearch {hosts => ["192.168.242.147:9200"]index => 'kafka-%{+YYYY-MM-dd}'}
}

执行logstash的配置文件,然后在kafka-1上生产数据,在浏览器head监控插件上验证,是否转发到了Elasticsearch

kafka-3:
[root@localhost kafka]# logstash -f /root/kafka.conf
.....
[2022-05-01T15:41:18,178][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}    #看到这个successfully,并且没有error,说明执行成功,然后可以在kafka-1生产信息验证了kafka-1:
[root@localhost kafka]# bin/kafka-console-producer.sh --broker-list 192.168.242.144:9092 --topic mieba   #话题要保证与logstash的配置文件中的一致
>123
>64135164632
>3164613kafka-3:
会显示[2022-05-01T15:41:18,178][INFO ]................

在Elasticsearch的head监控插件上查看

然后可以用kibana创建索引,图表,仪表盘。

(然后每次执行logstash收集信息的文件,在kafka-1生产数据,都会有logstash转发到Elasticsearch,再由kibana展示)

四、filebeat

1、filebeat简介

Filebeat是本地文件的日志数据采集器。 作为服务器上的代理安装,Filebeat监视日志目录或特定日志文件,并将它们转发给Elasticsearch或Logstash进行索引、kafka 等。

Filebeat隶属于Beats,轻量级数据收集引擎。基于原先 Logstash-fowarder 的源码改造出来。换句话说:Filebeat就是新版的 Logstash-fowarder,也会是 ELK Stack 在 Agent 的第一选择,目前Beats包含四种工具:

  • 1.Packetbeat(搜集网络流量数据)
  • 2.Metricbeat(搜集系统、进程和文件系统级别的 CPU 和内存使用情况等数据。)
  • 3.Filebeat(搜集文件数据)
  • 4.Winlogbeat(搜集 Windows 日志数据)

2、工作原理

Filebeat由两个主要组件组成:prospectorharvester。这些组件一起工作来读取文件并将事件数据发送到您指定的输出。

启动filebeat时,它会启动一个或多个查找器,查看您为日志文件指定的本地路径。对于prospector所在的每个日志文件,prospector启动harvester。每个harvester都会为新内容读取单个日志文件,并将新日志数据发送到libbeat,后者将聚合时间并将聚合数据发送到您为filebeat配置的输出。

harvester:负责读取单个文件的内容。读取每个文件,并将内容发送到配置的输出位置,每个文件启动一个harvester,harvester负责打开和关闭文件,这意味着在运行是文件描述符保持打开状态,如果文件在读取时被删除,filebeat将继续读取文件。【确保至少一次交付,(filebeat意外关闭/停止),而不是说把要传输的日志文件给删了。未完成的时间保存在注册文件中,filebeat在次启动的时候,拿到这事件,继续完成未完成的操作(重复输出)】

prospector负责管理harvester并找到所有要读取的文件来源。如果输入类型为日志,则查找器将查找路径匹配的所有文件,并为每个文件启动一个harvester。每个harvester都在自己的GO协程中运行。

注意:

filebeat目前支持两种prospector类型:log和stdin

每个prospector类型可以定义多次。

日志文件prospector检查每个文件以查看harvester是否需要启动,是否已经运行,或者该文件是否可以被忽略。

filebeat prospector只读取本地文件,没有功能可以连接到远程主机来读取存储的文件或日志。

filebeat如何确保至少一次交付?

Filebeat保证事件至少会被传送到配置的输出一次,并且不会丢失数据。 Filebeat能够实现此行为,因为它将每个事件的传递状态存储在注册文件中。在输出阻塞或未确认所有事件的情况下,Filebeat将继续尝试发送事件,直到接收端确认已收到。

如果Filebeat在发送事件的过程中关闭,它不会等待输出确认所有收到事件。发送到输出但在Filebeat关闭前未确认的任何事件在重新启动Filebeat时会再次发送。这可以确保每个事件至少发送一次,但最终会将重复事件发送到输出。

也可以通过设置shutdown_timeout选项来配置Filebeat以在关闭之前等待特定时间。

为什么使用filebeat?

Filebeat是一种轻量级的日志搜集器,其不占用系统资源,自出现之后,迅速更新了原有的elk架构。Filebeats将收集到的数据发送给Logstash解析过滤,在Filebeats与Logstash传输数据的过程中,为了安全性,可以通过ssl认证来加强安全性。之后将其发送到Elasticsearch存储,并由kibana可视化分析。

3、filebeat参数说明

-c, --c FILE (常用)指定用于Filebeat的配置文件。 你在这里指定的文件是相对于path.config。 如果未指定-c标志,则使用默认配置文件filebeat.yml。-d, --d SELECTORS启用对指定选择器的调试。 对于选择器,可以指定逗号分隔的组件列表,也可以使用-d“*”为所有组件启用调试。 例如,-d “publish”显示所有“publish”相关的消息。-e, --e(常用)记录到stderr并禁用syslog /文件输出。-v, --v记录INFO级别的消息。

例:

1.测试filebeat启动后,查看相关输出信息:
./filebeat -e -c filebeat.yml -d "publish"2.后台方式启动filebeat:
nohup ./filebeat -e -c filebeat.yml >/dev/null 2>&1 &  将所有标准输出及标准错误输出到/dev/null空设备,即没有任何输出(不建议使用)
nohup ./filebeat -e -c filebeat.yml  &    3.停止filebeat:ps -ef |grep filebeat | kill -9  进程号

4、filebeat配置文件说明书

比较常用重要的部分我标注出来了

############### Filebeat 配置文件说明#############
filebeat:# List of prospectors to fetch data.prospectors:-# paths指定要监控的日志   #常用paths:- /var/log/*.log#指定被监控的文件的编码类型使用plain和utf-8都是可以处理中文日志的。# Some sample encodings:#   plain, utf-8, utf-16be-bom, utf-16be, utf-16le, big5, gb18030, gbk,#    hz-gb-2312, euc-kr, euc-jp, iso-2022-jp, shift-jis, ...#encoding: plain#指定文件的输入类型log(默认)或者stdin。 常用input_type: log# 在输入中排除符合正则表达式列表的那些行# exclude_lines: ["^DBG"]# 包含输入中符合正则表达式列表的那些行默认包含所有行include_lines执行完毕之后会执行exclude_lines。# include_lines: ["^ERR", "^WARN"]# 忽略掉符合正则表达式列表的文件默认为每一个符合paths定义的文件都创建一个harvester。# exclude_files: [".gz$"]# 向输出的每一条日志添加额外的信息比如“level:debug”方便后续对日志进行分组统计。默认情况下会在输出信息的fields子目录下以指定的新增fields建立子目录例如fields.level。#fields:#  level: debug#  review: 1# 如果该选项设置为true则新增fields成为顶级目录而不是将其放在fields目录下。自定义的field会覆盖filebeat默认的field。#fields_under_root: false# 可以指定Filebeat忽略指定时间段以外修改的日志内容比如2h两个小时或者5m(5分钟)。#ignore_older: 0# 如果一个文件在某个时间段内没有发生过更新则关闭监控的文件handle。默认1h,change只会在下一次scan才会被发现   常用#close_older: 1h# 设定Elasticsearch输出时的document的type字段也可以用来给日志进行分类。Default: log  常用#document_type: log# Filebeat以多快的频率去prospector指定的目录下面检测文件更新比如是否有新增文件如果设置为0s则Filebeat会尽可能快地感知更新占用的CPU会变高。默认是10s。#scan_frequency: 10s# 每个harvester监控文件时使用的buffer的大小。#harvester_buffer_size: 16384# 日志文件中增加一行算一个日志事件max_bytes限制在一次日志事件中最多上传的字节数多出的字节会被丢弃。The default is 10MB.#max_bytes: 10485760# 适用于日志中每一条日志占据多行的情况比如各种语言的报错信息调用栈。这个配置的下面包含如下配置#multiline:# The regexp Pattern that has to be matched. The example pattern matches all lines starting with [#pattern: ^\[# Defines if the pattern set under pattern should be negated or not. Default is false.#negate: false# Match can be set to "after" or "before". It is used to define if lines should be append to a pattern# that was (not) matched before or after or as long as a pattern is not matched based on negate.# Note: After is the equivalent to previous and before is the equivalent to to next in Logstash#match: after# The maximum number of lines that are combined to one event.# In case there are more the max_lines the additional lines are discarded.# Default is 500#max_lines: 500# After the defined timeout, an multiline event is sent even if no new pattern was found to start a new event# Default is 5s.#timeout: 5s# 如果设置为trueFilebeat从文件尾开始监控文件新增内容把新增的每一行文件作为一个事件依次发送而不是从文件开始处重新发送所有内容。#tail_files: false# Filebeat检测到某个文件到了EOF之后每次等待多久再去检测文件是否有更新默认为1s。#backoff: 1s# Filebeat检测到某个文件到了EOF之后等待检测文件更新的最大时间默认是10秒。#max_backoff: 10s# 定义到达max_backoff的速度默认因子是2到达max_backoff后变成每次等待max_backoff那么长的时间才backoff一次直到文件有更新才会重置为backoff。#backoff_factor: 2# 这个选项关闭一个文件,当文件名称的变化。#该配置选项建议只在windows。#force_close_files: false# Additional prospector#-# Configuration to use stdin input#input_type: stdin# spooler的大小spooler中的事件数量超过这个阈值的时候会清空发送出去不论是否到达超时时间。#spool_size: 2048# 是否采用异步发送模式(实验!)#publish_async: false# spooler的超时时间如果到了超时时间spooler也会清空发送出去不论是否到达容量的阈值。#idle_timeout: 5s# 记录filebeat处理日志文件的位置的文件registry_file: /var/lib/filebeat/registry# 如果要在本配置文件中引入其他位置的配置文件可以写在这里需要写完整路径但是只处理prospector的部分。#config_dir:############################# Output ############# 输出到数据配置.单个实例数据可以输出到elasticsearch或者logstash选择其中一种,注释掉另外一组输出配置。 常用
output:### 输出数据到Elasticsearchelasticsearch:# IPv6 addresses should always be defined as: https://[2001:db8::1]:9200hosts: ["localhost:9200"]# 输出认证.#protocol: "https"#username: "admin"#password: "s3cr3t"# 启动进程数.#worker: 1# 输出数据到指定index default is "filebeat"  可以使用变量[filebeat-]YYYY.MM.DD keys.  常用#index: "filebeat"index: "filebeat-testindex-%{+yyyy.MM.dd}"
setup.template.name: "filebeattest"
setup.template.pattern: "filebeattest-*"# 一个模板用于设置在Elasticsearch映射默认模板加载是禁用的,没有加载模板这些设置可以调整或者覆盖现有的加载自己的模板#template:# Template name. default is filebeat.#name: "filebeat"# Path to template file#path: "filebeat.template.json"# Overwrite existing template#overwrite: false# Optional HTTP Path#path: "/elasticsearch"# Proxy server url#proxy_url: http://proxy:3128# 发送重试的次数取决于max_retries的设置默认为3#max_retries: 3# 单个elasticsearch批量API索引请求的最大事件数。默认是50。#bulk_max_size: 50# elasticsearch请求超时事件。默认90秒.#timeout: 90# 新事件两个批量API索引请求之间需要等待的秒数。如果bulk_max_size在该值之前到达额外的批量索引请求生效。#flush_interval: 1# elasticsearch是否保持拓扑。默认false。该值只支持Packetbeat。#save_topology: false# elasticsearch保存拓扑信息的有效时间。默认15秒。#topology_expire: 15# 配置TLS参数选项如证书颁发机构等用于基于https的连接。如果tls丢失主机的CAs用于https连接elasticsearch。#tls:# List of root certificates for HTTPS server verifications#certificate_authorities: ["/etc/pki/root/ca.pem"]# Certificate for TLS client authentication#certificate: "/etc/pki/client/cert.pem"# Client Certificate Key#certificate_key: "/etc/pki/client/cert.key"# Controls whether the client verifies server certificates and host name.# If insecure is set to true, all server host names and certificates will be# accepted. In this mode TLS based connections are susceptible to# man-in-the-middle attacks. Use only for testing.#insecure: true# Configure cipher suites to be used for TLS connections#cipher_suites: []# Configure curve types for ECDHE based cipher suites#curve_types: []# Configure minimum TLS version allowed for connection to logstash#min_version: 1.0# Configure maximum TLS version allowed for connection to logstash#max_version: 1.2### 发送数据到logstash 单个实例数据可以输出到elasticsearch或者logstash选择其中一种注释掉另外一组输出配置。#logstash:# Logstash 主机地址#hosts: ["localhost:5044"]# 配置每个主机发布事件的worker数量。在负载均衡模式下最好启用。#worker: 1# #发送数据压缩级别#compression_level: 3# 如果设置为TRUE和配置了多台logstash主机输出插件将负载均衡的发布事件到所有logstash主机。#如果设置为false输出插件发送所有事件到随机的一台主机上如果选择的不可达将切换到另一台主机。默认是false。#loadbalance: true# 输出数据到指定index default is "filebeat"  可以使用变量[filebeat-]YYYY.MM.DD keys.#index: filebeat# Optional TLS. By default is off.#配置TLS参数选项如证书颁发机构等用于基于https的连接。如果tls丢失主机的CAs用于https连接elasticsearch。#tls:# List of root certificates for HTTPS server verifications#certificate_authorities: ["/etc/pki/root/ca.pem"]# Certificate for TLS client authentication#certificate: "/etc/pki/client/cert.pem"# Client Certificate Key#certificate_key: "/etc/pki/client/cert.key"# Controls whether the client verifies server certificates and host name.# If insecure is set to true, all server host names and certificates will be# accepted. In this mode TLS based connections are susceptible to# man-in-the-middle attacks. Use only for testing.#insecure: true# Configure cipher suites to be used for TLS connections#cipher_suites: []# Configure curve types for ECDHE based cipher suites#curve_types: []### 文件输出将事务转存到一个文件每个事务是一个JSON格式。主要用于测试。也可以用作logstash输入。#file:# 指定文件保存的路径。#path: "/tmp/filebeat"# 文件名。默认是 Beat 名称。上面配置将生成 packetbeat, packetbeat.1, packetbeat.2 等文件。#filename: filebeat# 定义每个文件最大大小。当大小到达该值文件将轮滚。默认值是1000 KB#rotate_every_kb: 10000# 保留文件最大数量。文件数量到达该值将删除最旧的文件。默认是7一星期。#number_of_files: 7### Console output 标准输出JSON 格式。# console:#如果设置为TRUE事件将很友好的格式化标准输出。默认false。#pretty: false############################# Shipper #############shipper:# #日志发送者信息标示# 如果没设置以hostname名自居。该名字包含在每个发布事务的shipper字段。可以以该名字对单个beat发送的所有事务分组。#name:# beat标签列表包含在每个发布事务的tags字段。标签可用很容易的按照不同的逻辑分组服务器。#例如一个web集群服务器可以对beat添加上webservers标签然后在kibana的visualisation界面以该标签过滤和查询整组服务器。#tags: ["service-X", "web-tier"]# 如果启用了ignore_outgoing选项beat将忽略从运行beat服务器上所有事务。#ignore_outgoing: true# 拓扑图刷新的间隔。也就是设置每个beat向拓扑图发布其IP地址的频率。默认是10秒。#refresh_topology_freq: 10# 拓扑的过期时间。在beat停止发布其IP地址时非常有用。当过期后IP地址将自动的从拓扑图中删除。默认是15秒。#topology_expire: 15# Internal queue size for single events in processing pipeline#queue_size: 1000# GeoIP数据库的搜索路径。beat找到GeoIP数据库后加载然后对每个事务输出client的GeoIP位置目前只有Packetbeat使用该选项。#geoip:#paths:#  - "/usr/share/GeoIP/GeoLiteCity.dat"#  - "/usr/local/var/GeoIP/GeoLiteCity.dat"############################# Logging ############## 配置beats日志。日志可以写入到syslog也可以是轮滚日志文件。默认是syslog。
logging:# 如果启用发送所有日志到系统日志。#to_syslog: true# 日志发送到轮滚文件。#to_files: false# files:# 日志文件目录。#path: /var/log/mybeat# 日志文件名称#name: mybeat# 日志文件的最大大小。默认 10485760 (10 MB)。rotateeverybytes: 10485760 # = 10MB# 保留日志周期。 默认 7。值范围为2 到 1024。#keepfiles: 7# Enable debug output for selected components. To enable all selectors use ["*"]# Other available selectors are beat, publish, service# Multiple selectors can be chained.#selectors: [ ]# 日志级别。debug, info, warning, error 或 critical。如果使用debug但没有配置selectors* selectors将被使用。默认error。#level: error

5、部署ELK+kafka+filebeat

实验环境:

elasticsearch-1+kibana:192.168.242.144

elasticsearch-2:192.168.242.145

logstash:192.168.242.146

kafka-1:192.168.242.147

kafka-2:192.168.242.148

kafka-3:192.168.242.149

filebeat+nginx:192.168.242.150

实验思路:

filebeat搜集nginx的访问日志,输出到kafka,filebeat此时相当于生产者(只是针对kafka来说的),kafka对日志消息进行队列,logstash将kafka中某个话题中(话题需要提前存在)的数据转发到elasticsearch集群(logstash此时对于kafka相当于消费者),最后由kibana展示。

实验过程:

1、部署elasticsearch集群:

1、部署java环境;(略)
2、安装elasticsearch
elasticsearch-1:
[root@localhost ~]# useradd es
[root@localhost ~]# echo "1" |passwd --stdin "es"
[root@localhost ~]# tar zxf elasticsearch-6.5.4.tar.gz -C /usr/local
[root@localhost ~]# mv /usr/local/elasticsearch-6.5.4 /usr/local/elasticsearch
[root@localhost ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml
cluster.name: zcg
node.name: elk01
node.master: true
node.data: true
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.242.144","192.168.242.145"]
discovery.zen.ping_timeout: 150s
discovery.zen.fd.ping_retries: 10
client.transport.ping_timeout: 60s
http.cors.enabled: true
http.cors.allow-origin: "*"
[root@localhost ~]# mkdir -p /data/elasticsearch/{data,logs}
[root@localhost ~]# chown -R elsearch:elsearch /data/elasticsearch
[root@localhost ~]# chown -R elsearch:elsearch /usr/local/elasticsearch
[root@localhost ~]# vim /etc/security/limits.conf    将以下内容添加到配置文件中。(*表示所有用户)
*       soft     nofile          65536
*       hard     nofile          131072
*       soft     nproc           2048
*       hard     nproc           4096
[root@localhost ~]# echo "vm.max_map_count=262144" >> /etc/sysctl.conf
[root@localhost ~]# sysctl -pelasticsearch-2:
[root@localhost ~]# useradd es
[root@localhost ~]# echo "1" |passwd --stdin "es"
[root@localhost ~]# tar zxf elasticsearch-6.5.4.tar.gz -C /usr/local
[root@localhost ~]# mv /usr/local/elasticsearch-6.5.4 /usr/local/elasticsearch
[root@localhost ~]# vim /usr/local/elasticsearch/config/elasticsearch.yml
cluster.name: zcg
node.name: elk02
node.master: true
node.data: true
path.data: /data/elasticsearch/data
path.logs: /data/elasticsearch/logs
bootstrap.memory_lock: false
bootstrap.system_call_filter: false
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["192.168.242.144","192.168.242.145"]
discovery.zen.ping_timeout: 150s
discovery.zen.fd.ping_retries: 10
client.transport.ping_timeout: 60s
http.cors.enabled: true
http.cors.allow-origin: "*"
[root@localhost ~]# mkdir -p /data/elasticsearch/{data,logs}
[root@localhost ~]# chown -R elsearch:elsearch /data/elasticsearch
[root@localhost ~]# chown -R elsearch:elsearch /usr/local/elasticsearch
[root@localhost ~]# vim /etc/security/limits.conf    将以下内容添加到配置文件中。(*表示所有用户)
*       soft     nofile          65536
*       hard     nofile          131072
*       soft     nproc           2048
*       hard     nproc           4096
[root@localhost ~]# echo "vm.max_map_count=262144" >> /etc/sysctl.conf
[root@localhost ~]# sysctl -p两台都部署好之后在启动elasticsearch
[root@localhost ~]# su - es -c "cd /usr/local/elasticsearch && nohup bin/elasticsearch &"在浏览器上使用head插件,查看集群状态。

2、部署kibana:

kibana部署在elasticsearch-1上:
1、jdk部署,在部署elasticsearch的时候已经部署好
2、安装kibana
[root@localhost ~]# tar zxf kibana-6.5.4-linux-x86_64.tar.gz -C /usr/local
[root@localhost ~]# mv /usr/local/kibana-6.5.4 /usr/local/kibana
[root@localhost ~]# vim /usr/local/kibana/config/kibana.yml
server.port: 5601
server.host: "192.168.242.144"
elasticsearch.url: "http://192.168.242.144:9200"
kibana.index: ".kibana"启动kibana,浏览器访问192.168.242.144:5601,验证
[root@localhost ~]# nohup /usr/local/kibana/bin/kibana &

3、部署logstash:

1、部署jdk(略)
2、安装logstash
[root@localhost ~]# tar zxf logstash-7.3.2.tar.gz -C /usr/local
[root@localhost ~]# mv /usr/local/logstash-7.3.2 /usr/local/logstash
[root@localhost ~]# ln -s /usr/local/logstash/bin/logstash /usr/bin/logstash
3、创建logstash的配置文件,用来搜集kafka集群中的消息,并转发到elasticsearch集群
[root@localhost ~]# vim filebeat.conf
input {kafka {type => "kafka-logs"bootstrap_servers => "192.168.242.147:9092,192.168.242.148:9092,192.168.242.149:9092"group_id => "logstash"auto_offset_reset => "earliest"topics => "kafka_run_log"consumer_threads => 5decorate_events => true}
}output {elasticsearch {index => 'kafka-run-log-%{+YYYY.MM.dd}'hosts => ["192.168.242.144:9200","192.168.242.145:9200"]}
}

4、部署kafka集群:

kafka-1:
1、安装jdk(略)
2、安装kafka
[root@localhost ~]# tar zxf kafka_2.11-2.1.0.tgz -C /usr/local
[root@localhost ~]# mv /usr/local/kafka_2.11 /usr/local/kafka
3、配置zookeeper
[root@localhost ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
[root@localhost ~]# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.242.147:2888:3888
server.2=192.168.242.148:2888:3888
server.3=192.168.242.149:2888:3888
[root@localhost ~]# mkdir -p /opt/data/zookeeper/{data,logs}
[root@localhost ~]# echo 1 > /opt/data/zookeeper/data/myid
4、配置kafka
[root@localhost ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties
[root@localhost ~]# vim  /usr/local/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.242.147:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.242.147:2181,192.168.242.148:2181,192.168.242.149:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@localhost ~]# mkdir -p /opt/data/kafka/logskafka-2:
1、安装jdk(略)
2、安装kafka
[root@localhost ~]# tar zxf kafka_2.11-2.1.0.tgz -C /usr/local
[root@localhost ~]# mv /usr/local/kafka_2.11 /usr/local/kafka
3、配置zookeeper
[root@localhost ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
[root@localhost ~]# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.242.147:2888:3888
server.2=192.168.242.148:2888:3888
server.3=192.168.242.149:2888:3888
[root@localhost ~]# mkdir -p /opt/data/zookeeper/{data,logs}
[root@localhost ~]# echo 2 > /opt/data/zookeeper/data/myid
4、配置kafka
[root@localhost ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties
[root@localhost ~]# vim  /usr/local/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.242.148:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.242.147:2181,192.168.242.148:2181,192.168.242.149:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@localhost ~]# mkdir -p /opt/data/kafka/logskafka-3:
1、安装jdk(略)
2、安装kafka
[root@localhost ~]# tar zxf kafka_2.11-2.1.0.tgz -C /usr/local
[root@localhost ~]# mv /usr/local/kafka_2.11 /usr/local/kafka
3、配置zookeeper
[root@localhost ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/zookeeper.properties
[root@localhost ~]# vim /usr/local/kafka/config/zookeeper.properties
dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs
clientPort=2181
tickTime=2000
initLimit=20
syncLimit=10
server.1=192.168.242.147:2888:3888
server.2=192.168.242.148:2888:3888
server.3=192.168.242.149:2888:3888
[root@localhost ~]# mkdir -p /opt/data/zookeeper/{data,logs}
[root@localhost ~]# echo 3 > /opt/data/zookeeper/data/myid
4、配置kafka
[root@localhost ~]# sed -i 's/^[^#]/#&/' /usr/local/kafka/config/server.properties
[root@localhost ~]# vim  /usr/local/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.242.149:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/opt/data/kafka/logs
num.partitions=6
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=536870912
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.242.147:2181,192.168.242.148:2181,192.168.242.149:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
[root@localhost ~]# mkdir -p /opt/data/kafka/logs三台机器一次启动zookeeper,然后验证,最后一次启动kafka(一定要先启动zookeeper再启动kafka)
[root@localhost ~]# nohup /usr/local/kafka/bin/zookeeper-server-start.sh config/zookeeper.properties & 回车后再回车[root@localhost ~]# echo conf | nc 127.0.0.1 2181
....[root@localhost ~]# echo stat | nc 127.0.0.1 2181
...
验证成功后再启动kafka
nohup /usr/local/kafka/bin/zookeeper-server-start.sh config/server.properties &然后通过jobs和查看端口号,以及创建topic验证(略)

5、部署filebeat搜集nginx日志:

1、安装jdk(略)
2、下载nginx,启动nginx
[root@localhost ~]# yum -y install nginx &&systemctl start nginx
3、安装filebeat
[root@localhost ~]# tar zxf filebeat-6.5.4-linux-x86_64.tar.gz -C /usr/local
[root@localhost ~]# mv /usr/local/filebeat-6.5.4 /usr/local/filebeat
[root@localhost ~]# vim /usr/local/filebeat/filebeat.yml
....
- type: log# Change to true to enable this input configuration.enabled: true# Paths that should be crawled and fetched. Glob based paths.paths:- /var/log/nginx/access.log
.....
output.kafka:# Array of hosts to connect to.enabled: truehosts: ["192.168.242.147:9092","192.168.242.148:9092","192.168.242.149:9092"]topic: 'kafka_run_log'    注意:话题的名字要与logstash配置文件中的话题名称相同,且话题需要已存在保存退出

6、搜集nginx的访问日志

kafka-1:
在kafka中创建topic,跟logstash保持一致
[root@localhost ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.242.147:2181 --replication-factor 1 --partitions 1 --topic kafka_run_log先启动logstash
logstash:
[root@localhost ~]# logstash -f filebeat.conf
...... 启动较慢,耐心等待,看到successfully start 就说明启动成功在启动filebeat
filebeat:
[root@localhost ~]# /usr/local/filebeat -c filebeat.yml &

7、测试

访问nginx的页面

在elasticsearch集群访问查看,在浏览器head监控插件中查看

然后在kibana中创建索引,展示nginx访问日志的访问情况

每访问一次nginx的网站发布页面,上图kibana中都会对应有消息更新。

五、Packetbeat(扩展)

1、Packetbeat 概述

Packetbeat 轻量型网络数据采集器,用于深挖网线上传输的数据,了解应用程序动态。Packetbeat 是一款轻量型网络数据包分析器,能够将数据发送至 Logstash 或 Elasticsearch等。

目前,Packetbeat支持以下协议:
• ICMP (v4 and v6)
• DNS
• HTTP
• AMQP 0.9.1
• Cassandra
• Mysql
• PostgreSQL
• Redis
• Thrift-RPC
• MongoDB
• Memcache
• TLS

packetbeat使用方法与filebeat一样,这里就不演示啦。

ELK日志中心集群,看完工资+1k相关推荐

  1. 一步一步搭建ELK日志处理集群(自己做过测试)

    下面的linux版本用的是centos 7的,用centos 6.5有可能出现问题. ELK集群搭建手册 一.   环境准备: 三台Linux服务器,ip地址分别为: 192.168.25.30 19 ...

  2. 系列学习分布式任务调度 XXL-JOB 之第 2 篇 —— 调度中心集群

    调度中心集群 调度中心支持集群部署,提升调度系统容灾和可用性. 调度中心集群部署时,几点要求和建议: DB配置保持一致: 集群机器时钟保持一致(单机集群忽视): 建议:推荐通过 nginx 为调度中心 ...

  3. 互联网晚报 | 2月25日 星期五 | 元宇宙概念,热度显著下降;​腾讯正洽谈落地“东数西算”大湾区枢纽韶关数据中心集群...

    ‍‍ 自动驾驶公司"云骥智行"获数亿元天使轮融资 2月25日消息,钛媒体App 独家获悉,国产GPU芯片公司壁仞科技的创始人张文,日前以董事长身份,领导了一家L4级自动驾驶初创企业 ...

  4. Apollo注册到自己的Eureka注册中心+配置中心集群

    ##重要提示:在任何步骤开始之前,谨记下面的东西## 在对apollo-master这个文件进行处理的时候,要找到文件夹scripts下的文件build.bat,苹果用户请找到build.sh,并且在 ...

  5. SpringCloud注册中心集群化及如何抗住大型系统的高并发访问

    一.场景引入 本人所在的项目由于直接面向消费者,迭代周期迅速,所以服务端框架一直采用Springboot+dubbo的组合模式,每个服务由service模块+web模块构成,service模块通过公司 ...

  6. 【分布式任务调度】(一)XXL-JOB调度中心集群部署配置

    文章目录 1.概述 2.代码编译 2.1.代码下载 2.2.初始化与编译 3.集群部署 3.1.服务启动 3.2.反向代理 4.总结 1.概述 XXL-JOB是一款轻量级的分布式任务调度中间件,默认支 ...

  7. 分布式解决方案-全面解密分布式任务调度平台-XXLJob调度中心集群

    调度中心集群 调度中心支持集群部署,提升调度系统容灾和可用性. 调度中心集群部署时,几点要求和建议: DB配置保持一致: 登陆账号配置保持一致: 群机器时钟保持一致(单机集群忽视): 建议:推荐通过n ...

  8. Akka 指南 之「跨多个数据中心集群」

    温馨提示:Akka 中文指南的 GitHub 地址为「akka-guide」,欢迎大家Star.Fork,纠错. 文章目录 跨多个数据中心集群 动机 定义数据中心 成员关系 故障检测 集群单例 集群分 ...

  9. springCloud - 第8篇 - 配置文件管理中心 - 集群模式(负载匀衡)使用

    前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家.点击跳转到教程. 当微服务系统中 应用服务有很多时(serviceA .serviceB ...),会都从同一个配置中 ...

最新文章

  1. socket udp
  2. Could not find codec parameters for stream 0 (Video: h264, none)
  3. format 转化时间格式不起作用
  4. 匹配行linux中grep命令的使用
  5. frameset 后台管理_易达CMS下载-易达CMS(免费开源网站管理系统)v3.0.0.1103免费版
  6. 目标检测系列(七)——CornerNet:detecting objects as paired keypoints
  7. Vue封装预约日期插件和发布到npm上
  8. java计算一个多边形的重心_2D凸多边形碰撞检测算法(二) - GJK(上)
  9. E-MapReduce 2.0.0 版本发布
  10. android 模板设计,Android的设计模式-模板方法模式
  11. pythonfor杨辉三角,python实现杨辉三角 python实现杨辉三角的几种方法代码实例
  12. AirPlay/ios浅谈
  13. python安装在什么地方_python的第三方库库安装在哪里了
  14. 简单的Windows远程连接
  15. 《基于Python的大数据分析基础及实战》第一章
  16. 联想R720使用kali安装NVIDIA显卡驱动
  17. 大数据命令,一文在手,全部都有(送纯净版文档)
  18. bmp格式图片怎么转成JPG格式?教你一键转格式的方法
  19. 制作秒表——AT24C02储存芯片与用定时器代替Delay函数
  20. 如何建立高质量团队-《克服团队协作的五种障碍》笔记与心得

热门文章

  1. 对jiq中一段程序以及prepare_to_wait的理解
  2. FPGA常用名词解释
  3. 获取WOED和EXCEL的公用方法
  4. 如何打开Windows7和Windows server 2008 R2的资源监视器
  5. Oracle错误 ora-12514 解决方法
  6. python随机生成数据库数据之一步一步教你数据造假成为道德主席-取名器-.-xswl-pydbgen || Faker
  7. 小米三季度业绩下滑,与荣耀、realme的崛起分不开
  8. 警惕这些房屋千万不可买!买了一切都完了!
  9. 安卓TextView文本不满一行由于英文,标点符号等原因换行
  10. 华师计算机学院男女比例,华中师范大学:专业录取无男女比例限制