Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务。这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据。这里有两个方法。

Python API:Flume现在还不支持PythonAPI

方法1:Flume风格的推方法

Flume被设计用来在Flume代理之间推送数据。在这种方法中,Spark Streaming本质上设置了一个接收器作为Flume的一个Avro代理,Flume把数据推送到接收器上。下面是配置的步骤。

一般需求

在你的集群中选择一台机器满足如下条件:

1.      当你的Flume+Spark Streaming程序启动之后,Spark节点之一必须运行在那台机器上。

2.      Flume可以配置为推送数据到那台机器的端口上。

根据推模型,流程序需要启动,同时接收器按计划运行并监听选择的端口,以让Flume能够推送数据。

配置Flume

配置Flume代理来发送数据到一个Avro Sink,需要在配置文件中加入如下的内容。

agent.sinks = avroSink
agent.sinks.avroSink.type = avro
agent.sinks.avroSink.channel = memoryChannel
agent.sinks.avroSink.hostname = <chosen machine's hostname>
agent.sinks.avroSink.port = <chosen port on the machine>

查看Flume文档来获取更多关于配置Flume代理的信息。

配置Spark Streaming程序

1.连接:在你的SBT/Maven项目定义中,通过下面的内容连接你的流程序(在主编程指南中的连接章节寻找更多的信息)。

groupId = org.apache.spark
artifactId = spark-streaming-flume_2.10
version = 1.4.1

2.编程:在流处理程序的代码中,引入FlumeUtils并如下创建一个输入DStream流。

Scala

importorg.apache.spark.streaming.flume._
val flumeStream = FlumeUtils.createStream(streamingContext, [chosenmachine's hostname], [chosen port])

Java

import org.apache.spark.streaming.flume.*;
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =FlumeUtils.createStream(streamingContext, [chosen machine'shostname], [chosen port]);

查看API文档和示例。

注意,这里的主机名应该和集群中的资源管理器使用的主机名相同(Mesos,YARN或Spark Standalone),这样资源分配可以匹配名字并在正确的机器上启动接收器。

3.部署:将spark-streaming-flume_2.10包和它的依赖(除了由spark-submit提供的spark-core_2.10和spark-streaming_2.10)添加到程序包中。然后使用spark-submit来启动你的应用程序(在主编程指南中查看部署章节)。

方法2:使用自定义Sink的拉方式

不是Flume直接推送数据到SparkStreaming,这种方法运行了一个如下所示的Flume Sink。

1.      Flume将数据推送到Sink中,然后数据在此处缓存。

2.      Spark Streaming使用一个可靠的Flume接收器和操作从Sink中拉取数据。只有在Spark Streaming接收到数据并且把数据复制后才认为操作成功。

这个方法比前面的方法提供了强大的可靠性和容错保证。然而,这需要配置Flume运行一个自定义Sink。下面是配置步骤。

一般需求

选择一台在Flume代理中运行自定义Sink的机器。Flume其余的管道被配置为向那个代理发送数据。Spark集群中的机器都能连接到运行自定义Sink的那台机器上。

配置Flume

在选定的机器上配置Flume需要如下的两步。

1.Sink JAR包:添加如下的JAR包到要运行自定义Sink的机器中的Flume的classpath中(查看Flume的文档https://flume.apache.org/documentation.html)。

(i)自定义Sink JAR包:下载与下面内容一致的JAR包(或直接下载的地址https://repo1.maven.org/maven2/org/apache/spark/spark-streaming-flume-sink_2.10/1.4.1/spark-streaming-flume-sink_2.10-1.4.1.jar)

        groupId= org.apache.sparkartifactId =spark-streaming-flume-sink_2.10version = 1.4.1

(ii)Scala库JAR包:下载Scala库2.10.4版本JAR包。它可以用下面的内容找到(或直接在这里下载https://repo1.maven.org/maven2/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar)

        groupId= org.scala-langartifactId = scala-libraryversion = 2.10.4

(iii)CommonsLang3 JAR包:下载Commons Lang 3 JAR包。它可以用下面的内容找到(或者直接下载https://repo1.maven.org/maven2/org/apache/commons/commons-lang3/3.3.2/commons-lang3-3.3.2.jar)

        groupId= org.apache.commonsartifactId = commons-lang3version = 3.3.2

2.配置文件:在那台机器上,通过下面的配置文件配置Flume代理发送数据到一个Avro Sink中。

 agent.sinks = sparkagent.sinks.spark.type =org.apache.spark.streaming.flume.sink.SparkSinkagent.sinks.spark.hostname = <hostname ofthe local machine>agent.sinks.spark.port = <port to listen onfor connection from Spark>agent.sinks.spark.channel = memoryChannel

也要确保上行流的Flume管道配置了发送数据到运行这个Sink的Flume代理。

查看Flume文档寻找更多关于配置Flume代理的信息。

配置Spark Streaming程序

1.      连接:在你的SBT/Maven项目定义中,通过下面的内容连接你的流程序(在主编程指南中的连接章节寻找更多的信息http://spark.apache.org/docs/latest/streaming-programming-guide.html#linking)

2.      编程:在流处理程序的代码中,引入FlumeUtils并如下创建一个输入DStream流。

Scala

importorg.apache.spark.streaming.flume._val flumeStream =FlumeUtils.createPollingStream(streamingContext, [sink machine hostname], [sinkport])

Java

importorg.apache.spark.streaming.flume.*;JavaReceiverInputDStream<SparkFlumeEvent>flumeStream=FlumeUtils.createPollingStream(streamingContext, [sink machinehostname], [sink port]);

查看Scala示例https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples/streaming/FlumePollingEventCount.scala

注意每个输入DStream可以配置为从多个Sink中接收数据。

3.      部署:将spark-streaming-flume_2.10包和它的依赖(除了由spark-submit提供的spark-core_2.10和spark-streaming_2.10)添加到程序包中。然后使用spark-submit来启动你的应用程序(在主编程指南中查看部署章节http://spark.apache.org/docs/latest/streaming-programming-guide.html#deploying-applications)。

Spark Streaming和Flume集成指南V1.4.1相关推荐

  1. 分享一下spark streaming与flume集成的scala代码。

    文章来自:http://www.cnblogs.com/hark0623/p/4172462.html 转发请注明 object LogicHandle {def main(args: Array[S ...

  2. Spark Streaming整合flume实战

    Spark Streaming对接Flume有两种方式 Poll:Spark Streaming从flume 中拉取数据 Push:Flume将消息Push推给Spark Streaming 1.安装 ...

  3. spark kafka java api_java实现spark streaming与kafka集成进行流式计算

    java实现spark streaming与kafka集成进行流式计算 2017/6/26补充:接手了搜索系统,这半年有了很多新的心得,懒改这篇粗鄙之文,大家看综合看这篇新博文来理解下面的粗鄙代码吧, ...

  4. Spark Streaming 编程新手入门指南

    Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理.可以从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中 ...

  5. Flume 开发者指南V1.5.2

    介绍 概述 Apache Flume是一个用来从很多不同的源有效地收集,聚集和移动大量的日志数据到一个中心数据仓库的分布式的,可靠的和可用的系统. Apache Flume是Apache软件基金会的顶 ...

  6. SPARK STREAMING之1:编程指南(翻译v1.4.1)

    SPARK STREAMING之1:编程指南(翻译v1.4.1) @(SPARK)[spark, 大数据] SPARK STREAMING之1编程指南翻译v141 概述 快速入门例子 基本概念 Lin ...

  7. [Spark]Spark Streaming 指南四 输入DStreams和Receivers

    1. 输入DStream与Receiver 输入DStreams表示从源中获取输入数据流的DStreams.在指南一示例中,lines表示输入DStream,它代表从netcat服务器获取的数据流.每 ...

  8. DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

    前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...

  9. Spark Streaming实时数据分析

    1.Spark Streaming功能介绍 1)定义 Spark Streaming is an extension of the core Spark API that enables scalab ...

最新文章

  1. python3 字符串操作总结
  2. springboot与分布式(zookeeper+dubbo)
  3. 【手把手教你Maven】构建过程
  4. 「递归」第5集 | 从网瘾少年到极客大神:没有什么是一段代码解决不了的
  5. dump分析工具_Java应用CPU过高,如何排查?参考解决思路和常用工具总结
  6. U3D的有限状态机系统
  7. 电脑的服务器操作系统是什么,电脑的服务器操作系统是什么
  8. hmcl启动器java下载_我的世界hmcl启动器
  9. 第四章 爬取西刺免费代理ip 并应用到scrapy
  10. case when的使用方法
  11. Transformer 权重共享
  12. 移动建站工具(一):分秒钟将Web网站移动化
  13. 一种通用的Qt数据库接口操作方法
  14. 经纬度计算两地之间的距离(原理与方法)
  15. 机器学习基本模型与算法在线实验闯关
  16. udacity深度学习--2. 深度学习简介--LESSON5 Jupyter notebook
  17. typename和class
  18. Visual Studio 2012 代码块注释快捷键和格式化快捷键
  19. 第四十九回 七星坛诸葛祭风  三江口周瑜纵火
  20. 关于win7 出现两个本地连接不能上网的问题?

热门文章

  1. Standard C++ Episode 10
  2. yum 安装包的用法
  3. php大文件下載,使用apache/nginx x-sendFile模塊替換
  4. W3c 中文 文档,很不错
  5. 数据结构—链表-单链表基本操作实现
  6. php三个表格,phpspreadsheet-excel工作表中有多个“格式为表”的表
  7. java 获取远程文件_java获取远程文件
  8. dxf转g代码_恶意代码分析系列几种常用技术(2)
  9. SQL优化:你真的知道国家字符集的性能影响吗?
  10. 解读 SSDB、LevelDB 和 RocksDB 到 GaussDB(for Redis) 的迁移