fluentd是何方神圣

fluentd是一个实时的数据收集系统,不仅可以收集日志,还可以收集定期执行的命令输出和HTTP请求内容。数据被收集后按照用户配置的解析规则,形成一系列event。每一个event包含如下内容:

tag = xxx
time = xxx
record = {"key1": "value1","key2": "value2"
}

其中:

  • tag:为数据流的标记。fluentd中可以具有多个数据源,解析器,过滤器和数据输出。他们之前使用tag来对应。类似于数据流按照tag分组。数据流向下游的时候只会进入tag相匹配的处理器。
  • time:event产生的时间,该字段通常由日志内的时间字段解析出来。
  • record:日志的内容,为JSON格式。

fluentd支持多种数据的解析过滤和输出操作。其中常用的有:

  • tail输入:增量读取日志文件作为数据源,支持日志滚动。
  • exec输入:定时执行命令,获取输出解析后作为数据源。
  • syslog输出:解析标准的syslog日志作为输入。
  • forward输入:接收其他fluentd转发来的数据作为数据源。
  • dummy:虚拟数据源,可以定时产生假数据,用于测试。
  • regexp解析器:使用正则表达式命名分组的方式提取出日志内容为JSON字段。
  • record_transformer过滤器:人为修改record内的字段。
  • file输出:用于将event落地为日志文件。
  • stdout:将event输出到stdout。如果fluentd以daemon方式运行,输出到fluentd的运行日志中。
  • forward:转发event到其他fluentd节点。
  • copy:多路输出,复制event到多个输出端。
  • kafka:输出event到Kafka。
  • webhdfs:输出event到HDFS。
  • elasticsearch:输出event到HDFS。

接下来以官网介绍为基础,穿插自己的理解,介绍下fluentd的使用方法。

安装启动方法

官网安装步骤链接:https://docs.fluentd.org/installation/install-by-rpm

下面是精简的在CentOS下的安装步骤。打开shell,执行如下命令:

curl -L https://toolbelt.treasuredata.com/sh/install-redhat-td-agent3.sh | shsystemctl start td-agent

可以安装并启动fluentd。

配置文件位置

编辑fluentd配置文件的方法:

vim /etc/td-agent/td-agent.conf

修改运行用户和组

默认来说fluentd使用td-agent用户启动。如果需要修改fluentd的用户,需要执行:

vim /usr/lib/systemd/system/td-agent.service

文件内容如下所示:

[Unit]
Description=td-agent: Fluentd based data collector for Treasure Data
Documentation=https://docs.treasuredata.com/articles/td-agent
After=network-online.target
Wants=network-online.target[Service]
User=td-agent
Group=td-agent
LimitNOFILE=65536
Environment=LD_PRELOAD=/opt/td-agent/embedded/lib/libjemalloc.so
Environment=GEM_HOME=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=GEM_PATH=/opt/td-agent/embedded/lib/ruby/gems/2.4.0/
Environment=FLUENT_CONF=/etc/td-agent/td-agent.conf
Environment=FLUENT_PLUGIN=/etc/td-agent/plugin
Environment=FLUENT_SOCKET=/var/run/td-agent/td-agent.sock
Environment=TD_AGENT_LOG_FILE=/var/log/td-agent/td-agent.log
Environment=TD_AGENT_OPTIONS=
EnvironmentFile=-/etc/sysconfig/td-agent
PIDFile=/var/run/td-agent/td-agent.pid
RuntimeDirectory=td-agent
Type=forking
ExecStart=/opt/td-agent/embedded/bin/fluentd --log $TD_AGENT_LOG_FILE --daemon /var/run/td-agent/td-agent.pid $TD_AGENT_OPTIONS
ExecStop=/bin/kill -TERM ${MAINPID}
ExecReload=/bin/kill -HUP ${MAINPID}
Restart=always
TimeoutStopSec=120[Install]
WantedBy=multi-user.target

修改Service部分UserGroup配置项可以更改fluentd进程的用户和组。

检测配置文件是否正确的方法

在shell中运行:

/opt/td-agent/embedded/bin/fluentd -c /etc/td-agent/td-agent.conf

观察输出,如果有错误会给出对应提示。

数据流逻辑

fluentd以tag值为基准,决定数据的流经哪些处理器。

数据的流向为:source -> parser -> filter -> output

input配置

tail

增量读取日志文件。需要提供一个用于标记已经读取到位置的文件(position file)所在的路径。

tail针对日志滚动的支持:
tail方式采用跟踪文件inode的方式进行。比如日志名为app.log,如果日志发生滚动,被重命名为app.log.1。文件重命名的时候inode是不会改变的。因此发生滚动时写入到旧文件末尾的日志也可以被收集到。tail会跟踪旧文件的inode一段时间(rotate_wait配置),这段时间过去之后,tail不再监听app.log.1,开始监听新的app.log文件。

tail方式的示例配置:

<source>@type tailpath /var/log/httpd-access.logpos_file /var/log/td-agent/httpd-access.log.postag apache.access<parse>@type apache2</parse>
</source>

注意:如果文件发生修改会输出全量文件内容。

配置项解释

tag:数据源的tag值。*号可以扩展为path(/替换为.)。例如

path /path/to/file
tag foo.*

tag会被扩展为foo.path.to.file

path:配置读取的路径。可以使用*或者是strftime。例如:

path /path/to/%Y/%m/%d/*

如果今天是2020年1月2日,fluentd会读取/path/to/2020/01/02目录下的内容。
也可以配置多个路径,使用逗号分隔:

path /path/to/a/*,/path/to/b/c.log

exclude_path:排除部分目录或文件,使用数组格式配置。

path /path/to/*
exclude_path ["/path/to/*.gz", "/path/to/*.zip"]

refresh_interval:多长时间刷新一次文件监听列表,配合*使用才有意义。

pos_file:位置文件地址。这个文件保存了监听的日志文件已经读取到第几行。该项一定要配置。
注意,不要再多个source之间共用pos file,否则会出现问题。
pos_file_compaction_interval:pos file文件压缩时间间隔。用于压缩pos file中不再监听的记录,不可解析的记录以及重复的记录。

parse标签:用于指定log的解析器(必须的配置项)。
例如:

# json
<parse>@type json
</parse># regexp
<parse>@type regexpexpression ^(?<name>[^ ]*) (?<user>[^ ]*) (?<age>\d*)$
</parse>

path_key:如果配置此项,监控文件的path会在event中,此项的key为path_key
例如:

path /path/to/access.log
path_key tailed_path

生成的数据如下所示:

{"tailed_path":"/path/to/access.log","k1":"v1",...,"kN":"vN"}

rotate_wait:日志发生滚动的时候,可能会有部分日志仍然输出在旧的日志文件,此时需要保持监听旧日志文件一段时间,这个时间配置就是rotate_wait

exec

周期性执行命令,抽取命令输出为event。

示例配置:

<source>@type execcommand cmd arg arg<parse>keys k1,k2,k3</parse><extract>tag_key k1time_key k2time_format %Y-%m-%d %H:%M:%S</extract>run_interval 10s
</source>

以上命令的含义为每10秒钟执行cmd arg arg命令,提取命令执行结果,以空白字符分隔三个字段的值为k1,k2,k3。其中k1的值作为tag,k2作为时间字段,使用%Y-%m-%d %H:%M:%S格式。

一个例子,周期获取系统的平均负载。配置方法如下:

<source>@type exectag system.loadavgcommand cat /proc/loadavg | cut -d ' ' -f 1,2,3run_interval 1m<parse>@type tsvkeys avg1,avg5,avg15delimiter " "</parse>
</source>

输出的日志格式为:

2018-06-29 17:27:35.115878527 +0900 system.loadavg: {"avg1":"0.30","avg5":"0.20","avg15":"0.05"}

syslog

连接rsyslog。可以作为rsyslog的接收端。

一个配置的例子:

<source>@type syslogport 5140bind 0.0.0.0tag system
</source>

fluentd打开5140端口监听rsyslog发来的log。

rsyslog配置文件/etc/rsyslog.conf设置为:

# Send log messages to Fluentd
*.* @127.0.0.1:5140

fluentd解析到的event格式如下:

tag = "#{@tag}.#{facility}.#{priority}"
time = 1353436518,
record = {"host": "host","ident": "ident","pid": "12345","message": "text"
}

dummy

专用于测试的数据源。周期产生假数据。

配置举例:

<source>@type dummydummy {"hello":"world"}
</source>

dummy常用参数:

  • tag: 标记值
  • size:每次发送的event数量
  • rate:每秒产生多少个event
  • auto_increment_key:自增键名。如果配置了此项,会有一个key为该配置项值的自增键
  • suspend:重启后自增值是否重新开始
  • dummy:测试数据内容

forward

用于接收其他fluentd forward过来的event。

示例配置:

<source>@type forwardport 24224bind 0.0.0.0
</source>

output配置

file

输出event为文件。默认每天输出一个日志文件。

示例配置:

<match pattern>@type filepath /var/log/fluent/myappcompress gzip<buffer>timekey 1dtimekey_use_utc truetimekey_wait 10m</buffer>
</match>

包含的参数类型:

  • path:path支持placeholder,可以在日志路径中嵌入时间,tag和record中的字段值。例如:
path /path/to/${tag}/${key1}/file.%Y%m%d
<buffer tag,time,key1># buffer parameters
</buffer>

注意:buffer标签后面的内容为buffer chunk key。Buffer根据这些key分段。

  • append:flush的chuck是否追加到已存在的文件后。默认为false,便于文件的并行处理。
  • format标签,用来规定文件内容的格式,默认值为out_file。
  • inject标签,用来为event增加time和tag等字段。
  • add_path_suffix:是否增加path后缀
  • path_suffix:path后缀内容,默认为.log
  • compress:采用什么压缩格式,默认不压缩。
  • recompress:是否在buffer chunk已经压缩的情况再次压缩,默认为false。

forward

将event转发到其他的fluentd节点。如果配置了多个fluentd节点,会使用负载均衡和支持容错的方式发送。如果需要发送多份数据,需要使用copy。

配置示例:

<match pattern>@type forwardsend_timeout 60srecover_wait 10shard_timeout 60s<server>name myserver1host 192.168.1.3port 24224weight 60</server><server>name myserver2host 192.168.1.4port 24224weight 60</server>...<secondary>@type filepath /var/log/fluent/forward-failed</secondary>
</match>

server标签内可以配置如下字段:

  • host
  • name
  • port
  • shared_key
  • username
  • password
  • standby 标记server为备用,只有其他node不可用的时候才会启用standby的node
  • weight 负载均衡的权重配置

copy

多路输出(复制event到多个输出端)

示例配置

<match pattern>@type copy<store>@type filepath /var/log/fluent/myapp1...</store><store>...</store><store>...</store>
</match>

其中每一个store是一路输出。

重要参数:

  • copy_mode:复制模式。可选值有

    • no_copy:每路输出共享event。
    • shallow:浅拷贝,如果不修改嵌套字段可以使用。
    • deep:深拷贝,使用msgpack-ruby方式。
    • marshal:深拷贝,使用marshal方式。
  • store标签的ignore_error参数:如果被标记ignore_error的store出现错误,不会影响其他的store。官网的例子为:
<match app.**>@type copy<store>@type plugin1</store><store>@type plugin2</store>
</match>

假如plugin1出现错误,plugin2也不会执行。如果在plugin1的store添加上ignore_error参数,如下所示:

<match app.**>@type copy<store ignore_error>@type plugin1</store><store>@type plugin2</store>
</match>

上述情况plugin2的运行不受影响。通常为不重要的store添加ignore_error参数。

http

通过http请求的方式发送event。
payload的格式由format标签决定。

示例配置:

<match pattern>@type httpendpoint http://logserver.com:9000/apiopen_timeout 2<format>@type json</format><buffer>flush_interval 10s</buffer>
</match>

该例子使用http方式将event发送到http://logserver.com:9000/api,使用post方式,连接超时时间为2秒。输出格式为json,每10秒钟输出一次。

注意:

如果使用JSON的方式发送,HTTP请求的content-type为application/x-ndjson (newline-delimited JSONs)。如果用spring mvc接收会提示不支持。可以使用HTTPServletRequest接收request body。

stdout

标准输出的模式,如果使用后台模式运行fluentd,输出到fluentd的日志。多用于debug的时候。

配置方法:

<match pattern>@type stdout
</match>

elasticsearch

输出event到elasticsearch。

示例配置:

<match my.logs>@type elasticsearchhost localhostport 9200logstash_format true
</match>

可选参数:

  • host:单个elasticsearch节点地址
  • port:单个elasticsearch节点的端口号
  • hosts:elasticsearch集群地址。格式为ip1:port1,ip2:port2...
  • user和password:elasticsearch的认证信息
  • scheme:使用https还是http。默认为http模式
  • path:REST接口路径,默认为空
  • index_name:index名称
  • logstash_format:index是否使用logstash命名方式(logstash-%Y.%m.%d),默认不启用
  • logstash_prefix:logstash_format启用的时候,index命名前缀是什么。默认为logstash

kafka

把event输出到kafka。

示例配置如下:

<match pattern>@type kafka2# list of seed brokersbrokers <broker1_host>:<broker1_port>,<broker2_host>:<broker2_port>use_event_time true# buffer settings<buffer topic>@type filepath /var/log/td-agent/buffer/tdflush_interval 3s</buffer># data type settings<format>@type json</format># topic settingstopic_key topicdefault_topic messages# producer settingsrequired_acks -1compression_codec gzip
</match>

重要的参数为:

  • brokers:Kafka brokers的地址和端口号
  • topic_key:record中哪个key对应的值用作Kafka消息的key
  • default_topic:如果没有配置topic_key,默认使用的topic名字
  • format标签:确定发送的数据格式
  • use_event_time:是否使用fluentd event的时间作为Kafka消息的时间。默认为false。意思为使用当前时间作为发送消息的时间
  • required_acks:producer acks的值
  • compression_codec:压缩编码方式

webhdfs

event通过REST方式写入到HDFS。

HADOOP启用webhdfs的方法

core-site.xml

<configuration><property><name>fs.defaultFS</name><value>hdfs://10.180.210.172:9000</value></property>
</configuration>

hdfs-site.xml

<configuration><property><name>dfs.replication</name><value>1</value></property><property><name>dfs.http.address</name><value>0.0.0.0:50070</value></property><property><name>dfs.webhdfs.enabled</name><value>true</value></property><property><name>dfs.support.append</name><value>true</value></property><property><name>dfs.support.broken.append</name><value>true</value></property>
</configuration>

最后执行$HADOOP_HOME/sbin/httpfs.sh start命令启动webhdfs支持。

注意:此时webhdfs的端口号为50070。

示例配置和参数

示例配置:

<match access.**>@type webhdfshost namenode.your.cluster.localport 50070path "/path/on/hdfs/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"<buffer>flush_interval 10s</buffer>
</match>

注意:需要保证HDFS的目标目录具有写入权限。debug过程发现fluentd请求webhdfs没有使用user proxy,HDFS认为操作的用户为dr.who,无法创建文件。为了解决这个问题,设置HDFS目标目录的权限为777。

重要参数:

  • host:namenode的地址
  • port:namenode的端口号
  • path:写入文件路径。可以使用占位符或者ruby表达式。可以使用如下方式表示时间:
\%Y: year including the century (at least 4 digits)
\%m: month of the year (01..12)
\%d: Day of the month (01..31)
\%H: Hour of the day, 24-hour clock (00..23)
\%M: Minute of the hour (00..59)
\%S: Second of the minute (00..60)

输出参数:

  • timekey:多久输出一次文件到HDFS。如果path中没有配置占位符,默认为86400(1天)。如果指定了和时间相关的占位符,则文件输出周期自动和最小的时间占位符单位一致
  • timekey_wait:允许等待来迟日志的最长时间
  • flush_interval:flush间隔时间,默认为不设置
  • flush_at_shutdown:关闭的时候是否flush。如果使用内存类型的buffer,需要配置为true

parser配置

regexp

使用正则表达式命名分组的方式从日志(一行或多行)中提取信息。可以通过time_key指定event的time字段的名字。名字为time字段名的分组内容会被抽取为event时间。

一个在线测试正则表达式的工具:http://fluentular.herokuapp.com/

基本配置格式:

<parse>@type regexpexpression /.../
</parse>

正则表达式可以添加额外的参数:
忽略大小写:/.../i
多行匹配:/.../m。注意,此时.匹配新行
同时使用忽略大小写和多行匹配:/.../im

一个例子,示例配置如下:

<parse>@type regexpexpression /^\[(?<logtime>[^\]]*)\] (?<name>[^ ]*) (?<title>[^ ]*) (?<id>\d*)$/time_key logtimetime_format %Y-%m-%d %H:%M:%S %ztypes id:integer
</parse>

如下的数据:

[2013-02-28 12:00:00 +0900] alice engineer 1

会被解析为:

time:
1362020400 (2013-02-28 12:00:00 +0900)record:
{"name" : "alice","title": "engineer","id"   : 1
}

filter配置

record_transformer

record_transformer用来修改event的结构,增加或修改字段。

一个record_transformer的例子:

<filter foo.bar>@type record_transformer<record>hostname "#{Socket.gethostname}"tag ${tag}</record>
</filter>

这个filter匹配tag为foo.bar的source。event增加了两个新的字段:hostname和tag。

其中hostname这里使用了ruby表达式。tag使用了字符串插值。

如果数据为:

{"message":"hello world!"}

会被转换为:

{"message":"hello world!", "hostname":"db001.internal.example.com", "tag":"foo.bar"}

可以通过添加enable_ruby配置,在${}中使用ruby表达式。

例如:

<filter foo.bar>@type record_transformerenable_ruby<record>avg ${record["total"] / record["count"]}</record>
</filter>

如下输入:

{"total":100, "count":10}

会被转换为:

{"total":100, "count":10, "avg":"10"}

注意,可以启用auto_typecast true配置实现自动类型转换。

修改字段的例子:

<filter foo.bar>@type record_transformer<record>message yay, ${record["message"]}</record>
</filter>

如下输入:

{"message":"hello world!"}

会被修改为:

{"message":"yay, hello world!"}

可以在表达式中配置tag_parts变量,引用tag的第n部分。如下所示:

<filter web.*>@type record_transformer<record>service_name ${tag_parts[1]}</record>
</filter>

如果遇到tag为web.auth的数据:

{"user_id":1, "status":"ok"}

会被转换为:

{"user_id":1, "status":"ok", "service_name":"auth"}

record标签

record标签的语法为:

<record>NEW_FIELD NEW_VALUE
</record>

表达式中可以配置如下变量:

  • record:获取record中某些字段的内容。例如record["count"]
  • tag:获取tag的内容
  • time:获取日志的时间戳
  • hostname:获取主机名字,和#{Socket.gethostname}作用一样
  • tag_parts[N]:tag以.分隔,获取tag的第N部分
  • tag_prefix[N]:获取tag的0-N部分
  • tag_suffix[N]:获取tag的N-结尾部分

例如tag为debug.my.apptag_parts[1]返回mytag_prefixtag_suffix的结果如下:

tag_prefix[0] = debug          tag_suffix[0] = debug.my.app
tag_prefix[1] = debug.my       tag_suffix[1] = my.app
tag_prefix[2] = debug.my.app   tag_suffix[2] = app

配置文件使用通配符和扩展

<match><filter>标签可以使用通配符和扩展。

tag以.为分隔符,分隔为多个部分。

fluentd支持的通配符和扩展有:
*:只匹配一个部分。比如a.*匹配a.b,但是不匹配aa.b.c
**:匹配0个或多个部分。比如a.**匹配aa.ba.b.c
{X,Y,Z}:匹配X或Y或Z。
#{expression}:使用嵌入的ruby表达式。有一些快捷变量可以直接使用,例如#{hostname}#{worker_id}
${..}:使用变量值,tag,record
可以使用如下的方式指定默认值。例如:#{ENV["FOOBAR"] || use_default}。如果FOOBAR环境变量不存在,则使用use_default这个值。

注意:match标签的匹配过程是有顺序的。比如说下面的例子:

<match **>@type blackhole_plugin
</match><match myapp.access>@type filepath /var/log/fluent/access
</match>

因为上面的match总是能被匹配到,下面的match永远没有机会执行。

Buffer

buffer为fluentd很关键的配置,意为缓冲区。可以决定收集的数据存入什么介质,多长时间输出一次等。

buffer标签必须配置在match标签内(即在输出端配置)。

buffer具有一个@type属性,用来配置buffer的储存介质:

<buffer>@type file
</buffer>

@type有两个值:

  • file:存入文件
  • memory:存入内存,这个是默认值

buffer标签后面可以跟随chunk keys,用来决定buffer以record的什么字段来分段存放。例如:

<buffer ARGUMENT_CHUNK_KEYS># ...
</buffer>

注意:

  1. 可以指定多个buffer chunk keys,使用逗号分隔。
  2. 如果没有配置chunk key,所有的event都会写入同一个chunk file,直到buffer滚动。

buffer如果使用time作为chunk key,可以按照时间对buffer进行分段。其中:

  • timekey:时间的跨度
  • timekey_wait:flush延迟时间,用于等待迟到的数据

官网的例子如下:

<match tag.**># ...<buffer time>timekey      1h # chunks per hours ("3600" also available)timekey_wait 5m # 5mins delay for flush ("300" also available)</buffer>
</match># Time chunk key: events will be separated for hours (by timekey 3600)11:59:30 web.access {"key1":"yay","key2":100}  ------> CHUNK_A12:00:01 web.access {"key1":"foo","key2":200}  --||---> CHUNK_B
12:00:25 ssh.login  {"key1":"yay","key2":100}  --|

部分经常用到的配置参数:

  • timekey_use_utc:使用国际标准时间还是当地时间,默认是使用当地时间。
  • timekey_zone:指定时区。
  • chunk_limit_size:chunk大小限制,默认8MB。
  • chunk_limit_records:chunk event条数限制。
  • total_limit_size:总buffer大小限制。
  • chunk_full_threshold:chunk大小超过chunk_limit_size * chunk_full_threshold时会自动flush。
  • queued_chunks_limit_size:限制队列中的chunk数目,防止频繁flush产生过多的chunk。
  • compress:压缩格式,可使用text或gzip。默认为text。
  • flush_at_shutdown:关闭时候是否flush。对于非持久化buffer默认值为true,持久化buffer默认值为false。
  • flush_interval:多长时间flush一次。
  • retry_timeout:重试flush的超时时间。在这个时间后不再会retry。
  • retry_forever:是否永远尝试flush。如果设置为true会忽略retry_timeout的配置。
  • retry_max_times:重试最大次数。
  • retry_type:有两个配置值:retry时间间隔,指数级增长或者是固定周期重试。
  • retry_wait:每次重试等待时间。
  • retry_exponential_backoff_base:retry时间指数扩大倍数。
  • retry_max_interval:最长retry时间间隔。
  • retry_randomize:是否随机retry时间间隔。

配置文件重用

可以通过@include 配置文件路径方式,引用其他配置文件片段到fluentd主配置文件中。

配置文件路径可以使用绝对路径或相对路径。相对路径的基准路径为fluentd主配置文件所在的路径。

@include可以出现在主配置文件的任何位置。

Docker日志输出到fluentd

通过配置fluentd logging driver的方式实现。
该driver发送的log信息包含:

字段 描述
container_id 64字符的container id
container_name container名字
source stdout或stderr
log container的log

全局配置方式

修改/etc/docker/daemon.json,增加如下内容:

{"log-driver": "fluentd","log-opts": {"fluentd-address": "fluentdhost:24224"}
}

然后重启docker daemon使配置生效。

也可以通过添加--log-driver--log-opt参数的方式指定某个container使用fluentd logging driver。如下所示:

docker run --log-driver=fluentd --log-opt fluentd-address=fluentdhost:24224

可以通过在--log-opt后指定tag的方式,确定source的tag。

Docker官网参考链接:https://docs.docker.com/config/containers/logging/fluentd/

配置实例

实例1

采集/root/my.txt文件(内容格式为key value),并发送到http://localhost:9090/

fluentd的配置文件如下:

<source>@type tailpath /root/my.txtpos_file /root/my.txt.postag my<parse>@type regexpexpression /(?<key>\w+)\s(?<value>\w+)/</parse>
</source><match my>@type httpendpoint http://localhost:9090/open_timeout 2http_method post<format>@type json</format><buffer>flush_interval 3s</buffer>
</match>

实例2

提取用户操作记录,打印到fluentd日志。

<source>@type tail# 这里使用HISTFILE环境变量,如果没有设置,使用默认值/root/.bash_historypath "#{ENV["HISTFILE"] || /root/.bash_history}"pos_file /root/.bash_history.postag history<parse>@type none</parse>
</source><filter history>@type record_transformer<record>hostname ${hostname}</record>
</filter><match history>@type stdout
</match>

实例3

收集用户操作记录转发到另一个fluentd节点,同时将数据发送到Kafka和存入HDFS。

数据流为:fluentd采集端 -> fluentd收集端 -> kafka和HDFS

示例用户操作记录数据为:

root pts/1 2020-03-26 10:59 (10.180.206.1):root 2020-03-26 11:00:09 130  tail -f /var/log/command.his.log

采集节点的配置:

<source>@type tailpath /var/log/command.his.logpos_file /var/log/command.his.log.postag history<parse>@type regexp# 使用正则解析日志文件expression /^(?<who_user>\w+)\s(?<pts>\S+)\s(?<who_time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2})\s\((?<remote_ip>\d+\.\d+\.\d+\.\d+)\):(?<user>\w+)\s(?<time>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2})\s(?<res>\d+)\s(?<command>.+)$/time_key time</parse>
</source>
<filter history>@type record_transformer<record># event内容增加hostname这一行hostname ${hostname}</record>
</filter><match history>@type forwardsend_timeout 60srecover_wait 10shard_timeout 60s<buffer># 1秒钟向另一个fluentd节点转发一次flush_interval 1s</buffer><server>name myserver1host 10.180.210.172port 24225weight 60</server>
</match>

fluentd收集节点的配置:

<source>@type forwardport 24225bind 0.0.0.0tag remote
</source><match remote># 使用copy方式,分两路输出@type copy<store>@type kafka2brokers 10.180.210.172:9092use_event_time true<buffer topic>@type filepath /var/log/td-agent/buffer/tdflush_interval 3s</buffer><format>@type json</format>default_topic historyrequired_acks -1</store><store>@type webhdfshost 10.180.210.172port 50070path "/history/access.log.%Y%m%d_%H.#{Socket.gethostname}.log"<buffer>flush_interval 60s</buffer></store>
</match>

本文为原创内容,欢迎大家讨论、批评指正与转载。转载时请注明出处。

作者:AlienPaul
链接:https://www.jianshu.com/p/d60f2f286808
来源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

Fluentd日志采集使用教程相关推荐

  1. 容器日志采集利器Log-Pilot

    容器时代越来越多的传统应用将会逐渐容器化,而日志又是应用的一个关键环节,那么在应用容器化过程中,如何方便快捷高效地来自动发现和采集应用的日志,如何与日志存储系统协同来高效存储和搜索应用日志.本文将主要 ...

  2. 直击痛点,详解 K8s 日志采集最佳实践

    作者 | 元乙 阿里云存储服务技术专家 导读:上一篇文章主要介绍 Kubernetes 日志输出的一些注意事项,日志输出最终的目的还是做统一的采集和分析.在 Kubernetes 中,日志采集和普通虚 ...

  3. Kubernetes入门——Kubernetes日志采集与监控告警

    作者简介: 郭川磊 百度基础架构部研发工程师 负责云原生产品的研发 本文基于百度云原生团队『云原生基础知识概述及实践』系列视频课程--『Kubernetes入门-Kubernetes实现应用高可用』梳 ...

  4. golang 日志分析_容器日志采集利器:Filebeat深度剖析与实践

    在云原生时代和容器化浪潮中,容器的日志采集是一个看起来不起眼却又无法忽视的重要议题.对于容器日志采集我们常用的工具有filebeat和fluentd,两者对比各有优劣,相比基于ruby的fluentd ...

  5. iLogtail使用入门-K8S环境日志采集到SLS

    ​简介:iLogtail是阿里云中简单日志服务又名"SLS"的采集部分. 它用于收集遥测数据,例如日志.跟踪和指标,目前已经正式开源(https://github.com/alib ...

  6. 系列文章:Kubernetes日志采集最佳实践

    前言 上一期主要介绍Kubernetes日志输出的一些注意事项,日志输出最终的目的还是做统一的采集和分析.在Kubernetes中,日志采集和普通虚拟机的方式有很大不同,相对实现难度和部署代价也略大, ...

  7. LC3视角:Kubernetes下日志采集、存储与处理技术实践

    摘要: 在Kubernetes服务化.日志处理实时化以及日志集中式存储趋势下,Kubernetes日志处理上也遇到的新挑战,包括:容器动态采集.大流量性能瓶颈.日志路由管理等问题.本文介绍了" ...

  8. Rancher体系下容器日志采集

    引言 一个完整的容器平台,容器日志都是很重要的一环.尤其在微服务架构大行其道状况下,程序的访问监控健康状态很多都依赖日志信息的收集,由于Docker的存在,让容器平台中的日志收集和传统方式很多不一样, ...

  9. KubeSphere 多行日志采集方案深度探索

    作者:大飞哥,视源电子运维工程师,KubeSphere 用户委员会广州站站长 采集落盘日志 日志采集,通常使用 EFK 架构,即 ElasticSearch,Filebeat,Kibana,这是在主机 ...

  10. 日志采集中的关键技术分析

    概述 日志从最初面向人类演变到现在的面向机器发生了巨大的变化.最初的日志主要的消费者是软件工程师,他们通过读取日志来排查问题,如今,大量机器日夜处理日志数据以生成可读性的报告以此来帮助人类做出决策.在 ...

最新文章

  1. Python学习之路 拓展篇 Pychram的应用
  2. for oracle中pivot_oracle关键字pivot行转列【坑爹的三小时,动脑经真累 】 | 学步园...
  3. SpringBoot2 Redis连接池
  4. 3D游戏引擎中常见的三维场景管理方法
  5. IE发现新的零日攻击漏洞 用户可采取缓解措施
  6. 等式约束凸二次规划(拉格朗日乘子法)_python
  7. 三相全桥整流电路_什么是三相全波整流电路,三相全波整流电路的工作原理是什么,三相全波整流电路电路图...
  8. 2018.12.3比赛题目:电子警察
  9. 【Unity开发小技巧】Unity混音器Mixer控制全局音量
  10. 记一次小白的手游脚本破解过程及难题
  11. C#开发WPF/Silverlight动画及游戏系列教程(Game Tutorial):(二十四) Be careful!前方怪物出没...
  12. 工作展望简短_工作展望简短_时间2017工作展望
  13. 我的世界服务器退出信息,我的世界模仿他人进入/退出服务器
  14. 微信头像失效_微信头像地址失效踩坑记附带解决方案
  15. 100以内加减法练习题,打印到A4纸
  16. 坚持使用Ubuntu
  17. golang+websocket实现
  18. Prometheus 普罗米修斯
  19. 为何面试时都会问你的职业规划呢?该如何回答呢?
  20. NASA通过WebAPP提醒你抬头看看人类连续常驻达12年的太空站

热门文章

  1. 人事管理系统 C语言,人事管理系统----C语言设计
  2. HTTP/HTTPS/SOCKS5协议的区别
  3. c#万能视频播放器(附代码)
  4. 苹果手机Apple ID 忘记密码 ,频繁跳出登录iCloud如何关闭
  5. 安卓和win环境下扫描局域网下设备IP的工具
  6. leadbbs 上ID为Robin·H的东西,有空瞧瞧....
  7. Mac删除声音输出设备
  8. android获取系统剪贴板内容,android系统如何如何恢复剪贴板内容
  9. php侧边客服,利用jquery实现网页侧边栏在线客服代码
  10. c语言病毒编写教程,来来来,教你一个用C语言写个小病毒