Spark Streaming整合flume实战
Spark Streaming对接Flume有两种方式
- Poll:Spark Streaming从flume 中拉取数据
- Push:Flume将消息Push推给Spark Streaming
1、安装flume1.6以上
2、下载依赖包
spark-streaming-flume-sink_2.11-2.0.2.jar放入到flume的lib目录下
3、生成数据
服务器上的 /root/data目录下准备数据文件data.txt
vi data.txthadoop spark hive spark
hadoop sqoop flume redis flume hadoop
solr kafka solr hadoop
4、配置采集方案
vi flume-poll.confa1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=hdp-node-01
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000
5、添加依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-flume_2.10</artifactId><version>2.0.2</version>
</dependency>
6、代码实现
package cn.cheng.spark
import java.net.InetSocketAddress
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume 拉模式Poll*/
object SparkStreaming_Flume_Poll {//newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1//runningCount 历史的所有相同key的value总和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf参数val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Poll").setMaster("local[2]")//构建sparkContext对象val sc: SparkContext = new SparkContext(sparkConf)//构建StreamingContext对象,每个批处理的时间间隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//设置checkpointscc.checkpoint("./")//设置flume的地址,可以设置多台val address=Seq(new InetSocketAddress("192.168.200.160",8888))// 从flume中拉取数据val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createPollingStream(scc,address,StorageLevel.MEMORY_AND_DISK)//获取flume中数据,数据存在event的body中,转化为Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//实现单词汇总val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}}
7、启动flume
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-poll.conf -Dflume.root.logger=INFO,console
8、启动spark-streaming应用程序
9、查看结果
flume将消息Push推给Spark Streaming
1、配置采集方案
vi flume-push.conf#push mode
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source
a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/data
a1.sources.r1.fileHeader = true
#channel
a1.channels.c1.type =memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity=5000
#sinks
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname=172.16.43.63
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000
注意配置文件中指明的hostname和port是spark应用程序所在服务器的ip地址和端口。
2、代码实现
package cn.cheng.sparkimport java.net.InetSocketAddressimport org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.flume.{FlumeUtils, SparkFlumeEvent}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}/*** sparkStreaming整合flume 推模式Push*/
object SparkStreaming_Flume_Push {//newValues 表示当前批次汇总成的(word,1)中相同单词的所有的1//runningCount 历史的所有相同key的value总和def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount =runningCount.getOrElse(0)+newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//配置sparkConf参数val sparkConf: SparkConf = new SparkConf().setAppName("SparkStreaming_Flume_Push").setMaster("local[2]")//构建sparkContext对象val sc: SparkContext = new SparkContext(sparkConf)//构建StreamingContext对象,每个批处理的时间间隔val scc: StreamingContext = new StreamingContext(sc, Seconds(5))//设置日志输出级别sc.setLogLevel("WARN")//设置检查点目录scc.checkpoint("./")//flume推数据过来// 当前应用程序部署的服务器ip地址,跟flume配置文件保持一致val flumeStream: ReceiverInputDStream[SparkFlumeEvent] = FlumeUtils.createStream(scc,"172.16.43.63",8888,StorageLevel.MEMORY_AND_DISK)//获取flume中数据,数据存在event的body中,转化为Stringval lineStream: DStream[String] = flumeStream.map(x=>new String(x.event.getBody.array()))//实现单词汇总val result: DStream[(String, Int)] = lineStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(updateFunction)result.print()scc.start()scc.awaitTermination()}}
}
3、启动spark-streaming应用程序
4、生成数据
cp data.txt data2.txt
5、启动flume
flume-ng agent -n a1 -c /opt/bigdata/flume/conf -f /opt/bigdata/flume/conf/flume-push.conf -Dflume.root.logger=INFO,console
6、查看结果
Spark Streaming整合flume实战相关推荐
- DStream实战之Spark Streaming整合fulme实战, Flume向Spark Streaming中push推数据 36
前言 本文所需要的安装包&Flume配置文件,博主都已上传,链接为本文涉及安装包&Flume配置文件本文涉及的安装包&Flume配置文件,请自行下载~ flume作为日志实时采 ...
- spark streaming 整合kafka 报错 KafkaConsumer is not safe for multi-threaded access
问题描述 spark streaming 使用 直连方式 读取kafka 数据,使用窗口时出现 java.util.ConcurrentModificationException: KafkaCons ...
- 大数据Spark “蘑菇云”行动第76课: Kafka+Spark Streaming+Redis项目实战
大数据Spark "蘑菇云"行动第76课: Kafka+Spark Streaming+Redis项目实战 jedis插件 redis <dependency> ...
- Spark Streaming和Flume集成指南V1.4.1
Apache Flume是一个用来有效地收集,聚集和移动大量日志数据的分布式的,有效的服务.这里我们解释一下怎样配置Flume和Spark Streaming来从Flume获取数据.这里有两个方法. ...
- Spark Streaming整合logstash + Kafka wordCount
1.安装logstash,直接解压即可 测试logstash是否可以正常运行 bin/logstash -e 'input { stdin { } } output { stdout {codec = ...
- 分享一下spark streaming与flume集成的scala代码。
文章来自:http://www.cnblogs.com/hark0623/p/4172462.html 转发请注明 object LogicHandle {def main(args: Array[S ...
- Spark Streaming实时流处理学习
目录 1.初识实时流处理 2.分布式日志收集框架Flume 3.分布式发布订阅消息系统Kafka 4.实战环境搭建 5.Spark Streaming入门 6.Spark Streaming核心概念与 ...
- 大数据入门之分布式计算框架Spark(3) -- Spark Streaming
1.概述 Spark Streaming将不同的数据源,经过处理之后,结果输出到外部文件系统. 特点:低延时:能从错误中高效地恢复过来:能够运行在成百上千的节点上:能够将批处理.机器学习.图计算等子框 ...
- <Zhuuu_ZZ>Spark Streaming
Spark Streaming 一 Spark Streaming概述 1.离线和实时概念 2.批量和流式概念 3.Spark Streaming是什么 4.Spark Streaming特点 5.S ...
最新文章
- python在windows下import其他模块的注意事项
- [Unity3D]关于NaN(Not a Number)的问题
- android 经纬度的范围内,在Android里如何判断一个指定的经纬度点是否落在一个多边形区域内...
- UNIX(多线程):25---当前进程的线程哪些数据共享哪些是私有的
- MacOS Monterey12.3和Big Sur11.6.5离线安装包
- linux定时执行脚本
- 【Proteus仿真】BME280温湿度气压传感器数据串口输出
- 1996-2016人工智能各大顶级会议最佳论文best paper
- Vue快速实现通用表单验证
- 你在工作中遇到过印象深刻的困难是什么,你怎么克服的?
- 让div在页面居中(滚动条滚动时也居中)
- java 1 9随机数_Java-随机数详解
- 掘金15W沸点简单分析(二)
- 报数游戏1-3循环报数,报到3的人退出,求原来的序号
- 修改云服务器端口,如何修改云服务器默认3389端口
- mh采样算法推导_科学网—MCMC中的Metropolis Hastings抽样法 - 张金龙的博文
- 开卷有益 今天你读书了么?
- visio如何开启开发工具功能
- freertos与linux区别,μClinux、μC/OS-II、eCos、FreeRTOS和djyos操作系统的特点及不足-嵌入式系统-与非网...
- Oracle 批量插入sql
热门文章
- 小学计算机考查方案,宋家塘街道中心学校2020年理化生实验操作和信息技术考试方案...
- java 实现 tcp_java实现TCP通信
- Source Insight学习教程
- list循环赋值_Python之 for循环
- wxif 判断字符串相等_ES6:字符串、数组、对象的扩展
- python程序写蛇_python蟒蛇绘制程序
- linux禁用用户账号,技术|在 Linux 系统中禁用与解禁用户的账号
- python歌词统计单词词频_Python爬虫网易云歌词及词频统计
- js 单页面ajax缓存策略,浅谈ajax的缓存机制---IE浏览器方面
- php在空值时调用成员函数_当Vlookup函数匹配的结果是时间,或者空值时,显示不正常了...