mapWithState()可以保存流的状态,并能做到当前rdd和前一段时间的rdd进行比较或者聚合。

当stream调用mapWithState()方法的时候,将会返回一个MapWithStateDStreamImpl。

@Experimental
def mapWithState[StateType: ClassTag, MappedType: ClassTag](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType] = {new MapWithStateDStreamImpl[K, V, StateType, MappedType](self,spec.asInstanceOf[StateSpecImpl[K, V, StateType, MappedType]])
}override val mustCheckpoint = true/** Override the default checkpoint duration */
override def initialize(time: Time): Unit = {if (checkpointDuration == null) {checkpointDuration = slideDuration * DEFAULT_CHECKPOINT_DURATION_MULTIPLIER}super.initialize(time)
}private val internalStream =new InternalMapWithStateDStream[KeyType, ValueType, StateType, MappedType](dataStream, spec)
override def compute(validTime: Time): Option[RDD[MappedType]] = {internalStream.getOrCompute(validTime).map { _.flatMap[MappedType] { _.mappedData } }
}

其强制进行checkpoint,并规定了checkpoint的时间间隔。

当其compute()方法被调用的时候,真正经过调用的是其内部类InternalMapWithStateDStream的compute()方法。

override def compute(validTime: Time): Option[RDD[MapWithStateRDDRecord[K, S, E]]] = {// Get the previous state or create a new empty state RDDval prevStateRDD = getOrCompute(validTime - slideDuration) match {case Some(rdd) =>if (rdd.partitioner != Some(partitioner)) {// If the RDD is not partitioned the right way, let us repartition it using the// partition index as the key. This is to ensure that state RDD is always partitioned// before creating another state RDD using itMapWithStateRDD.createFromRDD[K, V, S, E](rdd.flatMap { _.stateMap.getAll() }, partitioner, validTime)} else {rdd}case None =>MapWithStateRDD.createFromPairRDD[K, V, S, E](spec.getInitialStateRDD().getOrElse(new EmptyRDD[(K, S)](ssc.sparkContext)),partitioner,validTime)}// Compute the new state RDD with previous state RDD and partitioned data RDD// Even if there is no data RDD, use an empty one to create a new state RDDval dataRDD = parent.getOrCompute(validTime).getOrElse {context.sparkContext.emptyRDD[(K, V)]}val partitionedDataRDD = dataRDD.partitionBy(partitioner)val timeoutThresholdTime = spec.getTimeoutInterval().map { interval =>(validTime - interval).milliseconds}Some(new MapWithStateRDD(prevStateRDD, partitionedDataRDD, mappingFunction, validTime, timeoutThresholdTime))
}

首先,获取上一个时间窗口的rdd,如果没有,则新建一个空的MapWithStateRDD作为下一个时间窗口可以访问到的前置rdd。如果取得到,那么判断前置rdd是否和当前分区一致,如果一致,则直接获得,否则也是以前置rdd为基础新建一个当前分区情况的MapWithStateRDD。

之后获取当前的实时rdd并分区,将该rdd和在方法之初获得的前置rdd为参数,构造新的MapWithStateRDD。

最后看到这个MapWithStateRDD的compue()方法的实现。

override def compute(partition: Partition, context: TaskContext): Iterator[MapWithStateRDDRecord[K, S, E]] = {val stateRDDPartition = partition.asInstanceOf[MapWithStateRDDPartition]val prevStateRDDIterator = prevStateRDD.iterator(stateRDDPartition.previousSessionRDDPartition, context)val dataIterator = partitionedDataRDD.iterator(stateRDDPartition.partitionedDataRDDPartition, context)val prevRecord = if (prevStateRDDIterator.hasNext) Some(prevStateRDDIterator.next()) else Noneval newRecord = MapWithStateRDDRecord.updateRecordWithData(prevRecord,dataIterator,mappingFunction,batchTime,timeoutThresholdTime,removeTimedoutData = doFullScan // remove timed-out data only when full scan is enabled)Iterator(newRecord)
}

根据分区获得前置rdd和当前rdd的分区的数据,调用MapWithStateRDDRecord的updateRecordWithData()方法根据用户所定义的方法去更新新的rdd。

def updateRecordWithData[K: ClassTag, V: ClassTag, S: ClassTag, E: ClassTag](prevRecord: Option[MapWithStateRDDRecord[K, S, E]],dataIterator: Iterator[(K, V)],mappingFunction: (Time, K, Option[V], State[S]) => Option[E],batchTime: Time,timeoutThresholdTime: Option[Long],removeTimedoutData: Boolean
): MapWithStateRDDRecord[K, S, E] = {// Create a new state map by cloning the previous one (if it exists) or by creating an empty oneval newStateMap = prevRecord.map { _.stateMap.copy() }. getOrElse { new EmptyStateMap[K, S]() }val mappedData = new ArrayBuffer[E]val wrappedState = new StateImpl[S]()// Call the mapping function on each record in the data iterator, and accordingly// update the states touched, and collect the data returned by the mapping functiondataIterator.foreach { case (key, value) =>wrappedState.wrap(newStateMap.get(key))val returned = mappingFunction(batchTime, key, Some(value), wrappedState)if (wrappedState.isRemoved) {newStateMap.remove(key)} else if (wrappedState.isUpdated|| (wrappedState.exists && timeoutThresholdTime.isDefined)) {newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)}mappedData ++= returned}// Get the timed out state records, call the mapping function on each and collect the// data returnedif (removeTimedoutData && timeoutThresholdTime.isDefined) {newStateMap.getByTime(timeoutThresholdTime.get).foreach { case (key, state, _) =>wrappedState.wrapTimingOutState(state)val returned = mappingFunction(batchTime, key, None, wrappedState)mappedData ++= returnednewStateMap.remove(key)}}MapWithStateRDDRecord(newStateMap, mappedData)
}

首先获取前置rdd的所有数据,并遍历当前的rdd的key,如果能够在前置的rdd中获取得到相应的key,那么就获取之前rdd的键值对调用用户定义的mappingFunction执行用户所定义的逻辑。

根据用户方法返回的结果执行更新或者移出操作,并返回,达成实时rdd与历史rdd比较归并的目的。

spark mapWithState 实现相关推荐

  1. 第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密

    第14课:Spark Streaming源码解读之State管理之updateStateByKey和mapWithState解密 /* 王家林老师授课http://weibo.com/ilovepai ...

  2. Spark Streaming之updateStateByKey和mapWithState比较

    一 UpdateStateByKey UpdateStateByKey:统计全局的key的状态,但是就算没有数据输入,他也会在每一个批次的时候返回之前的key的状态.假设5s产生一个批次的数据,那么5 ...

  3. 2021年大数据Spark(三十七):SparkStreaming实战案例二 UpdateStateByKey

    目录 SparkStreaming实战案例二 UpdateStateByKey 需求 1.updateStateByKey 2.mapWithState 代码实现 SparkStreaming实战案例 ...

  4. Spark 1.6发布:引入Dataset接口

    今天,Databricks宣布发布Apache Spark 1.6! 这也是开源社区开发的一个里程碑,2015年代码贡献者达到1000人,是2014一整年的两倍,见下图. \\ \\ 接下来揭开Spa ...

  5. 实时流处理框架Storm、Spark Streaming、Samza、Flink,孰优孰劣?!

    https://mp.weixin.qq.com/s?__biz=MzU1NDA4NjU2MA==&mid=2247486490&idx=1&sn=e25a05be8cf98c ...

  6. 每周一书《Spark与Hadoop大数据分析》分享!

    Spark与Hadoop大数据分析比较系统地讲解了利用Hadoop和Spark及其生态系统里的一系列工具进行大数据分析的方法,既涵盖ApacheSpark和Hadoop的基础知识,又深入探讨所有Spa ...

  7. Storm,Trident,Spark Streaming,Samza和Flink主流流处理框架比较

    文 | Petr Zapletal ,译者 | 侠天 分布式流处理是对无边界数据集进行连续不断的处理.聚合和分析.它跟MapReduce一样是一种通用计算,但我们期望延迟在毫秒或者秒级别.这类系统一般 ...

  8. maven依赖 spark sql_使用Kafka+Spark+Cassandra构建实时处理引擎

    大数据技术与架构点击右侧关注,大数据开发领域最强公众号! 暴走大数据点击右侧关注,暴走大数据! Apache Kafka 是一个可扩展,高性能,低延迟的平台,允许我们像消息系统一样读取和写入数据.我们 ...

  9. SparkStreaming之mapWithState

    与updateStateByKey方法相比,使用mapWithState方法能够得到6倍的低延迟的同时维护的key状态的数量要多10倍,这一性能的提升和扩展性可以从基准测试结果得到验证,所有的结果全部 ...

最新文章

  1. Python正则匹配HTML,python正则匹配html标签_Python爬虫常用正则表达式及HTML网页标签分析总结...
  2. 三个好用的并发工具类
  3. .Net环境下的缓存技术介绍 (转)
  4. python设计模式7-桥接模式
  5. DMAR(DMA remapping)与 IOMMU
  6. 虚拟机VMware的下载与安装——详细教程
  7. ArcGIS拓扑检查教程
  8. 漫画趣解Linux内核
  9. 图像的仿射变换:cv2.warpAffine()
  10. 查看/data/data下的数据库文件
  11. 小程序短视频项目———视频展示页面开发
  12. radisys官方介绍--Promentum ATCA-1200== ATCA 可控 4-AMC 刀片式载板
  13. fcpx教程从入门到精通「1」初步认识Final Cut Pro
  14. 关于32位系统中int、float、short、double等占多少个字节
  15. idea点击接口的方法的跳到实现类的快捷键
  16. 基于php音乐网站平台设计与实现
  17. spring和jump区别_Hop 及jump 的区别
  18. 获得android手机root权限,安卓设备什么是Root? 获得手机Root后可进行哪些高权限的操作?...
  19. 没有后盾,也没有退路
  20. CSS:position:relative;的认识

热门文章

  1. 诗与远方:无题(十)- 小熊夜游走廊记
  2. 最近很喜欢Hello World啊,这次来Groovy的Hello World啦
  3. Target Unreachable, identifier 'userInfoUpdateBean' resolved to null 错误问题描述以及解决
  4. Ranger-Kylin插件安装
  5. 深入分析Java中的关键字static
  6. java url 短链接_推荐几个官方腾讯短链接url接口(含PHP演示代码)
  7. 理解 invokedynamic
  8. Linux用户不在sudoers文件中
  9. cocos2d-x物业现场
  10. Ubuntu 10.04下安装jekyll