Flume之生产正确的使用方式二(Multiple Agent+ Multiple Channel+Custom)
1.多Agent使用概述
上一篇文章我们详细介绍了单Agent的生产演进过程,但是生产上是需要从多台机器上采集数据的,故更多的是多Agent的串联和并联组合使用。如下图串联的Agent
- 串联的Agent一定是采用Avro Sink和 Avro Source方式进行数据传输
2.两个串联Agent实现数据采集到控制台
Agent的结构:source -> channel -> sink -> source -> channel -> sink
Agent的选择:exec memory avro avro memory logger
###exec-avro-agent.conf文件###
exec-avro-agent.sources = exec-source
exec-avro-agent.channels = memory-channel
exec-avro-agent.sinks = avro-sinkexec-avro-agent.sources.exec-source.type = exec
exec-avro-agent.sources.exec-source.command = tail -F /home/hadoop/data/flume/multiple/chuanlian/input/avro_access.dataexec-avro-agent.channels.memory-channel.type = memoryexec-avro-agent.sinks.avro-sink.type = avro
exec-avro-agent.sinks.avro-sink.hostname = localhost
exec-avro-agent.sinks.avro-sink.port = 44444exec-avro-agent.sources.exec-source.channels = memory-channel
exec-avro-agent.sinks.avro-sink.channel = memory-channel###avro-logger-agent.conf文件###
avro-logger-agent.sources = avro-source
avro-logger-agent.channels = memory-channel
avro-logger-agent.sinks = logger-sinkavro-logger-agent.sources.avro-source.type = avro
avro-logger-agent.sources.avro-source.bind = localhost
avro-logger-agent.sources.avro-source.port = 44444avro-logger-agent.channels.memory-channel.type = memoryavro-logger-agent.sinks.logger-sink.type = loggeravro-logger-agent.sources.avro-source.channels = memory-channel
avro-logger-agent.sinks.logger-sink.channel = memory-channel###先启动 avro-logger agent ####
flume-ng agent \
--name avro-logger-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/avro-logger-agent.conf \
-Dflume.root.logger=INFO,console###再启动 exec-avro agent ####
flume-ng agent \
--name exec-avro-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/exec-avro-agent.conf \
-Dflume.root.logger=INFO,console
注意
1) Avro source 以及 Avro sinlk的绑定的端口和地址要一致
2) 要先启动Avro soruce的Agent再启动Avro sink的Agent,关闭发的顺序反过来
3.Multiplexing the flow(多路传输流)
- Multiplexing Channel Selector :多路Channel选择器,是将根据自定义的选择器规则,将数据发送到指定Channel上。
- Replicating Channel Selector :多副本Channel选择器,每个Channel数据是一样的。
Multiplexing the flow即单Source多Channel、Sink,在生产中我们可能有这样的需求,采集到的数据一份存储HDFS(离线)一份存储Kafka(实时),这时我们就需要采用如下图的组合方式。
3.1(重要) Replicating Channel Selector实现将数据采集到hdfs以及控制台
Agent的结构:
Source --》Chianel--》Sink--》Chianel--》Sink
Agent的选择:
netcat--》memory--》hdfs--》memory--》logger
配置文件:
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444a1.sources.r1.selector.type = replicatinga1.channels.c1.type = memory
a1.channels.c2.type = memorya1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hadoop001:9000/flume/multipleFlow/%Y%m%d%H%M
a1.sinks.k1.hdfs.useLocalTimeStamp=true
a1.sinks.k1.hdfs.filePrefix = wsktest-
a1.sinks.k1.hdfs.rollInterval = 30
a1.sinks.k1.hdfs.rollSize = 100000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.fileType=DataStream
a1.sinks.k1.hdfs.writeFormat=Texta1.sinks.k2.type = loggera1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
启动命令;
flume-ng agent \
--name a1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/multiple-channel-agent.conf \
-Dflume.root.logger=INFO,console
注意
踩坑,官网的Replicating Channel Selector的配置为a1.source.r1.selector.type = replicating 这是错误的配置source少了s
3.2 Multiplexing Channel Selector实现将数据采集到不同的Channel然后移动到hdfs
它实现了将生产上不同业务的数据使用统一的Flume进行数据收集
- 官网Multiplexing Channel Selector配置
- 官网Static Interceptor配置
如下图:实现将不同state(业务类型)的数据发送到不同的channel
实现思路:
- 第一步:首先配置三个Agent分别抽取US、CN、CA的业务数据,每个Agent都需要添加静态拦截器,添加state=v信息
- 第二步:配置聚合的Agent,该Agent用于收集之前三个Agent的数据,并根据Event的head的State的vule值选择具体的Channel,不同的Channel将数据分别写到不同的hdfs上
4.(重要)Consolidation并联Agent采集数据到HDFS
生产上客户的服务器可能有成千上万台,而大数据集群是我们自己内部,首先客户业务集群和咱们的大数据集群网络肯定是不通的,其次上万台同时实时写hdfs,那hdfs网络开销太大了。实际是先通过先写到具有大数据集群getway权限的一个或多个落盘机器上,然后由该落盘机器将数据写到hdfs上。此流程实现也是非常的简单的,在 2.两个串联Agent实现数据采集到控制台基础上并联复制avro sink Agent到每台机器即可,avro source Agent的sink要写成hdfs。
注意:为了保证 sink端挂了如hdfs无法使用时channael不会被撑爆以及重启后数据丢失,channel最好选择为File
5.(了解)Sink Processors(sink group)
上面提到的都是一个sink只能消费一个channel,若那个唯一的sink异常了,那么channel就会阻塞,同时无法移动数据,故Flume提出了Sink group(Processors)概念,一个组内的所有sink可以消费一个channel,生产上一般processor.type采用failover,即向组内优先级高的sink发送数据,若挂了就向次优先级高的sink发送数据。
6.(重要)Custom Source
官网提供的souce以及sink有时并不能满足我们的需求,此时我们就需要custom source以及sink 了。可参考如下文档
- 官网custom-source配置文档
- 官网custom-source开发文档
- 官网custom-sink配置文档
- 官网custom-sink开发文档
完全idea要添加Flume core依赖:
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.7.0</version>
</dependency>
去github上搜custom的source以及sink借鉴学习:
- github上自定义的DB Source
7.(重要)生产中的Flume架构
- 第一层Agent:
Taildir Source -> File Channel -> AVRO Sink
- 第二层Agent:
AVRO Source -> File Channel -> HDFS-> File Channel -> Kafka
Flume之生产正确的使用方式二(Multiple Agent+ Multiple Channel+Custom)相关推荐
- Flume之生产正确的使用方式一(Singel Agent)
1.什么Flume? Flume是一个分布式的.高可靠的.高可用的将大批量的不同数据源的日志数据收集.聚合.移动到数据中心(hdfs)进行存储的系统.即是日志采集和汇总的工具 Logstash.Fil ...
- 劳保防护用品正确的打开方式,保障职业安全(二)
安全防护用品又称劳保用品,是指在施工作业过程中能够对作业人员的人身起保护作用使作业人员减轻或免遭各种人身伤害和职业伤害的用品.上一篇与大家讲解了3种劳保防护用品使用注意事项.今天百华小编继续与大家介绍 ...
- 你真的佩戴好劳保防护用品了吗?这才是正确的打开方式
对于劳保市场而言任何时候都是采购旺季,许多企业一年四季都需要做好储备的劳保物资.劳保用品是保护工作人员在生产过程中的人身安全所必备的一种防御性装备,是作业人员减轻或免受于各种人身伤害和职业伤害的用品. ...
- 如何才是有效的、正确的工作方式?
如何才是有效的.正确的工作方式? 高效人士善于结果驱动.他们未必动作更快,而是更善于判断,什么事情对结果是最有效的,然后有计划的工作,用尽一切办法保证结果达成,而不是死守着原有的工作任务. 而低效人士 ...
- Flume 1.7 源码分析(二)整体架构
Flume 1.7 源码分析(一)源码编译 Flume 1.7 源码分析(二)整体架构 Flume 1.7 源码分析(三)程序入口 Flume 1.7 源码分析(四)从Source写数据到Channe ...
- BYOD提升企业生产力的5种方式
自带设备(BYOD)是近年来管理圈流行的一种技术趋势.然而,很多管理者依旧选择忽略它或者贬低它的重要性.BYOD是一个允许员工使用自己私人设备(笔记本,智能手机,平板电脑)访问公司网络的政策. 基于云 ...
- 请打开正确的提问方式
请打开正确的提问方式 许多项目在他们的使用协助/说明网页中链接了本指南,这么做很好,我们也鼓励大家都这么做.但如果你是负责管理这个项目网页的人,请在超链接附近的显著位置上注明: 本指南不提供此项目的实 ...
- 商务电子邮箱用什么邮箱好?商务邮件正确的打开方式
随着电子邮件的普及,电子邮箱的使用成为了商务人士.外贸人的"必需品".很多商务人士及出差在外的工作人员为了提高自身职业素养,都需要选择一款好用的商务邮箱.下面TOM小编给大家分析一 ...
- SpringBoot中使用AMQ的两种方式二(Java配置、注解方式)
使用@JmsListener注解方式 1. 工程目录 2. 引入依赖 <?xml version="1.0" encoding="UTF-8"?> ...
- 【mongoDB】mongoDB的正确的连接方式
一.单机版 # systemctl status mongod # cat /etc/mongod.conf 未开启认证 在admin库创建数据库超管用户 # echo -e "use ad ...
最新文章
- 2022-2028年中国辉石行业市场全景调查及发展前景分析报告
- fetch git pull 切换_每天提交 Git 太烦?直接用 Python 就好了!
- Opengl-基本概念-着色器(都是固定的)
- 区块链BaaS云服务(28)TOP Network 之全分片主链(Layer-1)
- Android 权限问题
- 前端学习(2907):Vite的特点
- 第512章 河系量子计算机,第512章 河系量子计算机
- 解决 No module named ‘tensorflow.examples.tutorials‘
- Openpyxl:读取/写入Excel文件的Python模块
- 写给海布里之王—亨利
- choerodon-ui/pro入门 - dataset 的使用
- 晶振(crystal)与晶振(oscillator)的区别
- 权威发布丨2022 中国开源先锋 33 人之心尖上的开源人物
- ConcurrentHashMap中有十个提升性能的细节,你都知道吗?
- 计算机毕业设计php+vue基于微信小程序的房屋租赁小程序
- 学shell和python哪个难_shell与python的优劣对比到底用哪个
- 如何用C语言汉字编码输出汉字,【C语言学习】C语言汉字编码。。。C语言中汉字的输入...
- Ceph 学习——OSD读写流程与源码分析(一)
- oracle实现剪刀石头布,C#使用Unity实现剪刀石头布游戏
- 教育数据集——豆豆云助教