Flume之——配置多个Sink源(一个Source对应多个Channel和Sink)
转载请注明出处:https://blog.csdn.net/l1028386804/article/details/98055100
配置模型如下图:
Flume的配置如下:
myagent.sources = r1
myagent.sinks = k1 k2
myagent.channels = c1 c2
myagent.sources.r1.selector.type = replicating
# 订阅/配置Source源
myagent.sources.r1.type = http
myagent.sources.r1.port = 5140
myagent.sources.r1.handler = org.apache.flume.source.http.JSONHandlermyagent.sources.r1.channels = c1 c2# 设置Memmory Channel
myagent.sinks.k1.channel = c1
myagent.channels.c1.type = memory
myagent.channels.c1.capacity = 1000
myagent.channels.c1.transactionCapacity = 100myagent.sinks.k2.channel = c2
myagent.channels.c2.type = memory
myagent.channels.c2.capacity = 1000
myagent.channels.c2.transactionCapacity = 100#####发送到Kafka####
# 订阅k1 Sink
myagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.k1.topic = test
myagent.sinks.k1.brokerList = binghe100:9092
myagent.sinks.k1.requiredAcks = 1
myagent.sinks.k1.batchSize = 20#####发送到HDFS#####
# 订阅Sink
myagent.sinks.k2.type = hdfs
myagent.sinks.k2.hdfs.path = hdfs://binghe100:9000/flume/%Y%m%d
myagent.sinks.k2.hdfs.filePrefix = log_%H_%M
myagent.sinks.k2.hdfs.fileSuffix = .log
myagent.sinks.k2.hdfs.useLocalTimeStamp = true
myagent.sinks.k2.hdfs.writeFormat = Text
myagent.sinks.k2.hdfs.fileType = DataStream
#### 一个小时保存一次
myagent.sinks.k2.hdfs.round = true
myagent.sinks.k2.hdfs.roundValue = 1
myagent.sinks.k2.hdfs.roundUnit = hour
#### 文件达到1M写新文件
myagent.sinks.k2.hdfs.rollInterval = 0
myagent.sinks.k2.hdfs.rollSize=1048576
myagent.sinks.k2.hdfs.rollCount=0myagent.sinks.k2.hdfs.batchSize = 100
myagent.sinks.k2.hdfs.threadsPoolSize = 10
myagent.sinks.k2.hdfs.idleTimeout = 0
myagent.sinks.k2.hdfs.minBlockReplicas = 1
配置说明如下图所示:
直接监听Nginx日志变化的配置如下:
myagent.sources = s1
myagent.sinks = k1 k2
myagent.channels = c1 c2
myagent.sources.s1.selector.type = replicating
# 订阅/配置Source源
myagent.sources.s1.batchsize=10
myagent.sources.s1.type = exec
myagent.sources.s1.command = tail -F /usr/local/nginx-1.17.2/logs/access.logmyagent.sources.s1.channels = c1 c2# 设置Memmory Channel
myagent.sinks.k1.channel = c1
myagent.channels.c1.type = memory
myagent.channels.c1.capacity = 1000
myagent.channels.c1.transactionCapacity = 100myagent.sinks.k2.channel = c2
myagent.channels.c2.type = memory
myagent.channels.c2.capacity = 1000
myagent.channels.c2.transactionCapacity = 100#####发送到Kafka####
# 订阅k1 Sink
myagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.k1.topic = test
myagent.sinks.k1.brokerList = binghe100:9092
myagent.sinks.k1.requiredAcks = 1
myagent.sinks.k1.batchSize = 20#####发送到HDFS#####
# 订阅Sink
myagent.sinks.k2.type = hdfs
myagent.sinks.k2.hdfs.path = hdfs://binghe100:9000/flume/%Y%m%d
myagent.sinks.k2.hdfs.filePrefix = log_%H_%M
myagent.sinks.k2.hdfs.fileSuffix = .log
myagent.sinks.k2.hdfs.useLocalTimeStamp = true
myagent.sinks.k2.hdfs.writeFormat = Text
myagent.sinks.k2.hdfs.fileType = DataStream
#### 一个小时保存一次
myagent.sinks.k2.hdfs.round = true
myagent.sinks.k2.hdfs.roundValue = 1
myagent.sinks.k2.hdfs.roundUnit = hour
#### 文件达到1M写新文件
myagent.sinks.k2.hdfs.rollInterval = 0
myagent.sinks.k2.hdfs.rollSize=1048576
myagent.sinks.k2.hdfs.rollCount=0myagent.sinks.k2.hdfs.batchSize = 100
myagent.sinks.k2.hdfs.threadsPoolSize = 10
myagent.sinks.k2.hdfs.idleTimeout = 0
myagent.sinks.k2.hdfs.minBlockReplicas = 1
注意:flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用,可以将minBlockReplicas设置为1。如下设置项:
myagent.sinks.k2.hdfs.minBlockReplicas = 1
参考文章链接如下:
1.Flume中的HDFS Sink配置参数说明:
http://lxw1234.com/archives/2015/10/527.htm
2.Flume(NG)架构设计要点及配置实践
http://shiyanjun.cn/archives/915.html
3.Flume NG 简介及配置实战
https://yq.aliyun.com/articles/50487
4.flume官网
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink
Flume之——配置多个Sink源(一个Source对应多个Channel和Sink)相关推荐
- Flume 1.7 源码分析(四)从Source写数据到Channel
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- 大数据——Flume组件Source、Channel和Sink具体使用
Flume组件Source.Channel和Sink使用说明 Flume Sources Avro Source 配置范例 Thrift Source 配置范例 Exec Source 配置范例 JM ...
- flume 写入文件服务器,Flume环境配置以及基本操作
flume的作用是从接受外界的日志信息,然后输出到本地的一个框架. agent是Flume很重要的组成,包括有source,channel,sink. source是从外部接受日志. channel跟 ...
- Flume篇---Flume安装配置与相关使用
一.前述 Copy过来一段介绍Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制.flume具有高可用, ...
- 大数据——Flume安装配置和使用
Flume安装配置和使用 Flume安装和配置 测试 监控端口数据 监控文件信息 监控文件夹中的新文件 监控文件写入HDFS中 监控文件上传到HDFS并去除首条字段信息(拦截器) 内置拦截器 自定义拦 ...
- redhat配置centos的yum源
redhat默认不支持第三方的yum源,要求有rhn账号,下面就rhel5和rhel6改为centos源 一.rhel5.5 X86_64更改为centos的yum源 查看系统自带的yum相关的rpm ...
- 【转载翻译】Debian配置Networking 和 apt-get 源信息 开启root远程登录权限
1 简介 初始安装完debian 7.7.0时,需要首先配置网络及apt-get源,才能正常使用. 2 debian配置 2.1 debian 7.7.0配置网络及apt-get源 2.1.1 ...
- 在Nutz中如何配置多个数据库源,并且带事务控制
在Nutz中如何配置多个数据库源,并且带事务控制 发布于 560天前 作者 Longitude 995 次浏览 复制 上一个帖子 下一个帖子 标签: 无 在Nutz中如何配置多个数据库源, ...
- 什么是Mybatis配置解析?(源码+图文)
什么是Mybatis配置解析?(源码+图文) 1. 核心配置文件 mybatis-config.xml configuration(配置)properties(属性)settings(设置)typeA ...
最新文章
- OPENGFILER存储柜
- 如果redis哨兵宕机了怎么办_Spring集成Redis做缓存,Redis宕机时Spring处理的问题
- python tcp协议加代理_python实现简单的TCP代理服务器
- Python处理小学体育中的跑步计时数据并统计得分
- Linux内核第六节 20135332武西垚
- SAP License:2021年如何做一个被人喜欢的SAP顾问?
- c++矩阵转置_python3 单行代码实现矩阵相乘
- fscokopen 中执行超时 使用stream_set_timeout设置超时
- poj 3278 Catch That Cow (简单的bfs)
- R爬虫可视化第五季-图解欧洲足球五大联赛
- Python3 递归算法
- c语言标准流程图,c语言设计流程图!设计流程图
- android 表情包下载,表情包制作大师下载
- 开发者的拯救者还是掘墓人?解密低代码开发平台
- C语言训练-1522-对称矩阵的判定
- 网络学习---HTTPS的升级
- 全新小旋风万能蜘蛛池9.02开心版/站长必备SEO+带教程
- 手把手看如何制作本地yun源
- Artificial Intelligence Computer Vision ML and DL
- 美女主持直播,被突发意外打断!湾区网友却高喊: 我懂!超甜
热门文章
- Python学习笔记Day 3
- [转载备用]极酷SevenColorPlayer网页播放器(炫彩广告版),最强播放器定制
- html.tex 下拉框,winform ComboBox 下拉框 显示图片效果 附完整源码
- 有限差分法matlab两点边值代码,两点边值问题的有限差分法.doc
- 2014年中南大学研究生复试机试题(字符串、基础dp、最短路)
- 学生个人网页制作html
- 国产免费数据仓库ETL调度自动化运维专家—TASKCTL
- Android Audio AudioHardwareALSA::openOutputStream函数
- 2023年,如何自学通过PMP?(含pmp资料)
- dota2游戏c语言,新手科普:Dota2操作按键设置和游戏设置详解