第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密
Spark Streaming的DStream为我们提供了一个updateStateByKey方法,它的主要功能是可以随着时间的流逝在Spark Streaming中为每一个key维护一份state状态,通过更新函数对该key的状态不断更新。对每一个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新(对每个新出现的key,会同样执行state的更新函数操作),但是如果通过更新函数对state更新后返回none的话,此时刻key对应的state状态被删除掉,需要特别说明的是state可以是任意类型的数据结构,这就为我们的计算带来无限的想象空间;
重点来了!!!如果要不断的更新每个key的state,就一定会涉及到状态的保存和容错,这个时候就需要开启checkpoint机制和功能,需要说明的是checkpoint可以保存一切可以存储在文件系统上的内容,例如:程序未处理的数据及已经拥有的状态。
补充说明:关于流式处理对历史状态进行保存和更新具有重大实用意义,例如进行广告(投放广告和运营广告效果评估的价值意义,热点随时追踪、热力图)
简单的来说,如果我们需要进行wordcount,每个batchInterval都会计算出新的一批数据,这批数据如何更新到以前计算的结果上?updateStateByKey就能实现此功能。
函数定义如下:
def updateStateByKey[S: ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStream[(K, S)] = ssc.withScope {updateStateByKey(updateFunc, defaultPartitioner()) }
updateStateByKey 需要传入一个函数,该函数有两个参数Seq[V]表示最新一次reduce的值的序列,Option[s]表示的是key对应的以前的值。返回的时一个key的最新值。
下面我们用实例演示:
package com.dt.spark.streamingimport org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by Administrator on 2016/5/3.*/ object UpdateStateByKeyDemo {def main(args: Array[String]) {val conf = new SparkConf().setAppName("UpdateStateByKeyDemo")val ssc = new StreamingContext(conf,Seconds(20))//要使用updateStateByKey方法,必须设置Checkpoint。ssc.checkpoint("/checkpoint/")val socketLines = ssc.socketTextStream("spark-master",9999)socketLines.flatMap(_.split(",")).map(word=>(word,1)).updateStateByKey((currValues:Seq[Int],preValue:Option[Int]) =>{val currValue = currValues.sumSome(currValue + preValue.getOrElse(0))}).print()ssc.start()ssc.awaitTermination()ssc.stop()} }
打包上传至spark集群。
打开nc,发送测试数据
root@spark-master:~# nc -lk 9999 hadoop,spark,scala,hive hadoop,Hbase,spark
运行spark 程序
root@spark-master:~# /usr/local/spark/spark-1.6.0-bin-hadoop2.6/bin/spark-submit --class com.dt.spark.streaming.UpdateStateByKeyDemo --master spark://spark-master:7077 ./spark.jar
查看运行结果:
------------------------------------------- Time: 1462282180000 ms ------------------------------------------- (scala,1) (hive,1) (spark,2) (hadoop,2) (Hbase,1)
我们在nc中再输入一些数据
root@spark-master:~# nc -lk 9999 hadoop,spark,scala,hive hadoop,Hbase,spark hadoop,spark,scala,hive hadoop,Hbase,spark
再次查看结果:
------------------------------------------- Time: 1462282200000 ms ------------------------------------------- (scala,2) (hive,2) (spark,4) (hadoop,4) (Hbase,2)
可见,它将我们两次统计结果合并了。
备注:
1、DT大数据梦工厂微信公众号DT_Spark
2、IMF晚8点大数据实战YY直播频道号:68917580
3、新浪微博: http://www.weibo.com/ilovepains
转载于:https://blog.51cto.com/lqding/1769852
第93课:SparkStreaming updateStateByKey 基本操作综合案例实战和内幕源码解密相关推荐
- 第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密 java.lang.ClassNotFoundException 踩坑解决问题详细内幕版本
第91课:SparkStreaming基于Kafka Direct案例实战和内幕源码解密 /* * *王家林老师授课http://weibo.com/ilovepains */ 每天晚上20: ...
- 【Unity 3D】VR飞机起飞喷火游戏案例实战(附源码和演示视频 超详细)
需要源码和资源包请点赞关注收藏后评论区留言私信~~~ 下面我们用Unity 3D搭建一个飞机在跑道上喷火起飞的3D游戏案例,效果如下面的图片和视频所示 一.效果演示 视频如下 VR飞机 二.实现步骤 ...
- 《Spark SQL大数据实例开发》9.2 综合案例实战——电商网站搜索排名统计
<Spark SQL大数据实例开发>9.2 综合案例实战--电商网站搜索排名统计 9.2.1 案例概述 本节演示一个网站搜索综合案例:以京东为例,用户登录京东网站,在搜索栏中输入搜 ...
- 【PySpark】综合案例实战:处理加州房屋信息,构建线性回归模型
通过之前的学习,我们对 Spark 各种 API 的基本用法有了一定的了解,还通过统计词频的实例掌握了如何从零开始写一个 Spark 程序.那么现在,让我们从一个真实的数据集出发,看看如何用 Spar ...
- c语言案例分析105,C语言实战105例源码
C语言实战105例源码 以下程序大家如有兴趣可在文件夹下载即可 第1部分 基础篇 实例1 一个价值"三天"的BUG 2 实例2 灵活使用递增(递减)操作符 5 实例3 算术运算符计 ...
- 基于java校园课室资源预约系统设计与实现计算机毕业设计源码+系统+lw文档+mysql数据库+调试部署
基于java校园课室资源预约系统设计与实现计算机毕业设计源码+系统+lw文档+mysql数据库+调试部署 基于java校园课室资源预约系统设计与实现计算机毕业设计源码+系统+lw文档+mysql数据库 ...
- 计算机毕业设计ssm情报综合管理系统36zgo系统+程序+源码+lw+远程部署
计算机毕业设计ssm情报综合管理系统36zgo系统+程序+源码+lw+远程部署 计算机毕业设计ssm情报综合管理系统36zgo系统+程序+源码+lw+远程部署 本源码技术栈: 项目架构:B/S架构 开 ...
- 基于JAVA医院住院综合服务管理系统计算机毕业设计源码+系统+数据库+lw文档+部署
基于JAVA医院住院综合服务管理系统计算机毕业设计源码+系统+数据库+lw文档+部署 基于JAVA医院住院综合服务管理系统计算机毕业设计源码+系统+数据库+lw文档+部署 本源码技术栈: 项目架构:B ...
- java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署
java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运行部署 java计算机毕业设计社区养老综合服务平台服务端源码+系统+数据库+lw文档+mybatis+运 ...
最新文章
- postman请求soap 请求_postman中请求如何传递对象到spring controller?
- 315汽车大型排雷现场,数据总结避雷规律
- BPM实例方案分享:表单子表自动填入数据
- python 类 对象 知乎_python基础知识:类,对象,模块三者的区别
- 从C/C++零基础到月入9K我用了9个月
- git checkout切换到指定commit
- java SSL 简单操作demo
- 使用内容脚本将代码插入页面上下文
- 在MATLAB中处理RAW图像
- 怎么恢复苹果内置的计算机,苹果还原系统的方法_苹果电脑Mac如何恢复出厂系统-win7之家...
- Abaqus2022安装教程
- 诺奖经济大师,数学天才赌徒,和“神秘的股市财富公式”
- 视频教程-沐风老师3DMAX室内建模挤出法详解-3Dmax
- (27)Verilog实现倍频【方法四】
- KPI与360度考核结合的应用落地方案
- 软考中级 真题 2018年上半年 信息系统管理工程师 基础知识
- 计算机游戏模式怎么开144hz,刚开始换成了144hz现在为何最高只有60hz了求大神解答...
- Datax及Datax-web 下载使用
- 较全的OA系统功能需求
- 道德经-二十章-独异于人解
热门文章
- webservice 存根方式
- delegate、Lambda表达式、Func委托和Expression(TDelegate)表达式目录树
- 系统集成项目管理工程师-项目风险管理笔记
- 【数字信号处理】周期序列 ( 周期序列表示方法 | 主值区间表示法 | 模 N 表示法 )
- 【错误记录】Python 安装依赖库报错 ( ERROR: Could not find a version that satisfies the requirement elftools )
- 【Android 逆向】Linux 文件分类 ( 普通文件 | 目录文件 | 链接文件 | 字符设备文件 | 管道文件 | 块设备文件 )
- 【AOP 面向切面编程】Android Studio 中配置 AspectJ ( 下载并配置AS中 jar 包 | 配置 Gradle 和 Gradle 插件版本 | 配置 Gradle 构建脚本 )
- 【Android 插件化】“ 插桩式 “ 插件化框架 ( 类加载器创建 | 资源加载 )
- 【错误记录】NDK 导入外部 so 动态库报错 ( java.lang.UnsatisfiedLinkError | Android Studio 配置外部 so 动态库两种方法 )
- 【OpenGL】三、Visual Studio 2019 配置 GitHub ( 将项目上传到 GitHub )