原文作者:归来朝歌

原文地址:logstash之Input插件

1、stdin标准输入和stdout标准输出

Logsrtash含有两个非常重要的基础插件,input与output;首先执行命令:

bin/logstash -e 'input { stdin { } } output { stdout { codec   => rubydebug } }'   

程序启动之后输入:hello logstash

2、监控日志文件变化

Logstash 使用一个名叫 FileWatch 的 Ruby Gem 库来监听文件变化。这个库支持 glob 展开文件路径,而且会记录一个叫 .sincedb的数据库文件来跟踪被监听的日志文件的当前读取位置。所以,不要担心 logstash 会漏过你的数据(数据量并不是特别大的前提下)

创建logstash_conf文件(名字自定义):monitor_file.conf

input{file{path => "/home/angel/servers/logstash-5.5.2/monitor_log/ "type => "log"start_position => "beginning"}
}
output{stdout{codec=>rubydebug}
}

启动之后,logstash就会监控 /home/angel/servers/logstash-5.5.2/monitor_log/logs.txt文件,如果发现文件中有内容,则以标准输出的形式输出到控制台;

检测logstash 配置文件

bin/logstash -f /home/angel/servers/logstash-5.5.2/logstash_conf/monitor_file.conf -t

启动logstash:

bin/logstash -f /home/angel/servers/logstash-5.5.2/logstash_conf/monitor_file.conf

测试:

echo "hello logstash" >> /home/angel/servers/logstash-5.5.2/monitor_log/logs.txt  

配置参数说明:

  • Path=>表示监控的文件路径
  • Type=>给类型打标记,用来区分不同的文件类型。
  • Start_postion=>从哪里开始记录文件,默认是从结尾开始标记,要是你从头导入一个文件就把改成”beginning”.
  • discover_interval=>多久去监听path下是否有文件,默认是15s
  • exclude=>排除什么文件
  • close_older=>一个已经监听中的文件,如果超过这个值的时间内没有更新内容,就关闭监听它的文件句柄。默认是3600秒,即一个小时。
  • sincedb_path=>监控库存放位置(默认的读取文件信息记录在哪个文件中)。默认在:/data/plugins/inputs/file。
  • sincedb_write_interval=> logstash 每隔多久写一次 sincedb 文件,默认是 15 秒。
  • stat_interval=>logstash 每隔多久检查一次被监听文件状态(是否有更新),默认是 1 秒。

3、tcp插件

TCP是一种网络传输控制协议,很多公司的数据不一定是在本地的,而是在传输网络的;这个时候使用TCP建立连接后,通信双方就可以进行数据传输了;Logstash提供了TCP插件,TCP插件可以监控某个端口,当数据打入logstash监听的端口队列的时候,logstash就可以进行数据的采集;

add_field=> 可选项(添加自定义字段)
codec=> 可选项 (编码解码)
data_timeout=> 可选项 (超时时间,秒为单位。如果设置-1,则永不超时,默认是5
host=> 可选项 (主机地址,字符串类型,如"localhost"或者"192.168.0.1 )
mode=> 可选项 (值是["server","client"]其中之一,默认是server)
port=> 必填项 (远程监听的端口)
ssl_cacert=> 可选项,ssl认证相关
ssl_cert=>  
ssk_key=>  
ssl_enable=> 是否开启ssl认证
tags=> 可选项 用于增加一些标签,这个标签可能在后续的处理中起到标志的作用
type=> 可选项 标记事件类型,通过type判断

vim monitor_tcp.conf

input{tcp {port => 9876mode => "server"  //值是["server","client"]其中之一,默认是server  ssl_enable => false}
}
output{stdout{}
}

启动并且监听到9876端口;

检测logstash 配置文件

bin/logstash -f /home/angel/servers/logstash-5.5.2/logstash_conf/monitor_tcp.conf -t

启动logstash:

bin/logstash -f /home/angel/servers/logstash-5.5.2/logstash_conf/monitor_tcp.conf

写一段java的socket代码向9876发送数据:

public static void main(String[] args) throws Exception{// 向服务器端发送请求,服务器IP地址和服务器监听的端口号Socket client = new Socket("hadoop01", 9876);
​// 通过printWriter 来向服务器发送消息PrintWriter printWriter = new PrintWriter(client.getOutputStream());System.out.println("连接已建立...");for(int i=0;i<10;i++){// 发送消息printWriter.println("hello logstash , 这是第"+i+" 条消息");
​printWriter.flush();}
​
}

4、syslog插件

syslog**机制负责记录内核和应用程序产生的日志信息,管理员可以通过查看日志记录,来掌握系统状况**。默认系统已经安装了rsyslog.直接启动即可。

vim /etc/rsyslog.conf,增加如下代码,将日志信息发送至6789端口:

*.* @@hadoop01:6789

重启日志:

/etc/init.d/rsyslog restart

vim monitor_syslog.conf

input{tcp{port=> 6789type=> syslog}udp{port=> 6789type=> syslog}
}
​
filter{if [type] == "syslog" {grok {match => { "message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{SYSLOGHOST:syslog_hostname} %{DATA:syslog_program}(?:\[%{POSINT:syslog_pid}\])?: %{GREEDYDATA:syslog_message}" }add_field => [ "received_at", "%{@timestamp}" ]add_field => [ "received_from", "%{host}" ]}date {match => [ "syslog_timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]}}
}
​
output{stdout{codec=> rubydebug}
}

启动:

bin/logstash -f /home/angel/servers/logstash-5.5.2/logstash_conf/monitor_syslog.conf

通过telnet hadoop01 6789传输数据,【注意,在logstash中的grok是正则表达式,用来解析当前数据】

Dec 23 12:11:43 louis postfix/smtpd[31499]: connect from unknown[95.75.93.154] Jun 05 08:00:00 louis named[16000]: client 199.48.164.7#64817: query (cache) 'amsterdamboothuren.com/MX/IN' denied Jun 05 08:10:00 louis CRON[620]: (www-data) CMD (php /usr/share/cacti/site/poller.php >/dev/null 2>/var/log/cacti/poller-error.log) Jun 05 08:05:06 louis rsyslogd: [origin software="rsyslogd" swVersion="4.2.0" x-pid="2253" x-info="http://www.rsyslog.com"] rsyslogd was HUPed, type 'lightweight'. ​

日志截图

5、logstash的codec监控nginx日志

Codec 来自 Coder/decoder两个单词的首字母缩写;Logstash 不只是一个input | filter | output 的数据流,而是一个input | decode | filter | encode | output 的数据流,codec 就是用来decode、encode 事件的。简单说,就是在logstash读入的时候,通过codec编码解析日志为相应格式,从logstash输出的时候,通过codec解码成相应格式。如果当前的日志是codec的,可以避免使用grok,从而提高开发效率和性能

编辑nginx文件,设置log格式:

log_format json '{"@timestamp":"$time_iso8601",''"@version":"1",''"host":"$server_addr",''"client":"$remote_addr",''"size":$body_bytes_sent,''"responsetime":$request_time,''"domain":"$host",''"url":"$uri",''"status":"$status"}';
access_log  logs/access_codec.log  json;

截图:

Nginx生成kibana浏览器的日志:

server {listen 80;server_name hadoop01;location / {proxy_pass http://hadoop01:5601;proxy_set_header Upgrade $http_upgrade;proxy_set_header Connection 'upgrade';proxy_set_header Host $host;proxy_cache_bypass $http_upgrade;}}



vim nginx_codec.conf

input {file {path => "/usr/local/nginx/logs/access_codec.log"codec => "json"}
}
output{stdout{codec =>rubydebug}
}

启动nginx:

/usr/local/nginx/sbin

启动logstash:

bin/logstash -f /home/angel/logstash-5.5.2/logstash_conf/nginx_codec.conf

6、合并多行数据(Multiline)

在处理日志时,除了访问日志外,还要处理运行时日志,该日志大都用程序写的,比如 log4j。运行时日志跟访问日志最大的不同是,运行时日志是多行,也就是说,连续的多行才能表达一个意思。multiline 插件来说,有三个设置比较重要:negate、pattern 和 what

negate   类型是boolean   默认false 如果没有匹配,那么否定正则表达式
Pattern    类型是string    没有默认值 要匹配的正则表达式
What previous或者next 没有默认值 如果正则表达式匹配了,那么该事件是属于(属于下一次触发的事件)还是上一个(触发的事件)

【注意】使用mutiline之前,需要安装插件:

bin/logstash-plugin install logstash-filter-multiline
//如果报错,请使用:
bin/logstash-plugin install --version 3.0.4 logstash-filter-multiline//如果还是报错,请下载logstash-filter-multiline-3.0.4.gem,然后安装
bin/logstash-plugin install /home/angel/logstash-5.5.2/vendor/bundle/jruby/1.9/cache/logstash-filter-multiline-3.0.4.gem
  • 百度搜索:logstash-filter-multiline-3.0.4.gem
  • 将gem文件放入这个理:/home/angel/logstash-5.5.2/vendor/bundle/jruby/1.9/cache
  • bin/logstash-plugin install /home/angel/logstash-5.5.2/vendor/bundle/jruby/1.9/cache/logstash-filter-multiline-3.0.4.gem

vim multiline.conf

input {file{path=>"/home/angel/logstash-5.5.2/logs/mutil/mu_log.out"type=>"runtimelog"codec=> multiline {pattern => "^\["negate => truewhat => "previous"}start_position=>"beginning"sincedb_path=>"/home/angel/logstash-5.5.2/logs/mutil/sincedb-access"ignore_older=>0}
}
filter{multiline{pattern => "^\["negate => truewhat => "previous"}date {match => [ "timestamp", "yyyy-MM-dd-HH:mm:ss" ]locale => "cn"}grok {match=>["message","\[%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level}\] %{GREEDYDATA:msg}"]}
}
output{stdout{codec => rubydebug}
}

启动

bin/logstash -f /home/angel/logstash-5.5.2/logstash_conf/multiline.conf

向/home/angel/logstash-5.5.2/logs/mutil/mu_log.out文件中输入测试日志:

echo "[16-04-12 03:40:01 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.
[16-04-12 03:40:02 DEBUG] impl.JdbcEntityInserter:- from product_category product_category where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null order by product_category.ORDERS asc
[16-04-12 03:40:03 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.
[16-04-12 03:40:04 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over.
[16-04-12 03:40:05 DEBUG] impl.JdbcEntityInserter:- from product_category product_category where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is nullorder by product_category.ORDERS desc
[16-04-12 03:40:06 DEBUG] impl.JdbcEntityInserter:- from product_category product_category where product_category.PARENT_ID is null and product_category.STATUS = ? and product_category.DEALER_ID is null order by product_category.ORDERS asc
[16-04-12 03:40:07 DEBUG] model.MappingNode:- ['/store/shopclass'] matched over" >> /home/angel/logstash-5.5.2/logs/mutil/mu_log.out

ELK技术栈—Logstash—Input插件相关推荐

  1. ELK技术栈-Logstash的详细使用

    本文作者:罗海鹏,叩丁狼高级讲师.原创文章,转载请注明出处. 前言 在第九章节中,我们已经安装好Logstash组件了,并且启动实例测试它的数据输入和输出,但是用的是最简单的控制台标准输入和标准输出, ...

  2. ELK技术栈—Logstash—基础介绍

    原文作者: 原文地址: 1.概述 官网介绍:Logstash is an open source data collection engine with real-time pipelining ca ...

  3. ELK技术栈—Kibana

    原文作者:少年阿峣_从零单排 原文地址:Kibana介绍.安装和使用 目录 1.介绍 2.优势 3.安装 4.使用 5. X-pack插件 6.Kibana+X-Pack介绍使用 1.介绍 Kiban ...

  4. 云计算大数据:ELK技术栈介绍

    一. ELK工作栈简介 1. 简介 ELK Stack 是 Elasticsearch.Logstash.Kibana 三个开源软件的组合.在实时数据检索和分析场合,三者通常是配合共用,而且又都先后归 ...

  5. 利用ELK技术栈收集nginx日志

    之前的一篇文章已经介绍如何使用nginx写入post的数据入日志,详细见链接: nginx的post命令记录body到日志中 接下来使用filebeat.logstash.elasticsearch把 ...

  6. [基础服务-windows] [ELK] ElasticSearch + Kibana + Logstash 以及插件安装和配置

    步骤/详情 一:下载 注意的是下载版本为免安装版.下载地址: https://www.elastic.co/cn/downloads/elasticsearch 笔者由于当前用的是JDK8选择的是 7 ...

  7. 2018/2/11 ELK技术栈之ElasticSearch学习笔记二

    终于有时间记录一下最近学习的知识了,其实除了写下的这些还有很多很多,但懒得一一写下了: ElasticSearch添加修改删除原理: ElasticSearch的倒排索引和文档一旦生成就不允许修改(其 ...

  8. ELK技术栈(四) elasticsearch 数据聚合 数据同步

    目录 一.数据聚合 1.Bucket聚合 2.Metries聚合 3.自动补全 二.数据同步 1.同步通知 2.异步通知 3.binlog监听 4.小结 5.案例:基于MQ实现数据同步 一.数据聚合 ...

  9. 学习笔记:SpringCloud 微服务技术栈_实用篇①_基础知识

    若文章内容或图片失效,请留言反馈.部分素材来自网络,若不小心影响到您的利益,请联系博主删除. 前言 学习视频链接 SpringCloud + RabbitMQ + Docker + Redis + 搜 ...

最新文章

  1. golang 项目的目录结构
  2. Linux局域网搭建
  3. [转] c++的多态(一个接口,多种实现)
  4. Sharepoint学习笔记—ECM系列--4 根据位置设置的默认元数据值(Location-Based Metadata Defaults)
  5. define定义的是什么类型_为什么Django 3后建议使用Field.choices枚举类型定义choices选项...
  6. 99%程序员不知道的编程必备工具,人工智能助你编程更轻松
  7. POJ 2590 Steps (ZOJ 1871)
  8. 【TODO】HTML label
  9. 【干货】企业如何进行数字化转型及如何称为数据驱动型企业?
  10. 百度文库 复制文本 下载文档
  11. 2022美国大学生数学建模竞赛
  12. Ace Admin中表格按钮的使用方法——表格导出为xls、pdf,表格打印预览等
  13. RK987A键盘蓝牙连接电脑
  14. 35岁的程序员:第14章,前奏
  15. 开心下单助手v1.0免费版
  16. 以太网芯片MAC和PHY
  17. HTML+CSS个人笔记
  18. 【图解数据结构与算法】视频教程正式上线B站,持续更新中......
  19. 害怕,刷人超过70%?3招应对校招笔试|大厂笔试自救指南|应届生必看
  20. 论文阅读5 | Recent Advances in Data-Driven Wireless Communication Using Gaussian Processes: A Comprehens

热门文章

  1. Bootstrap3免费单页面模板-Shuffle
  2. 什么是Intent? 转
  3. CommandBehavior.CloseConnection的使用
  4. AS3 RPG游戏引擎开发日志3:地图坐标转换
  5. MySQL 优化之 index_merge (索引合并)
  6. Redis 常用监控信息命令总结
  7. CentOS 7下安装Python3.6.4
  8. 软件测试与评估:Keep/悦跑圈
  9. leecode-数组-27Remove Element-java
  10. Visual Studio 2010 将网站直接发布到远程站点