转载请注明出处:https://blog.csdn.net/l1028386804/article/details/98055100

配置模型如下图:

Flume的配置如下:

myagent.sources = r1
myagent.sinks = k1 k2
myagent.channels = c1 c2
myagent.sources.r1.selector.type = replicating
# 订阅/配置Source源
myagent.sources.r1.type = http
myagent.sources.r1.port = 5140
myagent.sources.r1.handler = org.apache.flume.source.http.JSONHandlermyagent.sources.r1.channels = c1 c2# 设置Memmory Channel
myagent.sinks.k1.channel = c1
myagent.channels.c1.type = memory
myagent.channels.c1.capacity = 1000
myagent.channels.c1.transactionCapacity = 100myagent.sinks.k2.channel = c2
myagent.channels.c2.type = memory
myagent.channels.c2.capacity = 1000
myagent.channels.c2.transactionCapacity = 100#####发送到Kafka####
# 订阅k1 Sink
myagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.k1.topic = test
myagent.sinks.k1.brokerList = binghe100:9092
myagent.sinks.k1.requiredAcks = 1
myagent.sinks.k1.batchSize = 20#####发送到HDFS#####
# 订阅Sink
myagent.sinks.k2.type = hdfs
myagent.sinks.k2.hdfs.path = hdfs://binghe100:9000/flume/%Y%m%d
myagent.sinks.k2.hdfs.filePrefix = log_%H_%M
myagent.sinks.k2.hdfs.fileSuffix = .log
myagent.sinks.k2.hdfs.useLocalTimeStamp = true
myagent.sinks.k2.hdfs.writeFormat = Text
myagent.sinks.k2.hdfs.fileType = DataStream
#### 一个小时保存一次
myagent.sinks.k2.hdfs.round = true
myagent.sinks.k2.hdfs.roundValue = 1
myagent.sinks.k2.hdfs.roundUnit = hour
#### 文件达到1M写新文件
myagent.sinks.k2.hdfs.rollInterval = 0
myagent.sinks.k2.hdfs.rollSize=1048576
myagent.sinks.k2.hdfs.rollCount=0myagent.sinks.k2.hdfs.batchSize = 100
myagent.sinks.k2.hdfs.threadsPoolSize = 10
myagent.sinks.k2.hdfs.idleTimeout = 0
myagent.sinks.k2.hdfs.minBlockReplicas = 1

配置说明如下图所示:

直接监听Nginx日志变化的配置如下:

myagent.sources = s1
myagent.sinks = k1 k2
myagent.channels = c1 c2
myagent.sources.s1.selector.type = replicating
# 订阅/配置Source源
myagent.sources.s1.batchsize=10
myagent.sources.s1.type = exec
myagent.sources.s1.command = tail -F /usr/local/nginx-1.17.2/logs/access.logmyagent.sources.s1.channels = c1 c2# 设置Memmory Channel
myagent.sinks.k1.channel = c1
myagent.channels.c1.type = memory
myagent.channels.c1.capacity = 1000
myagent.channels.c1.transactionCapacity = 100myagent.sinks.k2.channel = c2
myagent.channels.c2.type = memory
myagent.channels.c2.capacity = 1000
myagent.channels.c2.transactionCapacity = 100#####发送到Kafka####
# 订阅k1 Sink
myagent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
myagent.sinks.k1.topic = test
myagent.sinks.k1.brokerList = binghe100:9092
myagent.sinks.k1.requiredAcks = 1
myagent.sinks.k1.batchSize = 20#####发送到HDFS#####
# 订阅Sink
myagent.sinks.k2.type = hdfs
myagent.sinks.k2.hdfs.path = hdfs://binghe100:9000/flume/%Y%m%d
myagent.sinks.k2.hdfs.filePrefix = log_%H_%M
myagent.sinks.k2.hdfs.fileSuffix = .log
myagent.sinks.k2.hdfs.useLocalTimeStamp = true
myagent.sinks.k2.hdfs.writeFormat = Text
myagent.sinks.k2.hdfs.fileType = DataStream
#### 一个小时保存一次
myagent.sinks.k2.hdfs.round = true
myagent.sinks.k2.hdfs.roundValue = 1
myagent.sinks.k2.hdfs.roundUnit = hour
#### 文件达到1M写新文件
myagent.sinks.k2.hdfs.rollInterval = 0
myagent.sinks.k2.hdfs.rollSize=1048576
myagent.sinks.k2.hdfs.rollCount=0myagent.sinks.k2.hdfs.batchSize = 100
myagent.sinks.k2.hdfs.threadsPoolSize = 10
myagent.sinks.k2.hdfs.idleTimeout = 0
myagent.sinks.k2.hdfs.minBlockReplicas = 1

注意:flume中sink到hdfs,文件系统频繁产生文件,文件滚动配置不起作用,可以将minBlockReplicas设置为1。如下设置项:

myagent.sinks.k2.hdfs.minBlockReplicas = 1

参考文章链接如下:

1.Flume中的HDFS Sink配置参数说明:
http://lxw1234.com/archives/2015/10/527.htm
2.Flume(NG)架构设计要点及配置实践
http://shiyanjun.cn/archives/915.html
3.Flume NG 简介及配置实战
https://yq.aliyun.com/articles/50487
4.flume官网
http://flume.apache.org/FlumeUserGuide.html#hdfs-sink

Flume之——配置多个Sink源(一个Source对应多个Channel和Sink)相关推荐

  1. Flume 1.7 源码分析(四)从Source写数据到Channel

    Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...

  2. 大数据——Flume组件Source、Channel和Sink具体使用

    Flume组件Source.Channel和Sink使用说明 Flume Sources Avro Source 配置范例 Thrift Source 配置范例 Exec Source 配置范例 JM ...

  3. flume 写入文件服务器,Flume环境配置以及基本操作

    flume的作用是从接受外界的日志信息,然后输出到本地的一个框架. agent是Flume很重要的组成,包括有source,channel,sink. source是从外部接受日志. channel跟 ...

  4. Flume篇---Flume安装配置与相关使用

    一.前述 Copy过来一段介绍Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务,或者数集中机制.flume具有高可用, ...

  5. 大数据——Flume安装配置和使用

    Flume安装配置和使用 Flume安装和配置 测试 监控端口数据 监控文件信息 监控文件夹中的新文件 监控文件写入HDFS中 监控文件上传到HDFS并去除首条字段信息(拦截器) 内置拦截器 自定义拦 ...

  6. redhat配置centos的yum源

    redhat默认不支持第三方的yum源,要求有rhn账号,下面就rhel5和rhel6改为centos源 一.rhel5.5 X86_64更改为centos的yum源 查看系统自带的yum相关的rpm ...

  7. 【转载翻译】Debian配置Networking 和 apt-get 源信息 开启root远程登录权限

    1 简介 初始安装完debian 7.7.0时,需要首先配置网络及apt-get源,才能正常使用. 2 debian配置 2.1 debian 7.7.0配置网络及apt-get源     2.1.1 ...

  8. 在Nutz中如何配置多个数据库源,并且带事务控制

    在Nutz中如何配置多个数据库源,并且带事务控制 发布于 560天前  作者 Longitude  995 次浏览  复制  上一个帖子  下一个帖子  标签: 无 在Nutz中如何配置多个数据库源, ...

  9. 什么是Mybatis配置解析?(源码+图文)

    什么是Mybatis配置解析?(源码+图文) 1. 核心配置文件 mybatis-config.xml configuration(配置)properties(属性)settings(设置)typeA ...

最新文章

  1. OPENGFILER存储柜
  2. 如果redis哨兵宕机了怎么办_Spring集成Redis做缓存,Redis宕机时Spring处理的问题
  3. python tcp协议加代理_python实现简单的TCP代理服务器
  4. Python处理小学体育中的跑步计时数据并统计得分
  5. Linux内核第六节 20135332武西垚
  6. SAP License:2021年如何做一个被人喜欢的SAP顾问?
  7. c++矩阵转置_python3 单行代码实现矩阵相乘
  8. fscokopen 中执行超时 使用stream_set_timeout设置超时
  9. poj 3278 Catch That Cow (简单的bfs)
  10. R爬虫可视化第五季-图解欧洲足球五大联赛
  11. Python3 递归算法
  12. c语言标准流程图,c语言设计流程图!设计流程图
  13. android 表情包下载,表情包制作大师下载
  14. 开发者的拯救者还是掘墓人?解密低代码开发平台
  15. C语言训练-1522-对称矩阵的判定
  16. 网络学习---HTTPS的升级
  17. 全新小旋风万能蜘蛛池9.02开心版/站长必备SEO+带教程
  18. 手把手看如何制作本地yun源
  19. Artificial Intelligence Computer Vision ML and DL
  20. 美女主持直播,被突发意外打断!湾区网友却高喊: 我懂!超甜

热门文章

  1. Python学习笔记Day 3
  2. [转载备用]极酷SevenColorPlayer网页播放器(炫彩广告版),最强播放器定制
  3. html.tex 下拉框,winform ComboBox 下拉框 显示图片效果 附完整源码
  4. 有限差分法matlab两点边值代码,两点边值问题的有限差分法.doc
  5. 2014年中南大学研究生复试机试题(字符串、基础dp、最短路)
  6. 学生个人网页制作html
  7. 国产免费数据仓库ETL调度自动化运维专家—TASKCTL
  8. Android Audio AudioHardwareALSA::openOutputStream函数
  9. 2023年,如何自学通过PMP?(含pmp资料)
  10. dota2游戏c语言,新手科普:Dota2操作按键设置和游戏设置详解