Kafka是一个分布式发布—订阅系统,由于其强大的分布式和性能特性,迅速成为数据管道的关键部分。它可完成许多工作,例如消息传递、指标收集、流处理和日志聚合。Kafka的另一个有效用途是将数据导入Hadoop。使用Kafka的关键原因是它将数据生产者和消费者分离,允许拥有多个独立的生产者(可能由不同的开发团队编写)。同样,还有多个独立的消费者(也可能由不同的团队编写)。此外,消费者可以是实时/同步或批量/离线/异步。当对比RabbitMQ等其他pub-sub工具时,后一个属性有很大区别。

要使用Kafka,有一些需要理解的概念:

  • topic—topic是相关消息的订阅源;
  • 分区—每个topic由一个或多个分区组成,这些分区是由日志文件支持的有序消息队列;
  • 生产者和消费者—生产者和消费者将消息写入分区并从分区读取。
  • Brokers—Brokers是管理topic和分区并为生产者和消费者请求提供服务的Kafka流程。

Kafka不保证对topic的“完全”排序,只保证组成topic的各个分区是有序的。消费者应用程序可以根据需要强制执行对“全局”topic排序。

图5.14 显示了Kafka的概念模型

图5.15 显示了如何在Kafka部署分发分区的示例

为了支持容错,可以复制topic,这意味着每个分区可以在不同主机上具有可配置数量的副本。这提供了更高的容错能力,这意味着单个服务器死亡对数据或生产者和消费者的可用性来说不是灾难性的。

此处采用Kafka版本0.8和Camus的0.8.X。

实践:使用Camus将Avro数据从Kafka复制到HDFS

该技巧在已经将数据流入Kafka用于其他目的并且希望将数据置于HDFS中的情况下非常有用。

问题

希望使用Kafka作为数据传递机制来将数据导入HDFS。

解决方案

使用LinkedIn开发的解决方案Camus将Kafka中的数据复制到HDFS。

讨论

Camus是LinkedIn开发的一个开源项目。Kafka在LinkedIn大量部署,而Camus则用作将数据从Kafka复制到HDFS。

开箱即用,Camus支持Kafka中的两种数据格式:JSON和Avro。在这种技术中,我们将通过Camus使用Avro数据。Camus对Avro的内置支持要求Kafka发布者以专有方式编写Avro数据,因此对于这种技术,我们假设希望在Kafka中使用vanilla序列化数据。

让这项技术发挥作用需要完成三个部分的工作:首先要将一些Avro数据写入Kafka,然后编写一个简单的类来帮助Camus反序列化Avro数据,最后运行一个Camus作业来执行数据导入。

为了把Avro记录写入Kafka,在以下代码中,需要通过配置必需的Kafka属性来设置Kafka生成器,从文件加载一些Avro记录,并将它们写出到Kafka:

可以使用以下命令将样本数据加载到名为test的Kafka的topic中:

Kafka控制台使用者可用于验证数据是否已写入Kafka,这会将二进制Avro数据转储到控制台:

完成后,编写一些Camus代码,以便可以在Camus中阅读这些Avro记录。

实践:编写Camus和模式注册表

首先,需要了解三种Camus概念:

  • 解码器—解码器的工作是将从Kafka提取的原始数据转换为Camus格式。
  • 编码器—编码器将解码数据序列化为将存储在HDFS中的格式。
  • Schema注册表—提供有关正在编码的Avro数据的schema信息。

正如前面提到的,Camus支持Avro数据,但确实需要Kafka生产者使用Camus KafkaAvroMessageEncoder类来编写数据,该类为Avro序列化二进制数据添加了部分专有数据,可能是因为Camus中的解码器可以验证它是由该类编写的。

在此示例中,使用 Avro serialization进行序列化,因此需要编写自己的解码器。幸运的是,这很简单:

你可能已经注意到我们在Kafka中写了一个特定的Avro记录,但在Camus中我们将该记录读作通用的Avro记录,而不是特定的Avro记录,这是因为CamusWrapper类仅支持通用Avro记录。否则,特定的Avro记录可以更简单地使用,因为可以使用生成的代码并具有随之而来的所有安全特征。

CamusWrapper对象是从Kafka提取的数据。此类存在的原因是允许将元数据粘贴到envelope中,例如时间戳,服务器名称和服务详细信息。强烈建议使用的任何数据都有一些与每条记录相关的有意义的时间戳(通常这将是创建或生成记录的时间)。然后,可以使用接受时间戳作为参数的CamusWrapper构造函数:

public CamusWrapper(R record, long timestamp) { ... }

如果未设置时间戳,则Camus将在创建包装器时创建新的时间戳。在确定输出记录的HDFS位置时,在Camus中使用此时间戳和其他元数据。

接下来,需要编写一个schema注册表,以便Camus Avro编码器知道正在写入HDFS的Avro记录的schema详细信息。注册架构时,还要指定从中拉出Avro记录的Kafka的topic名称:

运行Camus

Camus在Hadoop集群上作为MapReduce作业运行,希望在该集群中导入Kafka数据。需要向Camus提供一堆属性,可以使用命令行或者使用属性文件来执行此操作,我们将使用此技术的属性文件:

从属性中可以看出,无需明确告诉Camus要导入哪些topic。Camus自动与Kafka通信以发现topic(和分区)以及当前的开始和结束偏移。

如果想要精确控制导入的topic,可以分别使用kafka.whitelist.topics和kafka.blacklist.topics列举白名单(限制topic)和黑名单(排除topic),可以使用逗号作为分隔符指定多个topic,还支持正则表达式,如以下示例所示,其匹配topic的“topic1”或以“abc”开头,后跟一个或多个数字的任何topic,可以使用与value完全相同的语法指定黑名单:

kafka.whitelist.topics=topic1,abc[0-9]+

一旦属性全部设置完毕,就可以运行Camus作业了:

这将导致Avro数据在HDFS中着陆。我们来看看HDFS中的内容:

第一个文件包含已导入的数据,其他供Camus管理。

可以使用AvroDump实用程序查看HDFS中的数据文件:

那么,当Camus工作正在运行时究竟发生了什么? Camus导入过程作为MapReduce作业执行,如图5.16所示。

随着MapReduce中的Camus任务成功,Camus OutputCommitter(允许在任务完成时执行自定义工作的MapReduce构造)以原子方式将任务的数据文件移动到目标目录。OutputCommitter还为任务正在处理的所有分区创建偏移文件,同一作业中的其他任务可能会失败,但这不会影响成功任务的状态——成功任务的数据和偏移输出仍然存在,因此后续的Camus执行将从最后一个已知的成功状态恢复处理。

接下来,让我们看看Camus导入数据的位置以及如何控制行为。

数据分区

之前,我们看到了Camus导入位于Kafka的Avro数据,让我们仔细看看HDFS路径结构,如图5.17所示,看看可以做些什么来确定位置。

图5.17 在HDFS中解析导出数据的Camus输出路径

路径的日期/时间由从CamusWrapper中提取的时间戳确定,可以从MessageDecoder中的Kafka记录中提取时间戳,并将它们提供给CamusWrapper,这将允许按照有意义的日期对数据进行分区,而不是默认值,这只是在MapReduce中读取Kafka记录的时间。

Camus支持可插拔分区程序,允许控制图5.18所示路径的一部分。

图5.18 Camus分区路径

Camus Partitioner接口提供了两种必须实现的方法:

例如,自定义分区程序可创建用于Hive分区的路径。

总结

Camus提供了一个完整的解决方案,可以在HDFS中从Kafka获取数据,并在出现问题时负责维护状态和进行错误处理。通过将其与Azkaban或Oozie集成,可以轻松实现自动化,并根据消息时间组织HDFS数据执行简单的数据管理。值得一提的是,当涉及到ETL时,与Flume相比,它的功能是无懈可击的。

Kafka捆绑了一种将数据导入HDFS的机制。它有一个KafkaETLInputFormat输入格式类,可用于在MapReduce作业中从Kafka提取数据。要求编写MapReduce作业以执行导入,但优点是可以直接在MapReduce流中使用数据,而不是将HDFS用作数据的中间存储。接下来,我们将讨论如何将驻留在Hadoop中的数据传输到其他系统,例如文件系统和其他地方。

如何将kafka中的数据快速导入Hadoop?相关推荐

  1. 查看使用linkedIn Camus 把Kafka中的数据导入HDFS中生成的.deflate文件

    在使用Camus好不容易把kafka中的数据导入了HDFS,但是直接download后打开,显示的会是乱码.经查询,带.deflate后缀的文件是使用DEFLATE算法压缩过的,所以要查看,只需使用h ...

  2. docker导入MySQL文件_Docker容器中Mysql数据的导入/导出详解

    前言 Mysql数据的导入导出我们都知道一个mysqldump命令就能够解决,但如果是运行在docker环境下的mysql呢? 解决办法其实还是用mysqldump命令,但是我们需要进入docker的 ...

  3. wps表格保存html,网页中的数据怎么导入excel表格数据-怎么把网页数据导入到wps表格中...

    如何采集网页中的表格数据到Excel中 1.打开excel表格 2.打开菜单"数据"->入外部数据"->" Web 查询",在" ...

  4. Flink 获取 Kafka 中的数据,分流存储到 Redis、MySQL 中

    文章目录 案例:实时处理电商订单信息 需求一:统计商城实时订单实收金额 需求二:将上面的最后计算的结果,存储到 Redis 中(Key 为:totalprice) Redis Sink 自定义 Red ...

  5. word录入表单数据 java 导入系统,java导入excel | 怎么把excel中的数据批量导入到word中的表格中...

    用javascript怎么实现把excel中的数据批量导入到数据库表中 这个js不能直接实现吧 我们程序用到 先读取excel内容转换成数组 然后放到页面上 再提交表单 储存 MySql如何批量添加数 ...

  6. dataset中的数据批量导入oracle数据库,c#如何将dataset中的数据批量导入oracle数据库...

    c#如何将dataset中的数据批量导入oracle数据库以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧! c#如何将da ...

  7. Spark Streaming从Kafka中获取数据,并进行实时单词统计,统计URL出现的次数

    1.创建Maven项目 创建的过程参考:http://blog.csdn.net/tototuzuoquan/article/details/74571374 2.启动Kafka A:安装kafka集 ...

  8. 使用idea编写SparkStreaming消费kafka中的数据,并对数据进行某一指标的处理【小案例】(五)

    接    使用idea编写SparkStreaming消费kafka中的数据[小案例](四) https://georgedage.blog.csdn.net/article/details/1035 ...

  9. .net excel导入mysql_.NET Core使用NPOI将Excel中的数据批量导入到MySQL - 追逐时光者 - 博客园...

    前言: 在之前的几篇博客中写过.NET Core使用NPOI导出Word和Excel的文章,今天把同样我们日常开发中比较常用的使用Excel导入数据到MySQL数据库中的文章给安排上.与此同时还把NP ...

  10. matlab输入excel高版本,『matlab读取excel指定列』excel中大量数据如何导入matlab当中?超过1000个数据无法一个一个输入...

    如何将excel表格中大量数据导入matlab中并作图 哈哈,选我吧!使用xlsread函数体的语法你在帮助里面搜索xlsread就可以了.我要是现在回答也接翻译帮助文件.xlsread的参数有文件表 ...

最新文章

  1. javascript小数相减会出现一长串的小数位数的原因
  2. Linux 命令 top 学习总结
  3. 记一次 Vue 移动端活动倒计时优化
  4. Counting Triangles
  5. Android下拉刷新效果实现
  6. 边开车边唱K?特斯拉汽车卡拉OK功能即将推出
  7. nginx打开Last_modified
  8. Winrar无广告版下载地址
  9. 测试3.0u盘速度软件,SanDisk USB3.0接口的U盘测试
  10. python贪吃蛇游戏手把手教学 第一课
  11. Imu_heading使用
  12. javax.faces.FacesException: Unable to find CDI BeanManager的解决方法
  13. 各品牌电脑进入BIOS的按键
  14. Hbuilder mui 相册拍照图片上传
  15. 微信小程序实现下载功能(以下载视频为例)
  16. reference pics
  17. Springboot企业资源管理信息系统kvonv计算机毕业设计-课程设计-期末作业-毕设程序代做
  18. EXCEl2013 创建下拉菜单
  19. 坪山体育中心体育馆全景不同高度展示
  20. 通过插入脚注的方式引用参考文献。

热门文章

  1. JMeter设置集合点
  2. 【android】uiselectoer 自动化测试
  3. Python Extension
  4. EMS设置发送连接器和接收连接器邮件大小
  5. 洛谷P3649 [APIO2014]回文串(回文自动机)
  6. gulp-rev-append md5版本号
  7. 纯css控制-表格表头固定,内容多时滚动内容
  8. 数据结构——单人名单
  9. ArcGIS 10 SDE for ORACLE ---迁移 (1)
  10. Java语言基础--集合