Spark Streaming的DStream为我们提供了一个updateStateByKey方法,它的主要功能是可以随着时间的流逝在Spark Streaming中为每一个key维护一份state状态,通过更新函数对该key的状态不断更新。对每一个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新(对每个新出现的key,会同样执行state的更新函数操作),但是如果通过更新函数对state更新后返回none的话,此时刻key对应的state状态被删除掉,需要特别说明的是state可以是任意类型的数据结构,这就为我们的计算带来无限的想象空间;

重点来了!!!如果要不断的更新每个key的state,就一定会涉及到状态的保存和容错,这个时候就需要开启checkpoint机制和功能,需要说明的是checkpoint可以保存一切可以存储在文件系统上的内容,例如:程序未处理的数据及已经拥有的状态。

补充说明:关于流式处理对历史状态进行保存和更新具有重大实用意义,例如进行广告(投放广告和运营广告效果评估的价值意义,热点随时追踪、热力图)

简单的来说,如果我们需要进行wordcount,每个batchInterval都会计算出新的一批数据,这批数据如何更新到以前计算的结果上?updateStateByKey就能实现此功能。

函数定义如下:

def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)] = ssc.withScope {updateStateByKey(updateFunc, defaultPartitioner())
}

updateStateByKey 需要传入一个函数,该函数有两个参数Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key对应的以前的值。返回的时一个key的最新值。

下面我们用实例演示:

package com.dt.spark.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by Administrator on 2016/5/3.*/
object UpdateStateByKeyDemo {def main(args: Array[String]) {val conf = new SparkConf().setAppName("UpdateStateByKeyDemo")val ssc = new StreamingContext(conf,Seconds(20))//要使用updateStateByKey方法,必须设置Checkpoint。ssc.checkpoint("/checkpoint/")val socketLines = ssc.socketTextStream("spark-master",9999)socketLines.flatMap(_.split(",")).map(word=>(word,1)).updateStateByKey((currValues:Seq[Int],preValue:Option[Int]) =>{val currValue = currValues.sumSome(currValue + preValue.getOrElse(0))}).print()ssc.start()ssc.awaitTermination()ssc.stop()}
}

打包上传至spark集群。

打开nc,发送测试数据

root@spark-master:~# nc -lk 9999
hadoop,spark,scala,hive
hadoop,Hbase,spark

运行spark 程序

root@spark-master:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.UpdateStateByKeyDemo  --master spark://spark-master:7077 ./spark.jar

查看运行结果:

-------------------------------------------
Time: 1462282180000 ms
-------------------------------------------
(scala,1)
(hive,1)
(spark,2)
(hadoop,2)
(Hbase,1)

我们在nc中再输入一些数据

root@spark-master:~# nc -lk 9999
hadoop,spark,scala,hive
hadoop,Hbase,spark
hadoop,spark,scala,hive
hadoop,Hbase,spark

再次查看结果:

-------------------------------------------
Time: 1462282200000 ms
-------------------------------------------
(scala,2)
(hive,2)
(spark,4)
(hadoop,4)
(Hbase,2)

可见,它将我们两次统计结果合并了。

备注:

1、DT大数据梦工厂微信公众号DT_Spark 
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains

转载于:https://blog.51cto.com/lqding/1769852

第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密相关推荐

  1. 第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密 java.lang.ClassNotFoundException 踩坑解决问题详细内幕版本

    第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密    /* * *王家林老师授课http://weibo.com/ilovepains */  每天晚上20: ...

  2. 【Unity 3D】VR飞机起飞喷火游戏案例实战(附源码和演示视频 超详细)

    需要源码和资源包请点赞关注收藏后评论区留言私信~~~ 下面我们用Unity 3D搭建一个飞机在跑道上喷火起飞的3D游戏案例,效果如下面的图片和视频所示 一.效果演示 视频如下 VR飞机 二.实现步骤 ...

  3. 《Spark SQL大数据实例开发》9.2 综合案例实战——电商网站搜索排名统计

    <Spark SQL大数据实例开发>9.2 综合案例实战--电商网站搜索排名统计 9.2.1 案例概述     本节演示一个网站搜索综合案例:以京东为例,用户登录京东网站,在搜索栏中输入搜 ...

  4. 【PySpark】综合案例实战:处理加州房屋信息,构建线性回归模型

    通过之前的学习,我们对 Spark 各种 API 的基本用法有了一定的了解,还通过统计词频的实例掌握了如何从零开始写一个 Spark 程序.那么现在,让我们从一个真实的数据集出发,看看如何用 Spar ...

  5. c语言案例分析105,C语言实战105例源码

    C语言实战105例源码 以下程序大家如有兴趣可在文件夹下载即可 第1部分 基础篇 实例1 一个价值"三天"的BUG 2 实例2 灵活使用递增(递减)操作符 5 实例3 算术运算符计 ...

  6. 基于java校园课室资源预约系统设计与实现计算机毕业设计源码+系统+lw文档+mysql数据库+调试部署

    基于java校园课室资源预约系统设计与实现计算机毕业设计源码+系统+lw文档+mysql数据库+调试部署 基于java校园课室资源预约系统设计与实现计算机毕业设计源码+系统+lw文档+mysql数据库 ...

  7. 计算机毕业设计ssm情报综合管理系统36zgo系统+程序+源码+lw+远程部署

    计算机毕业设计ssm情报综合管理系统36zgo系统+程序+源码+lw+远程部署 计算机毕业设计ssm情报综合管理系统36zgo系统+程序+源码+lw+远程部署 本源码技术栈: 项目架构:B/S架构 开 ...

  8. 基于JAVA医院住院综合服务管理系统计算机毕业设计源码+系统+数据库+lw文档+部署

    基于JAVA医院住院综合服务管理系统计算机毕业设计源码+系统+数据库+lw文档+部署 基于JAVA医院住院综合服务管理系统计算机毕业设计源码+系统+数据库+lw文档+部署 本源码技术栈: 项目架构:B ...

  9. java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署

    java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署 java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运 ...

最新文章

  1. postman请求soap 请求_postman中请求如何传递对象到spring controller?
  2. 315汽车大型排雷现场,数据总结避雷规律
  3. BPM实例方案分享:表单子表自动填入数据
  4. python 类 对象 知乎_python基础知识:类,对象,模块三者的区别
  5. 从C/C++零基础到月入9K我用了9个月
  6. git checkout切换到指定commit
  7. java SSL 简单操作demo
  8. 使用内容脚本将代码插入页面上下文
  9. 在MATLAB中处理RAW图像
  10. 怎么恢复苹果内置的计算机,苹果还原系统的方法_苹果电脑Mac如何恢复出厂系统-win7之家...
  11. Abaqus2022安装教程
  12. 诺奖经济大师,数学天才赌徒,和“神秘的股市财富公式”
  13. 视频教程-沐风老师3DMAX室内建模挤出法详解-3Dmax
  14. (27)Verilog实现倍频【方法四】
  15. KPI与360度考核结合的应用落地方案
  16. 软考中级 真题 2018年上半年 信息系统管理工程师 基础知识
  17. 计算机游戏模式怎么开144hz,刚开始换成了144hz现在为何最高只有60hz了求大神解答...
  18. Datax及Datax-web 下载使用
  19. 较全的OA系统功能需求
  20. 道德经-二十章-独异于人解

热门文章

  1. webservice 存根方式
  2. delegate、Lambda表达式、Func委托和Expression(TDelegate)表达式目录树
  3. 系统集成项目管理工程师-项目风险管理笔记
  4. 【数字信号处理】周期序列 ( 周期序列表示方法 | 主值区间表示法 | 模 N 表示法 )
  5. 【错误记录】Python 安装依赖库报错 ( ERROR: Could not find a version that satisfies the requirement elftools )
  6. 【Android 逆向】Linux 文件分类 ( 普通文件 | 目录文件 | 链接文件 | 字符设备文件 | 管道文件 | 块设备文件 )
  7. 【AOP 面向切面编程】Android Studio 中配置 AspectJ ( 下载并配置AS中 jar 包 | 配置 Gradle 和 Gradle 插件版本 | 配置 Gradle 构建脚本 )
  8. 【Android 插件化】“ 插桩式 “ 插件化框架 ( 类加载器创建 | 资源加载 )
  9. 【错误记录】NDK 导入外部 so 动态库报错 ( java.lang.UnsatisfiedLinkError | Android Studio 配置外部 so 动态库两种方法 )
  10. 【OpenGL】三、Visual Studio 2019 配置 GitHub ( 将项目上传到 GitHub )