Transformations on DStreams之transform的使用 实现黑名单操作/指定过滤
https://blog.csdn.net/qq_43688472/article/details/86616864
只处理当前批次的数据,所谓的无状态的方式,来一次,处理一次
有状态:改批次的数据和以前批次的数据是需要“累加”的

例如:今天某点到某点什么数据出现的次数

1.在那个基础上加个时间戳,把他放到某处,在进行累加
2.直接的方式完成
官网:http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations-on-dstreams
updateStateByKey(func)
Return a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain(维持) arbitrary state data for each key.
累计旧状态进行更新

IDEA操作

package g5.learningimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutable.ListBufferobject UpdateStateByKeyApp {def main(args: Array[String]): Unit = {//准备工作val conf = new SparkConf().setMaster("local[2]").setAppName("UpdateStateByKeyApp")val ssc = new StreamingContext(conf, Seconds(10))ssc.checkpoint("hdfs://hadoop001:8020/ss/logs")//这里要加这个,为什么,因为这是个有状态的数据,你要旧数据一个地方存放才能累加//业务逻辑val lines = ssc.socketTextStream("hadoop001", 9999)val results = lines.flatMap(_.split(",")).map((_,1))
val state = results.updateStateByKey(updateFunction)state.print()//streaming的启动ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate}def updateFunction(newValues: Seq[Int], runningCount: Option[Int]): Option[Int] = {val newCount = newValues.sumval pre =runningCount.getOrElse(0) // add the new values with the previous running count to get the new countSome(newCount+ pre)}}

问题:

这里你会发现在hdfs上你会产生很多的小文件

Transformations on DStreams之updateStateByKey 的使用和状态累加相关推荐

  1. Transformations on DStreams之transform的使用 实现黑名单操作/指定过滤

    Transformations on DStreams之transform 实现黑名单操作/指定过滤 官网:http://spark.apache.org/docs/latest/streaming- ...

  2. Spark Streaming实现实时WordCount,DStream的使用,updateStateByKey(func)实现累计计算单词出现频率

    一. 实战 1.用Spark Streaming实现实时WordCount 架构图: 说明:在hadoop1:9999下的nc上发送消息,消费端接收消息,然后并进行单词统计计算. * 2.安装并启动生 ...

  3. 【大数据笔记10】SparkStreaming——流式计算

    What it is Spark Streaming类似于Apache Storm,用于流式数据的处理.根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特点.Spark St ...

  4. Spark Streaming 编程指南[中英对照]

    2019独角兽企业重金招聘Python工程师标准>>> 基于Spark 2.0 Preview的材料翻译,原[英]文地址: http://spark.apache.org/docs/ ...

  5. Spark Streaming简介

    Spark Streaming 是core Spark的一个扩展,用来处理实时数据流,数据源可以来自Kafka, Flume, HDFS等,经过复杂的算法处理后,存入HDFS,数据库,或者实时的Das ...

  6. 5W字总结Spark(建议收藏)

    点击上方 "大数据肌肉猿"关注, 星标一起成长 后台回复[加群],进入高质量学习交流群 2021年大数据肌肉猿公众号奖励制度 本文目录: 一.Spark 基础 二.Spark Co ...

  7. 惊了!10万字的Spark全文!

    Hello,大家好,这里是857技术社区,我是社区创始人之一,以后会持续给大家更新大数据各组件的合集内容,路过给个关注吧!!! 今天给大家分享一篇小白易读懂的 Spark万字概念长文,本篇文章追求的是 ...

  8. 【Sparkstreaming_01】

    文章目录 SparkStreaming 1.流处理 /实时计算 2.批处理/离线计算 SparkStreaming简单介绍: SparkStreaming数据源: 总结: SparkStreaming ...

  9. ❤️Spark的关键技术回顾,持续更新!【推荐收藏加关注】❤️

    目录 前言 Spark的关键技术回顾 一.Spark复习题回顾 1.Spark使用的版本 2.Spark几种部署方式? 3.Spark的提交任务的方式? 4.使用Spark-shell的方式也可以交互 ...

最新文章

  1. 简要说明建设城市大脑三条关键标准规范
  2. 5G NR — O-RAN 的系统架构
  3. 差点吓尿,手贱不要乱点support native debug
  4. 2020年有寓意的领证日期_2020年有意义谐音的领证日子 容易记住的领证日期
  5. 贪心 - Dota2 参议院
  6. telnet不能用,提示:-bash: telnet: command not found
  7. 案例:无人测量船水库水下地形测量及库容量计算
  8. 工作75::一直报404
  9. ajax搜索思路,jquery创建一个ajax关键词数据搜索实现思路
  10. 溢价28倍!罗永浩的直播公司要卖了:“真还传”提前上演!
  11. 基于JAVA+SpringMVC+MYSQL的记账管理系统
  12. 一个类中可以没有main方法_一个月可以暴瘦二十斤的减肥方法
  13. HTML5 学习准备1
  14. 编译错误:vulkan/vulkan.h:没有那个文件或目录
  15. 第9章 逻辑回归 学习笔记 中
  16. 告别低效扫码, Barcode Reader高效解决你批量扫码的困扰
  17. 前后端分离开发,如何定义各类错误码?
  18. 中国移动“梧桐杯”大数据应用创新大赛智慧金融初赛TOP1开源
  19. 还在问java架构师路线?学习路线?十年京东架构师教你这样做
  20. UninstallToo卸载软件

热门文章

  1. 【转】模块(configparser+shutil+logging)
  2. Java 高效编程之 Builder 模式
  3. Linux(乌班图 )系统下安装jdk 和eclipse开发IDE
  4. Spring Cloud版——电影售票系统七使用 Zuul 构建微服务网关
  5. 作为前端Web开发者,这12条基本命令不可不会
  6. 201712-2-游戏
  7. DAY1-Workstation and CentOS7.x 快照
  8. 【汇编语言与计算机系统结构笔记16】子程序设计:子程序的嵌套与递归,多个模块之间的参数传送
  9. 【李宏毅2020 ML/DL】P43-44 More about Adversarial Attack | Images Audio
  10. [转]busybox登陆后没要求输入密码的解决办法