1、source为http模式,sink为logger模式,将数据在控制台打印出来。
conf配置文件如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = http #该设置表示接收通过http方式发送过来的数据
a1.sources.r1.bind = hadoop-master #运行flume的主机或IP地址都可以
a1.sources.r1.port = 9000#端口
#a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = logger#该设置表示将数据在控制台打印出来
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume命令为:
bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。
显示如下的信息表示启动flume成功。
895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
打开另外一个终端,通过http post的方式发送数据:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。
hadoop-master就是flume配置文件绑定的主机名,9000就是绑定的端口。
然后在运行flume的窗口就是看到如下的内容:
2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }
2、source为netcat(udp、tcp模式),sink为logger模式,将数据打印在控制台
conf配置文件如下:
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master#绑定的主机名或IP地址
a1.sources.r1.port = 44444
a1.sinks.k1.type = logger
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transcationCapacity = 100
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
启动flume
bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。
然后在另外一个终端,使用telnet发送数据:
命令为:telnet hadoop-maser 44444
[root@hadoop-master ~]# telnet hadoop-master 44444
Trying 192.168.194.6...
Connected to hadoop-master.
Escape character is '^]'.
显示上面的信息表示连接flume成功,然后输入:
12213213213
OK
12321313
OK
在flume就会收到相应的信息:
2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }
2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }
3、source为netcat/http模式,sink为hdfs模式,将数据存储在hdfs中。
conf配置文件如下,文件名为hdfs.conf:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =regex_filter
a1.sources.r1.interceptors.i1.regex =^[0-9]*$
a1.sources.r1.interceptors.i1.excludeEvents =true
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系统中存放的位置
a1.sinks.k1.hdfs.filePrefix = events- #文件的前缀
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,这个设置是以text的格式存放从flume传输过来的数据。
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在hdfs文件系统中创建文件存放的路径:
hadoop fs -mkdir /flume/event1。
启动flume:
bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
通过telnet模式向flume中发送文件:
telnet hadoop-master 44444
然后输入:
aaaaaaaa
bbbbbbb
ccccccccc
dddddddddd
通过如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:
-rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070
-rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556
-rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557
-rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215
-rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216
-rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217
通过hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的内容:
aaaaaaaaaaaaaaaaa
bbbbbbbbbbbbbbbb
ccccccccccccccccccc
dddddddddddddddd
eeeeeeeeeeeeeeeeeee
fffffffffffffffffffffff
gggggggggggggggggg
hhhhhhhhhhhhhhhhhhhhhhh
iiiiiiiiiiiiiiiiiii
jjjjjjjjjjjjjjjjjjj
http模式就是把hdfs.conf文件中的netcat改为http,然后传输文件从telnet改为:
curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。
在hadoop文件中就会看到上面命令传输的内容:badou flume。
4、source为netcat/http模式,sink为hive模式,将数据存储在hive中,并分区存储。
conf配置如下,文件名为hive.conf:
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = hadoop-master
a1.sources.r1.port = 44444
# Describe the sink
#a1.sinks.k1.type = logger
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083
a1.sinks.k1.hive.database=default#hive数据库名
a1.sinks.k1.hive.table=flume_user1
a1.sinks.k1.serializer=DELIMITED
a1.sinks.k1.hive.partition=3#如果以netcat模式,只能静态设置分区的值,因为netcat模式传输数据,无法传输某个字段的值,只能按照顺序来。这里设置age的分区值为3。
#a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能动态设置分区的值,因为http模式可以动态传输age的值。
a1.sinks.k1.serializer.delimiter=" "
a1.sinks.k1.serializer.serderSeparator=' '
a1.sinks.k1.serializer.fieldnames=user_id,user_name
a1.sinks.k1.hive.txnsPerBatchAsk = 10
a1.sinks.k1.hive.batchSize = 1500
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
在hive中创建表:
create table flume_user(
user_id int
,user_name string
)
partitioned by(age int)
clustered by (user_id) into 2 buckets
stored as orc
在hive-site.xml中添加如下内容:
<property>
<name>javax.jdo.option.ConnectionPassword</name>
<value>hive</value>
<description>password to use against metastore database</description>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.exec.dynamic.partition.mode</name>
<value>nonstrict</value>
</property>
<property>
<name>hive.txn.manager</name>
<value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
</property>
<property>
<name>hive.compactor.initiator.on</name>
<value>true</value>
</property>
<property>
<name>hive.compactor.worker.threads</name>
<value>1</value>
</property>
将hive根目录下的/hcatalog/share/hcatalog文件夹中的如下三个文件夹添加到flume的lib目录下。
运行flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
重新打开一个窗口,
启动metastroe服务:
hive --service metastore &
重新打开一个客户端,通过telnet连接到flume
telnet hadoop-master 44444
然后输入:
1 1
3 3
就会在hive中看到如下两行数据:
flume_user1.user_id flume_user1.user_name flume_user1.age
1 1 3
3 3 3
age是在hive.conf中设置的值3。
现在将flume的source换成http模式,然后hive分区通过参数模式动态的传输分区值。
将hive.conf中的
a1.sources.r1.type = netcat改成a1.sources.r1.type = http
a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。
然后启动flume:
bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
在重新打开的窗口中通过http的模式传输数据到flume
curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。
在hive中可以看到如下的数据:
flume_user1.user_id flume_user1.user_name flume_user1.age
11 ligongong 109
由此可以看出通过http模式传输数据到hive中时,分区字段的信息是在header中传输,而其他字段的信息是放在bady中传输,并且不同列之间以hive.conf文件定义好的分隔符分隔。
5、使用avro模式,将数据在控制台打印出来。
不同的agent之间传输数据只能通过avro模式。
这里我们需要两台服务器来演示avro的使用,两台服务器分别是hadoop-master和hadoop-slave2
hadoop-master中运行agent2,然后指定agent2的sink为avro,并且将数据发送的主机名设置为hadoop-slave2。hadoop-master中flume的conf文件设置如下,名字为push.conf:
#Name the components on this agent
a2.sources= r1
a2.sinks= k1
a2.channels= c1
#Describe/configure the source
a2.sources.r1.type= netcat
a2.sources.r1.bind= hadoop-master
a2.sources.r1.port = 44444
a2.sources.r1.channels= c1
#Use a channel which buffers events in memory
a2.channels.c1.type=www.thd178.com memory
a2.channels.c1.keep-alive= 10
a2.channels.c1.capacity= 100000
a2.channels.c1.transactionCapacity= 100000
#Describe/configure the source
a2.sinks.k1.type= avro#制定sink为avro
a2.sinks.k1.channel= c1
a2.sinks.k1.hostname= hadoop-slave2#指定sink要发送数据到的目的服务器名
a2.sinks.k1.port= 44444#目的服务器的端口
hadoop-slave2中运行的是agent1,agent1的source为avro。flume配置内容如下,文件名为pull.conf
#Name the components on this agent
a1.sources= r1
a1.sinks= k1
a1.channels= c1
#Describe/configure the source
a1.sources.r1.type= avro
a1.sources.r1.channels= c1
a1.sources.r1.bind= hadoop-slave2
a1.sources.r1.port= 44444
#Describe the sink
a1.sinks.k1.type= logger
a1.sinks.k1.channel = c1
#Use a channel which buffers events in memory
a1.channels.c1.type= memory
a1.channels.c1.keep-alive= 10
a1.channels.c1.capacity= 100000
a1.channels.c1.transactionCapacity= 100000。
现在hadoop-slave2中启动flume,然后在hadoop-master中启动flume,顺序一定要对,否则会报如下的错误:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address
在hadoop-slave2中启动flume:
bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
在hadoop-master中启动flume:
bin/flume-ng agent -c conf -f www.dongfan178.com conf/push.conf -n a2 -Dflume.root.logger=INFO,console
重新打开一个窗口,通过telnet连接到hadoop-master
telnet hadoop-master 44444
然后发送11111aaaa
在hadoop-slave2的控制台中就会显示之前发送的,11111aaaa,如下所示:
2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) www.mhylpt.com INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }
6、通过flume将数据通传输到kafka,然后通过kafka将数据存储在hdfs和hive中。
首先要配置kafka。配置kafka请参考:https://blog.csdn.net/zxy987872674/article/details/72466504
在分别在hadoop-master、hadoop-slave1、hadoop-slave2上启动zookeeper。
命令为:
然后启动kafka,进入kafka的安装目录,执行命令:
./bin/kafka-server-start.sh config/server.properties &
在kafka中创建topic:
bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka
查看kafka中的topic:
bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181
启动kafka的消费者:
./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka
配置flume中conf文件,设置source类型为exec,sink为org.apache.flume.sink.kafka.KafkaSink,设置kafka的topic为上面创建的flume_kafka,具体配置如下:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#设置sources的类型为exec,就是执行命令的意思
a1.sources.r1.type = exec
#设置sources要执行的命令
a1.sources.r1.command = tail -f www.feifanyule188.cn/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt
# 设置kafka接收器
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
# 设置kafka的broker地址和端口号
a1.sinks.k1.brokerList=hadoop-master:9092
# 设置Kafka的topic
a1.sinks.k1.topic=flume_www.feifanyule188.cn kafka
# 设置序列化的方式
a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
# use a channel which buffers events in memory
a1.channels.c1.type=memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 1000
# Bind the source and sink to the channel
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
启动flume:
只要/home/hadoop/flumeHomeWork/flumeCode/ www.leyou2.net  flume_exec_test.txt中有数据时flume就会加载kafka中,然后被上面启动的kafka消费者消费掉。
我们查看发现/home/hadoop/flumeHomeWork/flumeCode/flume_exec_www.taohuayuan178.com test.txt文件中有如下的数据:
131,dry pasta
131,dry pasta
132,beauty
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
133,muscles joints pain relief
134,specialty wines champagnes
134,specialty wines champagnes
134,specialty wines champagnes
而在消费者窗口这也是显示上面的内容。

flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结相关推荐

  1. 将CSV的数据发送到kafka(java版)

    为什么将CSV的数据发到kafka flink做流式计算时,选用kafka消息作为数据源是常用手段,因此在学习和开发flink过程中,也会将数据集文件中的记录发送到kafka,来模拟不间断数据: 整个 ...

  2. flume avro java 发送数据_flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结...

    1.source为http模式,sink为logger模式,将数据在控制台打印出来. conf配置文件如下: # Name the components on this agent a1.source ...

  3. 3.2.3 Sqoop 数据迁移工具, 导入数据import, MySQL到HDFS/Hive, 导出数据export,增量数据导入, Sqoop job,常用命令及参数

    目录 数据迁移工具 -- Sqoop 第一部分 Sqoop概述 第二部分 安装配置 第三部分 应用案例 第 1 节 导入数据import MySQL 到 HDFS MySQL 到 Hive 第 2 节 ...

  4. kafka异步发送数据_在Kafka上异步发送数据

    kafka异步发送数据 对于一个项目,我试图记录用户的基本交易,例如添加和删除一个项目以及多种类型的项目,并为每笔交易向kafka发送一条消息. 日志机制的准确性不是至关重要的,在kafka服务器停机 ...

  5. java调用kafka接口发送数据_Java调用Kafka生产者,消费者Api及相关配置说明

    本次的记录内容包括: 1.Java调用生产者APi流程 2.Kafka生产者Api的使用及说明 3.Kafka消费者Api的使用及说明 4.Kafka消费者自动提交Offset和手动提交Offset ...

  6. Docker安装部署MySQL+Canal+Kafka+Camus+HIVE数据实时同步

    因为公司业务需求要将mysql的数据实时同步到hive中,在网上找到一套可用的方案,即MySQL+Canal+Kafka+Camus+HIVE的数据流通方式,因为是首次搭建,所以暂时使用伪分布式的搭建 ...

  7. 【大数据入门核心技术-Flume】(四)使用Flume采集数据到Hive

    [大数据入门核心技术-Kafka](七)Ka 录 一.准备工作 1.Hadoop环境安装 2.Flume安装部署 二.采集数据到HDFS 1.配置任务文件 2.启动传输 3.查看是否同步成功 三.常见 ...

  8. 通过Flume简单实现Kafka与Hive对接(Json格式)

    将以下存储在kafka的topic中的JSON格式字符串,对接存储到Hive的表中 {"id":1,"name":"小李"} {" ...

  9. flume 一对多hdfs_10PB 规模的 HDFS 数据在 eBay 的迁移实战

    导读 INTRODUCTION Hadoop分布式文件系统(HDFS)是指被设计成适合运行在通用硬件上的分布式文件系统(Distributed File System).本文将介绍eBay ADI H ...

最新文章

  1. 《编写可维护的JavaScript》——1.7 直接量
  2. Connect(); // 2015 简要整理
  3. python官方网站地址-哪里能找到 Python 视频教程地址?
  4. 数据结构源码笔记(C语言):置换-选择算法
  5. Oracle数据表和Constraint管理
  6. 套接字错误处理函数的封装思想及函数实现
  7. Linux基础知识(1)
  8. 在路上,继续就好了。。。。
  9. 15行Python代码,帮你理解令牌桶算法
  10. oracle导出客户机使用us7a,导出已复制的文件系统 - Oracle® ZFS Storage Appliance 管理指南,发行版 OS8.6.0...
  11. JAVA CLASS混淆工具:ProGuard简单试用
  12. 软件测试及标准(基于ISO/IEC/IEEE 29119系列)
  13. 自己对ajax的第一次上手被人说菜死还真是菜
  14. Kth Largest Element
  15. debug5x 微信_微信X5内核webview调试
  16. JPA实现领域驱动设计(DDD) 中值对象的持久化
  17. 你的私密照片可能正被“合法”观看
  18. goback history 传递参数_goback 返回上一页触发刷新 / 回调传参
  19. 虚拟机在线迁移Vmotion
  20. fpu测试_解毒盖世G600散热器,3900X超频测试能不能压住?

热门文章

  1. cisco 密码重置
  2. 仿新浪微盾客户端项目简介四
  3. J2EE 重载跟覆盖的概念以及区别
  4. Android之Handler
  5. 微软推出Azure区块链开发套件,重点解决两大难题
  6. Linux的LAMP
  7. 创建外网 ext_net - 每天5分钟玩转 OpenStack(104)
  8. Centos下oracle11g R2的启动与关闭监听、数据库
  9. ACE网络编程思考(二)
  10. 软件过程评估和软件能力评价之间的差异