Spark Streaming对接Flume有两种方式

  • Poll:Spark Streaming从flume 中拉取数据
  • Push:Flume将消息Push推给Spark Streaming

1、安装flume1.6以上

2、下载依赖包

spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下

3、生成数据

服务器上的 /root/data目录下准备数据文件data.txt

vi data.txthadoop spark hive spark
hadoop sqoop flume redis flume hadoop
solr kafka solr hadoop

4、配置采集方案

vi flume-poll.confa1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=hdp-node-01
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000   

5、添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>2.0.2</version>
</dependency>

6、代码实现

package cn.cheng.spark
import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume 拉模式Poll*/
object SparkStreaming_Flume_Poll {//newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1//runningCount 历史的所有相同key的value总和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf参数val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")//构建sparkContext对象val sc: SparkContext = new SparkContext(sparkConf)//构建StreamingContext对象,每个批处理的时间间隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//设置checkpointscc.checkpoint("./")//设置flume的地址,可以设置多台val address=Seq(new InetSocketAddress("192.168.200.160",8888))// 从flume中拉取数据val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)//获取flume中数据,数据存在event的body中,转化为Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//实现单词汇总val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}}

7、启动flume

flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console

8、启动spark-streaming应用程序

9、查看结果

flume将消息Push推给Spark Streaming

1、配置采集方案

vi flume-push.conf#push mode
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=172.16.43.63
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000     

注意配置文件中指明的hostname和port是spark应用程序所在服务器的ip地址和端口。

2、代码实现

package cn.cheng.sparkimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume  推模式Push*/
object SparkStreaming_Flume_Push {//newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1//runningCount 历史的所有相同key的value总和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf参数val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")//构建sparkContext对象val sc: SparkContext = new SparkContext(sparkConf)//构建StreamingContext对象,每个批处理的时间间隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//设置日志输出级别sc.setLogLevel("WARN")//设置检查点目录scc.checkpoint("./")//flume推数据过来// 当前应用程序部署的服务器ip地址,跟flume配置文件保持一致val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(scc,"172.16.43.63",8888,StorageLevel.MEMORY_AND_DISK)//获取flume中数据,数据存在event的body中,转化为Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//实现单词汇总val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}}
}

3、启动spark-streaming应用程序

4、生成数据

cp data.txt data2.txt

5、启动flume

flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-push.conf -Dflume.root.logger=INFO,console

6、查看结果

Spark Streaming整合flume实战相关推荐

  1. DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36

    前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...

  2. spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access

    问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...

  3. 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战

    大数据Spark "蘑菇云"行动第76课:   Kafka+Spark Streaming+Redis项目实战 jedis插件 redis <dependency>   ...

  4. Spark Streaming和Flume集成指南V1.4.1

    Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务.这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据.这里有两个方法. ...

  5. Spark Streaming整合logstash + Kafka wordCount

    1.安装logstash,直接解压即可 测试logstash是否可以正常运行 bin/logstash -e 'input { stdin { } } output { stdout {codec = ...

  6. 分享一下spark streaming与flume集成的scala代码。

    文章来自:http://www.cnblogs.com/hark0623/p/4172462.html 转发请注明 object LogicHandle {def main(args: Array[S ...

  7. Spark Streaming实时流处理学习

    目录 1.初识实时流处理 2.分布式日志收集框架Flume 3.分布式发布订阅消息系统Kafka 4.实战环境搭建 5.Spark Streaming入门 6.Spark Streaming核心概念与 ...

  8. 大数据入门之分布式计算框架Spark(3) -- Spark Streaming

    1.概述 Spark Streaming将不同的数据源,经过处理之后,结果输出到外部文件系统. 特点:低延时:能从错误中高效地恢复过来:能够运行在成百上千的节点上:能够将批处理.机器学习.图计算等子框 ...

  9. <Zhuuu_ZZ>Spark Streaming

    Spark Streaming 一 Spark Streaming概述 1.离线和实时概念 2.批量和流式概念 3.Spark Streaming是什么 4.Spark Streaming特点 5.S ...

最新文章

  1. python在windows下import其他模块的注意事项
  2. [Unity3D]关于NaN(Not a Number)的问题
  3. android 经纬度的范围内,在Android里如何判断一个指定的经纬度点是否落在一个多边形区域内...
  4. UNIX(多线程):25---当前进程的线程哪些数据共享哪些是私有的
  5. MacOS Monterey12.3和Big Sur11.6.5离线安装包
  6. linux定时执行脚本
  7. 【Proteus仿真】BME280温湿度气压传感器数据串口输出
  8. 1996-2016人工智能各大顶级会议最佳论文best paper
  9. Vue快速实现通用表单验证
  10. 你在工作中遇到过印象深刻的困难是什么,你怎么克服的?
  11. 让div在页面居中(滚动条滚动时也居中)
  12. java 1 9随机数_Java-随机数详解
  13. 掘金15W沸点简单分析(二)
  14. 报数游戏1-3循环报数,报到3的人退出,求原来的序号
  15. 修改云服务器端口,如何修改云服务器默认3389端口
  16. mh采样算法推导_科学网—MCMC中的Metropolis Hastings抽样法 - 张金龙的博文
  17. 开卷有益 今天你读书了么?
  18. visio如何开启开发工具功能
  19. freertos与linux区别,μClinux、μC/OS-II、eCos、FreeRTOS和djyos操作系统的特点及不足-嵌入式系统-与非网...
  20. Oracle 批量插入sql

热门文章

  1. 小学计算机考查方案,宋家塘街道中心学校2020年理化生实验操作和信息技术考试方案...
  2. java 实现 tcp_java实现TCP通信
  3. Source Insight学习教程
  4. list循环赋值_Python之 for循环
  5. wxif 判断字符串相等_ES6:字符串、数组、对象的扩展
  6. python程序写蛇_python蟒蛇绘制程序
  7. linux禁用用户账号,技术|在 Linux 系统中禁用与解禁用户的账号
  8. python歌词统计单词词频_Python爬虫网易云歌词及词频统计
  9. js 单页面ajax缓存策略,浅谈ajax的缓存机制---IE浏览器方面
  10. php在空值时调用成员函数_当Vlookup函数匹配的结果是时间,或者空值时,显示不正常了...