概述

kafka是一个开源社区常用的消息队列,虽然kafka官方(Confluent公司)提供插件从Kafka直接导入数据到HDFS的connector,但对阿里云对文件存储系统OSS却没有官方的支持。本文会举一个简单的例子,实现kafka的数据写入阿里云OSS。因为阿里云E-MapReduce服务集成了大量开源组件和阿里云产品的对接工具,所以本文直接在E-MapReduce集群上运行这个例子。

这个例子使用开源的Flume工具作为中转,将kafka和OSS连接起来。Flume开源组件将来也可能出现在E-MapReduce平台上。

场景举例

下面举一个最简单的例子,如果已经有一个线上的Kafka集群,则可以直接跳到第4步。

  1. 在Kafka Home目录下启动Kafka服务进程,配置文件中Zookeeper的地址配置为E-MapReduce自带的服务地址 emr-header-1:2181
    bin/kafka-server-start.sh config/server.properties
  2. 创建一个Kafka的topic,名字为test
    bin/kafka-topics.sh --create --zookeeper emr-header-1:2181 --replication-factor 1 --partitions 1 --topic test
  3. 向Kafka test topic内写入数据,数据内容为本机的性能监控数据
    vmstat 1 | bin/kafka-console-producer.sh --broker-list emr-header-1:9092 --topic test
  4. 在Flume Home目录下配置并启动Flume服务

新建一个配置文件:conf/kafka-example.conf。其中source指定为kafka的对应topic,sink使用HDFS Sinker,并且路径指定为OSS的路径。因为E-MapReduce服务为我们实现了一个高效的OSS FileSystem(兼容Hadoop FileSystem),所以可以直接指定OSS路径,HDFS Sinker自动将数据写入OSS。

# Name the components on this agent
a1.sources = source1
a1.sinks = oss1
a1.channels = c1# Describe/configure the source
a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.source1.zookeeperConnect = emr-header-1:2181
a1.sources.source1.topic = test
a1.sources.source1.groupId = flume
a1.sources.source1.channels = c1
a1.sources.source1.interceptors = i1
a1.sources.source1.interceptors.i1.type = timestamp
a1.sources.source1.kafka.consumer.timeout.ms = 100# Describe the sink
a1.sinks.oss1.type = hdfs
a1.sinks.oss1.hdfs.path = oss://emr-examples/kafka/%{topic}/%y-%m-%d
a1.sinks.oss1.hdfs.rollInterval = 10
a1.sinks.oss1.hdfs.rollSize = 0
a1.sinks.oss1.hdfs.rollCount = 0
a1.sinks.oss1.hdfs.fileType = DataStream# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 10000# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.oss1.channel = c1

启动Flume服务:

bin/flume-ng agent --conf conf --conf-file conf/kafka-example.conf --name a1 -Dflume.root.logger=INFO,console --classpath '/usr/lib/hadoop-current/share/hadoop/tools/lib/*'

从日志中可以看到Flume HDFS sinker将数据写到了OSS,并且是每10秒钟轮转一次。

2016-12-05 18:41:04,794 (hdfs-oss1-call-runner-1) [INFO - org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:618)] Renaming oss://emr-perform/kafka/test/16-12-05/Flume
Data.1480934454657.tmp to oss://emr-perform/kafka/test/16-12-05/FlumeData.1480934454657
2016-12-05 18:41:04,852 (hdfs-oss1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:382)] Writer callback called.

备注:如果遇到如下 Exception,是因为Flume自带的httpclient jar包和EMR冲突:

Exception in thread "SinkRunner-PollingRunner-DefaultSinkProcessor" java.lang.NoSuchFieldError: INSTANCE

通过删除Flume自带的 httpclient jar 包可以避免冲突(统一使用EMR Hadoop带的httpclient):
rm ~/apache-flume-1.8.0-bin/lib/httpclient-4.2.1.jar

查看OSS上的结果

$ hadoop fs -ls oss://emr-examples/kafka/test/16-12-05/
Found 6 items
-rw-rw-rw-   1     162691 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934394566
-rw-rw-rw-   1        925 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934407580
-rw-rw-rw-   1       1170 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934418597
-rw-rw-rw-   1       1092 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934430613
-rw-rw-rw-   1       1254 2016-12-05 18:40 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638
-rw-rw-rw-   1        588 2016-12-05 18:41 oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934454657$ hadoop fs -cat oss://emr-examples/kafka/test/16-12-05/FlumeData.1480934443638 0  0      0 1911216  50036 1343828    0    0     0     0 1341 2396  1  1 98  0  00  0      0 1896964  50052 1343824    0    0     0   112 1982 2511 15  1 84  0  01  0      0 1896552  50052 1343828    0    0     0    76 2314 3329  3  4 94  0  0
procs -----------memory---------- ---swap-- -----io---- --system-- -----cpu-----r  b   swpd   free   buff  cache   si   so    bi    bo   in   cs us sy id wa st5  0      0 1903016  50052 1343828    0    0     0     0 2277 3249  2  4 94  0  00  0      0 1902892  50052 1343828    0    0     0     0 1417 2366  5  0 95  0  00  0      0 1902892  50052 1343828    0    0     0     0 1072 2243  0  0 99  0  00  0      0 1902892  50068 1343824    0    0     0   144 1275 2283  1  0 99  0  01  0      0 1903024  50068 1343828    0    0     0    24 1099 2071  1  1 99  0  00  0      0 1903272  50068 1343832    0    0     0     0 1294 2238  1  1 99  0  01  0      0 1903412  50068 1343832    0    0     0     0 1024 2094  1  0 99  0  02  0      0 1903148  50076 1343836    0    0     0    68 1879 2766  1  1 98  0  01  0      0 1903288  50092 1343840    0    0     0    92 1147 2240  1  0 99  0  00  0      0 1902792  50092 1343844    0    0     0    28 1456 2388  1  1 98  0  0

参考资料

  1. http://kafka.apache.org/quickstart
  2. https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html

使用E-MapReduce服务将Kafka数据导入OSS相关推荐

  1. Greenplum【部署 04】GPSS扩展安装并使用GPKafka实现Kafka数据导入Greenplum数据库(安装包网盘分享)

    链接:https://pan.baidu.com/s/1MO-qL0Pxe6PojfZKsw3_qA 提取码:o7fl Greenplum Stream Server (GPSS)是一个ETL(提取. ...

  2. Greenplum【环境搭建 04】使用GPKafka实现Kafka数据导入Greenplum数据库(扩展安装文件网盘分享)

    分享资源地址及文件列表: 链接:https://pan.baidu.com/s/1XVTxKLkOYrL4pCZpFfs-Tg 提取码:sq90 包含文件: # 命令执行 gpkafka # 扩展安装 ...

  3. Kafka数据导入导出

    用命令行方式读写 Kafka 玩玩还行,真正投入实际使用时,往往需要把数据导入到 Kafka 或导出 Kafka 数据到本地文件中.这里我们学习一下怎么实现这个功能.   先创建一个本地源数据文件: ...

  4. GPkafka-Kafka数据导入GreenPlum实践

    背景 Kafka是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是流数据处理中间件的事实标准.当通过Kafka和greenplum搭建流处理管道时,如何高速可靠的完成流数据加载,成为用 ...

  5. 华为的大数据平台—MapReduce服务

    内容: 大数据相关知识,和目前主流的解决方案 MapReduce服务 如何使用 文章整理自:https://edu.huaweicloud.com/courses 大数据的开源解决方案:Hadoop ...

  6. 02_clickhouse安装,卸载,启动/关闭服务,交互式命令(数据库创建,数据导入,查询),批模式数据导入,MySQL接口操作ClickHouse,DBeaver可视化工具安装与使用(学习笔记)

    1 ClickHouse安装 安装文件清单 clickhouse-client-${version}.noarch.rpm clickhouse-common-static-dbg-${version ...

  7. 使用MapReduce将HDFS数据导入Mysql

    使用MapReduce将Mysql数据导入HDFS代码链接 将HDFS数据导入Mysql,代码示例 package com.zhen.mysqlToHDFS;import java.io.DataIn ...

  8. ClickHouse数据导入(Flink、Spark、Kafka、MySQL、Hive)

    本文分享主要是ClickHouse的数据导入方式,本文主要介绍如何使用Flink.Spark.Kafka.MySQL.Hive将数据导入ClickHouse,具体内容包括: 使用Flink导入数据 使 ...

  9. Doris系列之导入Kafka数据操作

    Doris系列 注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Sp ...

最新文章

  1. 使用python hashlib模块给明文字符串加密,以及如何撞库破解密码
  2. AI理论知识整理(18)-内积与范数
  3. linux find命令使用示例
  4. CNN结构:序列预测复合DNN结构-AcGANs、 ENN误差编码网络
  5. 2015-03-18 - Deliberately pass a wrong note type for my task creation
  6. acwing——每日一题——总结
  7. jenkins启动/重启/停止命令 改端口
  8. 机器学习基础(六十)—— 凸优化
  9. Linux 系统 nohup 命令详解
  10. linux开启远程ssh服务器配置,配置Linux服务器SSH远程密钥登录
  11. 防线 2020/3/31
  12. 微信小程序闭环处理 App -- 小程序 -- 企业微信 + 公众号
  13. 二极管的三种击穿形式
  14. 打印机只能扫描图片,不能扫描成PDF解决办法
  15. 6轮面试辛苦拿到阿里Android开发offer,却从22k降到15k,在逗我
  16. SPFA+寻路(行路难,洛谷2832)
  17. 银行存款利率C语言程序设计,存款利息的计算 有1000元,想存5年,可按以下5种办法存:...
  18. c语言储存字母,有两个磁盘文件A和B,各存放一行字母-有两个磁盘文件A和B,各存放一行字母C语言实现-吾爱编程网...
  19. 关于嵌入式系统的应用分析
  20. Mac M1快速配置开发环境

热门文章

  1. LeetCode 42. Trapping Rain Water--算法题--c++解法
  2. Reids报错解决:Job for redis-server.service failed because the control process exited with error code.
  3. 解决selenium报错--unknown error: DevToolsActivePort file doesn‘t exist
  4. oracle 定时器时间分区_oracle分区表按时间自动创建
  5. java8为什么用不了_为什么不建议使用Date,而是使用Java8新的时间和日期API?
  6. MySQL中exists与in的使用
  7. class viewController has no initializers解决办法
  8. jQuery校验 jQuery Validate 表单验证详解
  9. matlab定子磁链观测器,一种基于二阶广义积分器的永磁同步电机定子磁链观测方法...
  10. 兼容性自动化测试 | HUAWEI DevEco Studio云测服务等您来体验