logstash redis kafka传输 haproxy日志
logstash 客户端收集 haproxy tcp日志
input {
file {
path => "/data/haproxy/logs/haproxy_http.log"
start_position => "beginning"
type => "haproxy_http"
}
file {
path => "/data/haproxy/logs/haproxy_tcp.log"
start_position => "beginning"
type => "haproxy_tcp"
}
}
filter {
if [type] == "haproxy_http" {
grok{
patterns_dir => "/data/logstash/patterns"
match => {"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{IPORHOST:syslog_server} %{SYSLOGPROG}: %{IP:client_ip}:%{INT:client_port} \[%{HAPROXYDATE:accept_date}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{INT:time_request}/%{INT:time_queue}/%{INT:time_backend_connect}/%{INT:time_backend_response}/%{NOTSPACE:time_duration} %{INT:http_status_code} %{NOTSPACE:bytes_read} %{FENG:captured_request_cookie} %{FENG:captured_response_cookie} %{NOTSPACE:termination_state} %{INT:actconn}/%{INT:feconn}/%{INT:beconn}/%{INT:srvconn}/%{NOTSPACE:retries} %{INT:srv_queue}/%{INT:backend_queue} \"%{WORD:verb} %{URIPATHPARAM:request} %{WORD:http_socke}/%{NUMBER:http_version}\""}
}
geoip {
source => "client_ip"
fields => ["ip","city_name","country_name","location"]
add_tag => [ "geoip" ]
}
} else if [type] == "haproxy_tcp" {
grok {
match => { "message" => "(?:%{SYSLOGTIMESTAMP:syslog_timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) %{IPORHOST:syslog_server} %{SYSLOGPROG}: %{IP:client_ip}:%{INT:client_port} \[%{HAPROXYDATE:accept_date}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{INT:time_queue}/%{INT:time_backend_connect}/%{NOTSPACE:time_duration} %{NOTSPACE:bytes_read} %{NOTSPACE:termination_state} %{INT:actconn}/%{INT:feconn}/%{INT:beconn}/%{INT:srvconn}/%{NOTSPACE:retries} %{INT:srv_queue}/%{INT:backend_queue}" }
}
}
}
output {
if [type] == "haproxy_http" {
redis {
host => "192.168.20.166"
port => "6379"
db => "5"
data_type => "list"
key => "haproxy_http.log"
}
} else if [type] == "haproxy_tcp" {
redis {
host => "192.168.20.166"
port => "6379"
db => "4"
data_type => "list"
key => "haproxy_tcp.log"
}
}
}
logstash 服务器端把 haproxy tcp日志写入到elasticsearch中
[root@logstashserver etc]# cat logstash.conf
input {
if [type] == "haproxy_http" {
redis {
host => "192.168.20.166"
port => "6379"
db => "5"
data_type => "list"
key => "haproxy_http.log"
}
} else if [type] == "haproxy_tcp" {
redis {
host => "192.168.20.166"
port => "6379"
db => "4"
data_type => "list"
key => "haproxy_tcp.log"
}
}
}
output {
if [type] == "haproxy_http" {
elasticsearch {
hosts => ["es1:9200","es2:9200","es3:9200"]
manage_template => true
index => "logstash-haproxy-http.log-%{+YYYY-MM-dd}"
}
}
if [type] == "haproxy_tcp" {
elasticsearch {
hosts => ["es1:9200","es2:9200","es3:9200"]
manage_template => true
index => "logstash-haproxy-tcp.log-%{+YYYY-MM-dd}"
}
}
}
#########################################kafka###############################################
客户端
input {
file {
path => "/data/haproxy/logs/haproxy_http.log"
start_position => "beginning"
type => "haproxy_http"
}
file {
path => "/data/haproxy/logs/haproxy_tcp.log"
start_position => "beginning"
type => "haproxy_tcp"
}
}
filter {
if [type] == "haproxy_http" {
grok{
patterns_dir => "/data/logstash/patterns"
match => {"message" => "%{SYSLOGTIMESTAMP:syslog_timestamp} %{IPORHOST:syslog_server} %{SYSLOGPROG}: %{IP:client_ip}:%{INT:client_port} \[%{HAPROXYDATE:accept_date}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{INT:time_request}/%{INT:time_queue}/%{INT:time_backend_connect}/%{INT:time_backend_response}/%{NOTSPACE:time_duration} %{INT:http_status_code} %{NOTSPACE:bytes_read} %{FENG:captured_request_cookie} %{FENG:captured_response_cookie} %{NOTSPACE:termination_state} %{INT:actconn}/%{INT:feconn}/%{INT:beconn}/%{INT:srvconn}/%{NOTSPACE:retries} %{INT:srv_queue}/%{INT:backend_queue} \"%{WORD:verb} %{URIPATHPARAM:request} %{WORD:http_socke}/%{NUMBER:http_version}\""}
}
geoip {
source => "client_ip"
fields => ["ip","city_name","country_name","location"]
add_tag => [ "geoip" ]
}
} else if [type] == "haproxy_tcp" {
grok {
match => { "message" => "(?:%{SYSLOGTIMESTAMP:syslog_timestamp}|%{TIMESTAMP_ISO8601:timestamp8601}) %{IPORHOST:syslog_server} %{SYSLOGPROG}: %{IP:client_ip}:%{INT:client_port} \[%{HAPROXYDATE:accept_date}\] %{NOTSPACE:frontend_name} %{NOTSPACE:backend_name}/%{NOTSPACE:server_name} %{INT:time_queue}/%{INT:time_backend_connect}/%{NOTSPACE:time_duration} %{NOTSPACE:bytes_read} %{NOTSPACE:termination_state} %{INT:actconn}/%{INT:feconn}/%{INT:beconn}/%{INT:srvconn}/%{NOTSPACE:retries} %{INT:srv_queue}/%{INT:backend_queue}" }
}
}
}
output {
if [type] == "haproxy_http" {
kafka { #输出到kafka
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092" #他们就是生产者
topic_id => "haproxy_http.log" #这个将作为主题的名称,将会自动创建
compression_type => "snappy" #压缩类型
}
} else if [type] == "haproxy_tcp" {
kafka { #输出到kafka
bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092" #他们就是生产者
topic_id => "haproxy_tcp.log" #这个将作为主题的名称,将会自动创建
compression_type => "snappy" #压缩类型
}
}
}
服务器端
input {
if [type] == "haproxy_http" {
kafka {
zk_connect => "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
topic_id => "haproxy_http.log"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
} else if [type] == "haproxy_tcp" {
kafka {
zk_connect => "zookeeper1:2181,zookeeper2:2181,zookeeper3:2181"
topic_id => "haproxy_tcp.log"
reset_beginning => false
consumer_threads => 5
decorate_events => true
}
}
}
output {
if [type] == "haproxy_http" {
elasticsearch {
hosts => ["es1:9200","es2:9200","es3:9200"]
manage_template => true
index => "logstash-haproxy-http.log-%{+YYYY-MM-dd}"
}
}
if [type] == "haproxy_tcp" {
elasticsearch {
hosts => ["es1:9200","es2:9200","es3:9200"]
manage_template => true
index => "logstash-haproxy-tcp.log-%{+YYYY-MM-dd}"
}
}
}
转载于:https://www.cnblogs.com/fengjian2016/p/5919702.html
logstash redis kafka传输 haproxy日志相关推荐
- logstash通过kafka传输nginx日志(三)
单个进程 logstash 可以实现对数据的读取.解析和输出处理.但是在生产环境中,从每台应用服务器运行 logstash 进程并将数据直接发送到 Elasticsearch 里,显然不是第一选择:第 ...
- ELK之收集haproxy日志
由于HAProxy的运行信息不写入日志文件,但它依赖于标准的系统日志协议将日志发送到远程服务器(通常位于同一系统上),所以需要借助rsyslog来收集haproxy的日志.haproxy代理nginx ...
- ELK日志服务使用-kafka传输日志(bbotte.com)
本文转载于 http://bbotte.com/ ELK日志服务使用-kafka传输日志 对于日志传输,rsyslog或者logstash也就够用了,一般的redis,kafka,主要是作为缓冲或序 ...
- CENTOS6.5安装日志分析ELK elasticsearch + logstash + redis + kibana
1.日志平台的工作流程 多个独立的agent(Shipper)负责收集不同来源的数据,一个中心agent(Indexer)负责汇总和分析数据,在中心agent前的Broker(使用redis实现)作为 ...
- Logstash+Redis+Elasticsearch+Kibana+Nginx搭建日志分析系统
为什么80%的码农都做不了架构师?>>> 前言: 随着实时分析技术的发展及成本的降低,用户已经不仅仅满足于离线分析.目前我们服务的用户包括微博.微盘.云存储.弹性计算平台等十多 ...
- ELK学习3_使用redis+logstash+elasticsearch+kibana快速搭建日志平台
日志的分析和监控在系统开发中占非常重要的地位,系统越复杂,日志的分析和监控就越重要,常见的需求有: 根据关键字查询日志详情 监控系统的运行状况 统计分析,比如接口的调用次数.执行时间.成功率等 异常数 ...
- ELK+Kafka集群日志分析系统
因为是自己本地写好的word文档复制进来的.格式有些出入还望体谅.如有错误请回复.谢谢! 一. 系统介绍 2 二. 版本说明 3 三. 服务部署 3 1) JDK部署 3 2) Elasticsear ...
- Elasticsearch、Logstash、Kibana搭建统一日志分析平台
ELKstack是Elasticsearch.Logstash.Kibana三个开源软件的组合.目前都在Elastic.co公司名下. ELK是一套常用的开源日志监控和分析系统,包括一个分布式索引与搜 ...
- 为什么物联网大数据平台,使用TDengine,可不要redis, kafka, spark等软件?
为什么物联网大数据平台,使用TDengine,可不要redis, kafka, spark等软件? TDengine是一高效的时序空间大数据处理引擎,因为充分利用物联网.车联网.工业互联网等场景的数据 ...
最新文章
- php要怎么使用imagettftext_延长防腐木使用要怎么做呢?
- 统计学习导论 Chapter4--Classification
- redis-sentinel高可用配置(2)
- ES6 Proxy的简单使用
- HT For Web 拓扑图背景设置
- java中可以作为GC Roots的对象
- CRTMPServer 在CentOS 64-bit下的编译(转)
- python嵩天第七章课后题答案_python语言程序设计嵩天第七章答案
- Android 4 2官方文档chm格式下载
- VCIX-NV学习指南
- 20201016:力扣第210周周赛题解(下)
- 开源社群系统 ThinkSNS+ 0.7.4 版本发布
- 《代码大全》阅读笔记01
- 夏至日计算公式及“三伏”的日期算法问题
- solid works旋转、抽壳的应用
- VUE前端应用部署页面访问404问题
- Tests run: 1, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 8.074 s <<< FAILURE - in com.xxx.X
- 细谈JVM垃圾回收与部分底层实现
- K8S故障排查指南:部分节点无法启动Pod资源-Pod处于ContainerCreating状态
- 怎样提高深度睡眠时间,五个助眠小妙招帮助你快速入睡