spark学习9:sparkStreaming
1.sparkStreaming是什么?
sparkStreaming 其实是对RDD进行微批量处理,核心还是对RDD的操作。
只不过spark是线程级的应用,实现秒级的运算是可以的。
所以sparkStreaming并不是真正意义上的流处理,最多实现秒级响应,无法做到毫秒级。
所以sparksSreaming比较适合 实时和批量数据相结合的场景。
2.sparkStreaming工作机制
跟spark工作机制非常相似,因为本身就是spark对RDD的操作。
1.每个workerNode中会有一个 receiver
2.receiver 会对接一个input DStream,input DStream负责源源不断的输入数据。
receiver 主要任务就是负责对接input DStream ,接收并处理流数据。
input DStream 对接的数据流可以有多种,
套接字流:通过socket 不断发送数据。
文件流:监听文本,文件一旦发生改变就传输文本
KAFKA:接收kafka的数据流
3.sparkStreaming的编写步骤
3.1创建DStream 来定义输入源
和创建spark-shell的 sparkContext 简称 sc ,
sparkSQL的 sparkSession 简称 spark类似。
sparkStreaming 需要的 streamContext 简称ssc
ps:如果pom.xml中 没有配置sparkStreaming 依赖,需要先导入依赖。
可以参照下边添加sparkSession依赖的文章,添加sparkStreaming
在idea中pom.xml添加sparkSQL依赖_hzp666的博客-CSDN博客_idea添加spark依赖
第一步创建创建 spark的指挥所,
如果在spark-shell中编写的话,因为自动创建了 sparkContext ,即sc
所以创建StreamingContext时,只用 传递 sc 参数
val ssc = new StreamingContext(conf,Seconds(2))
如果在idea中编程,创建StreamingContext 时候,需要声明 SparkConf,并传入 conf, 而不是sc
//create spark commander
val ssc = new StreamingContext(conf,Seconds(2))
val conf = new SparkConf().setMaster("local[2]").setAppName("streamingTest")val ssc = new StreamingContext(conf,Seconds(2))
ps:另外注意 setMaster("local[2]") 中 至少要2个线程,一个负责接收数据,一个负责处理。直接写local 或者 local[1] 都不行。
第二步,生成input 输入流
下边案例是文本流:
//read val lines = ssc.textFileStream("D:/doc/spark/streaming")
3.2 编写对流数据的转换和操作
//transfer val countedRDD: DStream[(String, Int)] = lines.flatMap(_.split("/[\\s\\n]/")).map(x => (x, 1)).reduceByKey(_+_)
//print ,for visual the result countedRDD.print()
3.3 启动程序
//start ssc.start()
3.4 结束
发生错误时结束
//stop ssc.awaitTermination()
3.5 手动结束
ssc.stop()
4.文本流完整案例:
import org.apache.spark._ import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext}object sparkStreamingTest {def main(args: Array[String]): Unit = {//create spark commanderval conf = new SparkConf().setMaster("local[2]").setAppName("streamingTest")val ssc = new StreamingContext(conf,Seconds(5))//readval lines = ssc.textFileStream("D:/doc/spark/streaming")//transferval countedRDD: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map(x => (x, 1)).reduceByKey(_+_)//printcountedRDD.print()//startssc.start()//stopssc.awaitTermination()} }
运行效果:
5.
PS 注意事项:
如果在计算的时候,指定--master时 使用的是local 并且只指定了一个线程,那么只有receiver线程工作,计算的线程不会工作,所以在指定线程数的时候,最少指定2个。
(2)通过输入源创建InputDStream:
在构建好StreamingContext之后,首先我们要读取数据源的数据进行实时处理:
InputDStreams指的是从数据流的源头接收的输入数据流,每个 InputDStream 都关联一个 Receiver 对象,该 Receiver 对象接收数据源传来的数据并将其保存在内存中以便后期 Spark 处理。
Spark Streaming 提供两种原生支持的流数据源和自定义的数据源:
- 直接通过 StreamingContext API 创建,例如文件系统(本地文件系统及分布式文件系统)、 Socket 连接及 Akka 的 Actor。
- Kafka, Flume, Kinesis, Twitter 等,需要借助外部工具类,在运行时需要外部依赖
-Spark Streaming 还支持用户自定义数据源,它需要用户定义 receiver
注意:
- 在本地运行 Spark Streaming 时,master URL 不能使用”local”或”local[1] ”,因为当 Input DStream 与 Receiver(如 sockets, Kafka, Flume 等)关联时,Receiver 自身就需要一个线程 来运行,此时便没有线程去处理接收到的数据。因此,在本地运行 SparkStreaming 程序时,要使用”local[n]”作为 master URL,n 要大于 receiver 的数量。
- 在集群上运行 Spark Streaming 时,分配给 Spark Streaming 程序的 CPU 核数也必须大于 receiver 的数量,否则系统将只接受数据,无法处理数据。
(3)对DStream进行transformation 和 output 操作,这样操作构成了后期流式计算的逻辑
(4)通过streamingContext.start()方法启动接收和处理数据的流程
(5)使用streamingContext.awaitTermination()方法等待程序结束(手动停止或出错停止)
(6)调用streamingContext.stop()方法来结束程序的运行。
在编写sparkStreaming时的注意点:
- streamingContext启动后,增加新的操作将不起作用,一定要在启动之前定义好逻辑,也就是说在调用start方法之后,在对sparkStreaming程序进行逻辑操作是不被允许的
- StreamingContext 是单例对象停止后,不能重新启动,除非重新启动任务,重新执行计算
- 在单个jvm中,一段时间内不能出现两个active状态的StreamingContext
- 当在调用 StreamingContext 的 stop 方法时,默认情况下 SparkContext 也将被 stop 掉, 如果希望 StreamingContext 关闭时,能够保留 SparkContext,则需要在 stop 方法中传入参 数 stop SparkContext=false
- 一个 SparkContext 可以用来创建多个 StreamingContext,只要前一个 StreamingContext 已经停止了。
spark学习9:sparkStreaming相关推荐
- Hadoop学习系列之Hadoop、Spark学习路线(很值得推荐)
Hadoop学习系列之Hadoop.Spark学习路线(很值得推荐) 文章出自:http://www.cnblogs.com/zlslch/p/5448857.html 1 Java基础: 视频方面: ...
- Apache Spark学习:利用Eclipse构建Spark集成开发环境
介绍了如何使用Maven编译生成可直接运行在Hadoop 2.2.0上的Spark jar包,而本文则在此基础上, 介绍如何利用Eclipse构建Spark集成开发环境 . 不建议大家使用eclips ...
- Apache Spark学习:利用Scala语言开发Spark应用程序
Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情.如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Ja ...
- Spark学习之Spark调优与调试(7)
Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项. 当创建一个SparkContext时就会创建一个SparkConf实例. 2. ...
- 用Spark学习FP Tree算法和PrefixSpan算法
在FP Tree算法原理总结和PrefixSpan算法原理总结中,我们对FP Tree和PrefixSpan这两种关联算法的原理做了总结,这里就从实践的角度介绍如何使用这两个算法.由于scikit-l ...
- Spark学习(一) -- Spark安装及简介
标签(空格分隔): Spark 学习中的知识点:函数式编程.泛型编程.面向对象.并行编程. 任何工具的产生都会涉及这几个问题: 现实问题是什么? 理论模型的提出. 工程实现. 思考: 数据规模达到一台 ...
- spark学习-58-Spark的EventLoggingListener
1.本次调试查看源代码采用 spark学习-57-Spark下Scala版HBase下的根据权重获取最真实数据http://blog.csdn.net/qq_21383435/article/deta ...
- spark学习-28-Spark数据倾斜问题
文章目录 推荐:先看看这个 spark学习-27-Spark性能调优(2) 目的 数据倾斜调优 简述 数据倾斜发生时的现象 数据倾斜发生的原理 上面说了那么多其实我还是没具体见过什么是数据倾斜了 分析 ...
- spark学习-Spark算子Transformations和Action使用大全(Action章)
spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...
- spark学习-Spark算子Transformations和Action使用大全(Transformations章(二))
spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...
最新文章
- R语言之斐波那契数列
- char和byte的区别
- pku1401 Factorial 计算n!末尾有几个0?
- 跨域405(Method Not Allowed)问题
- 你可能对position和z-index有一些误解
- Listview的OnScrollListener的滑动监听实现分页加载
- 使用dumpbin查看dll有哪些函数
- linux windows市场占有率,Windows 10市场份额罕见倒退:Win7也跌了 Linux暴增111%
- android 获取录音时长_Android、iOS录音时音量大小计算
- 常用英文字体收集备用
- 如何判断时间复杂度和空间复杂度
- 苹果手机照片误删如何找回
- 同花顺股票交易接口定义被类实现
- Docker 容器监控原理及 cAdvisor 的安装与使用
- 字符串(字符串的拼接及一些常用方法)
- Python自述和简介
- Python:学习成绩管理系统
- 分布式事务简介(seata)
- uniapp的checkbox标签属性
- 并行:并行编程的基础概述