强大的功能,丰富的插件,让logstash在数据处理的行列中出类拔萃

通常日志数据除了要入ES提供实时展示和简单统计外,还需要写入大数据集群来提供更为深入的逻辑处理,前边几篇ELK的文章介绍过利用logstash将kafka的数据写入到elasticsearch集群,这篇文章将会介绍如何通过logstash将数据写入HDFS

本文所有演示均基于logstash 6.6.2版本

数据收集

logstash默认不支持数据直接写入HDFS,官方推荐的output插件是webhdfs,webhdfs使用HDFS提供的API将数据写入HDFS集群

插件安装

插件安装比较简单,直接使用内置命令即可

# cd /home/opt/tools/logstash-6.6.2
# ./bin/logstash-plugin install logstash-output-webhdfs
复制代码

配置hosts

HDFS集群内通过主机名进行通信所以logstash所在的主机需要配置hadoop集群的hosts信息

# cat /etc/hosts
192.168.107.154 master01
192.168.107.155 slave01
192.168.107.156 slave02
192.168.107.157 slave03
复制代码

如果不配置host信息,可能会报下边的错

[WARN ][logstash.outputs.webhdfs ] Failed to flush outgoing items
复制代码

logstash配置

kafka里边的源日志格式可以参考这片文章:ELK日志系统之使用Rsyslog快速方便的收集Nginx日志

logstash的配置如下:

# cat config/indexer_rsyslog_nginx.conf
input {kafka {bootstrap_servers => "10.82.9.202:9092,10.82.9.203:9092,10.82.9.204:9092"topics => ["rsyslog_nginx"]codec => "json"}
}filter {date {match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]target => "time_local"}ruby {code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))"}ruby {code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))"}
}output {webhdfs {host => "master01"port => 50070user => "hadmin"path => "/logs/nginx/%{index.date}/%{index.hour}.log"codec => "json"}stdout { codec => rubydebug }
}
复制代码

logstash配置文件分为三部分:input、filter、output

input指定源在哪里,我们是从kafka取数据,这里就写kafka集群的配置信息,配置解释:

  • bootstrap_servers:指定kafka集群的地址
  • topics:需要读取的topic名字
  • codec:指定下数据的格式,我们写入的时候直接是json格式的,这里也配置json方便后续处理

filter可以对input输入的内容进行过滤或处理,例如格式化,添加字段,删除字段等等

  • 这里我们主要是为了解决生成HDFS文件时因时区不对差8小时导致的文件名不对的问题,后边有详细解释

output指定处理过的日志输出到哪里,可以是ES或者是HDFS等等,可以同时配置多个,webhdfs主要配置解释:

  • host:为hadoop集群namenode节点名称
  • user:为启动hdfs的用户名,不然没有权限写入数据
  • path:指定存储到HDFS上的文件路径,这里我们每日创建目录,并按小时存放文件
  • stdout:打开主要是方便调试,启动logstash时会在控制台打印详细的日志信息并格式化方便查找问题,正式环境建议关闭

webhdfs还有一些其他的参数例如compression,flush_size,standby_host,standby_port等可查看官方文档了解详细用法

启动logstash

# bin/logstash -f config/indexer_rsyslog_nginx.conf
复制代码

因为logstash配置中开了stdout输出,所以能在控制台看到格式化的数据,如下:

{"server_addr" => "172.18.90.17","http_user_agent" => "Mozilla/5.0 (iPhone; CPU iPhone OS 10_2 like Mac OS X) AppleWebKit/602.3.12 (KHTML, like Gecko) Mobile/14C92 Safari/601.1 wechatdevtools/1.02.1902010 MicroMessenger/6.7.3 Language/zh_CN webview/ token/e7b92168159736c30401a55589317d8c","remote_addr" => "172.18.101.0","status" => 200,"http_referer" => "https://ops-coffee.cn/wx02935bb29080a7b4/devtools/page-frame.html","upstream_response_time" => "0.056","host" => "ops-coffee.cn","request_uri" => "/api/community/v2/news/list","request_time" => 0.059,"upstream_status" => "200","@version" => "1","http_x_forwarded_for" => "192.168.106.100","time_local" => 2019-03-18T11:03:45.000Z,"body_bytes_sent" => 12431,"@timestamp" => 2019-03-18T11:03:45.984Z,"index.date" => "20190318","index.hour" => "19","request_method" => "POST","upstream_addr" => "127.0.0.1:8181"
}
复制代码

查看hdfs发现数据已经按照定义好的路径正常写入

$ hadoop fs -ls /logs/nginx/20190318/19.log
-rw-r--r--   3 hadmin supergroup       7776 2019-03-18 19:07 /logs/nginx/20190318/19.log
复制代码

至此kafka到hdfs数据转储完成

遇到的坑

HDFS按小时生成文件名不对

logstash在处理数据时会自动生成一个字段@timestamp,默认情况下这个字段存储的是logstash收到消息的时间,使用的是UTC时区,会跟国内的时间差8小时

我们output到ES或者HDFS时通常会使用类似于rsyslog-nginx-%{+YYYY.MM.dd}这样的变量来动态的设置index或者文件名,方便后续的检索,这里的变量YYYY使用的就是@timestamp中的时间,因为时区的问题生成的index或者文件名就差8小时不是很准确,这个问题在ELK架构中因为全部都是用的UTC时间且最终kibana展示时会自动转换我们无需关心,但这里要生成文件就需要认真对待下了

这里采用的方案是解析日志中的时间字段time_local,然后根据日志中的时间字段添加两个新字段index.dateindex.hour来分别标识日期和小时,在output的时候使用这两个新加的字段做变量来生成文件

logstash filter配置如下:

filter {# 匹配原始日志中的time_local字段并设置为时间字段# time_local字段为本地时间字段,没有8小时的时间差date {match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]target => "time_local"}# 添加一个index.date字段,值设置为time_local的日期ruby {code => "event.set('index.date', event.get('time_local').time.localtime.strftime('%Y%m%d'))"}# 添加一个index.hour字段,值设置为time_local的小时ruby {code => "event.set('index.hour', event.get('time_local').time.localtime.strftime('%H'))"}
}
复制代码

output的path中配置如下

path => "/logs/nginx/%{index.date}/%{index.hour}.log"
复制代码

HDFS记录多了时间和host字段

在没有指定codec的情况下,logstash会给每一条日志添加时间和host字段,例如:

源日志格式为

ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
复制代码

经过logstash处理后多了时间和host字段

2019-03-19T06:28:07.510Z %{host}  ops-coffee.cn | 192.168.105.91 | 19/Mar/2019:14:28:07 +0800 | GET / HTTP/1.1 | 304 | 0 | - | 0.000
复制代码

如果不需要我们可以指定最终的format只取message,解决方法为在output中添加如下配置:

codec => line {format => "%{message}"
}
复制代码

同时output到ES和HDFS

在实际应用中我们需要同时将日志数据写入ES和HDFS,那么可以直接用下边的配置来处理

# cat config/indexer_rsyslog_nginx.conf
input {kafka {bootstrap_servers => "localhost:9092"topics => ["rsyslog_nginx"]codec => "json"}
}filter {date {  match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]target => "@timestamp"}ruby {code => "event.set('index.date', event.get('@timestamp').time.localtime.strftime('%Y%m%d'))"}ruby {code => "event.set('index.hour', event.get('@timestamp').time.localtime.strftime('%H'))"}}output {elasticsearch {hosts => ["192.168.106.203:9200"]index => "rsyslog-nginx-%{+YYYY.MM.dd}"}webhdfs {host => "master01"port => 50070user => "hadmin"path => "/logs/nginx/%{index.date}/%{index.hour}.log"codec => "json"}
}
复制代码

这里我使用logstash的date插件将日志中的"time_local"字段直接替换为了@timestamp,这样做有什么好处呢?

logstash默认生成的@timestamp字段记录的时间是logstash接收到消息的时间,这个时间可能与日志产生的时间不同,而我们往往需要关注的时间是日志产生的时间,且在ELK架构中Kibana日志输出的默认顺序就是按照@timestamp来排序的,所以往往我们需要将默认的@timestamp替换成日志产生的时间,替换方法就用到了date插件,date插件的用法如下

date {  match => ["time_local","dd/MMM/yyyy:HH:mm:ss Z"]target => "@timestamp"
}
复制代码

match:匹配日志中的时间字段,这里为time_local

target:将match匹配到的时间戳存储到给定的字段中,默认不指定的话就存到@timestamp字段

另外还有参数可以配置:timezone,locale,tag_on_failure等,具体可查看官方文档


如果你觉得文章不错,请点右下角【在看】。如果你觉得读的不尽兴,推荐阅读以下文章:

  • ELK日志系统之使用Rsyslog快速方便的收集Nginx日志
  • ELK日志系统之通用应用程序日志接入方案

转载于:https://juejin.im/post/5c9198a1e51d454a63655e71

Logstash读取Kafka数据写入HDFS详解相关推荐

  1. kafka学习_《从0到1学习Flink》—— Flink 读取 Kafka 数据写入到 RabbitMQ

    前言 之前有文章 <从0到1学习Flink>-- Flink 写入数据到 Kafka 写过 Flink 将处理后的数据后发到 Kafka 消息队列中去,当然我们常用的消息队列可不止这一种, ...

  2. java读写德卡数据_Spark Streaming 读取Kafka数据写入ES

    简介: 目前项目中已有多个渠道到Kafka的数据处理,本文主要记录通过Spark Streaming 读取Kafka中的数据,写入到Elasticsearch,达到一个实时(严格来说,是近实时,刷新时 ...

  3. pandas读取文件数据、存储详解笔记

    本文是对 <利用Python进行数据分析>中关于数据读取的回顾性总计笔记,包含代码注释等. 目录 pd.read_csv和pd.read_table jsons数据读取 二级制数据读取 读 ...

  4. Java+大数据开发——HDFS详解

    1. HDFS 介绍  • 什么是HDFS 首先,它是一个文件系统,用于存储文件,通过统一的命名空间--目录树来定位文件. 其次,它是分布式的,由很多服务器联合起来实现其功能,集群中的服务器有各自的角 ...

  5. Flink1.11 读取kafka数据写入hive,未完待续

    昨天晚上Flink1.11出了,这次改动很多,我只关心hive这一部分. 目前尝试了几个小时用代码读取hive,安装官网的文档,没成功,先蹭个热点,记录下. 先贴一下依赖吧: 注意:反正各种报错,看社 ...

  6. 写入mysql_《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的东西当时是写死的,不能够通用,最近知识星 ...

  7. 《从0到1学习Flink》—— Flink 读取 Kafka 数据批量写入到 MySQL

    <!-- more --> 前言 之前其实在 <从0到1学习Flink>-- 如何自定义 Data Sink ? 文章中其实已经写了点将数据写入到 MySQL,但是一些配置化的 ...

  8. android json mysql_Android通过json向MySQL中读写数据的方法详解【读取篇】

    本文实例讲述了Android通过json向MySQL中读取数据的方法.分享给大家供大家参考,具体如下: 首先 要定义几个解析json的方法parseJsonMulti,代码如下: private vo ...

  9. Hadoop第五天--HDFS详解

    文章部分选自:https://blog.csdn.net/gwd1154978352/article/details/81095592 自己的话:层楼终究误少年,自由早晚乱余生 眼泪你别问,joker ...

最新文章

  1. 10 个冷门但又非常实用的 Docker 使用技巧
  2. iOS之深入解析静态库和动态库
  3. 简洁明了——STL容器库之set头文件常用函数集合
  4. 为pc编译配置安装当前最新的内核
  5. windows下最好的围棋_学围棋能使学习成绩提高吗?
  6. Modbus教程| Modbus协议,ASCII和RTU帧,Modbus工作
  7. 《数学之美》—图论和网络爬虫
  8. 大数据Hadoop基本概念介绍
  9. python常见函数抽样_Python中从列表中随机抽样函数的语法
  10. 计算机网络语音传输杂音回音,语音时有回音和杂音,怎么消除?
  11. canvas 的绘图模式 retained-mode(保存模式) 和 immediate-mode (立即模式)
  12. SAS(十二)PROC步
  13. Android4.2开发项目教程 Android4.2开发手机新闻移动客户端视频
  14. 双鱼座男适合学计算机专业,双鱼座男生适合的职业
  15. 机器学习:支持向量机
  16. 什么是tomcat?
  17. ALSA声卡驱动中的DAPM详解之一:kcontrol
  18. win11记事本出现乱码怎么恢复 windows11记事本出现乱码的解决方法
  19. 标题:2017-2018-20172309《程序设计与数据结构》课程总结
  20. 中国移动将对SIM卡进行节能减排改造

热门文章

  1. 什么是虚拟DOM(React16源码分析)
  2. requestIdleCallback函数
  3. express的基本用法
  4. tomcat连接oracle非常慢,关于myEclipse中tomcat 6.0启动慢的有关问题
  5. asc怎么用 linux zip_linux的asc文件怎么打开
  6. greys的简单使用
  7. VB 域名转换IP地址函数
  8. WordPress网站访问慢解决方案(超详细图文教程)
  9. 520礼包 | 情感分析算法从原理到PaddlePaddle实战全解
  10. 内含20万“不可描述”图片,这个数据集千万别在办公室打开