SparkStreaming-相关窗口操作
目录
- 提前封装好的重复代码
- 将日志信息调整为ERROR
- window
- countByWindow
- reduceByWindow
- reduceByKeyAndWindow
- 不保留数值
- 保留数值
- countByValueAndWindow
提前封装好的重复代码
按照我的习惯,先把重复的代码做一个简单的封装,后面直接继承就可,这里的窗口长度为3,滑动频率为1
package com.shujia.testimport org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}abstract class WindowTestTool {var spark: SparkSession = _var sc: SparkContext = _var ssc:StreamingContext = _def main(args: Array[String]): Unit = {spark = SparkSession.builder().master("local[*]").appName("WindowTest").getOrCreate()sc = spark.sparkContextval ssc= new StreamingContext(sc,Durations.seconds(1))ssc.checkpoint("/data/checkpoint")val lines: ReceiverInputDStream[String] = ssc.socketTextStream("master",8888)val words: DStream[String] = lines.flatMap(_.split(" "))this.run(words)ssc.start()ssc.awaitTermination()ssc.stop()}def run(words:DStream[String])
}
这里提前开启一个端口,但要是端口没有正常关闭被占用了,可以先清除
这样就可以了
将日志信息调整为ERROR
在没经过修改之前,日志的等级为INFO,也就是平时所有的信息都会出现,这里修改为ERROR,只让idea在程序报错的时候才出现信息
window
// 1.对每个滑动窗口的数据执行自定义的计算
// window(windowLength, slideInterval)
// 该操作由一个DStream对象调用,传入一个窗口长度参数,
// 一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStream
object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {// 1.对每个滑动窗口的数据执行自定义的计算
// window(windowLength, slideInterval)
// 该操作由一个DStream对象调用,传入一个窗口长度参数,
// 一个窗口移动速率参数,然后将当前时刻当前长度窗口中的元素取出形成一个新的DStreamval windowWords: DStream[String] = words.window(Durations.seconds(3),Durations.seconds(1))windowWords.print()}
}
countByWindow
//2.对每个滑动窗口的数据执行count操作
//countByWindow(windowLength,slideInterval)
//返回指定长度窗口中的元素个数
object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//2.对每个滑动窗口的数据执行count操作//countByWindow(windowLength,slideInterval)//返回指定长度窗口中的元素个数val windowWords: DStream[Long] = words.countByWindow(Durations.seconds(3),Durations.seconds(1))windowWords.print() }
}
reduceByWindow
//3.对每个滑动窗口的数据执行reduce操作
//这里不再是对整个调用DStream进行reduce操作,
//而是在调用DStream上首先取窗口函数的元素形成新的DStream,
// 然后在窗口元素形成的DStream上进行reduce
object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//3.对每个滑动窗口的数据执行reduce操作//这里不再是对整个调用DStream进行reduce操作,//而是在调用DStream上首先取窗口函数的元素形成新的DStream,// 然后在窗口元素形成的DStream上进行reduceval windowWords: DStream[String] = words.reduceByWindow(_+"-"+_,Durations.seconds(3),Durations.seconds(1)) }
}
reduceByKeyAndWindow
不保留数值
//4.对每个滑动窗口的数据执行reduceByKey操作
//reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])
//调用该操作的DStream中的元素格式为(k, v),整个操作类似于前面的reduceByKey,
//只不过对应的数据源不同,reduceByKeyAndWindow的数据源是基于该DStream的窗口长度中的所有数据。该操作也有一个可选的并发数参数
object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//4.对每个滑动窗口的数据执行reduceByKey操作//reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])//调用该操作的DStream中的元素格式为(k, v),整个操作类似于前面的reduceByKey,//只不过对应的数据源不同,reduceByKeyAndWindow的数据源是基于该DStream的窗口长度中的所有数据。该操作也有一个可选的并发数参数val pairs: DStream[(String, Int)] = words.map(word => (word, 1))val windowWords: DStream[(String, Int)] = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b),Durations.seconds(3),Durations.seconds(1))windowWords.print()}
}
保留数值
object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {//两个参数:会对原来的数据进行一个保存val pairs: DStream[(String, Int)] = words.map(word => (word, 1))val windowWords: DStream[(String, Int)] = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b),(a: Int, b: Int) => (a - b),Durations.seconds(3),Durations.seconds(1))windowWords.print()}
}
countByValueAndWindow
//5.对每个滑动窗口的数据执行countByValue操作
//countByValueAndWindow(windowLength,slideInterval, [numTasks])
//类似于前面的countByValue操作,调用该操作的DStream数据格式为(K, v),
// 返回的DStream格式为(K, Long)。统计当前时间窗口中元素值相同的元素的个数
object WindowTest extends WindowTestTool {override def run(words: DStream[String]): Unit = {val windowWords: DStream[(String, Long)] = words.countByValueAndWindow(Durations.seconds(3),Durations.seconds(1))windowWords.print()}
}
感谢阅读,我是啊帅和和,一位大数据专业大四学生,祝你快乐。
SparkStreaming-相关窗口操作相关推荐
- SparkStreaming - 窗口函数(窗口操作)
窗口操作就是把多个采集周期设置成一个窗口,一起来计算,然后进行滑动,根据设置的滑动大小. 窗口大小和滑动大小,要是采集周期的倍数 package date_10_17_SparkStreamingim ...
- java 面向对账 抽象_java开发银行支付、对账时证书相关的操作实例
java 开发银行支付.对账时证书相关的操作总结. 证书的相关操作主要是在CMD窗口使用keytool工具Keytool 是一个Java数据证书的管理工具 ,Keytool将密钥(key)和证书(ce ...
- 医号馆门诊管理软件SaaS系统——新开电子处方以及售药窗口操作手册
医号馆诊所管理系统具备电子病例,收银管理.患者管理.库存管理.数据统计等功能,涵盖了诊所日常管理的各个方面,可有效提升医护人员的工作效率,提高诊所品牌形象,增加诊所营业收入,增强医患信任度. 为了方便 ...
- selenium之窗口操作
前言 webdriver经常会遇到在多个窗口之间切换的操作,例如在A窗口操作某个元素跳转到B窗口,然后又进入C窗口,此时如果需要进入A\B窗口操作就需要用到特殊的窗口处理方法 窗口处理方法 获取所有窗 ...
- Linux中和文件相关的操作
Linux中和文件(/文件夹)相关的操作 1. 文件:删除.复制.移动.创建链接 2. 文件的解压 和 压缩 3. 文件:列举查看.大小查看.个数统计 3.1 `ls`:文件列举查看 3.2 `ls. ...
- 搞定vim的窗口操作
最近在给学生演示数据结构代码时,发现用一般的方法总会有不方便,如果使用ide又觉得太浪费了,后来觉得用vim就够了,使用buffer总会需要页面调来跳出,学生看起来容易迷糊.所以就研究了下vim的窗口 ...
- python的窗口处理模块_Python tkinter模块弹出窗口及传值回到主窗口操作详解
本文实例讲述了Python tkinter模块弹出窗口及传值回到主窗口操作.分享给大家供大家参考,具体如下: 有些时候,我们需要使用弹出窗口,对程序的运行参数进行设置.有两种选择 一.标准窗口 如果只 ...
- MySQL数据库安装-Windows10为例【超级详细安装过程(每一步的截图)、检测安装是否成功、登录数据库并查看版本号、将MySQL添加到环境变量中、在命令行窗口操作数据库】
哈哈,这是在哔哩哔哩网站上 看的 MySQL安装步骤. 我安装的时候,截截图,希望对大家 有所帮助. 链接:https://pan.baidu.com/s/1fKFQCVHMQOmbg84UyGrKB ...
- Boost:与gz文件相关的操作实例
Boost:与gz文件相关的操作实例 实现功能 C++实现代码 实现功能 与gz文件相关的操作实例,打开,关闭,读写. C++实现代码 #include "zstream.h" # ...
- 拷贝构造,深度拷贝,关于delete和default相关的操作,explicit,类赋初值,构造函数和析构函数,成员函数和内联函数,关于内存存储,默认参数,静态函数和普通函数,const函数,友元
1.拷贝构造 //拷贝构造的规则,有两种方式实现初始化. //1.一个是通过在后面:a(x),b(y)的方式实现初始化. //2.第二种初始化的方式是直接在构造方法里面实现初始化. 案例如下: ...
最新文章
- 弃Java、Swift于不顾,为何选Python?
- P3398仓鼠(LCA)
- 特征值和特征向量(Eigenvalues and Eigenvectors)
- UML中类之间的几种关系
- 星外主机销售系统源码_业务员大客户销售订货订单管理系统源码开发外包解析...
- 简述linux的重要安装过程,对Linux安装过程的理解
- SCWS中文分词,功能函数实例应用
- How can I add a site title refiner
- html5吻胸小游戏,html5气球大战小游戏代码
- Hive 优化之 推测执行
- 派蒙的奇妙冒险------石之海(C/C++)
- Wireshark对pop3抓包分析
- python获取发送到手机的短信,使用python将短信更新发送到手机
- hdmi怎么支持2k分辨率_选择2K显示器有没有必要?
- Parasoft案例研究:医疗器械软件验证与合规性
- poi 删除 word文档中的图片
- HTML - CSS - JavaScript
- 片上网络路由算法综述
- Codeforces Global Round 2 B. Alyona and a Narrow Fridge(二分)
- 基于蚁群算法的多配送中心的车辆调度问题的研究附Matlab代码
热门文章
- STM32启动文件的分析
- MacBook Pro选购需要注意哪些,Mac电脑选购建议
- Linux 环境编程 day01 Linux系统介绍、GNU编译工具、静态/共享库、环境变量表
- 红米ac2100有ipv6吗_【1222多功能版OpenWrt】红米小米AC2100|IPV6|酸奶|SmartDNS|多拨|猫咪,附教程...
- android运行activity时报 Unfortunately,TestLayout has stopped 问题
- STM32 SWD/JTAG引脚被占用 无法使用Jlink下载代码时的解决方法
- Oracle VM VirtualBox 不可用
- 教你怎样选择伺服电机控制方式
- xshell执行sql脚本
- 组合(Composite)模式