DStream转换操作
Spark Streaming中对DStream的转换会转变成对RDD的转换操作,流程如下:
其中,lines表示转换操作前的DStream,words表示转换操作后生成的DStream。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。
接下来,列举DStream API 提供的与转换操作相关的方法。
在以上表中,列举了一些DStream API提供的与转换操作相关的方法。DStream API提供的与转换操作相关的方法和RDD API有些不同,不同之处在于RDD API 中没有提供transform()和updateStateByKey()两个方法。
1、transform()
(1)启动master,slave1,slave2三个节点的kafka服务。
bin/kafka-server-start.sh.config/server.properties
(2)在master节点下载nc服务并开启服务
下载命令:yum install nc
执行命令 nc-lk 9999启动服务器且监听Socket服务,并输入数据 I am learning Spark Streaming now.
(3)打开IDEA工具,创建一个名为spark03的Maven项目。
(4)配置pom.xml文件,引入Spark Streaming相关依赖和设置源代码的存储路径,具体内容如下:
<dependencies><!--引入Scala编程依赖库--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><!--引入Spark核心依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.0.2</version></dependency><!--引入sparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.0.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory></build>
(5)配置好pom.xml文件后,在/src/main和/src/test目录下分别创建scala目录,用来防止sourceDirectory和testDirectory标签提示错误。
(6)在spark03项目的/src/main/scala目录下创建一个名为itcast的包,接着在包下创建名为TransformTest的scala类,编写以下代码:
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}object TransformTest {def main(args: Array[String]): Unit = {//创建SparkConf对象val sparkConf : SparkConf = new SparkConf().setAppName("TransformTest").setMaster("local[2]")//创建SparkContext对象val sc : SparkContext = new SparkContext(sparkConf)//设置日志级别sc.setLogLevel("WARN")//创建StreamingContextval ssc : StreamingContext = new StreamingContext(sc,Seconds(5))//连接socket服务val dstream :ReceiverInputDStream[String] =ssc.socketTextStream("192.168.196.101",9999)//使用RDD-to-RDD函数val words : DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))//打印输出words.print()//开启流式计算ssc.start()//用于保持程序一直运行ssc.awaitTermination()}
}
运行结果如下图所示:
2、updateStateByKey()
在spark03项目的/src/main/scala/itcast目录下创建一个名为updataStateByKeyTest的scala类,编写以下内容:
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object UpdateStateByKeyTest {//newValues表示当前批次汇总成的(k,v)中的相同k的所有v//runningCount表示历史的所有相同key的value总和def updateFunction(newValues : Seq[Int],runningCount:Option[Int]):Option[Int] = {val newCount = runningCount.getOrElse(0) + newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//创建SparkConf对象val sparkConf: SparkConf = new SparkConf().setAppName("UpdateStateByKeyTest").setMaster("local[2]")//创建SparkContext对象val sc: SparkContext = new SparkContext(sparkConf)//设置日志级别sc.setLogLevel("WARN")//创建StreamingContextval ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//配置检查点目录ssc.checkpoint("./")//连接socket服务val dstream: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.196.101", 9999)//按空格切分每一行val wordAndOne : DStream[(String,Int)]= dstream.flatMap(_.split(" ")).map(word =>(word,1))//调用updateStateByKey操作var result : DStream[(String,Int)] = wordAndOne.updateStateByKey(updateFunction)//打印输出result.print()//开启流式计算ssc.start()//用于保持程序一直运行ssc.awaitTermination()}
}
先在IDEA中的updataStateByKeyTest运行代码,然后再到master节点不断输入单词,具体内容如下:
nc -lk 9999
hadoop spark itcast
spark itcast
运行结果如下:
DStream转换操作相关推荐
- Spark Streaming介绍,DStream,DStream相关操作(来自学习资料)
一. Spark Streaming介绍 1. SparkStreaming概述 1.1. 什么是Spark Streaming Spark Streaming类似于Apache Storm,用于流式 ...
- Spark DStream相关操作
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的操作,如:updateStateByK ...
- Hive 数仓中常见的日期转换操作
(1)Hive 数仓中一些常用的dt与日期的转换操作 下面总结了自己工作中经常用到的一些日期转换,这类日期转换经常用于报表的时间粒度和统计周期的控制中 日期变换: (1)dt转日期 to_date(f ...
- [scala-spark]10. RDD转换操作
RDD提供了一组非常丰富的操作来操作数据,如:map,flatMap,filter等转换操作,以及SaveAsTextFile,conutByKey等行动操作.这里仅仅综述了转换操作. map map ...
- c#图片base64去转义字符_C#实现字符串与图片的Base64编码转换操作示例
本文实例讲述了C#实现字符串与图片的Base64编码转换操作.分享给大家供大家参考,具体如下: using System; using System.Collections.Generic; usin ...
- c#图片base64去转义字符_C#实现字符串与图片的Base64编码转换操作示例|chu
本文实例讲述了C#实现字符串与图片的Base64编码转换操作.分享给大家供大家参考,具体如下: using System; using System.Collections.Generic; usin ...
- 对图片进行压缩,水印,伸缩变换,透明处理,格式转换操作
对图片进行压缩,水印,伸缩变换,透明处理,格式转换操作 1 /** 2 * <html> 3 * <body> 4 * <P> Copyright 1994 Jso ...
- CAD转Excel,该如何转换操作
在日常的CAD制图工作中,我们常常会遇到各 种图纸转换问题,其中就有CAD转Excel,CAD图纸该如何转换成Excel文件.如何快 速转换呢?今天我就和大家讨论一下,CAD转Excel的转换的方法. ...
- CAD版本转换,手机该如何转换操作呢?
CAD版本转换,手机该如何转换操作呢?我们都知道,做CAD制图的时候,我们常常需要转换各种CAD版本,若是不使用电脑端的时候,我们该如何操作转换呢?软件会自动搜索手机中相应的文件格式,比如CAD版本转 ...
最新文章
- 报名 | 首期AI Time PhD:听清北师兄分享前沿研究成果!
- Golang实现简单爬虫框架(4)——队列实现并发任务调度
- Python编程基础:第三十节 文件检测File Detection
- tp5 cache缓存简单使用
- python基础内容_python基础-python介绍
- 学PyTorch还是TensorFlow?
- 【linux命令】Centos下如何匹配内容在哪个文件中
- datax 持续数据同步_DataX数据同步
- EZchip将推全球首款100核64位ARM A-53芯片
- 95-140-112-源码-transform-算子split 和 select
- 求解斐波那契数列复杂度分析
- 基于SSM的校园帮系统
- 23届计算机专业毕设Java选题参考
- 比管理时间重要 1000 倍的,是管理精力
- Splitter和Joiner使用手册
- matlab中霍夫线检测函数,matlab 霍夫检测
- 也许通过社群找工作,是未来的趋势。
- python疫苗预约系统毕业设计开题报告
- c# ocx控件安装后不显示问题。
- ANSYS SIwave 基于S参数模型的信号完整性仿真论文