原理

架构


实战

RDD 队列

val rddQueue = new mutable.QueueRDD[Int]

自定义数据源

用法及说明
需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

class CustomerReceiver(host: String, port: Int) extends
Receiver[String](StorageLevel.MEMORY_ONLY) {//最初启动的时候,调用该方法,作用为:读数据并将数据发送给 Sparkoverride def onStart(): Unit = {new Thread("Socket Receiver") {override def run() {receive()}}.start()}//读数据并将数据发送给 Sparkdef receive(): Unit = {//创建一个 Socketvar socket: Socket = new Socket(host, port)//定义一个变量,用来接收端口传过来的数据var input: String = null//创建一个 BufferedReader 用于读取端口传来的数据val reader = new BufferedReader(new InputStreamReader(socket.getInputStream,
StandardCharsets.UTF_8))//读取数据input = reader.readLine()//当 receiver 没有关闭并且输入数据不为空,则循环发送数据给 Sparkwhile (!isStopped() && input != null) {store(input)input = reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart("restart")}override def onStop(): Unit = {}
}

使用自定义的数据源采集数据

object FileStream {def main(args: Array[String]): Unit = {//1.初始化 Spark 配置信息
val sparkConf = new SparkConf().setMaster("local[*]")
.setAppName("StreamWordCount")//2.初始化 SparkStreamingContextval ssc = new StreamingContext(sparkConf, Seconds(5))
//3.创建自定义 receiver 的 Streaming
val lineStream = ssc.receiverStream(new CustomerReceiver("hadoop102", 9999))//4.将每一行数据做切分,形成一个个单词val wordStream = lineStream.flatMap(_.split("\t"))//5.将单词映射成元组(word,1)
val wordAndOneStream = wordStream.map((_, 1))//6.将相同的单词次数做统计val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)//7.打印wordAndCountStream.print()//8.启动 SparkStreamingContextssc.start()ssc.awaitTermination()}
}

kafka数据源

Kafka 0-10 Direct 模式
1)需求:通过 SparkStreaming 从 Kafka 读取数据,并将读取过来的数据做简单计算,最终打印
到控制台。
2)导入依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
</dependency>
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.1</version></dependency>

3)编写代码


import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils,
LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectAPI {def main(args: Array[String]): Unit = {//1.创建 SparkConfval sparkConf: SparkConf = new
SparkConf().setAppName("ReceiverWordCount").setMaster("local[*]")//2.创建 StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//3.定义 Kafka 参数val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG ->
"linux1:9092,linux2:9092,linux3:9092",ConsumerConfig.GROUP_ID_CONFIG -> "atguigu","key.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" ->
"org.apache.kafka.common.serialization.StringDeserializer")//4.读取 Kafka 数据创建 DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] =
KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara))//5.将每条消息的 KV 取出val valueDStream: DStream[String] = kafkaDStream.map(record => record.value())//6.计算 WordCountvalueDStream.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()//7.开启任务ssc.start()ssc.awaitTermination()}
}

查看 Kafka 消费进度

bin/kafka-consumer-groups.sh --describe --bootstrap-server linux1:9092 --group
atguigu

DStream 转换

无状态转化操作

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。

Transform

Transform 允许 DStream 上执行任意的 RDD-to-RDD 函数。

join

两个流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是
对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

有状态转化操作

UpdateStateByKey

针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey 操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:

  1. 定义状态,状态可以是一个任意的数据类型。
  2. 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更
    新。
    使用 updateStateByKey 需要对检查点目录进行配置,会使用检查点来保存状态

WindowOperations

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
➢ 窗口时长:计算内容的时间范围;

Window 的操作
(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
(4)reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]): 当在一个(K,V)对的 DStream 上调用此函数,会返回一个新(K,V)对的 DStream,此处通过对滑动窗口中批次数据使用 reduce 函数来整合每个 key 的 value 值。
(5)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。
通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。

DStream 输出

输出操作如下:
➢ print():在运行流程序的驱动结点上打印 DStream 中每一批次数据的最开始 10 个元素。这用于开发和调试。在 Python API 中,同样的操作叫 print()。
➢ saveAsTextFiles(prefix, [suffix]):以 text 文件形式存储这个 DStream 的内容。每一批次的存储文件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”。
➢ saveAsObjectFiles(prefix, [suffix]):以 Java 对象序列化的方式将 Stream 中的数据保存为SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]“. Python中目前不可用。
➢ saveAsHadoopFiles(prefix, [suffix]):将 Stream 中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]”。Python API 中目前不可用。
➢ foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream 的每一个RDD。其中参数传入的函数 func 应该实现将每一个 RDD 中数据推送到外部系统,如将
RDD 存入文件或者通过网络将其写入数据库。

优雅关闭

使用外部文件系统来控制内部程序关闭。

【Spark | SparkStreaming】相关推荐

  1. 【Spark篇】---Spark解决数据倾斜问题

    [Spark篇]---Spark解决数据倾斜问题 参考文章: (1)[Spark篇]---Spark解决数据倾斜问题 (2)https://www.cnblogs.com/LHWorldBlog/p/ ...

  2. 【Spark Streaming】(二)DStream 编码实战

    文章目录 一.前言 二.DStream 编程模型 三.DStream 操作 3.1 套接字流:通过监听 Socket 端口来接收数据 3.2 文件流 3.2 RDD队列流 3.4 带状态的处理 Sta ...

  3. 【Spark篇】---SparkStream初始与应用

    一.前述 SparkStreaming是流式处理框架,是Spark API的扩展,支持可扩展.高吞吐量.容错的实时数据流处理,实时数据的来源可以是:Kafka, Flume, Twitter, Zer ...

  4. 【Spark篇】---Spark初始

    一.前述 Spark是基于内存的计算框架,性能要优于Mapreduce,可以实现hadoop生态圈中的多个组件,是一个非常优秀的大数据框架,是Apache的顶级项目.One stack  rule  ...

  5. 【Spark篇】---Spark中Master-HA和historyServer的搭建和应用

    一.前述 本节讲述Spark Master的HA的搭建,为的是防止单点故障. Spark-UI 的使用介绍,可以更好的监控Spark应用程序的执行. 二.具体细节 1.Master HA 1.Mast ...

  6. 【Spark篇】---Spark调优之代码调优,数据本地化调优,内存调优,SparkShuffle调优,Executor的堆外内存调优...

    一.前述 Spark中调优大致分为以下几种 ,代码调优,数据本地化,内存调优,SparkShuffle调优,调节Executor的堆外内存. 二.具体    1.代码调优 1.避免创建重复的RDD,尽 ...

  7. 【Spark笔记】Windows10 本地搭建单机版Spark开发环境

    0x00 环境及软件 1.系统环境 OS:Windows10_x64 专业版 2.所需软件或工具 JDK1.8.0_131 spark-2.3.0-bin-hadoop2.7.tgz hadoop-2 ...

  8. 【Spark Core】【RDD】【01】核心属性 执行原理

    理解RDD 刚从地里挖出来的土豆食材.清洗过后的干净土豆.生薯片.烤熟的薯片,流水线上这些食材的不同形态,就像是 Spark 中 RDD 对于不同数据集合的抽象. RDD 具有 4 大属性,分别是 p ...

  9. 【spark使用】4. Dataset转换算子使用

    1.groupByKey.mapGroups.flatMapGroups结合使用 package com.DataSet;import bean.Dept; import bean.Employee; ...

最新文章

  1. python装饰器-如何理解Python装饰器?
  2. Java读取xml文件的四种方法
  3. boost::mp11::mp_transform_if_q相关用法的测试程序
  4. BackGroundWorker用法
  5. LeetCode 1409. 查询带键的排列(map模拟)
  6. totalspider爬虫批量重启报错Connection Error
  7. vue + elementui 通过父子组件实现弹框
  8. BZOJ 10628 Luogu 2633
  9. linux进阶之gitlab仓库搭建及免密使用
  10. Python入门之面向对象module,library,package之间区别
  11. web基础_$POST 在线http接口测试网址
  12. 树莓派通过网络共享USB设备
  13. 微软拼音变成繁体,如何修改为简体
  14. 【Unity3D】Android 打包 ① ( Android 编译选项 | 安装 Android Build Support 模块 )
  15. 使用Selenium模拟登陆百度盘
  16. ios Objective-c 字体样式大全(UIFont 可设置的)
  17. 基于Quartus II 软件(VHDL)设计
  18. SQLServer数据库备份的使用
  19. 怎么把图片按12345....顺序排列
  20. 【图结构】之图注意力网络GAT详解

热门文章

  1. C# 保存窗口为图片(保存纵断面图)
  2. JAVA进行图片压缩
  3. ERP的实施--把握三大计划
  4. 算法笔记(1)-常用推荐算法总结
  5. STM32的HAL库分析及使用
  6. php twig if,twig基本语法
  7. WebSocket + Redis简单快速实现Web网站单设备登录功能
  8. QlikView 笔记(一) 初次使用时最让我惊喜的函数
  9. CMMI(能力成熟度集成)四个等级
  10. Git与bitbucket简单使用