最近搭建一套日志监控平台,结合系统本身的特性总结一句话也就是:需要将Kafka中的数据导入到elasticsearch中。那么如何将Kafka中的数据导入到elasticsearch中去呢,总结起来大概有如下几种方式:

Kafka->logstash->elasticsearch->kibana(简单,只需启动一个代理程序)
Kafka->kafka-connect-elasticsearch->elasticsearch->kibana(与confluent绑定紧,有些复杂)
Kafka->elasticsearch-river-kafka-1.2.1-plugin->elasticsearch->kibana(代码很久没更新,后续支持比较差)

最终选择了第一种方案:
Logstash将topic数据写入es前可以做过滤ETL处理(总的来说还是比较好用的)
一、拓扑图

项目拓扑图如下所示:

此时消息的整体流向为:日志/消息整体流向Flume => kafka => logstash => elasticsearch => kibana(数据展示)
第一步:A .日志采集 Flume

Flume的指导工具书:
**Flume用户手册:============》**
https://flume.liyifeng.org/?spm=a2c4e.10696291.0.0.1c6e19a4oNUrUj#

agent.sources = r1
agent.channels = c1
agent.sinks = s1agent.sources.r1.type = exec
agent.sources.r1.command = tail -F -n 0 /data01/monitorRequst.log
agent.sources.r1.restart = true  //解决tail -F进程被杀死问题agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100agent.sinks.s1.type = avro
agent.sinks.s1.port = 50001
agent.sinks.s1.hostname = IP
agent.sources.r1.channels = c1
agent.sinks.s1.channel = c1

Flume日志收集过程中踩过的坑可以参考:http://www.digitalsdp.com/Experiencebbs/maintenance/506.jhtml

B.Kafka Sink

agent.sources = r1
agent.channels = c2
agent.sinks = s2agent.sources.r1.type = avro
agent.sources.r1.bind = IP
agent.sources.r1.port = 50001agent.channels.c2.type = memory
agent.channels.c2.capacity = 1000
agent.channels.c2.transactionCapacity = 100agent.sinks.s2.type = org.apace.flume.sink.kafka.KafkaSink
agent.sinks.s2.topic = XXX
agent.sinks.s2.brokerList = IP:9092,IP:9092
agent.sinks.s2.batchSize = 20agent.sources.r1.channels = c2
agent.sinks.s2.channel = c2

1.7以后 Flume支持tailDir source

自己另外的一种配置:

a1.sources=r1
a1.channels=c1 c2
a1.sinks=k1 k2# configure source
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /root/module/flume/flume-data/taildir_positiontest.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /applog/gmall2019/log/gateway.log
a1.sources.r1.fileHeader = true
a1.sources.r1.channels = c1 c2# configure channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.ch1.type = file
a1.channels.c1.checkpointDir =/root/module/flume/flume-data/gla2020/app_ch1_tmp1
a1.channels.c1.dataDirs =/root/module/flume/flume-data/gla2020/app_ch1_tmpdataa1.channels.c2.type = memory
a1.channels.c2.capacity=10000
a1.channels.c2.byteCapacityBufferPercentage=20# configure sink
# gla_api-sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = gateway_api
a1.sinks.k1.kafka.bootstrap.servers = node101:9092,node102:9092,node103:9092
a1.sinks.k1.kafka.flumeBatchSize = 2000
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.channel = c1

a1.sources.r1.positionFile = /root/module/flume/flume-data/taildir_positiontest.json

*这里记录着数据采集到哪里,另外Flume是具有事务的,数据可做到不丢失,即使环境宕机,重新启动后依然可顺次进行数据采集工作。
*
二、环境搭建

关于Kafka及Flume的搭建在这里不再详细论述,如有需要请参见本文其它说明。在这里重点说明logstash的安装及配置。

A.下载logstash的安装包;

B.新建kafka-logstash-es.conf置于logstash/conf目录下;

C.配置kafka-logstash-es.conf如下:

logstash的配置语法如下:

input {...#读取数据,logstash已提供非常多的插件,可以从file、redis、syslog等读取数据
}filter{...#想要从不规则的日志中提取关注的数据,就需要在这里处理。常用的有grok、mutate等
}output{...#输出数据,将上面处理后的数据输出到file、elasticsearch等
}

示例:
1:

input {kafka {zk_connect => "c1:2181,c2:2181,c3:2181"group_id => "elasticconsumer"   ---随意取topic_id => "xxxlog"  ---与flume中的Channel保持一致reset_beginning => falseconsumer_threads => 5 decorate_events => truecodec => "json"}}
output {elasticsearch {hosts => ["c4:9200","c5:9200"]index => "traceid"--与Kafka中json字段无任何关联关系,注意:index必须小写index => "log-%{+YYYY-MM-dd}"workers => 5codec => "json"}}

2:

input {kafka {bootstrap_servers => ["node101:9092"]client_id => "kafka_esend"group_id => "kafka_esend"#auto_offset_reset => "latest"auto_offset_reset => "earliest"consumer_threads => 3#decorate_events => truetopics => ["gla_api"]#type => "bhy"codec => "json"}
}filter {}
output {elasticsearch {hosts => ["node101:9200"]index => "gla_api"#index => "log-%{+YYYY-MM-dd}"#workers => 24#codec => "json"}#stdout {# JSON格式输出 控制台打印以json格式一行一行输出# codec => json_lines# }
}

示例2中,特别提示,如果不是为了测试看数据的写入情况,切记不要打印出数据,非常影响性能:
比如下面这个打印:(生产不要打印)
#stdout {
# JSON格式输出 控制台打印以json格式一行一行输出
# codec => json_lines

运行logstash命令为:nohup bin/logstash -f /XXX/logstash/conf/kafka-logstash-es.conf &

三、调测过程中遇到的一些坑

A.在集成ELK过程中总以为head插件是必须的,其实head插件为非必需品。elasticsearch仅提供了一个数据存储的煤介,head为了让大家更方便的去查看数据;

B.采用以上方案进行布署时,当系统正常运行时,可以在elasticsearch服务器上http://IP:9200/*中搜索index是否创建成功

参考:https://www.slahser.com/2016/04/21/日志监控平台搭建-关于Flume-Kafka-ELK/

      http://www.jayveehe.com/2017/02/01/elk-stack/http://wdxtub.com/2016/11/19/babel-log-analysis-platform-1/

**以下在举例几个Logstash的一些复杂配置============》**
供大家参考:
1 对数据进行过滤类型
conf1:

input {kafka {type => "ad"bootstrap_servers => "114.118.13.66:9092,114.118.13.66:9093,114.118.13.66:9094"client_id => "es_ad"group_id => "es_ad"auto_offset_reset => "latest" # 从最新的偏移量开始消费consumer_threads => 5decorate_events => true # 此属性会将当前topic、offset、group、partition等信息也带到message中topics => ["ad_nginx_access_log"] # 数组类型,可配置多个topictags => ["nginx", "ad_access"]}
}
filter {if [type] == "ad" {grok {match => { "message" => "%{IPORHOST:clientip} - (%{USERNAME:user}|-) \[%{HTTPDATE:log_timestamp}\] \"%{WORD:http_method} %{NOTSPACE:request_uri} (?:HTTP\/\d\.\d)\" %{NUMBER:http_status} %{NUMBER:body_bytes_send} \"%{DATA:http_refer}\" \"%{DATA:user_agent}\" \"%{DATA:x_forword_for}\" %{NUMBER:action_length_time}" }remove_field => ["message"]}date {match => [ "log_timestamp" , "YYYY-MM-dd:HH:mm:ss Z" ]}geoip {source => "clientip"}kv {source => "request_uri"field_split => "&?"value_split => "="include_keys => ["city_id","model","ad_position","osmodel","systemVersion","province_id","versionCode","mac","channel_number" ]}urldecode {all_fields => true}}
}
output {if [type] == "ad" {elasticsearch {hosts => ["114.118.10.253:9200"]index => "adlog-%{+YYYY-MM-dd}"document_type => "access_log"timeout => 300}}
}

conf2:

input {kafka {codec => "plain"group_id => "es2"bootstrap_servers => ["kafka1:9092,kafka2:9092,kafka3:9092"] # 注意这里配置的kafka的broker地址不是zk的地址auto_offset_reset => "earliest"topics => ["gamekafka"]}
}filter {mutate {split => { "message" => "    " }add_field => {"event_type" => "%{message[3]}""current_map" => "%{message[4]}""current_X" => "%{message[5]}""current_y" => "%{message[6]}""user" => "%{message[7]}""item" => "%{message[8]}""item_id" => "%{message[9]}""current_time" => "%{message[12]}"}remove_field => [ "message" ]}
}output {elasticsearch {index => "gamelogs"codec => plain {charset => "UTF-8"}hosts => ["elk1:9200", "elk2:9200", "elk3:9200"]}stdout {codec => rubydebug}
}
②启动写入到es的logstash

2 多个topic时Logstash的配置:
使用logstash-input-kafka消费信息并根据topic写入不同的es索引

分析:利用 decorate_events 参数,将 kafka的元信息(如:topic,group,offset等)全部输出,根据不同的topic即可输出到不同的es索引。

示例如下:

input{kafka{bootstrap_servers => ["55.0.10.19:9092,55.0.10.20:9092,55.0.10.21:9092"]group_id => "kafkaToes5" auto_offset_reset => "earliest"consumer_threads => 5 decorate_events => truetopics => ["a1","a2"]//security_protocol => "SASL_PLAINTEXT"     #sasl认证使用//sasl_mechanism => "PLAIN"                 #sasl认证使用//jaas_path => "/var/vcap/jobs/logstash/bin/kafka.conf" #sasl认证使用-认证填写的密码文件}
}
filter{if [@metadata][kafka][topic] == "a1" {json{source => "message"skip_on_invalid_json => true}mutate { remove_field => ["@version","@timestamp"]}}if [@metadata][kafka][topic] == "a2" {json{source => "message"skip_on_invalid_json => true}mutate { remove_field => ["@version","@timestamp"]}}
}
output {stdout{codec => json_lines}if [@metadata][kafka][topic] == "a1" {elasticsearch { index => "test123"hosts => ["55.0.10.10:9280","55.0.10.11:9280"]user => "admin"password => "123456"document_type=> "doc"}}if [@metadata][kafka][topic] == "a2" {elasticsearch { index => "test456"hosts => ["55.0.10.10:9280","55.0.10.11:9280"]user => "admin"password => "123456"document_type=> "doc"}}
}补充:
//认证填写的内容例子
// cat /var/vcap/jobs/logstash/bin/kafka.conf
KafkaClient {org.apache.kafka.common.security.plain.PlainLoginModule requiredusername="kafkaUser"password="kafkaPwd";
};

就先写到这里了 大家一起加油!!!

记以ELK结合的Web日志数据采集心得整理相关推荐

  1. ELK 处理 Spring Boot 日志,妙!

    以下文章来源方志朋的博客,回复"666"获面试宝典 在排查线上异常的过程中,查询日志总是必不可缺的一部分.现今大多采用的微服务架构,日志被分散在不同的机器上,使得日志的查询变得异常 ...

  2. ELK 处理 Spring Boot 日志,有点强悍!

    在排查线上异常的过程中,查询日志总是必不可缺的一部分.现今大多采用的微服务架构,日志被分散在不同的机器上,使得日志的查询变得异常困难.工欲善其事,必先利其器.如果此时有一个统一的实时日志分析平台,那可 ...

  3. Spring Boot整合ELK 处理为服务日志,妙!

    你知道的越多,不知道的就越多,业余的像一棵小草! 成功路上并不拥挤,因为坚持的人不多. 编辑:业余草 developer.ibm.com 推荐:https://www.xttblog.com/?p=5 ...

  4. 使用elk+redis搭建nginx日志分析平台(引)

    http://www.cnblogs.com/yjf512/p/4199105.html elk+redis 搭建nginx日志分析平台 logstash,elasticsearch,kibana 怎 ...

  5. ELK+redis搭建nginx日志分析平台

    ELK+redis搭建nginx日志分析平台 发表于 2015-08-19   |   分类于 Linux/Unix   |   ELK简介 ELKStack即Elasticsearch + Logs ...

  6. 几点基于Web日志的Webshell检测思路

    摘要: Web日志记录了网站被访问的情况,在Web安全的应用中,Web日志常被用来进行攻击事件的回溯和取证.Webshell大多由网页脚本语言编写,常被入侵者用作对网站服务器操作的后门程序,网站被植入 ...

  7. 10个好用的Web日志安全分析工具

    首先,我们应该清楚,日志文件不但可以帮助我们溯源,找到入侵者攻击路径,而且在平常的运维中,日志也可以反应出很多的安全攻击行为. 一款简单好用的Web日志分析工具,可以大大提升效率,目前业内日志分析工具 ...

  8. Web日志安全分析浅谈

    一.为什么需要对日志进行分析? 随着Web技术不断发展,Web被应用得越来越广泛,所谓有价值的地方就有江湖,网站被恶意黑客攻击的频率和网站的价值一般成正比趋势,即使网站价值相对较小,也会面对" ...

  9. ELK采集MySQL慢日志实现

    文章目录 一.ELK采集MySQL慢日志架构 二.filebeat 三.logstash 四.es+kibana 一.ELK采集MySQL慢日志架构 MySQL 服务器安装 Filebeat 作为 a ...

  10. 基于python的数据分析毕业设计-基于python的Web大数据采集和数据分析

    肖乐 丛天伟 严卫 摘要:该设计使用python语言作为开发语言,主要采用了两个框架:Scrapy和Django,用Scrapy来实现数据的采集技术,让数据采集效率更高,错误率低等:用Django来实 ...

最新文章

  1. js检测、控制表单输入必须为中文
  2. DataGridView新特色、常用操作
  3. 微软正式发布Windows 10 2020年10月更新
  4. Python Django设置中文语言及时区
  5. json、pickle
  6. 使用.pk8 和.pem签名生成.keystore 签名
  7. jsp页面中文乱码解决方法
  8. java运行环境简称_java程序的运行环境简称为什么?
  9. 关于腾讯云学生服务器搭建个人网站——配置web开发环境详细步骤
  10. qq空间不能访问解决方法
  11. Windows 10蓝牙只能发送文件到手机而无法从手机接收文件
  12. 一、DC DC电源转换电路设计
  13. 【SEED Labs 2.0】Packet Sniffing and Spoofing Lab
  14. 还在问视频音频转文字软件哪个好吗?快码住这两款
  15. 分类计数原理与分步计数原理_《分类加法计数原理与分步乘法计数原理》教学设计...
  16. spring-boot整合druid配置
  17. linux ps swn,Linux操作的基本概念与命令(转)
  18. C语言里文字颜色色和背景颜色设置
  19. STM32自学笔记15-步进电机驱动项目-磁编码器MT6816驱动
  20. kafka(三):kafka broker

热门文章

  1. 微信小程序wx.getUserInfo获取用户所在地区将拼音转换为中文的方法
  2. ABE或IBE中属性撤销的寻找最小覆盖集的基本算法
  3. 点餐APP 冲刺二总结
  4. 哈尔滨计算机毛校长国二,【实验视角】静待紫冰花开 知行合一 且行且知 ——记哈尔滨市实验学校校长王媛参加第二届中国阳明心学高峰论坛...
  5. 关于VM虚拟机一启动就会使电脑重启的问题
  6. pip安装pandas失败的问题
  7. 八股文-ArrayList
  8. sun.misc.BASE64Encoder是内部专用 API, 可能会在未来发行版中删除
  9. 软件工程第2次作业 | 结对项目-最长单词链
  10. 永久短网址生成 可以永久使用的短链接推荐