使用E-MapReduce服务将Kafka数据导入OSS
概述
kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服务集成了大量开源组件和阿里云产品的对接工具,所以本文直接在E-MapReduce集群上运行这个例子。
这个例子使用开源的Flume工具作为中转,将kafka和OSS连接起来。Flume开源组件将来也可能出现在E-MapReduce平台上。
场景举例
下面举一个最简单的例子,如果已经有一个线上的Kafka集群,则可以直接跳到第4步。
- 在Kafka Home目录下启动Kafka服务进程,配置文件中Zookeeper的地址配置为E-MapReduce自带的服务地址 emr-header-1:2181
bin/kafka-server-start.sh config/server.properties
- 创建一个Kafka的topic,名字为test
bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 1 --partitions 1 --topic test
- 向Kafka test topic内写入数据,数据内容为本机的性能监控数据
vmstat 1 | bin/kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
- 在Flume Home目录下配置并启动Flume服务
新建一个配置文件:conf/kafka-example.conf。其中source指定为kafka的对应topic,sink使用HDFS Sinker,并且路径指定为OSS的路径。因为E-MapReduce服务为我们实现了一个高效的OSS FileSystem(兼容Hadoop FileSystem),所以可以直接指定OSS路径,HDFS Sinker自动将数据写入OSS。
# Name the components on this agent
a1.sources = source1
a1.sinks = oss1
a1.channels = c1# Describe/configure the source
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.zookeeperConnect = emr-header-1:2181
a1.sources.source1.topic = test
a1.sources.source1.groupId = flume
a1.sources.source1.channels = c1
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.kafka.consumer.timeout.ms = 100# Describe the sink
a1.sinks.oss1.type = hdfs
a1.sinks.oss1.hdfs.path = oss://emr-examples/kafka/%{topic}/%y-%m-%d
a1.sinks.oss1.hdfs.rollInterval = 10
a1.sinks.oss1.hdfs.rollSize = 0
a1.sinks.oss1.hdfs.rollCount = 0
a1.sinks.oss1.hdfs.fileType = DataStream# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.oss1.channel = c1
启动Flume服务:
bin/flume-ng agent --conf conf --conf-file conf/kafka-example.conf --name a1 -Dflume.root.logger=INFO,console --classpath '/usr/lib/hadoop-current/share/hadoop/tools/lib/*'
从日志中可以看到Flume HDFS sinker将数据写到了OSS,并且是每10秒钟轮转一次。
2016-12-05 18:41:04,794 (hdfs-oss1-call-runner-1) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming oss://emr-perform/kafka/test/16-12-05/Flume
Data.1480934454657.tmp to oss://emr-perform/kafka/test/16-12-05/FlumeData.1480934454657
2016-12-05 18:41:04,852 (hdfs-oss1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.
备注:如果遇到如下 Exception,是因为Flume自带的httpclient jar包和EMR冲突:
Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchFieldError: INSTANCE
通过删除Flume自带的 httpclient jar 包可以避免冲突(统一使用EMR Hadoop带的httpclient):
rm ~/apache-flume-1.8.0-bin/lib/httpclient-4.2.1.jar
查看OSS上的结果
$ hadoop fs -ls oss://emr-examples/kafka/test/16-12-05/
Found 6 items
-rw-rw-rw- 1 162691 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934394566
-rw-rw-rw- 1 925 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934407580
-rw-rw-rw- 1 1170 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934418597
-rw-rw-rw- 1 1092 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934430613
-rw-rw-rw- 1 1254 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638
-rw-rw-rw- 1 588 2016-12-05 18:41 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934454657$ hadoop fs -cat oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638 0 0 0 1911216 50036 1343828 0 0 0 0 1341 2396 1 1 98 0 00 0 0 1896964 50052 1343824 0 0 0 112 1982 2511 15 1 84 0 01 0 0 1896552 50052 1343828 0 0 0 76 2314 3329 3 4 94 0 0
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----r b swpd free buff cache si so bi bo in cs us sy id wa st5 0 0 1903016 50052 1343828 0 0 0 0 2277 3249 2 4 94 0 00 0 0 1902892 50052 1343828 0 0 0 0 1417 2366 5 0 95 0 00 0 0 1902892 50052 1343828 0 0 0 0 1072 2243 0 0 99 0 00 0 0 1902892 50068 1343824 0 0 0 144 1275 2283 1 0 99 0 01 0 0 1903024 50068 1343828 0 0 0 24 1099 2071 1 1 99 0 00 0 0 1903272 50068 1343832 0 0 0 0 1294 2238 1 1 99 0 01 0 0 1903412 50068 1343832 0 0 0 0 1024 2094 1 0 99 0 02 0 0 1903148 50076 1343836 0 0 0 68 1879 2766 1 1 98 0 01 0 0 1903288 50092 1343840 0 0 0 92 1147 2240 1 0 99 0 00 0 0 1902792 50092 1343844 0 0 0 28 1456 2388 1 1 98 0 0
参考资料
- http://kafka.apache.org/quickstart
- https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html
使用E-MapReduce服务将Kafka数据导入OSS相关推荐
- Greenplum【部署 04】GPSS扩展安装并使用GPKafka实现Kafka数据导入Greenplum数据库(安装包网盘分享)
链接:https://pan.baidu.com/s/1MO-qL0Pxe6PojfZKsw3_qA 提取码:o7fl Greenplum Stream Server (GPSS)是一个ETL(提取. ...
- Greenplum【环境搭建 04】使用GPKafka实现Kafka数据导入Greenplum数据库(扩展安装文件网盘分享)
分享资源地址及文件列表: 链接:https://pan.baidu.com/s/1XVTxKLkOYrL4pCZpFfs-Tg 提取码:sq90 包含文件: # 命令执行 gpkafka # 扩展安装 ...
- Kafka数据导入导出
用命令行方式读写 Kafka 玩玩还行,真正投入实际使用时,往往需要把数据导入到 Kafka 或导出 Kafka 数据到本地文件中.这里我们学习一下怎么实现这个功能. 先创建一个本地源数据文件: ...
- GPkafka-Kafka数据导入GreenPlum实践
背景 Kafka是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是流数据处理中间件的事实标准.当通过Kafka和greenplum搭建流处理管道时,如何高速可靠的完成流数据加载,成为用 ...
- 华为的大数据平台—MapReduce服务
内容: 大数据相关知识,和目前主流的解决方案 MapReduce服务 如何使用 文章整理自:https://edu.huaweicloud.com/courses 大数据的开源解决方案:Hadoop ...
- 02_clickhouse安装,卸载,启动/关闭服务,交互式命令(数据库创建,数据导入,查询),批模式数据导入,MySQL接口操作ClickHouse,DBeaver可视化工具安装与使用(学习笔记)
1 ClickHouse安装 安装文件清单 clickhouse-client-${version}.noarch.rpm clickhouse-common-static-dbg-${version ...
- 使用MapReduce将HDFS数据导入Mysql
使用MapReduce将Mysql数据导入HDFS代码链接 将HDFS数据导入Mysql,代码示例 package com.zhen.mysqlToHDFS;import java.io.DataIn ...
- ClickHouse数据导入(Flink、Spark、Kafka、MySQL、Hive)
本文分享主要是ClickHouse的数据导入方式,本文主要介绍如何使用Flink.Spark.Kafka.MySQL.Hive将数据导入ClickHouse,具体内容包括: 使用Flink导入数据 使 ...
- Doris系列之导入Kafka数据操作
Doris系列 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Sp ...
最新文章
- 使用python hashlib模块给明文字符串加密,以及如何撞库破解密码
- AI理论知识整理(18)-内积与范数
- linux find命令使用示例
- CNN结构:序列预测复合DNN结构-AcGANs、 ENN误差编码网络
- 2015-03-18 - Deliberately pass a wrong note type for my task creation
- acwing——每日一题——总结
- jenkins启动/重启/停止命令 改端口
- 机器学习基础(六十)—— 凸优化
- Linux 系统 nohup 命令详解
- linux开启远程ssh服务器配置,配置Linux服务器SSH远程密钥登录
- 防线 2020/3/31
- 微信小程序闭环处理 App -- 小程序 -- 企业微信 + 公众号
- 二极管的三种击穿形式
- 打印机只能扫描图片,不能扫描成PDF解决办法
- 6轮面试辛苦拿到阿里Android开发offer,却从22k降到15k,在逗我
- SPFA+寻路(行路难,洛谷2832)
- 银行存款利率C语言程序设计,存款利息的计算 有1000元,想存5年,可按以下5种办法存:...
- c语言储存字母,有两个磁盘文件A和B,各存放一行字母-有两个磁盘文件A和B,各存放一行字母C语言实现-吾爱编程网...
- 关于嵌入式系统的应用分析
- Mac M1快速配置开发环境
热门文章
- LeetCode 42. Trapping Rain Water--算法题--c++解法
- Reids报错解决:Job for redis-server.service failed because the control process exited with error code.
- 解决selenium报错--unknown error: DevToolsActivePort file doesn‘t exist
- oracle 定时器时间分区_oracle分区表按时间自动创建
- java8为什么用不了_为什么不建议使用Date,而是使用Java8新的时间和日期API?
- MySQL中exists与in的使用
- class viewController has no initializers解决办法
- jQuery校验 jQuery Validate 表单验证详解
- matlab定子磁链观测器,一种基于二阶广义积分器的永磁同步电机定子磁链观测方法...
- 兼容性自动化测试 | HUAWEI DevEco Studio云测服务等您来体验