logstash简介

Logstash 是一个开源的数据收集引擎,它具有备实时数据传输能力。它可以统一过滤来自不同源的数据,并按照开发者的制定的规范输出到目的地。

logstash-2.2.2的配置:

从logstash-forward        到kafka的配置

ubuntu@sp1:~/logstashBeforeChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf

input {

lumberjack {

port => "5044"

ssl_certificate => "/home/ubuntu/logstash-2.2.2/config/lumberjack.crt"

ssl_key =>  "/home/ubuntu/logstash-2.2.2/config/lumberjack.key"

type => "fc_access"

}

}

output {

if "_grokparsefailure" not in [tags] {

#       stdout { codec => rubydebug }

kafka {

topic_id => "kafka_es"

bootstrap_servers => "sp1:9092,sp2:9092,sp3:9092,sp4:9092,sp5:9092,sp6:9092,sp7:9092"

compression_type => "snappy"

acks => ["1"]

value_serializer => "org.apache.kafka.common.serialization.StringSerializer"

timeout_ms => 10000

retries => 5

retry_backoff_ms => 100

send_buffer_bytes => 102400

workers => 2

}

}

}

从kafka到es配置

其中包括了对日志各个字段的解析,以及对异常日志过滤(同时注意其中过滤了 不属于当前时间前后5天的时间的日志,为了防止异常日志创建索引过多导致es报红)

ubuntu@sp1:~/logstashAfterChangeConf$ cat /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf

input {

kafka {

topic_id => "kafka_es"

group_id => "kafka_es"

zk_connect => "sp1:2181,sp2:2181,sp3:2181,sp4:2181,sp5:2181,sp6:2181,sp7:2181"

consumer_threads => 1

consumer_restart_on_error => true

consumer_restart_sleep_ms => 5000

decorate_events => true

consumer_timeout_ms => 1000

queue_size => 100

auto_offset_reset => "smallest"

rebalance_max_retries => 50

}

}

filter {

mutate {

add_field => [ "messageClone", "%{message}" ]

}

mutate {

split => { "messageClone" => '"' }

add_field => {"agent" => "%{[messageClone][3]}"}

}

useragent {

source => "agent"

}

mutate {

split => { "message" => " " }

add_field => {"timestamp" => "%{[message][0]}"}

add_field => {"reqtime" => "%{[message][1]}"}

add_field => {"clientIP" => "%{[message][2]}"}

add_field => {"squidCache" => "%{[message][3]}"}

add_field => {"repsize" => "%{[message][4]}"}

add_field => {"reqMethod" => "%{[message][5]}"}

add_field => {"requestURL" => "%{[message][6]}"}

add_field => {"username" => "%{[message][7]}"}

add_field => {"requestOriginSite" => "%{[message][8]}"}

add_field => {"mime" => "%{[message][9]}"}

add_field => {"referer" => "%{[message][10]}"}

add_field => {"agentCheck" => "%{[message][11]}"}

add_field => {"dnsGroup" => "%{[message][-1]}"}

remove_field => ["offset", "kafka", "@version", "file", "message", "messageClone"]

}

if [agentCheck] =~ "ChinaCache" {

grok { match => { "agentCheck" => "OOPS" } }

}

mutate {

convert => {

"timestamp" => "float"

"reqtime" => "integer"

"repsize" => "integer"

}

remove_field => ["agentCheck"]

}

ruby {

code => "event['timestamp_str'] = Time.at(event['timestamp']).strftime('%Y-%m-%dT%H:%M:%S.%LZ')"

}

date { match => [ "timestamp_str", "ISO8601" ]

}

mutate {

split => { "requestURL" => '/' }

add_field => {"uriHost" => "%{[requestURL][2]}"}

remove_field => ["timestamp_str"]

}

mutate {

join => { "requestURL" => '/' }

}

ruby {

code => "event.cancel if 5 * 24 * 3600 < (event['@timestamp']-::Time.now).abs"

}

}

output {

if "ChinaCache" not in [agent] {

#                   stdout { codec => "rubydebug" }

elasticsearch {

index => "logstash-%{+YYYY.MM.dd.HH}"

workers => 1

flush_size => 5000

idle_flush_time => 1

hosts => ["es-ip-1:9200","es-ip-2:9200","es-ip-3:9200","es-ip-4:9200","es-ip-5:9200","es-ip-6:9200","es-ip-7:9200"]

}

}

}

启动命令:

nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/after-kafa-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-after-kafka-access.log &

nohup /home/ubuntu/logstash-2.2.2/bin/logstash -f /home/ubuntu/logstash-2.2.2/config/before-kafka-access.conf 2>&1 > /home/ubuntu/logstash-2.2.2/logs/logstash-before-kafka.log &

logstash-6.1.1配置

从filbeat到kafka的配置

ubuntu@sp26:~/apps/logstash-6.1.1$ cat filebeat5055-kafkasp26-3.conf

input {

beats {

port => "5055"

type => "log"

}

}

output {

#   stdout { codec => rubydebug }

kafka {

codec => "json"

bootstrap_servers => "37:9092,38:9092,39:9092,40:9092,41:9092"

topic_id => "test"

compression_type => "snappy"

value_serializer => "org.apache.kafka.common.serialization.StringSerializer"

}

}

检测
/home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf  --config.test_and_exit
启动

nohup /home/ubuntu/apps/logstash-6.1.1/bin/logstash -f /home/ubuntu/apps/logstash-6.1.1/filebeat5055-kafkasp26-3.conf --config.reload.automatic   2>&1 >  /home/ubuntu/apps/logstash-6.1.1/logs/filebeat5055-kafkasp26-3.log  &

大数据日志分析系统-logstash相关推荐

  1. 小白玩大数据日志分析系统经典入门实操篇FileBeat+ElasticSearch+Kibana 实时日志系统搭建从入门到放弃

    大数据实时日志系统搭建 距离全链路跟踪分析系统第二个迭代已经有一小阵子了,由于在项目中主要在写ES查询\Storm Bolt逻辑,都没有去搭建实时日志分析系统,全链路跟踪分析系统采用的开源产品组合为F ...

  2. 大数据日志分析系统-hdfs日志存储

    先补充spark的博客链接,没在目录显示 hdfs简介: Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件(commodity hardware)上的分布式文件系统. 项目需求: 使用 ...

  3. 【计算机专业毕设之基于python猫咪网爬虫大数据可视化分析系统-哔哩哔哩】 https://b23.tv/jRN6MVh

    [计算机专业毕设之基于python猫咪网爬虫大数据可视化分析系统-哔哩哔哩] https://b23.tv/jRN6MVh https://b23.tv/jRN6MVh

  4. 大数据系统开发综合实践(淘宝双11大数据批处理分析系统、NBA 、淘宝购物大数据实时展示、Spark GraphX)

    cqupt || xmu--大数据系统开发综合实践 代码放在了GitHub上 链接 task01 大数据批处理系统 淘宝双11大数据批处理分析系统 task02 大数据查询分析计算系统 NBA 统计大 ...

  5. 基于Neo4j中医方剂药材知识图谱大数据可视化分析系统的设计与开发

    基于Neo4j中医方剂药材知识图谱大数据可视化分析系统的设计与开发 设计背景 这个系统的开发初衷是笔者希望通过这个系统来学习一下Neo4j的相关技术,包括与python.java的对接.可视化等方面, ...

  6. 毕业设计 - 地铁大数据客流分析系统 设计与实现

    文章目录 1 前言 1.1 实现目的 2 数据集 2.2 数据集概况 2.3 数据字段 3 实现效果 3.1 地铁数据整体概况 3.2 平均指标 3.3 地铁2018年9月开通运营的线路 3.4 客流 ...

  7. ByteV智能电网大数据可视化分析系统

    ByteV智能电网大数据可视化分析系统是一个面向电力行业管理部门的综合辅助决策平台,旨在通过大数据技术和系统全方位时空动态感知能力,为电力行业管理决策者提供及时.准确的多维数据分析结果信息,直观展现数 ...

  8. 电商大数据日志收集系统之EFK

    背景 日志管理的挑战: 关注点很多,任何一个点都有可能引起问题 日志分散在很多机器,出了问题时,才发现日志被删了 很多运维人员是消防员,哪里有问题去哪里 集中化日志管理思路: 日志收集 -->格 ...

  9. 大数据日志分析项目架构

    老是弹出由于您编辑时间过长,页面和服务器之间的连接已断开,请先将文章内容另外保存,再刷新本页面继续编辑让我保存页面我也是醉了,图片多没法一次上传,上传图片还一直失败,我只好都放在一个附件里面了.阿里能 ...

最新文章

  1. [转]MySQL查询表内重复记录
  2. web.config/app.config敏感数据加/解密的二种方法
  3. Neo4j-Cypher语言语法
  4. 使用c#接入华为云-内容审核
  5. TCP/IP协议网络模型
  6. 4乘4方格走的路线_苏州周边4个冷门自驾游路线景点推荐
  7. 用友php漏洞,用友CRM注入漏洞(无需登录通杀所有版本)
  8. python对文本数据进行采样_Python对wav文件的重采样实例
  9. 实时视频流(url)——延时显示
  10. [Java]jvm参数选项中文文档
  11. 怎样在百度地图上画圈_知识地图分享:你不是记忆差,你只是没找对方法
  12. python流水灯程序_单片机流水灯汇编语言源代码大全(六款流水灯汇编语言源代码)...
  13. 数值分析与算法——读书笔记(一)
  14. 5W2H法分析用户流失内因
  15. torch.Generator 随机数生成器
  16. php 屏蔽鼠标右键 复制,FLEX屏蔽鼠标右键
  17. Docker教程(一)入门教程
  18. 联想笔记本声音太小怎么办_笔记本电脑声音变小了怎么办 这里有妙招
  19. 小米路由器3刷openWRT系统的实践过程
  20. 汽车企业数字化转型成熟度评估模型研究

热门文章

  1. 全球3D打印机行业一流服务品牌
  2. JS实现网页开关灯效果
  3. 百度智能小程序巡检调度方案演进之路
  4. 【迭代器】迭代器相应型别
  5. 获取中文字符串的拼音
  6. 【Audio】Unity音频模块:加载、转换、剪切、混音、合并
  7. 计算机主机什么硬件组成,台式电脑机箱里面有哪些硬件组成?
  8. Excel 中的协方差阵
  9. Java集合的接口和类层次结构图以及代码示例
  10. zkh工业/润滑油//服// 务