目录

  • 提前封装好的重复代码
  • 将日志信息调整为ERROR
  • window
  • countByWindow
  • reduceByWindow
  • reduceByKeyAndWindow
    • 不保留数值
    • 保留数值
  • countByValueAndWindow

提前封装好的重复代码

按照我的习惯,先把重复的代码做一个简单的封装,后面直接继承就可,这里的窗口长度为3,滑动频率为1

package com.shujia.testimport org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}abstract class WindowTestTool {var spark: SparkSession = _var sc: SparkContext = _var ssc:StreamingContext = _def main(args: Array[String]): Unit = {spark = SparkSession.builder().master("local[*]").appName("WindowTest").getOrCreate()sc = spark.sparkContextval ssc= new StreamingContext(sc,Durations.seconds(1))ssc.checkpoint("/data/checkpoint")val lines: ReceiverInputDStream[String] = ssc.socketTextStream("master",8888)val words: DStream[String] = lines.flatMap(_.split(" "))this.run(words)ssc.start()ssc.awaitTermination()ssc.stop()}def run(words:DStream[String])
}

这里提前开启一个端口,但要是端口没有正常关闭被占用了,可以先清除

这样就可以了

将日志信息调整为ERROR

在没经过修改之前,日志的等级为INFO,也就是平时所有的信息都会出现,这里修改为ERROR,只让idea在程序报错的时候才出现信息

window

// 1.对每个滑动窗口的数据执行自定义的计算
// window(windowLength, slideInterval)
// 该操作由一个DStream对象调用,传入一个窗口长度参数,
// 一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream

object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//    1.对每个滑动窗口的数据执行自定义的计算
//    window(windowLength, slideInterval)
//    该操作由一个DStream对象调用,传入一个窗口长度参数,
//    一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStreamval windowWords: DStream[String] = words.window(Durations.seconds(3),Durations.seconds(1))windowWords.print()}
}

countByWindow

//2.对每个滑动窗口的数据执行count操作
//countByWindow(windowLength,slideInterval)
//返回指定长度窗口中的元素个数

object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//2.对每个滑动窗口的数据执行count操作//countByWindow(windowLength,slideInterval)//返回指定长度窗口中的元素个数val windowWords: DStream[Long] = words.countByWindow(Durations.seconds(3),Durations.seconds(1))windowWords.print()    }
}

reduceByWindow

//3.对每个滑动窗口的数据执行reduce操作
//这里不再是对整个调用DStream进行reduce操作,
//而是在调用DStream上首先取窗口函数的元素形成新的DStream,
// 然后在窗口元素形成的DStream上进行reduce


object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//3.对每个滑动窗口的数据执行reduce操作//这里不再是对整个调用DStream进行reduce操作,//而是在调用DStream上首先取窗口函数的元素形成新的DStream,// 然后在窗口元素形成的DStream上进行reduceval windowWords: DStream[String] = words.reduceByWindow(_+"-"+_,Durations.seconds(3),Durations.seconds(1)) }
}

reduceByKeyAndWindow

不保留数值

//4.对每个滑动窗口的数据执行reduceByKey操作
//reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
//调用该操作的DStream中的元素格式为(k, v),整个操作类似于前面的reduceByKey,
//只不过对应的数据源不同,reduceByKeyAndWindow的数据源是基于该DStream的窗口长度中的所有数据。该操作也有一个可选的并发数参数

object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//4.对每个滑动窗口的数据执行reduceByKey操作//reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])//调用该操作的DStream中的元素格式为(k, v),整个操作类似于前面的reduceByKey,//只不过对应的数据源不同,reduceByKeyAndWindow的数据源是基于该DStream的窗口长度中的所有数据。该操作也有一个可选的并发数参数val pairs: DStream[(String, Int)] = words.map(word => (word, 1))val windowWords: DStream[(String, Int)] = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b),Durations.seconds(3),Durations.seconds(1))windowWords.print()}
}

保留数值

object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//两个参数:会对原来的数据进行一个保存val pairs: DStream[(String, Int)] = words.map(word => (word, 1))val windowWords: DStream[(String, Int)] = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b),(a: Int, b: Int) => (a - b),Durations.seconds(3),Durations.seconds(1))windowWords.print()}
}

countByValueAndWindow

//5.对每个滑动窗口的数据执行countByValue操作
//countByValueAndWindow(windowLength,slideInterval, [numTasks])
//类似于前面的countByValue操作,调用该操作的DStream数据格式为(K, v),
// 返回的DStream格式为(K, Long)。统计当前时间窗口中元素值相同的元素的个数

object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {val windowWords: DStream[(String, Long)] = words.countByValueAndWindow(Durations.seconds(3),Durations.seconds(1))windowWords.print()}
}

感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。

SparkStreaming-相关窗口操作相关推荐

  1. SparkStreaming - 窗口函数(窗口操作)

    窗口操作就是把多个采集周期设置成一个窗口,一起来计算,然后进行滑动,根据设置的滑动大小. 窗口大小和滑动大小,要是采集周期的倍数 package date_10_17_SparkStreamingim ...

  2. java 面向对账 抽象_java开发银行支付、对账时证书相关的操作实例

    java 开发银行支付.对账时证书相关的操作总结. 证书的相关操作主要是在CMD窗口使用keytool工具Keytool 是一个Java数据证书的管理工具 ,Keytool将密钥(key)和证书(ce ...

  3. 医号馆门诊管理软件SaaS系统——新开电子处方以及售药窗口操作手册

    医号馆诊所管理系统具备电子病例,收银管理.患者管理.库存管理.数据统计等功能,涵盖了诊所日常管理的各个方面,可有效提升医护人员的工作效率,提高诊所品牌形象,增加诊所营业收入,增强医患信任度. 为了方便 ...

  4. selenium之窗口操作

    前言 webdriver经常会遇到在多个窗口之间切换的操作,例如在A窗口操作某个元素跳转到B窗口,然后又进入C窗口,此时如果需要进入A\B窗口操作就需要用到特殊的窗口处理方法 窗口处理方法 获取所有窗 ...

  5. Linux中和文件相关的操作

    Linux中和文件(/文件夹)相关的操作 1. 文件:删除.复制.移动.创建链接 2. 文件的解压 和 压缩 3. 文件:列举查看.大小查看.个数统计 3.1 `ls`:文件列举查看 3.2 `ls. ...

  6. 搞定vim的窗口操作

    最近在给学生演示数据结构代码时,发现用一般的方法总会有不方便,如果使用ide又觉得太浪费了,后来觉得用vim就够了,使用buffer总会需要页面调来跳出,学生看起来容易迷糊.所以就研究了下vim的窗口 ...

  7. python的窗口处理模块_Python tkinter模块弹出窗口及传值回到主窗口操作详解

    本文实例讲述了Python tkinter模块弹出窗口及传值回到主窗口操作.分享给大家供大家参考,具体如下: 有些时候,我们需要使用弹出窗口,对程序的运行参数进行设置.有两种选择 一.标准窗口 如果只 ...

  8. MySQL数据库安装-Windows10为例【超级详细安装过程(每一步的截图)、检测安装是否成功、登录数据库并查看版本号、将MySQL添加到环境变量中、在命令行窗口操作数据库】

    哈哈,这是在哔哩哔哩网站上 看的 MySQL安装步骤. 我安装的时候,截截图,希望对大家 有所帮助. 链接:https://pan.baidu.com/s/1fKFQCVHMQOmbg84UyGrKB ...

  9. Boost:与gz文件相关的操作实例

    Boost:与gz文件相关的操作实例 实现功能 C++实现代码 实现功能 与gz文件相关的操作实例,打开,关闭,读写. C++实现代码 #include "zstream.h" # ...

  10. 拷贝构造,深度拷贝,关于delete和default相关的操作,explicit,类赋初值,构造函数和析构函数,成员函数和内联函数,关于内存存储,默认参数,静态函数和普通函数,const函数,友元

     1.拷贝构造 //拷贝构造的规则,有两种方式实现初始化. //1.一个是通过在后面:a(x),b(y)的方式实现初始化. //2.第二种初始化的方式是直接在构造方法里面实现初始化. 案例如下: ...

最新文章

  1. 弃Java、Swift于不顾,为何选Python?
  2. P3398仓鼠(LCA)
  3. 特征值和特征向量(Eigenvalues and Eigenvectors)
  4. UML中类之间的几种关系
  5. 星外主机销售系统源码_业务员大客户销售订货订单管理系统源码开发外包解析...
  6. 简述linux的重要安装过程,对Linux安装过程的理解
  7. SCWS中文分词,功能函数实例应用
  8. How can I add a site title refiner
  9. html5吻胸小游戏,html5气球大战小游戏代码
  10. Hive 优化之 推测执行
  11. 派蒙的奇妙冒险------石之海(C/C++)
  12. Wireshark对pop3抓包分析
  13. python获取发送到手机的短信,使用python将短信更新发送到手机
  14. hdmi怎么支持2k分辨率_选择2K显示器有没有必要?
  15. Parasoft案例研究:医疗器械软件验证与合规性
  16. poi 删除 word文档中的图片
  17. HTML - CSS - JavaScript
  18. 片上网络路由算法综述
  19. Codeforces Global Round 2 B. Alyona and a Narrow Fridge(二分)
  20. 基于蚁群算法的多配送中心的车辆调度问题的研究附Matlab代码

热门文章

  1. STM32启动文件的分析
  2. MacBook Pro选购需要注意哪些,Mac电脑选购建议
  3. Linux 环境编程 day01 Linux系统介绍、GNU编译工具、静态/共享库、环境变量表
  4. 红米ac2100有ipv6吗_【1222多功能版OpenWrt】红米小米AC2100|IPV6|酸奶|SmartDNS|多拨|猫咪,附教程...
  5. android运行activity时报 Unfortunately,TestLayout has stopped 问题
  6. STM32 SWD/JTAG引脚被占用 无法使用Jlink下载代码时的解决方法
  7. Oracle VM VirtualBox 不可用
  8. 教你怎样选择伺服电机控制方式
  9. xshell执行sql脚本
  10. 组合(Composite)模式