下载安装(Redhat/Centos7)

  • rpm --import http://packages.elasticsearch.org/GPG-KEY-elasticsearch cat > /etc/yum.repos.d/logstash.repo <<EOF [logstash-5.0] name=logstash repository for 5.0.x packages baseurl=http://packages.elasticsearch.org/logstash/5.0/centos gpgcheck=1 gpgkey=http://packages.elasticsearch.org/GPG-KEY-elasticsearch enabled=1 EOF yum clean all yum install logstash

hello world

  • # bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

logstash用不同的线程来实现对输入信息处理之后显示成我们希望的样子,logstash给每个线程都取了名字,输入的叫xx,过滤的叫\xx

数据在线程之间以事件的形式流传,不要叫行,因为logstash可以处理多行事件。

logstash会给事件添加一些额外的信息,最重要的就是@timestamp,用来标记事件的发生时间。因为这个字段涉及到logstash的内部流转,所以必须是一个joda对象,如果尝试自己给一个字符串字段重命名为@timestamp的话,logstash会直接报错。所以请使用filters/data插件来管理这个特殊字段。

另外的几个字段:

  • host标记发生位置
  • type标记事件的唯一类型
  • tags标记事件的某方面属性。这是一个数组,一个事件可以有多个标签

你可以随意给事件添加字段或者从事件里删除字段。事实上事件就是一个Ruby对象,或者更简单的理解为就是一个哈希。

注:每个logstash过滤插件都有四个方法:

add_tag、remove_tag、add_field、remove_field

配置语法

logstash习惯用shipper、broker、indexer来描述数据流中不同进程的角色。

语法

在很多的运维场景中并没有用logstash作为shipper,或者说 没有用elasticsearch作为是数据存储,也就是没有indexer。

logstash设计了自己的DSL--有点像Puppet的DSL,两者均是通过Ruby语言写的--包括区域、注释、数据类型(布尔值、字符串、数值、数组、哈希)。条件判断,字段引用等。

区段

logstash用{}来定义区域。区域内可以包括插件区域定义,可以在一个区域中定义多个插件。插件区域内则可以定义键值对设置。示例:

input{

stdin{}

syslog{}

}

数据类型

logstash支持少量的数据值类型

  • bool

debug => true

  • string

host => "hostname"

  • number

port => 514

  • array

match => ["datetime","UNIX","ISO8601"]

  • hash

option => {

key1 => "value1",

key2 => "value2"

}

注:如果版本低于1.2.0,哈希的语法跟数组是一样的,示例:

match => ["field1","pattern1","field2","pattern2"]

字段引用

字段是logstash::Event对象的属性。可以想象字段就像一个键值对。如果你想在Logstash中使用字段的值,只需要把字段的名字写在中括号中就行了,这就叫字段引用。

对于嵌套字段(也就是多维哈希表,或者叫哈希的哈希),每层字段名都写在[]中。比如你可以从geoip获取到longitude值(这是个笨办法,实际上有单独的字段专门存这个数据的):

[geoip][location][0]

注:logstash的数组也支持倒序下标,及[geoip][location][-1]可以获取数组最后一个元素的值。

Logstash还支持变量内插,在字符串里使用字段引用的方法是这样:

“the longitude is %{[geoip][location][0]}”

条件判断(condition)

Logstash从1.3.0版本开始支持条件判断和表达式。

表达式支持下面这些操作符:

  • ==,!=,<,>,<=,>=
  • =~(匹配正则),!~(不匹配正则)
  • in(包含),not in(不包含)
  • and,or,nand(非与),xor(非或)
  • ()(复合表达式),!()(对复合表达式取反)

示例:

if "_grokparsefailure" not in [tags] {
} else if [status] !~ /^2\d\d/ or ( [url] == "/noc.gif" nand [geoip][city] != "beijing" ) {
} else {
}

命令行参数

logstash提供了一个shell脚本叫logstash,方便快速运行。

  • -e

即执行。事实上你可以不写任何具体配置,直接运行bin/logstash -e ‘ ’达到相同的效果,这个参数的默认值是下面这样的:

input{

stdin{  }

}

output{

stdout{  }

}

  • --config或-f

即文件。bin/logstash -f agent.conf,或者bin/logstash -f /etc/logstash.d/

注:logstash列出目录下所有文件时是字母排序的。而logstash配置段的filter和output都是顺序执行的,所以顺序非常重要。采用多文件管理的用户,推荐采用数字编号方式命名配置文件,同时在配置中,严谨采用if判断限定不同日志的动作。

  • --configtest或-t

即测试。用来测试logstash读取的配置文件是否存在语法错误。logstash配置语法是用grammar.treetop定义的,尤其是使用了上一条提到的读取目录方式的读者,尤其要提前测试。

  • --log或-l

即日志。logstash默认输出日志到标准错误。生产环境下你可以通过bin/logstash -l log/logstash.log命令来统一存储日志。

  • --pipelinne-workers或-w

运行filter和output的pipeline线程的数量。默认是CPU核数。

  • --pipeline-batch-delay或-u

每个logstash pipeline线程,在打包批量日志的时候,最多等待毫秒数。默认是5ms。

  • --pluginpath或-P

可以写自己的插件,然后用bin/logstash --pluginpath /path/to/own/plugins进行加载。

  • -verbose

输出一定的调试日志。

设置文件

从logstash 5.0开始,新增了$LS_HOME/config/logstash.yml文件,可以将所有的命令行参数都通过YAML文件方式设置。同时为了反映命令行配置参数的层级关系,参数也都改成用.而不是-了。

pipeline:

workers:24

   bath:

    size:125

    delay:5

plugin的安装

从logstash1.5.0版本开始,logstash将所有插件都独立拆分成gem包。这样每个插件都可以独立更新,不用等待logstash自身做整体更新的时候才能使用。

为了达到这个目标,logstash配置了专门的plugin管理命令

plugin用法说明

Usage:bin/logstash-plugin [OPTIONS] SUBCOMMAND [ARG] ...Parameters:SUBCOMMAND                    subcommand[ARG] ...                     subcommand argumentsSubcommands:install                       Install a pluginuninstall                     Uninstall a pluginupdate                        Install a pluginlist                          List all installed pluginsOptions:-h, --help                    print help

示例

bin/logstash-plugin list查看本机现在有多少插件可用(起始就是在vender/bundle/jruby/1.9/gems的目录下)

若发布新模块logstash-output-webhdfs。只需要运行:bin/logstash-plugin install logstash-output-webhdfs

假如只是升级的话:bin/logstash-plugin update logstash-input-tcp

本地插件安装

bin/logstash-plugin install /path/to/logstash-filter-crash.gem

执行成功后在logstash-5.0.0目录下的Gemfile文件最后一行多出一段内容:

gem "logstash-filter-crash", "1.1.0", :path => "vendor/local_gems/d354312c/logstash-filter-mweibocrash-1.1.0"

同时Gemfile.jruby-1.9.lock文件开头也会多出一段内容:

PATHremote: vendor/local_gems/d354312c/logstash-filter-crash-1.1.0specs:logstash-filter-crash (1.1.0)logstash-core (>= 1.4.0, < 2.0.0)

长期运行

标准service方式

rpm发行包安装的用户:/etc/init.d/logstash脚本中,会加载/etc/init.d/functions库文件,利用其中的daemon函数,将logstash进程作为后台程序运行。

所以只要确保配置文件存在于/etc/logstash/conf.d/目录下所有的conf结尾的文件且无其他文件,即可通过service logstash start启动

最基础的nohup方式

command
command > /dev/null
command > /dev/null 2>&1
command &
command > /dev/null &
command > /dev/null 2>&1 &
command &> /dev/null
nohup command &> /dev/null

想要维持一个长期后台运行的logstash,你需要同时在命令前面加nohup,后面加&。

更优雅的SCREEN方式

screen算是linux运维一个中高级技巧。通过screen命令创建的环境下运行的终端命令,其父进程不是sshd的登录会话,而是screen。这样就可以避免用户退出进程消失的问题,又随时能重新接管回终端继续操作。

创建独立的screen命令:

screen -dmS elkscreen_1

接管连入创建的elkscreen_1

screen -r elkscreen_1

运行logstash之后,不要按Ctrl+C,而是按Ctrl+A+D,断开环境。想要重新接管,依然screen -r elkscreen_1即可。

若创建了多个screen,可通过:

screen -list

最推荐的daemontools方式

daemontools工具:daemontools、python实现的supervisord、perl实现的ubic、ruby实现的god等。

以supervisord为例,因为出现比较早,可通过epel仓库直接安装:

yum -y install supervisord --enablerepo=epel

在/etc/supervisord.conf配置文件中添加内容,定义你要启动的程序:

[program:elkpro_1]
environment=LS_HEAP_SIZE=5000m
directory=/opt/logstash
command=/opt/logstash/bin/logstash -f /etc/logstash/pro1.conf -w 10 -l /var/log/logstash/pro1.log
[program:elkpro_2]
environment=LS_HEAP_SIZE=5000m
directory=/opt/logstash
command=/opt/logstash/bin/logstash -f /etc/logstash/pro2.conf -w 10 -l /var/log/logstash/pro2.log

然后service supervisord start 即可。

logstash会以supervisord子进程的身份运行,你还可以使用supervisordctl命令单独控制一系列logstash子进程中某一个进程的启停操作:

supervisorctl stop elkpro_2

  • 输入插件

  • 读取文件(File)

logstash使用一个名叫FileWatch的Ruby Gem库来监听文件变化。这个库支持glob展开文件路径,而且会记录一个叫.sincedb的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心logstash会漏过你的数据。

sincedb文件中记录了每个被监听文件的inide、major number、minor number和pos。

inode:在其他博文中已经阐述;

major device number:可以看做是设备驱动程序,被同一设备驱动程序管理的设备有相同的major device number。这个数字实际是Kernel中device driver table的索引。这个表保存着不同的设备驱动程序;

minor number:代表被访问的具体设备,也就是说Kernel根据major device number找到的设备驱动程序,然后再从minor device number获得设备位置属性。

pos:包含发生错误的位置的绝对文件位置;

  • 配置示例

input {file {path => ["/var/log/*.log", "/var/log/message"]type => "system"start_position => "beginning"}
}
  • 解释

有一些比较有用的配置项,可以用来指定File Watch库的行为:

  •   discover_interval

logstash每隔多久去检查一次被监听的path下是否有新文件,默认15s。

  •   exclude

不想被监听的文件可以排除出去,这里跟path一样支持glob展开。

glob:所谓的 glob 模式是指 shell 所使用的简化了的正则表达式。星号(*)匹配零个或多个任意字符;[abc]匹配任何一个列在方括号中的字符(这个例子要么匹配一个 a,要么匹配一个 b,要么匹配一个 c);问号(?)只匹配一个任意字符;如果在方括号中使用短划线分隔两个字符,表示所有在这两个字符范围内的都可以匹配(比如 [0-9] 表示匹配所有 0 到 9 的数字)。

  •   close_older

一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3600s。

  •   ignore_older

在每次检查文件列表的时候,如果一个文件的最后修改时间超过这个值,就忽略这个文件。默认是86400s。

  •   sincedb_path

如果你不想用默认的$HOME/.sincedb(windows平台上在C:\windows\System32\config\systemprofile\.sincedb),可以通过这个配置定义sincedb文件到其他位置。

  •   sincedb_write_interval

logstash每隔多久写一次sincedb文件,默认是15s。

  •   stat_interval

logstash每隔多久检查一次被监听文件状态(是否有更新,默认是1s)

  •   start_position

logstash从什么位置开始读取文件数据,默认是结束位置,也就是说logstash进程会以类似tail -f的形式运行。如果你是要导入原有数据,把这个设定成“beginning”,logstash进程就从头开始读取,类似less +F的形式运行。

  • 注意

1、通常你要导入原有数据进Elasticsearch的话,你还需要filter/date插件来修改默认的@timestamp字段值。

2、FileWatch只支持绝对路径,而且会不自动递归目录。所以有需要的话,请用数组方式都写明具体哪些文件。

3、LogStash::Input::File只是在进程运行的注册阶段初始化一个FileWatch对象,所以它不能支持类似fluentd那样的path=> “/path/to/%{+yyyy/mm/dd/hh}.log”写法。达到相同目的,你只能写成path=>"/path/to/*/*/*/*.log"。FileWatch模块提供了一个稍微简单一点的写法:/path/to/**/*.log,用**来缩写表示递归全部子目录。

4、在单个input/file中监听的文件数量太多的话,每次启动扫描构建监听队列会消耗较多的时间。给使用者的感觉好像读取不到一样,这是正常现象。

5、start_position仅在该文件从未被监听过的时候起作用,如果sincedb文件中已经有这个文件的inode记录了,那么logstash依然会从记录的pos开始读取数据。

6、因为windows平台上没有inode概念,logstash某些版本在windows平台上监听文件不是很靠谱,windows平台上,推荐考虑使用nxlog作为收集端。

  • 标准输入(Stdin)

input {stdin {add_field => {"key" => "value"}codec => "plain"tags => ["add"]type => "std"}
}
  • 运行结果

用上面的新stdin设置重新运行一次最开始的hello world示例。建议大家把整段配置都写入一个文本文件,然后运行命令:bin/logstash -f stdin.conf。输入“hello world”并回车,你会在终端看到如下的输出:

{"message" => "hello world","@version" => "1","@timestamp" => "2014-08-08T06:48:47.789Z", "type" => "std", "tags" => [ [0] "add" ], "key" => "value", "host" => "raochenlindeMacBook-Air.local" }
  • 解释

type和tags是logstash事件中两个特殊的字段。通常说我们会在输入区段中通过type来标记事件类型----我们肯定是提前能知道这个事件属于什么类型的。而tags这是在数据处理过程中,由具体的插件来添加或者删除的。

最常见的用法像下面这样:

input {stdin {type => "web"}
}
filter {if [type] == "web" {grok {match => ["message", %{COMBINEDAPACHELOG}]}}
}
output {if "_grokparsefailure" in [tags] {nagios_nsca {nagios_status => "1"}} else {elasticsearch {}}
}
  • 读取Syslog数据

syslog可能是运维领域最流行的数据传输协议了。当你想从设备上收集系统日志的时候,syslog应该会是你的第一选择。尤其是网络设备,比如思科----syslog几乎是唯一可行的办法。

这里只讲如何把logstash配置成一个syslog服务器来接收数据。有关rsyslog的用法,稍后的类型项目中,会有更详细的介绍。

  • 配置示例

input {syslog {port => "514"}
}
  • 运行结果

作为简单的测试,先暂停本机的syslog或rsyslog进程,然后启动logstash进程(这样就不会有端口冲突的问题)。现在,本机的syslog就会默认发送到logstash里了。我们可以用自带的logger命令行工具发送一条“hello world”信息到syslog里,即logstash里面。看到logstash输出像下面这样。

{"message" => "Hello World","@version" => "1","@timestamp" => "2014-08-08T09:01:15.911Z", "host" => "127.0.0.1", "priority" => 31, "timestamp" => "Aug 8 17:01:15", "logsource" => "raochenlindeMacBook-Air.local", "program" => "com.apple.metadata.mdflagwriter", "pid" => "381", "severity" => 7, "facility" => 3, "facility_label" => "system", "severity_label" => "Debug" }
  • 解释

Logstash是用UDPSocket,TCPServer和Logstash::Filter::Grok来实现Logstash::Inputs::Syslog的。所以你起始可以直接用logstash配置实现一样的效果:

input {tcp {port => "8514"}
}
filter {grok {match => ["message", "%{SYSLOGLINE}" ]}syslog_pri { }
}
  • 最佳实践

建议在使用Logstash::Inputs::Syslog的时候走TCP协议来传输数据;

因为具体实现中,UDP监听器只用了一个线程,而TCP监听器会在接收每个连接的时候都启动新的线程来处理后续步骤。

如果你已经使用UDP监听收集日志,用下行命令检查你的UDP接收队列的大小:

netstat -plnu | awk 'NR==1 || $4~/:514$/{print $2}'
Recv-Q
228096

228096是UDP接收队列的默认最大大小,这时候linux内核开始丢弃数据包了。

强 烈建议使用Logstash::Input::Tcp和Logstash::filters::Grok配合实现同样的syslog功能。

虽然Logstash::Input::Syslog在使用TCPServer的时候可以采用多线程处理的接收,但是在同一个客户端数据处理中,其grok和date是一直在该线程中完成的,这会导致总体上的处理性能几何级的下降,经过测试,TCPServer每秒可以接收50000条数据,而在同一线程中启用grok后每秒只能处理5000条,再加上date只能到500条。

才将这两步拆分到filters阶段后,logstash支持对该阶段插件单独设置多线程运行,大大提高了总体处理性能。在相同环境下,logstash -f tcp.conf -w 20的测试中,总体处理性能可以达到30000条数据。

注:测试采用 logstash 作者提供的 yes "<44>May 19 18:30:17 snack jls: foo bar 32" | nc localhost 3000 命令。出处见:https://github.com/jordansissel/experiments/blob/master/ruby/jruby-netty/syslog-server/Makefile

  • 贴士

如果实在没办法切换到TCP协议,你可以自己写程序或者使用其他基于异步IO框架(比如libev)的项目。下面是一个简单的异步IO实现UDP监听数据输入Eleasticsearch的示例:

https://gist.github.com/chenryn/7c922ac424324ee0d695

  • 读取网络数据(TCP)

未来你可能会用到redis服务器或者其他消息队列系统作为logstash broker的角色。不过logstash其实也有自己的TCP/UDP插件,在临时任务的时候,也算能用,尤其是测试环境。

小贴士:

虽然Logstash::Input::TCP用Ruby的Socket和Openssl库实现了高级的SSL功能,但Logstash本身只能在SizedQueue中缓存20个事件。这就是建议在生产环境中换用其他消息队列的原因。

  • 配置示例

input {tcp {port => 8888mode => "server"ssl_enable => false}
}
  • 常见场景

目前看来,Logstash::Input::TCP最常见的用法就是配合nc命令导入旧数据。在启动logstash进程后,在另一个终端运行如下的命令即可导入数据:

# nc 127.0.0.1 8888 < olddata 

这种做法比用Logstash::Input::File好,因为当nc命令结束,我们就知道数据导入完毕了。而用input/file方式,logstash进程还会一直等待新数据输入被监听的文件,不能直接看出是否任务完成了。

未完待续……

转载于:https://www.cnblogs.com/cf532088799/p/7732973.html

ELK之logstash相关推荐

  1. 使用ELK(Elasticsearch + Logstash + Kibana) 搭建日志集中分析平台实践--转载

    原文地址:https://wsgzao.github.io/post/elk/ 另外可以参考:https://www.digitalocean.com/community/tutorials/how- ...

  2. ELK(ElasticSearch, Logstash, Kibana)搭建实时日志分析平台

    ELK平台介绍 在搜索ELK资料的时候,发现这篇文章比较好,于是摘抄一小段: 以下内容来自: http://baidu.blog.51cto.com/71938/1676798 日志主要包括系统日志. ...

  3. ELK(ElasticSearch+Logstash+ Kibana)搭建实时日志分析平台

    来源:http://www.cnblogs.com/zclzhao/p/5749736.html 一.简介 ELK 由三部分组成elasticsearch.logstash.kibana,elasti ...

  4. Centos6.5使用ELK(Elasticsearch + Logstash + Kibana) 搭建日志集中分析平台实践

    Centos6.5安装Logstash ELK stack 日志管理系统 概述: 日志主要包括系统日志.应用程序日志和安全日志.系统运维和开发人员可以通过日志了解服务器软硬件信息.检查配置过程中的错误 ...

  5. ELK(ElasticSearch, Logstash, Kibana)搭建实时日志分析平

    ELK平台介绍 在搜索ELK资料的时候,发现这篇文章比较好,于是摘抄一小段: 以下内容来自:http://baidu.blog.51cto.com/71938/1676798 日志主要包括系统日志.应 ...

  6. ELK(ElasticSearch, Logstash, Kibana)实时日志分析平台部署

    开源实时日志分析ELK平台能够完美的解决我们上述的问题,ELK由ElasticSearch.Logstash和Kiabana三个开源工具组成.官方网站:https://www.elastic.co/p ...

  7. ELK:LogStash写入ES索引建立差8个时区问题

    一.版本信息 ElasticSearch版本:elasticsearch-5.4.2.rpm Kibana版本:kibana-5.4.2-linux-x86_64.tar.gz logstash版本: ...

  8. 【分布式应用】ELFK集群部署(Filebeat+ELK)Logstash的过滤模块

    一.ELFK集群部署(Filebeat+ELK) ELFK= ES + logstash+filebeat+kibana 实验环境 服务器类型 系统和IP地址 需要安装的组件 硬件方面 node1节点 ...

  9. CENTOS6.5安装日志分析ELK elasticsearch + logstash + redis + kibana

    1.日志平台的工作流程 多个独立的agent(Shipper)负责收集不同来源的数据,一个中心agent(Indexer)负责汇总和分析数据,在中心agent前的Broker(使用redis实现)作为 ...

  10. centos7安装配置ELK(Elasticsearch+Logstash+Kibana)

    1.server端安装 #安装elasticsearch yum install java-1.8.0-openjdk ruby yum install elasticsearch-2.1.0.rpm ...

最新文章

  1. 如何理解Nyquist采样定理?
  2. c++ 虚函数_到底什么情况下会合成默认构造函数?
  3. 推特千赞Demo袭来!简笔画变照片的GauGAN,编故事的GPT-2,浏览器皆可玩
  4. 数据库中什么是主键,什么是外键?
  5. ASP.NET网站实现多语言版本 【转】
  6. [C++11]弱引用智能指针weak_ptr初始化和相关的操作函数
  7. AZURE kinect 深度相机配置ubuntu16.04
  8. iMeta教程 | 使用PMS分析微生物组(图文+视频)
  9. 深度解读:GAN模型及其在2016年度的进展
  10. DataGuard常用命令及DG主备库开关顺序
  11. C++与Java语法上的不同,互联网 面试官 如何面试
  12. 基于JAVA+SpringBoot+Mybatis+MYSQL的请假与审批系统
  13. 明确职责分工的重要性_发挥属地作用明确责任分工
  14. mount: block device /dev/cdrom is write-protected, mounting read-only 解决方法
  15. 机器学习与神经网络的学习
  16. mysql5.4升级5.6_Laravel5.4 升级到 5.6
  17. 接口测试工具-Jmeter使用笔记(五:正则表达式提取器)
  18. VS2017 winform 打包(使用 Microsoft Visual Studio 2017 Installer Project)
  19. Atitit 互联网技术公司防爆指南技术规范标准流程 30个危险物品
  20. yuv422,yuv420,yuv444的区别

热门文章

  1. Django开发个人博客网站——19、通过Django Haystack实现搜索功能(上)
  2. php课设源代码网站,php精品课程教学网站在线发布系统
  3. 邮储银行您在我行留存的身份信息不准确.. 解决
  4. formidable词根词缀_词根词缀+语境:和UP一起逻辑背单词(三十六)
  5. 有沃更精彩,沃课堂理想的移动学习平台
  6. sv基础-数据类型(一)
  7. CKEditor5记
  8. FlyBanner+RecyclerView显示+MVP
  9. 大数据技术之Kafka(一)Kafka概述、Kafka快速入门、Kafka架构深入
  10. 还想抄底?小心爆炸!