第四天:Spark Streaming
Spark Streaming概述
1. Spark Streaming是什么
Spark Streaming用于流式
数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka
、Flume
、Twitter
、ZeroMQ
和简单的TCP
套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算
。而结果也能保存在很多地方,如HDFS,数据库等。
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流
(discretized stream)作为抽象表示,叫作DStream
。DStream 是随时间推移而收到的数据的序列。在内部,每个时间区间(采集周期)收到的数据都作为 RDD 存在,而DStream是由这些RDD所组成的序列(因此得名离散化
)。
2. Spark Streaming特点
- 易用
- 容错
- 易整合到Spark体系
3. Spark Streaming架构
架构图
多了一个接收器,一个StreamingContext。
整体架构图:
形象图
背压机制
Spark 1.5以前
版本,用户如果要限制Receiver
的数据接收速率,可以通过设置静态配制参数spark.streaming.receiver.maxRate
的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。
为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率
来适配集群数据处理能力。背压机制
(即Spark Streaming Backpressure
):
根据JobScheduler
反馈作业的执行信息来动态调整Receiver数据接收率。 通过属性
spark.streaming.backpressure.enabled来控制是否启用
backpressure`机制,默认值false,即不启用。
第2章 Dstream入门
WordCount案例实操
需求
:使用netcat
工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
添加依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version><!--<scope>provided</scope>--></dependency></dependencies>
scala
package com.atguigu.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by wuyufei on 06/09/2017.*/
object WorldCount {def main(args: Array[String]) {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")// 间隔采集周期, 若干半生类val ssc = new StreamingContext(conf, Seconds(2))ssc.checkpoint("./checkpoint")// Create a DStream that will connect to hostname:port, like localhost:9999, 一行一行的接受数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9000)val linesFile: DStream[String] = ssc.textFileStream("test")//监控文件夹里的内容,然后从别的地方把文件移动到test中即可。不过Flume 也可以做并且做的更好, 一般不会用上述方法。// Split each line into wordsval words: DStream[String] = lines.flatMap(_.split(" "))//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3// Count each word in each batchval pairs: DStream[(String, Int)] = words.map(word => (word, 1))val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _)wordCounts.print()// 启动采集器ssc.start()// Driver等待采集器执行ssc.awaitTermination()//ssc.stop() // 把采集流停止 一般不用 因为是不间断的}
}
测试:
[atguigu@hadoop102 spark]$ nc -lk 9000
hello sowhat
hello hello
hello hello
s s s
注意
:如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件里面的日志级别改成WARN或者ERROR。
WordCount解析
Discretized Stream
是Spark Streaming的基础抽象,代表持续性的数据流和经过各种Spark原语操作后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每个RDD含有一段时间间隔内的数据,如下图:
第三章 Dstream创建
1. RDD队列
用法及说明
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。
需求
:间隔性的发送数据, 间隔性的从内存队列取出数据,统计取出数据%10的结果个数
编写代码
package com.atguigu.streamingimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutable/*** 间隔性的发送数据, 间隔性的从内存队列取出数据,统计取出数据%10的结果个数* */
object QueueRdd {def main(args: Array[String]) {val conf = new SparkConf().setMaster("local[*]").setAppName("QueueRdd")val ssc = new StreamingContext(conf, Seconds(1))// Create the queue through which RDDs can be pushed to// a QueueInputDStream//创建RDD队列val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()// Create the QueueInputDStream and use it do some processing// 创建QueueInputDStreamval inputStream = ssc.queueStream(rddQueue)//处理队列中的RDD数据val mappedStream = inputStream.map(x => (x % 10, 1))val reducedStream = mappedStream.reduceByKey(_ + _)//打印结果reducedStream.print()//启动计算ssc.start()// Create and push some RDDs intofor (i <- 1 to 30) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)//通过程序停止StreamingContext的运行//ssc.stop()}ssc.awaitTermination()}
}
2. 自定义数据源
用法及说明
需要继承Receiver,并实现onStart、onStop方法来自定义数据源采集。
案例实操
需求
:自定义数据源,实现监控某个端口号,获取该端口号内容。
代码:
package com.atguigu.streamingimport java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsetsimport org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 模仿 package org.apache.spark.streaming.dstream.SocketReceiver*/
// String就是接收數據的類型
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {override def onStart(): Unit = {// Start the thread that receives data over a connectionnew Thread("Socket Receiver") {override def run() {receive()}}.start()}override def onStop(): Unit = {// There is nothing much to do as the thread calling receive()// is designed to stop by itself if isStopped() returns false}/** Create a socket connection and receive data until receiver is stopped */private def receive() {var socket: Socket = nullvar userInput: String = nulltry {// Connect to host:portsocket = new Socket(host, port)// Until stopped or connection broken continue readingval reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))userInput = reader.readLine()while (!isStopped && userInput != null) {// socket 接受数据 需要一个结束的信号// 內部的函數,將數據存儲下倆store(userInput)userInput = reader.readLine()}reader.close()socket.close()// Restart in an attempt to connect again when server is active againrestart("Trying to connect again")} catch {case e: java.net.ConnectException =>// restart if could not connect to serverrestart("Error connecting to " + host + ":" + port, e)case t: Throwable =>// restart if there is any other errorrestart("Error receiving data", t)}}
}object CustomReceiver {def main(args: Array[String]) {val conf = new SparkConf().setMaster("local[*]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))// Create a DStream that will connect to hostname:port, like localhost:9999,自定义数据源的操作val lines = ssc.receiverStream(new CustomReceiver("localhost", 9999))// Split each line into wordsval words = lines.flatMap(_.split(" "))//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3// Count each word in each batchval pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)// Print the first ten elements of each RDD generated in this DStream to the consolewordCounts.print()ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate//ssc.stop()}
}
总结:依葫芦画瓢,继承必须的类,重写方法实现自己业务逻辑。
3. Kafka数据源(面试开发重点)
用法及说明
在工程中需要引入Maven工件spark-streaming-kafka-0-8_2.11
来使用它。包内提供的 KafkaUtils
对象可以在StreamingContext
和JavaStreamingContext
中以你的Kafka消息创建出 DStream。
两个核心类
:KafkaUtils、KafkaCluster
案例实操
需求
:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算(WordCount),最终打印到控制台。
简单版
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>sparkStreaming</artifactId><groupId>com.atguigu</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>sparkstreaming_windowWordCount</artifactId><dependencies><!-- 用来连接Kafka的工具类 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>${spark.version}</version></dependency></dependencies><build><finalName>statefulwordcount</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass>com.atguigu.streaming.WorldCount</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build>
</project>
代码
package com.atguigu.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}object WorldCount {def main(args: Array[String]) {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")// 间隔采集周期, 若干半生类val ssc = new StreamingContext(conf, Seconds(2))ssc.checkpoint("./checkpoint")// Create a DStream that will connect to hostname:port, like localhost:9999, 一行一行的接受数据val lines: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9000)// 这里主要是创建好 sparkStream如何跟消费Kafka的 逻辑代码val value: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(ssc, "zpIp:2181", "sowhatGroup", Map("sowhat" -> 3))// Split each line into words 接受到的Kafka数据都是KV对,一般情况下不传K而已,val words: DStream[String] = value.flatMap(_._2.split(" "))//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3// Count each word in each batchval pairs: DStream[(String, Int)] = words.map(word => (word, 1))val wordCounts: DStream[(String, Int)] = pairs.reduceByKey(_ + _)wordCounts.print()// 启动采集器ssc.start()// Driver等待采集器执行ssc.awaitTermination()//ssc.stop() // 把采集流停止 一般不用 因为是不间断的}
}
连接池版
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><artifactId>sparkstreaming_kafka</artifactId><groupId>com.atguigu</groupId><version>1.0-SNAPSHOT</version><dependencies><!-- 提供对象连接池的一种方式 --><dependency><groupId>org.apache.commons</groupId><artifactId>commons-pool2</artifactId><version>2.4.2</version></dependency><!-- 用来连接Kafka的工具类 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency></dependencies><build><finalName>kafkastreaming</finalName><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><configuration><archive><manifest><mainClass>com.atguigu.streaming.KafkaStreaming</mainClass></manifest></archive><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration></plugin></plugins></build>
</project>
KafkaStreaming:
package com.atguigu.streamingimport org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}//单例对象
object createKafkaProducerPool {//用于返回真正的对象池GenericObjectPooldef apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = {val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)//指定了你的kafka对象池的大小val poolConfig = {val c = new GenericObjectPoolConfigval maxNumProducers = 10c.setMaxTotal(maxNumProducers)c.setMaxIdle(maxNumProducers)c}//返回一个对象池new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)}
}object KafkaStreaming {def main(args: Array[String]) {//设置sparkconfval conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")//新建了streamingContextval ssc = new StreamingContext(conf, Seconds(1))//kafka的地址val brobrokers = "192.168.56.150:9092,192.168.56.151:9092,192.168.56.152:9092"//kafka的队列名称val sourcetopic = "source1";//kafka的队列名称val targettopic = "target1";//创建消费者组名var group = "con-consumer-group"//kafka消费者配置val kafkaParam = Map("bootstrap.servers" -> brobrokers, //用于初始化链接到集群的地址"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],//用于标识这个消费者属于哪个消费团体"group.id" -> group,//如果没有初始化偏移量或者当前的偏移量不存在任何服务器上,可以使用这个配置属性//可以使用这个配置,latest自动重置偏移量为最新的偏移量"auto.offset.reset" -> "latest",//如果是true,则这个消费者的偏移量会在后台自动提交"enable.auto.commit" -> (false: java.lang.Boolean),//ConsumerConfig.GROUP_ID_CONFIG);//创建DStream,返回接收到的输入数据val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(sourcetopic), kafkaParam))//每一个stream都是一个ConsumerRecordstream.map(s => ("id:" + s.key(), ">>>>:" + s.value())).foreachRDD(rdd => {//对于RDD的每一个分区执行一个操作rdd.foreachPartition(partitionOfRecords => {// kafka连接池。val pool = createKafkaProducerPool(brobrokers, targettopic)//从连接池里面取出了一个Kafka的连接val p = pool.borrowObject()//发送当前分区里面每一个数据partitionOfRecords.foreach { message => System.out.println(message._2); p.send(message._2, Option(targettopic)) }// 使用完了需要将kafka还回去pool.returnObject(p)})//更新offset信息val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges})ssc.start()ssc.awaitTermination()}
}
第4章 DStream转换
DStream上的操作与RDD的类似,分为Transformations
(转换)和Output Operations
(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
1. 无状态转化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次
上,也就是转化DStream中的每一个RDD。部分无状态转化操作列在了下表中。注意,针对键值对的DStream转化操作(比如 reduceByKey())要添加import StreamingContext._
才能在Scala中使用。
需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上
的。
例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。
package com.atguigu.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by wuyufei on 06/09/2017.*/
object WorldCount {def main(args: Array[String]) {val conf:SparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc:StreamingContext = new StreamingContext(conf, Seconds(1))ssc.checkpoint("./checkpoint")// Create a DStream that will connect to hostname:port, like localhost:9999val lines = ssc.socketTextStream("localhost", 9999)// Split each line into wordsval words = lines.flatMap(_.split(" "))// Count each word in each batchval pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)// 注意此处是无状态的,每次只处理对应的时间间隔数据!!// Print the first ten elements of each RDD generated in this DStream to the consolewordCounts.print()ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate//ssc.stop()}
}
2. 有状态转化操作
1. UpdateStateByKey
UpdateStateByKey原语用于记录历史
记录,有时,我们需要在DStream中跨批次
维护状态(例如流计算中累加wordcount)。针对这种情况,updateStateByKey()
为我们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的DStream
,其内部的RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey操作使得我们可以在用新信息进行更新时保持任意的状态。为使用这个功能,需要做下面两步:
- 定义状态,状态可以是一个任意的数据类型。
- 定义状态更新函数,用此函数阐明如何使用之前的状态和来自输入流的新值对状态进行更新。
使用updateStateByKey
需要对检查点目录进行配置,会使用检查点来保存状态。 - 整体过程有点类似 SparkSQL中的 自定义函数求均值,其中中间和跟个数的存储,不过这里是存储早checkpoint中,保存到硬盘。
更新版的 wordcount
package com.atguigu.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** Created by wuyufei on 06/09/2017.*/
object WorldCount {def main(args: Array[String]) {val conf:SparkConf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc:StreamingContext = new StreamingContext(conf, Seconds(1))ssc.checkpoint("./checkpoint")// Create a DStream that will connect to hostname:port, like localhost:9999val lines = ssc.socketTextStream("localhost", 9999)// Split each line into wordsval words = lines.flatMap(_.split(" "))// Count each word in each batchval pairs = words.map(word => (word, 1))// 上面给每个数据提供了 个数1, 然后 根据key 分组后就有个Seq[Int]的数据,然后不同的时间段数据需要累计 需要一个中间缓冲变量 buffer .val wordCounts: DStream[(String, Int)] = pairs.updateStateByKey {case (seq, buffer) => {val sum: Int = buffer.getOrElse(0) + seq.sumOption(sum)}}// Print the first ten elements of each RDD generated in this DStream to the consolewordCounts.print()ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate//ssc.stop()}
}
2. WindowOperations
Window Operations可以设置窗口的大小
和滑动窗口的间隔
来动态的获取当前Steaming的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长。
- 窗口时长:计算内容的时间范围;
- 滑动步长:隔多久触发一次计算。
注意
:这两者都必须为批次大小的整数倍。
如下图所示WordCount案例:窗口大小为批次的2倍,滑动步等于批次大小。
WordCount第三版:3秒一个批次,窗口12秒,滑步6秒。
package com.atguigu.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object WorldCount {def main(args: Array[String]) {val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(3))ssc.checkpoint("./ck")// Create a DStream that will connect to hostname:port, like localhost:9999val lines = ssc.socketTextStream("hadoop102", 9999)// Split each line into wordsval words = lines.flatMap(_.split(" "))// Count each word in each batchval pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6))// Print the first ten elements of each RDD generated in this DStream to the consolewordCounts.print()ssc.start() // Start the computationssc.awaitTermination() // Wait for the computation to terminate}
}
关于Window的操作还有如下方法:
- window(windowLength, slideInterval):
基于对源DStream窗化的批次进行计算返回一个新的Dstream;
- countByWindow(windowLength, slideInterval):
返回一个滑动窗口计数流中的元素个数;
- reduceByWindow(func, windowLength, slideInterval):
通过使用自定义函数整合滑动区间流元素来创建一个新的单元素流;
- reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):
当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用reduce函数来整合每个key的value值。
- reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]):
这个函数是上述函数的变化版本,每个窗口的reduce值都是通过用前一个窗的reduce值来递增计算。通过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对keys的“加”“减”计数。通过前边介绍可以想到,这个函数只适用于”可逆的reduce函数”,也就是这些reduce函数有相应的”反reduce”函数(以参数invFunc形式传入)。如前述函数,reduce任务的数量通过可选参数来配置。
al ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow({(x, y) => x + y},{(x, y) => x - y},Seconds(30),Seconds(10))//加上新进入窗口的批次中的元素 //移除离开窗口的老批次中的元素 //窗口时长// 滑动步长
countByWindow()
和countByValueAndWindow()
作为对数据进行计数操作的简写。countByWindow()
返回一个表示每个窗口中元素个数的DStream,而countByValueAndWindow()
返回的DStream
则包含窗口中每个值的个数。
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
3. 其他重要操作
1. Transform
Transform原语允许DStream上执行任意的RDD-to-RDD函数,即使这些函数并没有在DStream的API中暴露处理,通过改函数可以方便的扩展Spark API,改函数每一批次调用一次,其实也就是DStream中的RDD应用转换, 比如下面的例子,单词统计要过滤掉spam信息
val spamRDD = ssc.sparkContext.newAPIHadoopRDD()
val cleanDStream = wordCounts.transform{rdd=> {rdd.join(spamInfoRDD).filter()}
}
---
关键是理解 执行次数的不同!
// 转换
// TODO 代码 Driver 执行 1次
val a = 1
socketLineDStream.map{case x =>{// TODO executor执行 n次val a = 1 // 执行N 次x}
}// TODO Driver中执行一次
socketLineDStream.transform{case rdd=>{// TODO Driver 执行 执行 周期次rdd.map{case x=> {// todo Executor 执行 N次x}}}
}
2. Join
连接操作(leftOuterJoin、rightOutJoin、fullOuterJoin也可以),可以连接Stream-Stream,windows-Stream to windows-stream
第5章 DStream输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。
输出操作如下:
- print():
在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。
- saveAsTextFiles(prefix, [suffix]):
以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。
- saveAsObjectFiles(prefix, [suffix]):
以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
- saveAsHadoopFiles(prefix, [suffix]):
将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。
- foreachRDD(func):
这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。
注意
:
6. 数据库的连接不能写在driver层面(因为链接无法序列化)
7. 如果写在foreach则每个RDD中的每一条数据都创建,得不偿失
8. 增加foreachPartition,在分区创建(获取)
参考
Spark全套资料
第四天:Spark Streaming相关推荐
- Spark(四)— Spark Streaming
Spark(四)- Spark Streaming 一.概述 二.使用 2.1 基础环境 (wordcount测试) 2.2 DStream输出 2.3 自定义采集器 - 对接Kafka 2.4 DS ...
- 编程实现将rdd转换为dataframe:源文件内容如下(_第四篇|Spark Streaming编程指南(1)
Spark Streaming是构建在Spark Core基础之上的流处理框架,是Spark非常重要的组成部分.Spark Streaming于2013年2月在Spark0.7.0版本中引入,发展至今 ...
- spark算子大全glom_(七)Spark Streaming 算子梳理 — repartition算子
目录 天小天:(一)Spark Streaming 算子梳理 - 简单介绍streaming运行逻辑 天小天:(二)Spark Streaming 算子梳理 - flatMap和mapPartitio ...
- 大数据求索(8):Spark Streaming简易入门一
大数据求索(8):Spark Streaming简易入门一 一.Spark Streaming简单介绍 Spark Streaming是基于Spark Core上的一个应用程序,可伸缩,高吞吐,容错( ...
- Kafka:ZK+Kafka+Spark Streaming集群环境搭建(十二)VMW安装四台CentOS,并实现本机与它们能交互,虚拟机内部实现可以上网。...
Centos7出现异常:Failed to start LSB: Bring up/down networking. 按照<Kafka:ZK+Kafka+Spark Streaming集群环境搭 ...
- 【Spark Streaming】(四)基于 Spark Structured Streaming 的开发与数据处理
文章目录 一.前言 二.Spark Streaming vs Structured Streaming 2.1 Spark Streaming 2.2 Structured Streaming 2.3 ...
- Spark Streaming 实战案例(四) Spark Streaming 缓存、Checkpoint机制
主要内容 Spark Stream 缓存 Checkpoint 案例 1. Spark Stream 缓存 通过前面一系列的课程介绍,我们知道DStream是由一系列的RDD构成的,它同一般的RDD一 ...
- sparkstreaming监听hdfs目录如何终止_四十六、Spark Streaming简介及入门
1.什么是Spark Streaming Spark Streaming是基于Spark Core之间的实时计算框架,可以从很多数据源消费数据并对数据进行处理.它是Spark核心API的一个扩展与封装 ...
- Spark Streaming简介 (三十四)
Spark Streaming简介 Spark Streaming 是 Spark 提供的对实时数据进行流式计算的组件.它是 Spark 核心 API 的一个扩展,具有吞吐量高.容错能力强的实时流数据 ...
- [Spark]Spark Streaming 指南四 输入DStreams和Receivers
1. 输入DStream与Receiver 输入DStreams表示从源中获取输入数据流的DStreams.在指南一示例中,lines表示输入DStream,它代表从netcat服务器获取的数据流.每 ...
最新文章
- 盘点——那些你不能不知道的自动化测试面试题
- AI综述专栏 | 神经科学启发的人工智能
- 成功解决tensorflow\contrib\learn\python\learn\datasets\mnist.py: maybe_download (from tensorflow.contri
- 深度解读NLP文本情感分析Pipeline
- bash下: () {} [] [[]] (())的解释
- maven 打包普通java配置_配置pom.xml用maven打包java工程的方法(推荐)
- mysql lenenc int_MySQL-NonMySQL同步工具源码解读——确定同步位置
- linux qt sql,linux qt联接sqlserver怎么配置服务器
- 散点图 横纵坐标_厉害了我的Python!散点图还能这么画
- 【设计模式】—— 解释器模式Interpret
- HashMap为什么是线程不安全的
- (11)Redis------分布式锁的实现方式之一(基于Springboot项目搭建)
- hp打印机一直显示正在打印中_安装惠普打印机出现“新设备现已连接”一直不动怎么办?...
- 如何学好3D游戏引擎编程
- wps怎样删除空白页 WPS文档的空白页如何删除
- ME525+刷机2.3.6版本过程分享
- 2021年中国百强区总体发展概况分析:深圳南山区、广州天河区、深圳福田区等城区高质量发展水平领跑全国[图]
- 纳什均衡与极大极小值算法
- 程序员必备技能之约会倍增术
- 解决之道:从互联网安全到IoT安全,如何关上潘多拉魔盒?
热门文章
- DS18B20 数字温度传感器的使用和基于RT-Thread操作系统的实现
- 郑捷《机器学习算法原理与编程实践》学习笔记(第一章 机器学习基础)
- 郑捷《机器学习算法原理与编程实践》学习笔记(第四章 推荐系统原理)(一)推荐系统概述...
- Seaborn使用violinplot函数可视化分组小提琴图(violin plot)、使用inner函数设置在小提琴图中使用虚线显示分位数位置(inner = ‘quartile‘)
- web错误代码ERR_BLOCKED_BY_RESPONSE
- nmap下载和扫描教程
- 更新 mac 系统,clion 不能用
- css实现3D书本翻页动画
- JavaWeb的学习(上)
- unity 场景背景替换2D图片方法