Spark Streaming中对DStream的转换会转变成对RDD的转换操作,流程如下:

其中,lines表示转换操作前的DStream,words表示转换操作后生成的DStream。对lines做flatMap转换操作,也就是对它内部的所有RDD做flatMap转换操作。

接下来,列举DStream API 提供的与转换操作相关的方法。

 

在以上表中,列举了一些DStream API提供的与转换操作相关的方法。DStream API提供的与转换操作相关的方法和RDD API有些不同,不同之处在于RDD API 中没有提供transform()和updateStateByKey()两个方法。

1、transform()

(1)启动master,slave1,slave2三个节点的kafka服务。

bin/kafka-server-start.sh.config/server.properties

(2)在master节点下载nc服务并开启服务

下载命令:yum install nc

执行命令 nc-lk 9999启动服务器且监听Socket服务,并输入数据 I am learning Spark Streaming now.

(3)打开IDEA工具,创建一个名为spark03的Maven项目。

(4)配置pom.xml文件,引入Spark Streaming相关依赖和设置源代码的存储路径,具体内容如下:

<dependencies><!--引入Scala编程依赖库--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><!--引入Spark核心依赖--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.0.2</version></dependency><!--引入sparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.0.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory></build>

(5)配置好pom.xml文件后,在/src/main和/src/test目录下分别创建scala目录,用来防止sourceDirectory和testDirectory标签提示错误。

(6)在spark03项目的/src/main/scala目录下创建一个名为itcast的包,接着在包下创建名为TransformTest的scala类,编写以下代码:

import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.{SparkConf, SparkContext}object TransformTest {def main(args: Array[String]): Unit = {//创建SparkConf对象val sparkConf : SparkConf = new SparkConf().setAppName("TransformTest").setMaster("local[2]")//创建SparkContext对象val sc : SparkContext = new SparkContext(sparkConf)//设置日志级别sc.setLogLevel("WARN")//创建StreamingContextval ssc : StreamingContext = new StreamingContext(sc,Seconds(5))//连接socket服务val dstream :ReceiverInputDStream[String] =ssc.socketTextStream("192.168.196.101",9999)//使用RDD-to-RDD函数val words : DStream[String] = dstream.transform(rdd => rdd.flatMap(_.split(" ")))//打印输出words.print()//开启流式计算ssc.start()//用于保持程序一直运行ssc.awaitTermination()}
}

运行结果如下图所示:

2、updateStateByKey()

在spark03项目的/src/main/scala/itcast目录下创建一个名为updataStateByKeyTest的scala类,编写以下内容:

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object UpdateStateByKeyTest {//newValues表示当前批次汇总成的(k,v)中的相同k的所有v//runningCount表示历史的所有相同key的value总和def updateFunction(newValues : Seq[Int],runningCount:Option[Int]):Option[Int] = {val newCount = runningCount.getOrElse(0) + newValues.sumSome(newCount)}def main(args: Array[String]): Unit = {//创建SparkConf对象val sparkConf: SparkConf = new SparkConf().setAppName("UpdateStateByKeyTest").setMaster("local[2]")//创建SparkContext对象val sc: SparkContext = new SparkContext(sparkConf)//设置日志级别sc.setLogLevel("WARN")//创建StreamingContextval ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//配置检查点目录ssc.checkpoint("./")//连接socket服务val dstream: ReceiverInputDStream[String] =ssc.socketTextStream("192.168.196.101", 9999)//按空格切分每一行val wordAndOne : DStream[(String,Int)]= dstream.flatMap(_.split(" ")).map(word =>(word,1))//调用updateStateByKey操作var result : DStream[(String,Int)] = wordAndOne.updateStateByKey(updateFunction)//打印输出result.print()//开启流式计算ssc.start()//用于保持程序一直运行ssc.awaitTermination()}
}

先在IDEA中的updataStateByKeyTest运行代码,然后再到master节点不断输入单词,具体内容如下:

nc -lk 9999

hadoop spark itcast

spark itcast

运行结果如下:

DStream转换操作相关推荐

  1. Spark Streaming介绍,DStream,DStream相关操作(来自学习资料)

    一. Spark Streaming介绍 1. SparkStreaming概述 1.1. 什么是Spark Streaming Spark Streaming类似于Apache Storm,用于流式 ...

  2. Spark DStream相关操作

    DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的操作,如:updateStateByK ...

  3. Hive 数仓中常见的日期转换操作

    (1)Hive 数仓中一些常用的dt与日期的转换操作 下面总结了自己工作中经常用到的一些日期转换,这类日期转换经常用于报表的时间粒度和统计周期的控制中 日期变换: (1)dt转日期 to_date(f ...

  4. [scala-spark]10. RDD转换操作

    RDD提供了一组非常丰富的操作来操作数据,如:map,flatMap,filter等转换操作,以及SaveAsTextFile,conutByKey等行动操作.这里仅仅综述了转换操作. map map ...

  5. c#图片base64去转义字符_C#实现字符串与图片的Base64编码转换操作示例

    本文实例讲述了C#实现字符串与图片的Base64编码转换操作.分享给大家供大家参考,具体如下: using System; using System.Collections.Generic; usin ...

  6. c#图片base64去转义字符_C#实现字符串与图片的Base64编码转换操作示例|chu

    本文实例讲述了C#实现字符串与图片的Base64编码转换操作.分享给大家供大家参考,具体如下: using System; using System.Collections.Generic; usin ...

  7. 对图片进行压缩,水印,伸缩变换,透明处理,格式转换操作

    对图片进行压缩,水印,伸缩变换,透明处理,格式转换操作 1 /** 2 * <html> 3 * <body> 4 * <P> Copyright 1994 Jso ...

  8. CAD转Excel,该如何转换操作

    在日常的CAD制图工作中,我们常常会遇到各 种图纸转换问题,其中就有CAD转Excel,CAD图纸该如何转换成Excel文件.如何快 速转换呢?今天我就和大家讨论一下,CAD转Excel的转换的方法. ...

  9. CAD版本转换,手机该如何转换操作呢?

    CAD版本转换,手机该如何转换操作呢?我们都知道,做CAD制图的时候,我们常常需要转换各种CAD版本,若是不使用电脑端的时候,我们该如何操作转换呢?软件会自动搜索手机中相应的文件格式,比如CAD版本转 ...

最新文章

  1. 报名 | 首期AI Time PhD:听清北师兄分享前沿研究成果!
  2. Golang实现简单爬虫框架(4)——队列实现并发任务调度
  3. Python编程基础:第三十节 文件检测File Detection
  4. tp5 cache缓存简单使用
  5. python基础内容_python基础-python介绍
  6. 学PyTorch还是TensorFlow?
  7. 【linux命令】Centos下如何匹配内容在哪个文件中
  8. datax 持续数据同步_DataX数据同步
  9. EZchip将推全球首款100核64位ARM A-53芯片
  10. 95-140-112-源码-transform-算子split 和 select
  11. 求解斐波那契数列复杂度分析
  12. 基于SSM的校园帮系统
  13. 23届计算机专业毕设Java选题参考
  14. 比管理时间重要 1000 倍的,是管理精力
  15. Splitter和Joiner使用手册
  16. matlab中霍夫线检测函数,matlab 霍夫检测
  17. 也许通过社群找工作,是未来的趋势。
  18. python疫苗预约系统毕业设计开题报告
  19. c# ocx控件安装后不显示问题。
  20. ANSYS SIwave 基于S参数模型的信号完整性仿真论文

热门文章

  1. find_in_set学习与思考
  2. 如何从视频中提取音频?
  3. 皮尔逊相关系数和检验P值
  4. VR系列--VR介绍
  5. 盘点国内外25款备具代表性的协同办公软件
  6. 大数据分析02——成都二手房(热度)
  7. 论语 灵公篇(笔记)
  8. python中round函数的精度保留方法---四舍六入五成双
  9. Windows 改变CMD窗口颜色!
  10. 【漫步计算机系统】:发展概览Ⅲ