• 前言

  • 组件介绍

1.Input2.Filter3.Output

  • 部署
  • 优缺点
  • 优化 1.基于Logstash自身优化 2.基于插件的优化

前言

注意:文章中如果涉及到代码的部分,使用PC端阅读更佳哦

编写这篇文章的目的是为了后面的日志工程做铺垫,如果是老手可直接跳过这篇文章

Logstash已经是很成熟的日志采集技术了,简单来说就是一个pipeline形式的采集。Logstash大体可以分为三个部分:Inpu(输入端),Filter(过滤端)、Output(输出端);每个部分都可以使用各种组件进行功能处理.接下来将分别对这三个部分组件进行详细讲解,并分析Logstash的优缺点以及如何优化Logstash?

组件介绍

一条消息在Logstash中是以事件event为单元的.

本文只简单介绍几个入门的插件,来了解Logstash的运行,高级的部分请自行阅读官网 https://www.elastic.co/guide/en/logstash/1.5/index.html

1 Input

1.1 标准输入Stdin
# 配置示例input{ stdin{     add_field => {"key" => "value"}     codec => "plain"     tags => ["add"]     type => "std"     } }#运行结果# type和tags是logstash事件中两个特殊字段。通过会在输入区域通过type来标记事件类型,而tags则是在数据处理过程中,由具体的插件来添加或删除{    "message" => "hello world",   "@version" => "1", "@timestamp" => "2014-08-08T06:48:47.789Z",       "type" => "std",       "tags" => [     [0] "add" ],        "key" => "value",       "host" => "raochenlindeMacBook-Air.local"}
1.2 读取文件File
# 配置示例input { file{        path => ["/var/log/*.log","/var/log/message"]    type => "system"    start_position => "beginning"  }}

logstash使用一个名叫filewatch的Ruby Gem库来监听文件的变化。这个库支持glob展开文件路径,而且会记录一个叫.slincedb的数据库文件来跟踪被监听的日志文件的当前读取位置、

注意:

  1. FileWatch只支持文件的绝对路径,而且不会自动递归目录。所以有需要的话,用数组的方式写明具体哪些文件

  2. Logstash::Inputs::file只是在进程运行的注册阶段初始化一个FileWatch对象。所以不能支持类似fluentd那样的Path => "/path/to/%{+yyyy/MM/dd/hh}.log" 写法。达到相同目的,你只能写成 path=>"/path/to/*/*/*/*.log"

  3. start_position仅在该文件从未被监听过的时候起作用。如果slincedb文件中已经有这个文件的inode记录了,那么logstash依然会从记录过的Pos开始读取数据。所以重复测试的时候需要删除slincedb文件

1.3 生成测试数据

# 配置文件input{ generator{   count => 100000   message => '{"key1":"value1","key2":[1,2],"key3":{"subkey1":"subvalue1"}}'  }}

2.Filter

2.1 Grok

Grok可以说是Logstash中最常用到的一个插件了,该插件的功能是通过自定义正则表达式来匹配符合条件的事件消息,更多的正则表达式请见: https://github.com/elastic/logstash/tree/v1.4.2/patterns

Grok测试工具可以使用GrokDebug,但是这个工具经常连不上,个人感觉不好用。

个人平时使用Kinbana自带的Grok Debug来进行grok语法的调试

filter{    grok{     match => {        "message" => "%{WORD} %{NUMBER:request_time:float} %{WORD}"     }    }}

2.2 GeoIp

geoip 插件的 "source" 字段可以是任一处理后的字段,比如 "client_ip",但是字段内容却需要小心!geoip 库内只存有公共网络上的 IP 信息,查询不到结果的,会直接返回 null,而 logstash 的 geoip 插件对 null 结果的处理是:不生成对应的 geoip.字段 . 如果使用了诸如 127.0.0.1, 172.16.0.1, 182.168.0.1, 10.0.0.1 等内网地址,会发现没有对应输出!

# 配置示例filter{ geoip{   source => "message" }}#运行结果{    "message" => "183.60.92.253",   "@version" => "1", "@timestamp" => "2014-08-07T10:32:55.610Z",       "host" => "raochenlindeMacBook-Air.local",      "geoip" => {                   "ip" => "183.60.92.253",        "country_code2" => "CN",        "country_code3" => "CHN",         "country_name" => "China",       "continent_code" => "AS",          "region_name" => "30",            "city_name" => "Guangzhou",             "latitude" => 23.11670000000001,            "longitude" => 113.25,             "timezone" => "Asia/Chongqing",     "real_region_name" => "Guangdong",             "location" => [         [0] 113.25,         [1] 23.11670000000001     ] }}

2.3 ruby

在Logstash中使用ruby代码,可以说是非常强大的一个功能。

短短一行 Ruby 代码,可以减少 50% 以上的 CPU 使用率。

filter{ date {  match => ["datetime","UNIX"]  ruby{      code => "event.cancel if 5*24*3600 < (event['@timestamp']-::Time.now").abs"    }  }}

2.4 metrics

metrics{    meter => "cnt"  #统计所有的input事件    add_tag => "metric"    add_field => {"metrics_flag" => "total"}  # 增加标识字段,统计所有的input事件  }  if "_grokparsefailure" in [tags]{     metrics{       meter => "cnt"       add_tag => "metric"      add_field => {"metrics_flag" => "fail"} #统计所有解析失败的事件   }  }

3.Output

3.1标准输出(Stdout)
output{  stdout {     codec => rubydebug # rubydebug通常是用来debug用的     workers => 2  } }
3.2 保存File
output { file {    path => "/path.log.gz"   message_format => "%{message}"   gzip => true}
3.3 保存es
output{ elasticsearch{   host => "localhost"     protocol => "http"     index => "logstash-%{type}-%{+YYYY.MM.dd}"     index_type => "%{type}"     workers => 5     template_overwrite => true }}

部署

# 1.下载并解压缩logstashwget https://download.elastic.co/logstash/logstash/logstash-all-plugins-2.4.1.tar.gztar -zvxf logstash-all-plugins-2.4.1.tar.gz# 2.运行最基本的logstash pipeline# -e:直接在命令行对logstash进行配置,从命令行接受输入,将输出写入命令行# 输入:hello world,可以看到输出,logstash会补充timestamp和ip地址# 用ctrl-d可以结束这个pieplinebin/logstash -e 'input { stdin { } } output { stdout {} }'

优缺点

优点

1.扩展性

Logstash提供了超过200个插件,同时可自定义插件

2.灵活性

Logstash插件不受限,可输入多个数据源或输出多个数据源

3.多样性

数据源多样性,不限于日志文件,比如kafka,es,redis都可以作为数据源

4.可过滤

Logstash对事件可执行常规转换

缺点

Logstash底层是基于JVM的,而且Logstash的设计原则就是尽可能多的处理事件,所以Logstash对资源的消费特别大,如果你遇到了CPU超过100%的时候请不要惊讶,接下来就要开始优化了~

优化

Logstash的优化可分为两大部分,第一部分的优化是基于Logstash本身的,第二部分的优化是基于Logstash插件的

1.基于Logstash程序

1.1 提高线程数

在启动logstash程序的时候,指定-w参数,可以指定线程数

1.2 jvm调整

在logstash的bin目录下logstash.bin.sh脚本里修改LSHEAPSIZE="${LSHEAPSIZE:=4g}"这个值就ok了

2.基于插件

基于插件的优化,无非就是对三个部分(input、filter、output)的优化

2.1 input优化

根据input端使用的插件结合官网提供的参数进行调整,比如input端使用kafka作为数据源,那么则可以调整consumer_thread参数来调整消费者线程数,或者开启压缩等其他手段

2.2 Filter优化

filter端的优化涉及部分比较多,这里只给出两种优化方案

2.2.1 如果使用了grok语法,则需要对grok进行优化

grok的优化无非就是对正则表达式的优化,Logstash在进行grok匹配的时候,如果有事件不符合匹配,那么就会打上_grokparsefailure的tag标签,同时grok语法使用不当的话,cpu消耗会非常大。具体优化如下:

1.增加锚定

^%{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "%{WORD:verb} %{DATA:request} HTTP/%{NUMBER:httpversion}" %{NUMBER:response:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}$

如果增加锚定的话,可以确保只会匹配整个字符串从开始到结束,而不包含其他的。

2.去重重复的正则表达式

# 原始日志Application 1: '8.8.8.8 process-name[666]: a b 1 2 a lot of text at the end'Application 2: '8.8.8.8 process-name[667]: a 1 2 3 a lot of text near the end;4'Application 3: '8.8.8.8 process-name[421]: a completely different format | 1111'# 一个常见的grok设置匹配三种格式grok {  "match" => { "message => [    '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{WORD:word_2} %{NUMBER:number_1} %{NUMBER:number_2} %{DATA:data}',    '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{WORD:word_1} %{NUMBER:number_1} %{NUMBER:number_2} %{NUMBER:number_3} %{DATA:data};%{NUMBER:number_4}',    '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{DATA:data} | %{NUMBER:number}'    ] }}#注意,grok会依次尝试将传入日志行与三个表达式进行匹配,从而在第一次匹配时中断;这意味着确保我们尽可能快地跳到正确的位置,因为应用程序2总是有一个失败的匹配,应用程序3有两个失败的匹配。# 进行优化filter {  grok {    "match" => { "message" => '%{IPORHOST:clientip} %{DATA:process_name}\[%{NUMBER:process_id}\]: %{GREEDYDATA:message}' },    "overwrite" => "message"  }  grok {    "match" => { "message" => [      '%{WORD:word_1} %{WORD:word_2} %{NUMBER:number_1} %{NUMBER:number_2} %{GREEDYDATA:data}',      '%{WORD:word_1} %{NUMBER:number_1} %{NUMBER:number_2} %{NUMBER:number_3} %{DATA:data};%{NUMBER:number_4}',      '%{DATA:data} | %{NUMBER:number}'    ] }  })
2.2.2 如果使用了ruby,排查定位是否有异常抛出,进行优化ruby代码

logstash 获取多个kafka_日志工程Logstash日志采集入门篇相关推荐

  1. 如何使用Elasticsearch,Logstash和Kibana实时可视化Python中的日志

    by Ritvik Khanna Ritvik Khanna着 如何使用Elasticsearch,Logstash和Kibana实时可视化Python中的日志 (How to use Elastic ...

  2. 【十三】景区人流量统计:python日志生成+logstash+kafka+storm+mysql+springBoot+高德地图

    storm+kafka+logstash+springBoot+高德地图 项目概述: 作用:交通信息化,智慧城市 需求:实时统计人流量并通过热力图展示. 类似于腾讯热力图的景区人流量统计 如何采集某个 ...

  3. logstash清除sincedb_path上传记录,重传日志数据

    logstash清除sincedb_path上传记录,重传日志数据 logstash通过一个名为 sincedb_path 下的记录文件记录当前logstash已经上传的日志文件的位置,如果指定为 n ...

  4. 使用Filebeat采集日志结合logstash过滤出特定格式的日志至Elasticsearch

    使用Filebeat采集日志结合logstash过滤出特定格式的日志 文章目录 使用Filebeat采集日志结合logstash过滤出特定格式的日志 ELK搭建 什么是Filebeat 什么是Logs ...

  5. logstash丢弃没有精准匹配到文件名的日志数据

    logstash丢弃没有精准匹配到文件名的日志数据 一种场景,如果确定需要精准采集这一文件 f:\x\data\2022-05-27-231259\my.txt 里面的数据,不是这样形式的 data\ ...

  6. logstash读取Elasticsearch数据保存为json,logstash接收log数据写入kafka生产者

    [提前声明] 文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章 写作不易,转载请注明,谢谢! 代码案例地址: ?https://github.com/Mydreamandrea ...

  7. 计算机网络研学日志,教师信息技术应用能力提升工程研修日志.doc

    教师信息技术应用能力提升工程研修日志 教师信息技术应用能力提升工程研修日志 为提高教学技术水平,促进专业能力发展,并贯彻执行教师继续教育的相关政策,我在本学期参加了<信息技术应用能力专项培训&g ...

  8. oracle的asmcmd获取归档日志,分析oracle的联机日志和归档日志

    logminer和配置 安装logminer 以sqlplus / as sysdba登录系统数据库系统,ORACLE默认安装logminer,如果没有安装,执行SQL脚本安装 --安装logmine ...

  9. c++ 如何获取移动硬盘型号信息_工程销售,如何高效快速获取项目信息

    怎么获取工程信息,对于做建筑工程的销售来说,基本都知道,无非就是通过扫街.网上 搜索.购买第三方工程信息服务.或者他人介绍(亲朋好友.客户.同事.设计师等).在怎 么找项目信息方面?每个人都有自己的方 ...

  10. android 获取monkey日志_APP压力测试定位问题_monkey篇

    1.执行以下monkey命令: adb shell monkey -p com.android.settings -v 1000 > E:apkmonkey.log 2.monkey跑完后,会出 ...

最新文章

  1. 基于OpenCV实战:对象跟踪
  2. git远程仓库回退版本
  3. das,nas,san区别——大型数据中心会用NAS+SAN软硬结合思路
  4. 移动开发的罗曼蒂克消亡史
  5. getsockname与getpeername用法与区别
  6. 编译安装PHP出现configure: error: MySQL configure failed. Please check config.log的解决方法
  7. 基于java Springboot+Vue+shiro前后端分离疫情防疫管理系统设计和实现2.0
  8. 计算机应用基础理论,计算机应用基础理论A卷.doc
  9. 初识EMCASCRIPT 模块化
  10. python,PyQt5编程将qrc文件转为py文件
  11. ios14描述文件无法与服务器连接,iOS14屏蔽更新描述文件已损坏,无法安装的解决办法...
  12. Spread 14.2.0 for WinForms
  13. fly.io ruby on rails
  14. 派森python_派森(Python)
  15. ​当英雄链逞不了英雄 比特吴等代投们也身陷囹圄
  16. python 节点关系图_python 可视化节点关系(一):networkx
  17. 试用用友致远最新版协同产品A8之二
  18. 徐徐落地 2009年云计算五大趋势盘点
  19. sh_setup -yr 2019建立table报错command not found
  20. Perl正则表达式详解

热门文章

  1. 6.深入分布式缓存:从原理到实践 --- Memcached 周边技术
  2. 59. Event 例子
  3. php安全拦截,php类中的各种拦截器用法分析
  4. 如何遍历JTree的每一个节点
  5. vscode 中 eslint 的配置
  6. 4 个拥有绝佳命令行界面的终端程序
  7. Win7+Ubuntu双系统时间不一致
  8. VMware开启虚拟化实现CentOS创建KVM
  9. 使用Maven下载jar包
  10. 校验插入指定结点是否导致编码循环的示例函数