作用:全局计数
在简单Spark Streaming上

  1. 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
     jssc.checkpoint("hdfs://master:9000/wordcount_checkpoint");
  1. updateStateByKey
public Optional<Integer> call(List<Integer> values,Optional<Integer> state) throws Exception {

这里的values指的是此时此刻出现的值
state指的是之前的值

例子:

package cn.spark.study.streaming;import java.util.Arrays;
import java.util.List;
import java.util.Properties;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;import com.google.common.base.Optional;import scala.Tuple2;/*** 基于updateStateByKey算子实现缓存机制的实时wordcount程序* @author Administrator**/public class UpdateStateByKeyWordCount {public static void main(String[] args) {Properties properties = System.getProperties();properties.setProperty("HADOOP_USER_NAME", "root");SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyWordCount");  JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));// 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制// 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份// 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在// 内存数据丢失的时候,可以从checkpoint中恢复数据// 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可jssc.checkpoint("hdfs://master:9000/wordcount_checkpoint");  // 然后先实现基础的wordcount逻辑//JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);JavaReceiverInputDStream<String> lines = jssc.socketTextStream("192.168.142.11", 9999);JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {private static final long serialVersionUID = 1L;@Overridepublic Iterable<String> call(String line) throws Exception {return Arrays.asList(line.split(" "));  }});JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L;@Overridepublic Tuple2<String, Integer> call(String word)throws Exception {return new Tuple2<String, Integer>(word, 1);}});// 到了这里,就不一样了,之前的话,是不是直接就是pairs.reduceByKey// 然后,就可以得到每个时间段的batch对应的RDD,计算出来的单词计数// 然后,可以打印出那个时间段的单词计数// 但是,有个问题,你如果要统计每个单词的全局的计数呢?// 就是说,统计出来,从程序启动开始,到现在为止,一个单词出现的次数,那么就之前的方式就不好实现// 就必须基于redis这种缓存,或者是mysql这种db,来实现累加// 但是,我们的updateStateByKey,就可以实现直接通过Spark维护一份每个单词的全局的统计次数JavaPairDStream<String, Integer> wordCounts = pairs.updateStateByKey(//import com.google.common.base.Optional;//注意是这个包// 这里的Optional,相当于Scala中的样例类,就是Option,可以这么理解// 它代表了一个值的存在状态,可能存在,也可能不存在new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {private static final long serialVersionUID = 1L;// 这里两个参数// 实际上,对于每个单词,每次batch计算的时候,都会调用这个函数// 第一个参数,values,相当于是这个batch中,这个key的新的值,可能有多个吧// 比如说一个hello,可能有2个1,(hello, 1) (hello, 1),那么传入的是(1,1)// 第二个参数,就是指的是这个key之前的状态,state,其中泛型的类型是你自己指定的@Overridepublic Optional<Integer> call(List<Integer> values,Optional<Integer> state) throws Exception {System.out.println("+++++++++++++++++++++++++++++++");System.out.println("values:   "+values);System.out.println("state:    "+state);System.out.println("===================================");// 首先定义一个全局的单词计数Integer newValue = 0;// 其次,判断,state是否存在,如果不存在,说明是一个key第一次出现// 如果存在,说明这个key之前已经统计过全局的次数了if(state.isPresent()) {newValue = state.get();}// 接着,将本次新出现的值,都累加到newValue上去,就是一个key目前的全局的统计// 次数for(Integer value : values) {newValue += value;}return Optional.of(newValue);  }});// 到这里为止,相当于是,每个batch过来是,计算到pairs DStream,就会执行全局的updateStateByKey// 算子,updateStateByKey返回的JavaPairDStream,其实就代表了每个key的全局的计数// 打印出来wordCounts.print();jssc.start();jssc.awaitTermination();jssc.close();}}

Spark之UpdateStateByKey算子相关推荐

  1. updateStateByKey算子入门案例

    概念 参数为函数 def updateStateByKey[S : ClassTag](updateFunc: (Seq[V], Option[S]) => Option[S]): DStrea ...

  2. spark常用RDD算子 汇总(java和scala版本)

    github: https://github.com/zhaikaishun/spark_tutorial  spark RDD的算子挺多,有时候如何灵活的使用,该如何用一下子想不起来,这一段时间将s ...

  3. 【Spark】Spark的常用算子

    Spark的常用算子 目录内容 Spark的常用算子 一.转换算子(Transformation) 二.行动算子(Action) 三.键值对算子(PairRDDFunctions) 四.文件系统算子( ...

  4. 深入理解spark高阶算子combineByKey

    今天来详细说说spark中的一个比较底层的算子combineByKey. 熟悉spark的朋友应该知道,spark里面有很多类型的算子,有些比较基础,什么map,filter,可能看一眼就会了,有些稍 ...

  5. updateStateByKey算子入门案例之wordCount

    概念 有一个参数,是个函数,该函数有两个参数,第一个是序列类型,第二个是Option类型 def updateStateByKey[S : ClassTag](updateFunc: (Seq[V], ...

  6. Spark RDD-行动算子

    2.4 Action 行动算子:触发运算,在 Executor 执行,如果想直接在 Driver 端看到结果可以使用 collect 和 foreach 都可以将数据拉取到 Driver 端. 2.4 ...

  7. Spark的Transformations算子(理解+实例)

    把每个Transformations算子都敲着练习几遍会理解的更深刻 Transformations算子之后要写action算子才会进行计算. 1. map(func) 描述:返回一个新的RDD,该R ...

  8. Spark常用的算子以及Scala函数总结

    上海站 | 高性能计算之GPU CUDA培训 4月13-15日 三天密集式学习  快速带你晋级 阅读全文 > 正文共11264个字,7张图,预计阅读时间28分钟. Spark与Scala 首先, ...

  9. Spark 基础——RDD 算子

    RDD弹性分布式数据集(Resilient Distributed Dataset)是 Spark 最基本也是最根本的数据抽象 RDD 它具备像 MapReduce 等数据流模型的容错性(fault- ...

  10. spark常见转换算子(transformation)的操作

    package com.meng.nan.day717import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkCon ...

最新文章

  1. 提高安全意识,保护自身安全
  2. 转 MySQL问题排查工具介绍
  3. 【五线谱】五线谱的常用符号 ( 花连谱号 | 高音谱号 | 低音谱号 | 休止符 | 小节线 )
  4. 避免button处理事件过程中 点击按钮触发事件的方法
  5. 智能计算机科学的奠基人,【编注】神经网络算法奠基人之一沃尔特·皮茨的传奇故事...
  6. STM32的8种GPIO输入输出模式深入详解
  7. Python super超类方法
  8. 带有无参数的存储过程
  9. php oracle 配置,关于php:为Windows 64位配置Oracle OCI8
  10. 容器中用uwsgi协议部署注意的问题以及用flask部署
  11. php文本文件操作,文本文件操作的php类
  12. Cloudarrow V2.0 正式发布!
  13. thymeleaf中的条件判断用法
  14. 罗永浩与锤子手机撇清关系;微软回应「高管传奇」经历;Rust 1.38 稳定版发布 | 极客头条...
  15. 写作是最好的学习方法
  16. python中根据字符串导入模块module
  17. windows无法新建计算机对象,教您activex部件不能创建对象怎么解决
  18. 地图服务 纬度、经度对应坐标轴x,y
  19. 三维重建:特征检测+匹配+RT恢复+稠密重建方法
  20. python客户价值分析_Python实现RFM客户价值分析

热门文章

  1. 3. Longest Substring Without Repeating Characters
  2. 使用ExtJs实现文件下载
  3. 转 layout_weight体验(实现按比例显示)
  4. Python MySQL操作
  5. weblogic 找不到数据源问题
  6. Internet 信息服务承载说明 即IIS安装说明
  7. Tomcat中配置文件conf修改的一些常识
  8. (1) 还原二叉树 (25 分)
  9. JAVA程序提取PDF中间页
  10. oracle psu版本确认,Oracle PSU更新