SparkStreaming DStream入门及其算子应用
什么是Dstream?
离散流(DStream
)是Spark Streaming
中的基本抽象,是表示相同数据流的RDD
(相同类型)的连续序列。 DStreams可以使用StreamingContext
从实时数据(例如来自TCP套接字,Kafka,Flume等的数据)创建,也可以通过使用map
,Window
和reduceByKeyAndWindow
等操作转换现有DStreams
来生成。在运行Spark Streaming
程序时,每个DStream
都会根据实时数据或通过转换父DStream生成的RDD定期生成RDD。
此类包含所有DStream
上可用的基本操作,例如map
,filter
和window
。另外,PairDStreamFunctions
包含仅在键-值对的DStream
上可用的操作,例如groupByKeyAndWindow
和join
。通过隐式转换,这些操作可在任何成对的DStream(例如DStream [(Int,Int)])
上自动使用。
DStream内部具有一些基本属性:
DStream依赖的其他DStream的列表
DStream生成RDD的时间间隔
每个时间间隔后用于生成RDD的函数
Dstream与RDD的关系
看源码可以知道,RRD结合时间尺度就是Dstream
.
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
案例
DStream的算子相对RDD比较少,简单示例如下
以nc作为源发送数据
# nc -lk mypc01 10087
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}//sparkStreaming 算子测试案例
object Test extends App {private val conf = new SparkConf().setAppName("test").setMaster("local[*]").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")private val ssc: StreamingContext = new StreamingContext(conf, Seconds(5))//以nc作为源转为Dstreamprivate val dstream: ReceiverInputDStream[String] = ssc.socketTextStream("mypc01", 10087)//filetrDemo()//unionDemo()//countDemo()//countByvalue()//((o,6.0),2)//joinDemo()//(k,(1,2))//统计单词长度大于4的频率def filetrDemo(): Unit = {dstream.flatMap((_.split(" "))).filter(_.length > 4).map((_, 1)).reduceByKey(_ + _).print()}//测试union算子def unionDemo(): Unit = {val d1: DStream[(String, Int)] = dstream.map((_, 1))val d2: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKey(_ + _)val d3: DStream[(String, Int)] = d1.union(d2)d3.print()}//测试count算子//统计每一批的个数def countDemo(): Unit = {val d1: DStream[Long] = dstream.count()d1.print()}//测试countByvalue算子def countByvalue(): Unit = {dstream.map((_, Math.floor(Math.random() * 10))).countByValue(2).print()}//测试join算子def joinDemo(): Unit = {val d1: DStream[(String, Int)] = dstream.map((_, 1))val d2: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKey(_ + _)val d3: DStream[(String, (Int, Int))] = d1.join(d2)d3.print()}def cogroupDemo(): Unit = {val d1: DStream[(String, Int)] = dstream.map((_, 1))val d2: DStream[(String, Int)] = dstream.map((_, 1)).reduceByKey(_ + _)val d3: DStream[(String, (Iterable[Int], Iterable[Int]))] = d1.cogroup(d2)d3.print()}ssc.start()ssc.awaitTermination()
}
Dstream方法解析
Dstream
应用map
算子后返回值还是 Dstream
类型,这个就好比Rdd
应用map
算子后返回值还是Rdd
类型一样.
def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {new MappedDStream(this, context.sparkContext.clean(mapFunc))}
Dstream
同样可以用filter
算子
def filter(filterFunc: T => Boolean): DStream[T] = ssc.withScope {new FilteredDStream(this, context.sparkContext.clean(filterFunc))}
foreachRDD
和上述的不同之处有亮点
1 没有返回值
2 参数为函数,且该函数的参数为Dstream
所蕴含的RDD,这个就意味着内部也可以用RDD
相关的算子.
def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {val cleanedF = context.sparkContext.clean(foreachFunc, false)foreachRDD((r: RDD[T], _: Time) => cleanedF(r), displayInnerRDDOps = true)}
transform
和foreachRDD
是类似的,参数为函数,且该函数的参数是RDD
类型
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {// because the DStream is reachable from the outer object here, and because// DStreams can't be serialized with closures, we can't proactively check// it for serializability and so we pass the optional false to SparkContext.cleanval cleanedF = context.sparkContext.clean(transformFunc, false)transform((r: RDD[T], _: Time) => cleanedF(r))}
总结
Dstream
就是RDD+时间尺度- 注意理解
foreachRDD
算子以及transform
算子与其他算子的不同之处
SparkStreaming DStream入门及其算子应用相关推荐
- 尚硅谷大数据技术Spark教程-笔记09【SparkStreaming(概念、入门、DStream入门、案例实操、总结)】
尚硅谷大数据技术-教程-学习路线-笔记汇总表[课程资料下载] 视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[SparkCore ...
- spark-streaming从入门到精通
1.spark streaming获取kafka的数据有两种形式:(现在基本都是用direct方式了) receiver 通过zookeeper来连接kafka队列来获取数据.如果要做到容错,就要启用 ...
- 【大数据开发】SparkStreaming——DStream输入源、原语、SparkStream与Kafka和Redis三者的交互
设置SparkConf的时候不能设置为local,会报错,应当设置成local[N],N>1.这是因为需要一个核接收数据,另一个核处理数据,如果只分配一个线程处理,这个线程会被用来接收数据,就没 ...
- Dstream的action算子与RDD的action算子
Dstream action算子 print() 在运行流应用程序的驱动程序节点上打印DStream中每批数据的前10个元素.这对于开发和调试非常有用.这在Python API中称为pprint(). ...
- SparkStreaming窗口入门
window操作就是窗口函数.Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作.每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生 ...
- sparkstreaming监听hdfs目录如何终止_四十六、Spark Streaming简介及入门
1.什么是Spark Streaming Spark Streaming是基于Spark Core之间的实时计算框架,可以从很多数据源消费数据并对数据进行处理.它是Spark核心API的一个扩展与封装 ...
- Dstream如何应用RDD特有算子?
比如collectAsMap算子Dstream并没有,如何用呢? 可以通过foreachRDD或者transform算子间接使用. 比如 //sparkStreaming 算子测试案例wordcoun ...
- SparkStreaming 入门案例之wordcount
案例概述 以nc作为源发送数据 案例演示 创建nc源,用于发送数据. [root@mypc01 ~]# nc -lk mypc01 10086 创建maven工程,导入依赖 <dependenc ...
- SparkStreaming编程
0. SparkStreaming 流式计算简介 SparkStreaming实时处理入门案例 SparkStreaming和HDFS整合 SparkStreaming与Kafka整合 SparkSt ...
最新文章
- NLP分词数据准备及模型训练实例
- javascript总结9:JavaScript三目运算符
- ECCV 2020 SenseHuman Workshop:人类感知、理解与生成
- Python 线程条件变量 Condition - Python零基础入门教程
- js 文件上传进度条
- Linux USB驱动程序设计
- 客户端可以查询到数据,程序却查询不到数据
- 删缓存,数据库更新谁先执行,及延时双删
- w3cschool离线手册
- Python3学习笔记_F(垃圾回收)
- stm32——使用串口下载程序
- 基于RFID定位技术的精神病人员定位解决方案--新导智能
- 百度网盘python客户端——筑梦之路
- 卫星导航定位误差之多路径地球自转相位缠绕相位中心误差地球潮汐
- 重磅!景驰科技公司CTO Tony Han加入硅谷科技论坛!
- 3款常见的网站文章采集工具推荐(2019最新)
- PS 2019 Mac版 自学入门系列(八)—— 替换背景
- python123测验答案-python123国二选择题
- 概率论:均值、方差与协方差矩阵
- Hibernate对象状态之间的神奇转换
热门文章
- pythonopencv算法_OpenCV3-Python基于Kalman和CAMShift算法应用
- hdfs java操作_hdfs java操作
- 系统无法执行指定的程序。_自制操作系统-函数代码副本跳转无法正确执行的问题...
- Error while waiting for device: The emulator process for AVD Pixel_API_30 has terminated.
- ERROR: Could not install Gradle distribution from ‘https://services.gradle.org/distributions/gradle
- 磁盘调度算法课程设计(附源代码)
- 编译原理 - SLR(1)
- linux怎么开启httpd服务公钥,在Apache httpd服务器上部署SSL证书
- java requirenonnull_Java null判断新方法:Objects.requireNonNull 你过用吗?
- 绘图的尺寸_Auto CAD机械绘图尺寸标注教程10(标注多重引线)