再见 Logstash,是时候拥抱下一代开源日志收集系统 Fluentd 了
公众号关注 「奇妙的 Linux 世界」
设为「星标」,每天带你玩转 Linux !
fluentd 是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和 HTTP 请求内容。数据被收集后按照用户配置的解析规则,形成一系列 event。每一个 event 包含如下内容:
tag = xxx
time = xxx
record = {"key1": "value1","key2": "value2"
}
其中:
tag:为数据流的标记。fluentd 中可以具有多个数据源,解析器,过滤器和数据输出。他们之前使用 tag 来对应。类似于数据流按照 tag 分组。数据流向下游的时候只会进入 tag 相匹配的处理器。
time:event 产生的时间,该字段通常由日志内的时间字段解析出来。
record:日志的内容,为 JSON 格式。
fluentd 支持多种数据的解析过滤和输出操作。其中常用的有:
tail 输入:增量读取日志文件作为数据源,支持日志滚动。
exec 输入:定时执行命令,获取输出解析后作为数据源。
syslog 输出:解析标准的 syslog 日志作为输入。
forward 输入:接收其他 fluentd 转发来的数据作为数据源。
dummy:虚拟数据源,可以定时产生假数据,用于测试。
regexp 解析器:使用正则表达式命名分组的方式提取出日志内容为 JSON 字段。
record_transformer 过滤器:人为修改 record 内的字段。
file 输出:用于将 event 落地为日志文件。
stdout:将 event 输出到 stdout。如果 fluentd 以 daemon 方式运行,输出到 fluentd 的运行日志中。
forward:转发 event 到其他 fluentd 节点。
copy:多路输出,复制 event 到多个输出端。
kafka:输出 event 到 Kafka。
webhdfs:输出 event 到 HDFS。
elasticsearch:输出 event 到 HDFS。
接下来以官网介绍为基础,穿插自己的理解,介绍下 fluentd 的使用方法。
安装启动方法
官网安装步骤链接:https://docs.fluentd.org/installation/install-by-rpm
下面是精简的在 CentOS 下的安装步骤。打开 shell,执行如下命令:
curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | shsystemctl start td-agent
可以安装并启动 fluentd。
配置文件位置
编辑 fluentd 配置文件的方法:
vim /etc/td-agent/td-agent.conf
修改运行用户和组
默认来说 fluentd 使用 td-agent 用户启动。如果需要修改 fluentd 的用户,需要执行:
vim /usr/lib/systemd/system/td-agent.service
文件内容如下所示:
[Unit]
Description=td-agent: Fluentd based data collector for Treasure Data
Documentation=https://docs.treasuredata.com/articles/td-agent
After=network-online.target
Wants=network-online.target[Service]
User=td-agent
Group=td-agent
LimitNOFILE=65536
Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so
Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf
Environment=FLUENT_PLUGIN=/etc/td-agent/plugin
Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock
Environment=TD_AGENT_LOG_FILE=/var/log/td-agent/td-agent.log
Environment=TD_AGENT_OPTIONS=
EnvironmentFile=-/etc/sysconfig/td-agent
PIDFile=/var/run/td-agent/td-agent.pid
RuntimeDirectory=td-agent
Type=forking
ExecStart=/opt/td-agent/embedded/bin/fluentd --log $TD_AGENT_LOG_FILE --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONS
ExecStop=/bin/kill -TERM ${MAINPID}
ExecReload=/bin/kill -HUP ${MAINPID}
Restart=always
TimeoutStopSec=120[Install]
WantedBy=multi-user.target
修改Service
部分User
和Group
配置项可以更改 fluentd 进程的用户和组。
检测配置文件是否正确的方法
在 shell 中运行:
/opt/td-agent/embedded/bin/fluentd -c /etc/td-agent/td-agent.conf
观察输出,如果有错误会给出对应提示。
数据流逻辑
fluentd 以 tag 值为基准,决定数据的流经哪些处理器。
数据的流向为:source -> parser -> filter -> output
input 配置
tail
增量读取日志文件。需要提供一个用于标记已经读取到位置的文件(position file)所在的路径。
tail 针对日志滚动的支持:tail 方式采用跟踪文件 inode 的方式进行。比如日志名为app.log
,如果日志发生滚动,被重命名为app.log.1
。文件重命名的时候 inode 是不会改变的。因此发生滚动时写入到旧文件末尾的日志也可以被收集到。tail 会跟踪旧文件的 inode 一段时间(rotate_wait
配置),这段时间过去之后,tail 不再监听app.log.1
,开始监听新的app.log
文件。
tail 方式的示例配置:
<source>@type tailpath /var/log/httpd-access.logpos_file /var/log/td-agent/httpd-access.log.postag apache.access<parse>@type apache2</parse>
</source>
注意:如果文件发生修改会输出全量文件内容。
配置项解释
tag:数据源的 tag 值。*
号可以扩展为 path(/
替换为.
)。例如
path /path/to/file
tag foo.*
tag 会被扩展为foo.path.to.file
path:配置读取的路径。可以使用*
或者是strftime
。例如:
path /path/to/%Y/%m/%d/*
如果今天是 2020 年 1 月 2 日,fluentd 会读取/path/to/2020/01/02
目录下的内容。也可以配置多个路径,使用逗号分隔:
path /path/to/a/*,/path/to/b/c.log
exclude_path:排除部分目录或文件,使用数组格式配置。
path /path/to/*
exclude_path ["/path/to/*.gz", "/path/to/*.zip"]
refresh_interval:多长时间刷新一次文件监听列表,配合*
使用才有意义。
pos_file:位置文件地址。这个文件保存了监听的日志文件已经读取到第几行。该项一定要配置。注意,不要在多个 source 之间共用 pos file,否则会出现问题。pos_file_compaction_interval:pos file 文件压缩时间间隔。用于压缩 pos file 中不再监听的记录,不可解析的记录以及重复的记录。
parse 标签:用于指定 log 的解析器(必须的配置项)。例如:
# json
<parse>@type json
</parse># regexp
<parse>@type regexpexpression ^(?<name>[^ ]*) (?<user>[^ ]*) (?<age>\d*)$
</parse>
path_key:如果配置此项,监控文件的 path 会在 event 中,此项的 key 为path_key
。例如:
path /path/to/access.log
path_key tailed_path
生成的数据如下所示:
{"tailed_path":"/path/to/access.log","k1":"v1",...,"kN":"vN"}
rotate_wait:日志发生滚动的时候,可能会有部分日志仍然输出在旧的日志文件,此时需要保持监听旧日志文件一段时间,这个时间配置就是rotate_wait
。
exec
周期性执行命令,抽取命令输出为 event。
示例配置:
<source>@type execcommand cmd arg arg<parse>keys k1,k2,k3</parse><extract>tag_key k1time_key k2time_format %Y-%m-%d %H:%M:%S</extract>run_interval 10s
</source>
以上命令的含义为每 10 秒钟执行cmd arg arg
命令,提取命令执行结果,以空白字符分隔三个字段的值为 k1,k2,k3。其中 k1 的值作为 tag,k2 作为时间字段,使用%Y-%m-%d %H:%M:%S
格式。
一个例子,周期获取系统的平均负载。配置方法如下:
<source>@type exectag system.loadavgcommand cat /proc/loadavg | cut -d ' ' -f 1,2,3run_interval 1m<parse>@type tsvkeys avg1,avg5,avg15delimiter " "</parse>
</source>
输出的日志格式为:
2018-06-29 17:27:35.115878527 +0900 system.loadavg: {"avg1":"0.30","avg5":"0.20","avg15":"0.05"}
syslog
连接 rsyslog。可以作为 rsyslog 的接收端。
一个配置的例子:
<source>@type syslogport 5140bind 0.0.0.0tag system
</source>
fluentd 打开 5140 端口监听 rsyslog 发来的 log。
rsyslog 配置文件/etc/rsyslog.conf
设置为:
# Send log messages to Fluentd
*.* @127.0.0.1:5140
fluentd 解析到的 event 格式如下:
tag = "#{@tag}.#{facility}.#{priority}"
time = 1353436518,
record = {"host": "host","ident": "ident","pid": "12345","message": "text"
}
dummy
专用于测试的数据源。周期产生假数据。
配置举例:
<source>@type dummydummy {"hello":"world"}
</source>
dummy 常用参数:
tag: 标记值
size:每次发送的 event 数量
rate:每秒产生多少个 event
auto_increment_key:自增键名。如果配置了此项,会有一个 key 为该配置项值的自增键
suspend:重启后自增值是否重新开始
dummy:测试数据内容
forward
用于接收其他 fluentd forward 过来的 event。
示例配置:
<source>@type forwardport 24224bind 0.0.0.0
</source>
output 配置
file
输出 event 为文件。默认每天输出一个日志文件。
示例配置:
<match pattern>@type filepath /var/log/fluent/myappcompress gzip<buffer>timekey 1dtimekey_use_utc truetimekey_wait 10m</buffer>
</match>
包含的参数类型:
path:path 支持 placeholder,可以在日志路径中嵌入时间,tag 和 record 中的字段值。例如:
path /path/to/${tag}/${key1}/file.%Y%m%d
<buffer tag,time,key1># buffer parameters
</buffer>
注意:buffer 标签后面的内容为 buffer chunk key。Buffer 根据这些 key 分段。
append:flush 的 chuck 是否追加到已存在的文件后。默认为 false,便于文件的并行处理。
format 标签,用来规定文件内容的格式,默认值为 out_file。
inject 标签,用来为 event 增加 time 和 tag 等字段。
add_path_suffix:是否增加 path 后缀
path_suffix:path 后缀内容,默认为
.log
。compress:采用什么压缩格式,默认不压缩。
recompress:是否在 buffer chunk 已经压缩的情况再次压缩,默认为 false。
forward
将 event 转发到其他的 fluentd 节点。如果配置了多个 fluentd 节点,会使用负载均衡和支持容错的方式发送。如果需要发送多份数据,需要使用 copy。
配置示例:
<match pattern>@type forwardsend_timeout 60srecover_wait 10shard_timeout 60s<server>name myserver1host 192.168.1.3port 24224weight 60</server><server>name myserver2host 192.168.1.4port 24224weight 60</server>...<secondary>@type filepath /var/log/fluent/forward-failed</secondary>
</match>
server 标签内可以配置如下字段:
host
name
port
shared_key
username
password
standby 标记 server 为备用,只有其他 node 不可用的时候才会启用 standby 的 node
weight 负载均衡的权重配置
copy
多路输出(复制 event 到多个输出端)
示例配置
<match pattern>@type copy<store>@type filepath /var/log/fluent/myapp1...</store><store>...</store><store>...</store>
</match>
其中每一个 store 是一路输出。
重要参数:
copy_mode:复制模式。可选值有
no_copy:每路输出共享 event。
shallow:浅拷贝,如果不修改嵌套字段可以使用。
deep:深拷贝,使用
msgpack-ruby
方式。marshal:深拷贝,使用
marshal
方式。
store 标签的 ignore_error 参数:如果被标记 ignore_error 的 store 出现错误,不会影响其他的 store。官网的例子为:
<match app.**>@type copy<store>@type plugin1</store><store>@type plugin2</store>
</match>
假如 plugin1 出现错误,plugin2 也不会执行。如果在 plugin1 的 store 添加上 ignore_error 参数,如下所示:
<match app.**>@type copy<store ignore_error>@type plugin1</store><store>@type plugin2</store>
</match>
上述情况 plugin2 的运行不受影响。通常为不重要的 store 添加 ignore_error 参数。
http
通过 http 请求的方式发送 event。payload 的格式由 format 标签决定。
示例配置:
<match pattern>@type httpendpoint http://logserver.com:9000/apiopen_timeout 2<format>@type json</format><buffer>flush_interval 10s</buffer>
</match>
该例子使用 http 方式将 event 发送到http://logserver.com:9000/api
,使用 post 方式,连接超时时间为 2 秒。输出格式为 json,每 10 秒钟输出一次。
注意:
如果使用 JSON 的方式发送,HTTP 请求的 content-type 为 application/x-ndjson (newline-delimited JSONs)。如果用 spring mvc 接收会提示不支持。可以使用HTTPServletRequest
接收 request body。
stdout
标准输出的模式,如果使用后台模式运行 fluentd,输出到 fluentd 的日志。多用于 debug 的时候。
配置方法:
<match pattern>@type stdout
</match>
elasticsearch
输出 event 到 elasticsearch。
示例配置:
<match my.logs>@type elasticsearchhost localhostport 9200logstash_format true
</match>
可选参数:
host:单个 elasticsearch 节点地址
port:单个 elasticsearch 节点的端口号
hosts:elasticsearch 集群地址。格式为 ip1:port1,ip2:port2...
user 和 password:elasticsearch 的认证信息
scheme:使用 https 还是 http。默认为 http 模式
path:REST 接口路径,默认为空
index_name:index 名称
logstash_format:index 是否使用 logstash 命名方式(
logstash-%Y.%m.%d
),默认不启用logstash_prefix:logstash_format 启用的时候,index 命名前缀是什么。默认为
logstash
kafka
把 event 输出到 kafka。
示例配置如下:
<match pattern>@type kafka2# list of seed brokersbrokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>use_event_time true# buffer settings<buffer topic>@type filepath /var/log/td-agent/buffer/tdflush_interval 3s</buffer># data type settings<format>@type json</format># topic settingstopic_key topicdefault_topic messages# producer settingsrequired_acks -1compression_codec gzip
</match>
重要的参数为:
brokers:Kafka brokers 的地址和端口号
topic_key:record 中哪个 key 对应的值用作 Kafka 消息的 key
default_topic:如果没有配置 topic_key,默认使用的 topic 名字
format 标签:确定发送的数据格式
use_event_time:是否使用 fluentd event 的时间作为 Kafka 消息的时间。默认为 false。意思为使用当前时间作为发送消息的时间
required_acks:producer acks 的值
compression_codec:压缩编码方式
webhdfs
event 通过 REST 方式写入到 HDFS。
HADOOP 启用 webhdfs 的方法
core-site.xml
<configuration><property><name>fs.defaultFS</name><value>hdfs://10.180.210.172:9000</value></property>
</configuration>
hdfs-site.xml
<configuration><property><name>dfs.replication</name><value>1</value></property><property><name>dfs.http.address</name><value>0.0.0.0:50070</value></property><property><name>dfs.webhdfs.enabled</name><value>true</value></property><property><name>dfs.support.append</name><value>true</value></property><property><name>dfs.support.broken.append</name><value>true</value></property>
</configuration>
最后执行$HADOOP_HOME/sbin/httpfs.sh start
命令启动 webhdfs 支持。
注意:此时 webhdfs 的端口号为 50070。
示例配置和参数
示例配置:
<match access.**>@type webhdfshost namenode.your.cluster.localport 50070path "/path/on/hdfs/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"<buffer>flush_interval 10s</buffer>
</match>
注意:需要保证 HDFS 的目标目录具有写入权限。debug 过程发现 fluentd 请求 webhdfs 没有使用 user proxy,HDFS 认为操作的用户为 dr.who,无法创建文件。为了解决这个问题,设置 HDFS 目标目录的权限为 777。
重要参数:
host:namenode 的地址
port:namenode 的端口号
path:写入文件路径。可以使用占位符或者 ruby 表达式。可以使用如下方式表示时间:
\%Y: year including the century (at least 4 digits)
\%m: month of the year (01..12)
\%d: Day of the month (01..31)
\%H: Hour of the day, 24-hour clock (00..23)
\%M: Minute of the hour (00..59)
\%S: Second of the minute (00..60)
输出参数:
timekey:多久输出一次文件到 HDFS。如果 path 中没有配置占位符,默认为 86400(1 天)。如果指定了和时间相关的占位符,则文件输出周期自动和最小的时间占位符单位一致
timekey_wait:允许等待来迟日志的最长时间
flush_interval:flush 间隔时间,默认为不设置
flush_at_shutdown:关闭的时候是否 flush。如果使用内存类型的 buffer,需要配置为 true
parser 配置
regexp
使用正则表达式命名分组的方式从日志(一行或多行)中提取信息。可以通过 time_key 指定 event 的 time 字段的名字。名字为 time 字段名的分组内容会被抽取为 event 时间。
一个在线测试正则表达式的工具:http://fluentular.herokuapp.com/
基本配置格式:
<parse>@type regexpexpression /.../
</parse>
正则表达式可以添加额外的参数:忽略大小写:/.../i 多行匹配:/.../m。注意,此时.
匹配新行 同时使用忽略大小写和多行匹配:/.../im
一个例子,示例配置如下:
<parse>@type regexpexpression /^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$/time_key logtimetime_format %Y-%m-%d %H:%M:%S %ztypes id:integer
</parse>
如下的数据:
[2013-02-28 12:00:00 +0900] alice engineer 1
会被解析为:
time:
1362020400 (2013-02-28 12:00:00 +0900)record:
{"name" : "alice","title": "engineer","id" : 1
}
filter 配置
record_transformer
record_transformer 用来修改 event 的结构,增加或修改字段。
一个 record_transformer 的例子:
<filter foo.bar>@type record_transformer<record>hostname "#{Socket.gethostname}"tag ${tag}</record>
</filter>
这个 filter 匹配 tag 为foo.bar
的 source。event 增加了两个新的字段:hostname 和 tag。
其中 hostname 这里使用了 ruby 表达式。tag 使用了字符串插值。
如果数据为:
{ "message": "hello world!" }
会被转换为:
{"message": "hello world!","hostname": "db001.internal.example.com","tag": "foo.bar"
}
可以通过添加 enable_ruby 配置,在${}
中使用 ruby 表达式。
例如:
<filter foo.bar>@type record_transformerenable_ruby<record>avg ${record["total"] / record["count"]}</record>
</filter>
如下输入:
{ "total": 100, "count": 10 }
会被转换为:
{ "total": 100, "count": 10, "avg": "10" }
注意,可以启用auto_typecast true
配置实现自动类型转换。
修改字段的例子:
<filter foo.bar>@type record_transformer<record>message yay, ${record["message"]}</record>
</filter>
如下输入:
{ "message": "hello world!" }
会被修改为:
{ "message": "yay, hello world!" }
可以在表达式中配置 tag_parts 变量,引用 tag 的第 n 部分。如下所示:
<filter web.*>@type record_transformer<record>service_name ${tag_parts[1]}</record>
</filter>
如果遇到 tag 为web.auth
的数据:
{ "user_id": 1, "status": "ok" }
会被转换为:
{ "user_id": 1, "status": "ok", "service_name": "auth" }
record 标签
record 标签的语法为:
<record>NEW_FIELD NEW_VALUE
</record>
表达式中可以配置如下变量:
record:获取 record 中某些字段的内容。例如
record["count"]
tag:获取 tag 的内容
time:获取日志的时间戳
hostname:获取主机名字,和
#{Socket.gethostname}
作用一样tag_parts[N]:tag 以
.
分隔,获取 tag 的第 N 部分tag_prefix[N]:获取 tag 的 0-N 部分
tag_suffix[N]:获取 tag 的 N-结尾部分
例如 tag 为debug.my.app
,tag_parts[1]
返回my
。tag_prefix
和tag_suffix
的结果如下:
tag_prefix[0] = debug tag_suffix[0] = debug.my.app
tag_prefix[1] = debug.my tag_suffix[1] = my.app
tag_prefix[2] = debug.my.app tag_suffix[2] = app
配置文件使用通配符和扩展
<match>
和<filter>
标签可以使用通配符和扩展。
tag 以.
为分隔符,分隔为多个部分。
fluentd 支持的通配符和扩展有:*
:只匹配一个部分。比如a.*
匹配a.b
,但是不匹配a
或a.b.c
。**
:匹配 0 个或多个部分。比如a.**
匹配a
,a.b
和a.b.c
。{X,Y,Z}
:匹配 X 或 Y 或 Z。#{expression}
:使用嵌入的 ruby 表达式。有一些快捷变量可以直接使用,例如#{hostname}
和#{worker_id}
。${..}
:使用变量值,tag,record 可以使用如下的方式指定默认值。例如:#{ENV["FOOBAR"] || use_default}
。如果 FOOBAR 环境变量不存在,则使用use_default
这个值。
注意:match 标签的匹配过程是有顺序的。比如说下面的例子:
<match **>@type blackhole_plugin
</match><match myapp.access>@type filepath /var/log/fluent/access
</match>
因为上面的 match 总是能被匹配到,下面的 match 永远没有机会执行。
Buffer
buffer 为 fluentd 很关键的配置,意为缓冲区。可以决定收集的数据存入什么介质,多长时间输出一次等。
buffer 标签必须配置在 match 标签内(即在输出端配置)。
buffer 具有一个@type 属性,用来配置 buffer 的储存介质:
<buffer>@type file
</buffer>
@type 有两个值:
file:存入文件
memory:存入内存,这个是默认值
buffer 标签后面可以跟随 chunk keys,用来决定 buffer 以 record 的什么字段来分段存放。例如:
<buffer ARGUMENT_CHUNK_KEYS># ...
</buffer>
注意:
可以指定多个 buffer chunk keys,使用逗号分隔。
如果没有配置 chunk key,所有的 event 都会写入同一个 chunk file,直到 buffer 滚动。
buffer 如果使用 time 作为 chunk key,可以按照时间对 buffer 进行分段。其中:
timekey:时间的跨度
timekey_wait:flush 延迟时间,用于等待迟到的数据
官网的例子如下:
<match tag.**># ...<buffer time>timekey 1h # chunks per hours ("3600" also available)timekey_wait 5m # 5mins delay for flush ("300" also available)</buffer>
</match># Time chunk key: events will be separated for hours (by timekey 3600)11:59:30 web.access {"key1":"yay","key2":100} ------> CHUNK_A12:00:01 web.access {"key1":"foo","key2":200} --||---> CHUNK_B
12:00:25 ssh.login {"key1":"yay","key2":100} --|
部分经常用到的配置参数:
timekey_use_utc:使用国际标准时间还是当地时间,默认是使用当地时间。
timekey_zone:指定时区。
chunk_limit_size:chunk 大小限制,默认 8MB。
chunk_limit_records:chunk event 条数限制。
total_limit_size:总 buffer 大小限制。
chunk_full_threshold:chunk 大小超过 chunk_limit_size * chunk_full_threshold 时会自动 flush。
queued_chunks_limit_size:限制队列中的 chunk 数目,防止频繁 flush 产生过多的 chunk。
compress:压缩格式,可使用 text 或 gzip。默认为 text。
flush_at_shutdown:关闭时候是否 flush。对于非持久化 buffer 默认值为 true,持久化 buffer 默认值为 false。
flush_interval:多长时间 flush 一次。
retry_timeout:重试 flush 的超时时间。在这个时间后不再会 retry。
retry_forever:是否永远尝试 flush。如果设置为 true 会忽略 retry_timeout 的配置。
retry_max_times:重试最大次数。
retry_type:有两个配置值:retry 时间间隔,指数级增长或者是固定周期重试。
retry_wait:每次重试等待时间。
retry_exponential_backoff_base:retry 时间指数扩大倍数。
retry_max_interval:最长 retry 时间间隔。
retry_randomize:是否随机 retry 时间间隔。
配置文件重用
可以通过@include 配置文件路径
方式,引用其他配置文件片段到 fluentd 主配置文件中。
配置文件路径可以使用绝对路径或相对路径。相对路径的基准路径为 fluentd 主配置文件所在的路径。
@include
可以出现在主配置文件的任何位置。
Docker 日志输出到 fluentd
通过配置 fluentd logging driver 的方式实现。该 driver 发送的 log 信息包含:
字段 | 描述 |
---|---|
container_id | 64 字符的 container id |
container_name | container 名字 |
source | stdout 或 stderr |
log | container 的 log |
全局配置方式
修改/etc/docker/daemon.json
,增加如下内容:
{"log-driver": "fluentd","log-opts": {"fluentd-address": "fluentdhost:24224"}
}
然后重启 docker daemon 使配置生效。
也可以通过添加--log-driver
和--log-opt
参数的方式指定某个 container 使用 fluentd logging driver。如下所示:
docker run --log-driver=fluentd --log-opt fluentd-address=fluentdhost:24224
可以通过在--log-opt
后指定 tag 的方式,确定 source 的 tag。
Docker 官网参考链接:https://docs.docker.com/config/containers/logging/fluentd/
配置实例
实例 1
采集/root/my.txt
文件(内容格式为 key value),并发送到http://localhost:9090/
。
fluentd 的配置文件如下:
<source>@type tailpath /root/my.txtpos_file /root/my.txt.postag my<parse>@type regexpexpression /(?<key>\w+)\s(?<value>\w+)/</parse>
</source><match my>@type httpendpoint http://localhost:9090/open_timeout 2http_method post<format>@type json</format><buffer>flush_interval 3s</buffer>
</match>
实例 2
提取用户操作记录,打印到 fluentd 日志。
<source>@type tail# 这里使用HISTFILE环境变量,如果没有设置,使用默认值/root/.bash_historypath "#{ENV["HISTFILE"] || /root/.bash_history}"pos_file /root/.bash_history.postag history<parse>@type none</parse>
</source><filter history>@type record_transformer<record>hostname ${hostname}</record>
</filter><match history>@type stdout
</match>
实例 3
收集用户操作记录转发到另一个 fluentd 节点,同时将数据发送到 Kafka 和存入 HDFS。
数据流为:fluentd 采集端 -> fluentd 收集端 -> kafka 和 HDFS
示例用户操作记录数据为:
root pts/1 2020-03-26 10:59 (10.180.206.1):root 2020-03-26 11:00:09 130 tail -f /var/log/command.his.log
采集节点的配置:
<source>@type tailpath /var/log/command.his.logpos_file /var/log/command.his.log.postag history<parse>@type regexp# 使用正则解析日志文件expression /^(?<who_user>\w+)\s(?<pts>\S+)\s(?<who_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2})\s\((?<remote_ip>\d+\.\d+\.\d+\.\d+)\):(?<user>\w+)\s(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s(?<res>\d+)\s(?<command>.+)$/time_key time</parse>
</source>
<filter history>@type record_transformer<record># event内容增加hostname这一行hostname ${hostname}</record>
</filter><match history>@type forwardsend_timeout 60srecover_wait 10shard_timeout 60s<buffer># 1秒钟向另一个fluentd节点转发一次flush_interval 1s</buffer><server>name myserver1host 10.180.210.172port 24225weight 60</server>
</match>
fluentd 收集节点的配置:
<source>@type forwardport 24225bind 0.0.0.0tag remote
</source><match remote># 使用copy方式,分两路输出@type copy<store>@type kafka2brokers 10.180.210.172:9092use_event_time true<buffer topic>@type filepath /var/log/td-agent/buffer/tdflush_interval 3s</buffer><format>@type json</format>default_topic historyrequired_acks -1</store><store>@type webhdfshost 10.180.210.172port 50070path "/history/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"<buffer>flush_interval 60s</buffer></store>
</match>
本文转载自:「k8s技术圈」,原文:http://t.cn/A6cRchID,版权归原作者所有。欢迎投稿,投稿邮箱: editor@hi-linux.com。
你可能还喜欢
点击下方图片即可阅读
重磅 | AWS 免费开源增强版 Elasticsearch 分支 OpenSearch
点击上方图片,打开小程序,加入「玩转 Linux」圈子
更多有趣的互联网新鲜事,关注「奇妙的互联网」视频号全了解!
再见 Logstash,是时候拥抱下一代开源日志收集系统 Fluentd 了相关推荐
- 号称下一代日志收集系统!来看看它有多强
点击下方公众号「关注」和「星标」 回复"1024"获取独家整理的学习资料! 关于日志收集.处理.分析的方案,其实是很多,常见的就是ELK组合,即:Elasticsearch + L ...
- 借鉴开源框架自研日志收集系统
踏浪无痕 岂安科技高级架构师 十余年数据研发经验,擅长数据处理领域工作,如爬虫.搜索引擎.大数据应用高并发等.担任过架构师,研发经理等岗位.曾主导开发过大型爬虫,搜索引擎及大数据广告DMP系统目前负责 ...
- 基于Flume的美团日志收集系统(一)架构和设计
背景 美团的日志收集系统负责美团的所有业务日志的收集,并分别给Hadoop平台提供离线数据和Storm平台提供实时数据流.美团的日志收集系统基于Flume设计和搭建而成. <基于Flume的美团 ...
- 基于Flume的美团日志收集系统-----架构和设计
问题导读: 1.Flume-NG与Scribe对比,Flume-NG的优势在什么地方? 2.架构设计考虑需要考虑什么问题? 3.Agent死机该如何解决? 4.Collector死机是否会有影响? 5 ...
- python分布式日志收集系统_Go实现海量日志收集系统(一)
项目背景 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 当系统机器比较少时,登陆到服务器上查看即可满足 当系统机器规模巨大,登陆到机器上查看几乎不现实 当然即使是机器规模不大,一个系统通常 ...
- 你居然还去服务器上捞日志,搭个日志收集系统难道不香么!
摘要 ELK日志收集系统进阶使用,本文主要讲解如何打造一个线上环境真实可用的日志收集系统.有了它,你就可以和去服务器上捞日志说再见了! ELK环境安装 ELK是指Elasticsearch.Kiban ...
- java 如何去掉http debug日志_你居然还去服务器上捞日志,搭个日志收集系统难道不香吗?...
作者:MacroZheng 链接:https://juejin.im/post/5eef217d51882565d74fb4eb 来源:掘金 SpringBoot实战电商项目mall(35k+star ...
- java 如何去掉http debug日志_你居然还去服务器上捞日志,搭个日志收集系统难道不香么!...
作者:MacroZheng 链接:https://juejin.im/post/5eef217d51882565d74fb4eb 摘要 ELK日志收集系统进阶使用,本文主要讲解如何打造一个线上环境真实 ...
- Go语言学习之11 日志收集系统kafka库实战
本节主要内容: 1. 日志收集系统设计 2. 日志客户端开发 1. 项目背景 a. 每个系统都有日志,当系统出现问题时,需要通过日志解决问题 b. 当系统机器比较少时,登陆到服务器上查 ...
最新文章
- Capital one TPS整理
- 预测2019:数据中心将有哪些变化
- 【分割模型解读】感受野与分辨率的控制术—空洞卷积
- 矩阵论:线性空间与线性变换
- redis 笔记06 发布与订阅、事务、慢查询日志、监视器
- 数据结构 | 链表队列(基本操作及图示)
- json 例子_json-简单的例子
- 电商数据应用体系建设总结(二)—— 数据应用层架构介绍和规范总结
- Android SDK的环境变量配置
- 如何使用phpDesigner 编写一个表格
- Freeradius安装和配置
- 海思3559a资料目录整理
- windows .exe 文件默认打开方式变成记事本的解决方法
- 什么是邮箱域名,企业邮箱域名有什么好处?
- 最近的一些杂感-20220107
- 从一个class文件深入理解Java字节码结构
- 倍福 TwinCAT背景知识
- 【024】Vue+Springboot+mysql员工考勤管理系统(多角色登录、请假、打卡)(含源码、数据库、运行教程、实验报告)
- zcu102_1_PS端LED开关
- Jlink使用技巧系列教程索引
热门文章
- 纯 html 实现一个简单的个人简历
- 【实例】使用 PHPExcel 读取excel 文件
- WebRTC通话原理-网络协商-NAT-ICE-STUN-TURN
- 企业自动运行系统——渠道策略
- 自然语言c,自然语言处理_自然语言处理常用方法举例说明 - 人工智能 - 电子发烧友网...
- Android Studio Chipmunk 2021.2.1.15下载地址
- U-Boot 启动过程和源码分析(第一阶段)
- 全国二级c语言库理论,全国计算机等级考试二级C语言理论基础习.doc
- C++设计并测试一个名为Rectangle的矩形类,其属性为矩形的左下角与右上角两个点的坐标,根据坐标能计算矩形的面积。
- 成员变量、类变量和静态变量三者的区别