设置SparkConf的时候不能设置为local,会报错,应当设置成local[N],N>1。这是因为需要一个核接收数据,另一个核处理数据,如果只分配一个线程处理,这个线程会被用来接收数据,就没有办法处理接收到的数据

Spark Streaming快速入门

文章目录

  • 一、Spark Streaming初体验——WordCount
    • 1.1 添加依赖
    • 1.2 wordcount
  • 二、DStreams输入
    • 2.1文件数据源
    • 2.2 RDD队列
    • 2.3自定义输入源
  • 三、DStream原语
    • 3.1 updateStateByKey原语
    • 3.2 mapWithState原语
    • 3.3 updateStateByKey和mapWithState比较
    • 3.4 transform原语——案例:统计网站黑名单
  • 四、SparkStreaming集成Kafka
    • 4.1导入依赖
    • 4.2 SparkStream对接Kafka——Driver方式
    • 4.3 从Kafka获取消息并存储到Redis中(Kafka+SparkSteaming+Redis)
  • 五、window窗口操作

一、Spark Streaming初体验——WordCount

1.1 添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.2.0</version>
</dependency>

1.2 wordcount

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 注意事项:*      这里master一定要写成 local[2] 或者 local[*], 千万不要只写local。*      因为,如果分配一个线程去处理的话,这个线程将被用来接收数据。*      此时,接收到的数据,将没有办法去处理了,因为已经没有资源了。*/
object _03_SparkStreamingWordCount {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[2]").setAppName("streamingWordCount")// 1. 找到程序的入口//    需要配置一个SparkConf,和一个数据采集的时间(即一个批次处理多少时间的数据)val sc: StreamingContext = new StreamingContext(conf, Seconds(5))// 2. 获取实时的数据,从nc获取数据val dStream: ReceiverInputDStream[String] = sc.socketTextStream("qianfeng01", 6666)// 3. 进行数据的处理val res: DStream[(String, Int)] = dStream.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)// 4. 直接输出到控制台res.print()// 5. 开启任务sc.start()// 6. 等待下一个批次的处理sc.awaitTermination()}
}

使用命令nc -lv 端口号进行测试

1.创建StreamingContext对象 同Spark初始化需要创建SparkContext对象一样,使用Spark Streaming就需要创建StreamingContext对象。创建StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master,设定名称(如NetworkWordCount)。需要注意的是参数Seconds(1),Spark Streaming需要指定处理数据的时间间隔,如上例所示的1s,那么Spark Streaming会以1s为时间窗口进行数据处理。此参数需要根据用户的需求和集群的处理能力进行适当的设置;

2.创建InputDStream如同Storm的Spout,Spark Streaming需要指明数据源。如上例所示的socketTextStream,Spark Streaming以socket连接作为数据源读取数据。当然Spark Streaming支持多种不同的数据源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter等数据源;

3.操作DStream对于从数据源得到的DStream,用户可以在其基础上进行各种操作,如上例所示的操作就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源得到的数据首先进行分割,然后利用Map和ReduceByKey方法进行计算,当然最后还有使用print()方法输出结果;

4.启动Spark Streaming之前所作的所有步骤只是创建了执行流程,程序没有真正连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划,当ssc.start()启动后程序才真正进行所有预期的操作。

二、DStreams输入

2.1文件数据源

⽂件数据流:能够读取所有HDFS API兼容的⽂件系统⽂件,通过fileStream⽅法进⾏读取,Spark Streaming 将会监控 dataDirectory ⽬录并不断处理移动进来的⽂件,记住目前不⽀持嵌套目录。(如果监控的文件夹中又进来一个文件夹,那么该文件夹里面的数据不会被监控)

1)⽂件需要有相同的数据格式
2)⽂件进⼊ dataDirectory 的⽅式需要通过移动或者重命名来实现;
3)⼀旦⽂件移动进⽬录,则不能再修改,即便修改了也不会读取新数据;

ps:这个流只需要了解就⾏,效果不是很明显,本地的话也有可能会明显⼀下,所以了解即可我们多⽤于flume来完成⽂件的监控

如果需要访问hdfs文件系统,那么需要将core-site.xml文件添加进resources中,在textFileSystem中直接从hdfs根目录开始写,会自动读取hdfs中的文件

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 将文件作为输入源。* 可以监控一个文件夹,SparkStreaming会检测这个文件夹中的内容发生变化。** ps:*      这个流,只需要了解即可。 这个效果不明显。**/
object _01_InputFile {def main(args: Array[String]): Unit = {val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("input"), Seconds(3))// 作为SparkStreaming监控的目录//val path: String = "file:///C:\\Users\\luds\\Desktop\\data"// 监控这个目录,并生成DStreamval dirStream: DStream[String] = ssc.textFileStream(path)// 处理数据val resDStream: DStream[(String, Int)] = dirStream.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)// 输出数据resDStream.print()ssc.start()ssc.awaitTermination()}
}

2.2 RDD队列

oneAtATime – Whether only one RDD should be consumed from the queue in every interva 是否在每个间隔时间内只有一个RDD能够从队列中取出并被消费

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}import scala.collection.mutableobject _02_RddQueue {def main(args: Array[String]): Unit = {val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("rddQueue"), Seconds(3))// 1. 创建一个RDD的队列val queue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()// 2. 初始化DStream, 监控一个队列// oneAtTime表示在每个间隔中是否每个rdd只能消费一次val rddStream: InputDStream[Int] = ssc.queueStream(queue, oneAtATime = false)// 3. 数据处理: 统计每一个数字出现的次数val resStream: DStream[(Int, Int)] = rddStream.map((_, 1)).reduceByKey(_ + _)// 4. 输出统计结果resStream.print()// 先开启作业ssc.start()// 5. 往队列中,添加RDDfor (i <- 0 to 5) {val rdd: RDD[Int] = ssc.sparkContext.parallelize(1 to 500)queue += rdd        // 添加到队列Thread.sleep(2500)  //让每一个批次都有数据处理}ssc.awaitTermination()}
}

2.3自定义输入源

⾃定义数据源其实就是需要继承Receiver,并实现onStart、onStop⽅法来⾃定义数据源采集。

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socketimport org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver/*** 自定义的输入源*/
object _03_CustomReceiverTest {def main(args: Array[String]): Unit = {val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("custom"), Seconds(3))// 从一个自定义的输入源读取数据,创建DStreamval dStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("qianfeng01", 6666))// 处理数据val res: DStream[(String, Int)] = dStream.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)res.print()res.saveAsTextFiles("C:\\Users\\luds\\Desktop\\output\\myLog_", "log")ssc.start()ssc.awaitTermination()}
}/*** 自定义的输入源,需要继承自 org.apache.spark.streaming.receiver.Receiver* 在继承的时候,需要设置父类Receiver的主构造器* 并重写 onStart() 和 onStop() 方法* 该类含有store()、isStopped()方法*/
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) {// 启动的时候调用的// 读取数据,并将数据发送给Sparkoverride def onStart(): Unit = {new Thread() {override def run(): Unit = {receive()}}.start()}def receive(): Unit = {val socket: Socket = new Socket(host, port)// 实例化一个流,用来读取Socket中的数据val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream, "UTF8"))// 读取数据var line: String = reader.readLine()// 如果receiver还没有停止,就需要继续读取while (line != null && !isStopped()) {// 将接收到的数据块存储到spark内存汇总store(line)line = reader.readLine()}reader.close()socket.close()}// 停止的时候调用override def onStop(): Unit = {}
}

三、DStream原语

(1)Transformations
(2)Output Operations
(3)特殊的原语
updateStateByKey ---- ⽤于记录历史记录
transform
window

这里只说明特殊的原语用法,因为前面两个都在之前的学习中学习过

3.1 updateStateByKey原语

updateStateByKey() 的结果会是⼀个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。
updateStateByKey操作使得我们可以在⽤新信息进⾏更新时保持任意的状态。为使⽤这个功能,你需要做下⾯两步:

  1. 定义状态,状态可以是⼀个任意的数据类型。
  2. 定义状态更新函数,⽤此函数阐明如何使⽤之前的状态和来⾃输⼊流的新值对状态进⾏更新
    使⽤updateStateByKey需要对检查点⽬录进⾏配置,会使⽤检查点来保存状态

该原语只能保存5个批次的数据,保存的文件夹会随时间不断刷新

package day18_SparkStreamingimport org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 使用 updateStateByKey 实现wordcount* 要将每一个批次中处理的结果累加到一起** updateStateByKey: 将之前的批次处理的结果,存储在磁盘上的**/
object _04_UpdateStateByKeyTest {def main(args: Array[String]): Unit = {// 写入hdfs文件系统// System.setProperty("HADOOP_USER_NAME", "root")val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("wc"), Seconds(3))// 1. 如果要使用updateStateByKey的话,需要指定一个临时的文件保存目录//    SparkStreaming在处理的过程中,会将临时的数据,保存在这个目录下。//    保存的就是之前批次的处理结果,历史记录。//    这里,只会保留最近的5个批次的数据。ssc.checkpoint("C:\\Users\\luds\\Desktop\\ck")// ssc.checkpoint("hdfs://host01:9000/2020-10-21/20-25")// 2. 监听数据,创建DStreamval stream: ReceiverInputDStream[String] = ssc.socketTextStream("host01", 6666)// 3. 数据处理val mappedStream: DStream[(String, Int)] = stream.flatMap(_.split("\\s+")).map((_, 1))// val res: DStream[(String, Int)] = mappedStream.updateStateByKey(updateState)val res: DStream[(String, Int)] = mappedStream.updateStateByKey(updateState2 _, new HashPartitioner(ssc.sparkContext.defaultParallelism), rememberPartitioner = true)res.print()ssc.start()ssc.awaitTermination()}// (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)]// Iterator中的参数// String : Key// Seq[Int] : 这个Key对应的出现的次数// Option[Int] : 这个批次中这个Key对应的次数def updateState2(iterator: Iterator[(String, Seq[Int], Option[Int])]): Iterator[(String, Int)] = {iterator.map(tuple => {(tuple._1, tuple._2.sum + tuple._3.getOrElse(0))})}/*** seq: Seq[V]  => 历史记录中的,之前的数据累加的结果。 Seq(1, 1, 1, 1, 1, 1, 1, 1...)* value: Option[S]   => 当前批次的结果*/def updateState(seq: Seq[Int], value: Option[Int]): Option[Int] = {Some(seq.sum + value.getOrElse(0))}
}

测试:nc -lv 6666

3.2 mapWithState原语

mapWithState只返回变化后的key的值,这样做的好处是,可以只是关⼼那些已经发⽣的变化的key,对于没有数据输⼊,则不会返回那些没有变化的key的数据。这样的话,即使数据量很⼤,checkpoint也不会像updateStateByKey那样,占⽤太多的存储,效率⽐较⾼(再⽣产环境中建议使⽤这个)
注意

  1. mapWithState是1.6版本之后推出的;
  2. 必须设置checkpoint来储存历史数据;
  3. mapWithState和updateStateByKey的区别 : 它们是类似的,都是有状态DStream操作, 区别在于,
    updateStateByKey是输出增量数据,随着时间的增加, 输出的数据越来越多,这样会影响计算的效率, 对CPU和内存压⼒较⼤。⽽mapWithState则输出本批次数据,也含有状态更新;(mapWithState会在缓存中维护数据的状态)
  4. checkpoint的数据会分散存储在不同的分区中, 在进⾏状态更新时, ⾸先会对当前 key 做 hash , 再到对应的分区中去更新状态 , 这种⽅式⼤⼤提⾼了效率。

需要用到的函数和参数类型

Params:
spec – Specification of this transformation
Type parameters:
StateType – Class type of the state data
MappedType – Class type of the mapped datadef mapWithState[StateType: ClassTag, MappedType: ClassTag](spec: StateSpec[K, V, StateType, MappedType]): MapWithStateDStream[K, V, StateType, MappedType] = {}Params:
mappingFunction – The function applied on every data item to manage the associated state and generate the mapped data
Type parameters:
ValueType – Class of the values
StateType – Class of the states data
MappedType – Class of the mapped datadef function[KeyType, ValueType, StateType, MappedType](mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType): StateSpec[KeyType, ValueType, StateType, MappedType] = {}

测试代码:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext}object _03_MapWithStateTest {def main(args: Array[String]): Unit = {val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("map"), Seconds(3))// 设置一个文件存储的路径ssc.checkpoint("file:///C:/Users/luds/Desktop/ck")// 监控nc的输入val stream: ReceiverInputDStream[String] = ssc.socketTextStream("host01", 6666)// 简单的处理val mappedStream: DStream[(String, Int)] = stream.flatMap(_.split("\\s+")).map((_, 1))val res: MapWithStateDStream[String, Int, Int, (String, Int)] = mappedStream.mapWithState(StateSpec.function(mapping _))res.stateSnapshots().print()ssc.start()ssc.awaitTermination()}// (KeyType, Option[ValueType], State[StateType]) => MappedTypedef mapping(key: String, value: Option[Int], state: State[Int]): (String, Int) = {// 累加新的单词数量val sum: Int = value.getOrElse(0) + state.getOption().getOrElse(0)// 更新state的值state.update(sum)// 将这个键,对应的新的值返回(key, sum)}
}

测试:nc -lv 6666

3.3 updateStateByKey和mapWithState比较

3.4 transform原语——案例:统计网站黑名单

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** 案例: 过滤网站的黑名单* 预设: 网站访问的格式如下: zhangsan 192.168.10.101 20201010120856675  => (zhangsan, zhangsan 192.168.10.101 20201010120856675)*/
object _05_TransformTest {def main(args: Array[String]): Unit = {val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("transform"), Seconds(3))// 预设一个黑名单val blackList: Array[(String, Boolean)] = Array(("zhangsan", true), ("lisi", true))// 将黑名单做成RDDval blackListRDD: RDD[(String, Boolean)] = ssc.sparkContext.parallelize(blackList)val stream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 6666)// 流中的数据映射,映射成 (用户名, 访问信息)val mappedStream: DStream[(String, String)] = stream.map(line => (line.split(" ")(0), line))// 需要将这个批次的所有的用户访问和黑名单列表join起来// 但是 mappedStream是一个DStream, 不支持和RDD直接joinval filteredStream: DStream[String] = mappedStream.transform(rdd => {// 第一个String: 用户名// 第二个String: 用户访问信息// 第三个Option[Boolean]: 用户在黑名单中的状态val joinedRDD: RDD[(String, (String, Option[Boolean]))] = rdd.leftOuterJoin(blackListRDD)// 过滤掉这个批次的所有访问信息中,存在于黑名单内的数据// joinedRDD.filter(!_._2._2.getOrElse(false))val filteredRDD: RDD[(String, (String, Option[Boolean]))] = joinedRDD.filter(tuple => {if (!tuple._2._2.getOrElse(false)) {true} else {false}})// 将数据还原为初始的访问数据filteredRDD.map(_._2._1)})// filteredStream.print()filteredStream.foreachRDD(rdd =>{val rdd2: RDD[(String, String, String)] = rdd.map(line => {val arr: Array[String] = line.split(" ")(arr(0), arr(1), arr(2))})rdd2.foreach(println)})ssc.start()ssc.awaitTermination()}
}

四、SparkStreaming集成Kafka

有两种方式,分别是Receiver方式(已过时)和Driver方式

4.1导入依赖

     <dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>0.10.2.0</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.2.0</version></dependency>

4.2 SparkStream对接Kafka——Driver方式

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}/*** SparkStream对接Kafka——Driver方式*/
object _01_basic {def main(args: Array[String]): Unit = {val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("streaming2kafka"), Seconds(3))// 准备消费者策略需要的数据// 1. 准备一个所有的topicval topics: Array[String] = Array("streaming2kafka")// 2. 配置消费的属性val configuration: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> "qianfeng01:9092,qianfeng02:9092,qianfeng03:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"enable.auto.commit" -> "true","auto.offset.reset" -> "earliest","group.id" -> "streaming2KafkaGroup")// 创建DStreamval kafkaDStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(ssc,// 本地化策略:// 均匀的将需要消费的topic的每一个分区的数据,分配到每一个SparkStreaming的Executor中LocationStrategies.PreferConsistent,// 消费者策略ConsumerStrategies.Subscribe[String, String](topics, configuration))// 这里需要对数据做一个映射// 因为在上方,得到的Stream中的数据是ConsumerRecord// 需要从这里解出来valuekafkaDStream.map(_.value).print()ssc.start()ssc.awaitTermination()}
}

4.3 从Kafka获取消息并存储到Redis中(Kafka+SparkSteaming+Redis)

HasOffeSetRang类型存储的是所有的offset
手动管理offset

自动提交offset点击这里

import java.utilimport org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPoolConfig}/*** SparkStreaming对接Kafka,完成手动的offset提交** 在程序中维护一个Map,存储每一个tpoic的每一个分区,消费到了哪一个offset。* 每当有消费者在消费的时候,获取到消费的最新的offset,并更新到Map中。* 需要将Map中的数据,在Redis中存储一份。**/
object _02_Offset {def main(args: Array[String]): Unit = {val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("offset"), Seconds(3))// 获取Jedis连接对象val jedis: JedisCluster = RedisConn.getJedis// 准备消费者策略需要的数据// 0. 将消费者组单独列出来val groupid: String = "streaming2KafkaGroup"// 1. 准备一个所有的topicval topics: Array[String] = Array("streaming2kafka")// 2. 配置消费的属性val configuration: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> "host01:9092,host03:9092,host04:9092","key.deserializer" -> classOf[StringDeserializer],      //反射方式反序列化"value.deserializer" -> classOf[StringDeserializer],"enable.auto.commit" -> "true",     //关闭offset自动提交"auto.offset.reset" -> "earliest",      //offset设置到最新的位置,从最新的位置开始消费"group.id" -> "streaming2KafkaGroup"    //设置消费者组id)// 从Redis中查询之前的数据var offsets: Map[TopicPartition, Long] = Offset(jedis, groupid)// 3. 创建DStream//    在消费的时候,分为两种情况: 第一次消费和非第一次消费val kafkaStream: InputDStream[ConsumerRecord[String, String]] = if (offsets.isEmpty) {// offsets中没有记录之前的offset,说明是第一次消费KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, configuration))} else {KafkaUtils.createDirectStream(ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Assign[String, String](offsets.keys, configuration, offsets))}// 4. 处理消息,消费消息kafkaStream.foreachRDD(rdd => {// 模拟消费数据: 直接输出到控制台rdd.map(_.value).foreach(println)// 获取最新的offset,并将其同步给Redisval ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRangesfor (or <- ranges) {jedis.hset(groupid, or.topic + "_" + or.partition, or.untilOffset.toString)}})ssc.start()ssc.awaitTermination()}
}object RedisConn {// 设置主节点信息val hosts:util.Set[HostAndPort] = {val set: util.HashSet[HostAndPort] = new util.HashSet[HostAndPort]()set.add(new HostAndPort("192.168.10.101", 7001))set.add(new HostAndPort("192.168.10.101", 7002))set.add(new HostAndPort("192.168.10.101", 7003))set}// 配置连接池的信息val config: JedisPoolConfig = new JedisPoolConfigconfig.setMaxTotal(20)config.setMaxIdle(10)// 连接Redis集群的对象val cluster: JedisCluster = new JedisCluster(hosts, 10000, config)def getJedis: JedisCluster = cluster
}object Offset {def apply(jedis: JedisCluster, groupid: String): Map[TopicPartition, Long] = {// 1. 实例化一个新的Map,用来存储从Redis查询到的数据var offsets: Map[TopicPartition, Long] = Map[TopicPartition, Long]()// 2. 从Redis查询数据val topicsAndPartitionsAndOffsets: util.Map[String, String] = jedis.hgetAll(groupid)// 3. 引入一个转换包import scala.collection.JavaConversions._// 4. 将Map转成Scala的List集合,方便遍历val list: List[(String, String)] = topicsAndPartitionsAndOffsets.toList// 5. 遍历集合,存入offsetMap中for ((topicAndPartition, offset) <- list) {// 按照下划线,切分,得到topic和partitionval arr: Array[String] = topicAndPartition.split("_")offsets += (new TopicPartition(arr(0), arr(1).toInt) -> offset.toLong)}offsets}
}

五、window窗口操作

这块需要关注窗口的大小步长


案例一

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}/*** window的使用:* 案例: 热搜,统计过去的20秒内,所有的词条搜索量,每隔10秒更新一次**/
object WindowUsage {def main(args: Array[String]): Unit = {// 批次的间隔, 可以设置为毫秒, 使用Milliseconds即可val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("window"), Milliseconds(5000))// 监测nc的输入, 获取一个DStream对象val stream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 6666)// wordcountval res: DStream[(String, Int)] = stream.window(Seconds(20), Seconds(10))   // 设置一个窗口的大小为20秒,滑动的间隔为10秒.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _)// 需求进阶: 需要让每一个window中的次品搜索量,降序排序取top3// res.transform(rdd => {})// res.foreachRDD(rdd => rdd.sortBy(-_._2).take(3))ssc.start()ssc.awaitTermination()}
}

案例二

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object WindowUsage2 {def main(args: Array[String]): Unit = {// 批次的间隔, 可以设置为毫秒, 使用Milliseconds即可val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("window"), Milliseconds(5000))// 监测nc的输入, 获取一个DStream对象val stream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 6666)// wordcount// val res: DStream[(String, Int)] = stream//     .flatMap(_.split("\\s+"))//     .map((_, 1))//     .window(Seconds(20), Seconds(10))   // 设置一个窗口的大小为20秒,滑动的间隔为10秒//     .reduceByKey(_ + _)val res: DStream[(String, Int)] = stream.flatMap(_.split("\\s+")).map((_, 1)).reduceByKeyAndWindow((x: Int, y: Int) => x + y , Seconds(20), Seconds(10))res.print()ssc.start()ssc.awaitTermination()}
}

【大数据开发】SparkStreaming——DStream输入源、原语、SparkStream与Kafka和Redis三者的交互相关推荐

  1. 每日7千次的跨部门任务调度,有赞怎么设计大数据开发平台?

    随着公司规模的增长,对大数据的离线应用开发的需求越来越多,这些需求包括但不限于离线数据同步(MySQL/Hive/Hbase/Elastic Search 等之间的离线同步).离线计算(Hive/Ma ...

  2. 阿里年薪80W+大数据开发技能全套教程(源码+视频)都在这儿!

    随着大数据.云计算.物联网.人工智能这些行业的发展崛起,对于大数据人才的需求越来越大,而大数据人才的培养发展周期相对较长,导致了大数据人才短缺,市场供不应求.所以也就出现了大数据开发工程师.数仓工程师 ...

  3. (视频+源码)助力年后跳槽:对标阿里P8的大数据开发全套教程

    随着人工智能.大数据.云计算.区块链等新技术出现,加速了产业互联网的到来,加速了传统行业产业链快速涌入到互联网的新世界,所以它是未来的大趋势,而大数据是这些基石,万物互联.机器学习都是大数据应用场景! ...

  4. 【首次免费】下载价值16880元转型人工智能、大数据开发全套教程(视频+源码)......

    如今随着环境的改变,物联网.AI.大数据.人工智能等,是未来的大趋势,而大数据是这些基石,万物互联,机器学习都是大数据应用场景! 为什么要学习大数据? 好比问一个程序员的人为什么要学编程! 大数据技术 ...

  5. 大数据开发面试知识点复习3

    文章目录 大数据开发复习课程 10.scala 10.1.scala介绍 10.2.scala解释器 10.3.scala的基本语法 10.3.1.声明变量 10.3.2.字符串 10.3.3.数据类 ...

  6. 【Spark】黑马-大数据开发2

    Scala+Spark-大数据开发复习课程 10.scala 10.1.scala介绍 10.2.scala解释器 10.3.scala的基本语法 10.3.1.声明变量 10.3.2.字符串 10. ...

  7. 小白如何学习大数据开发,大数据学习路线是怎样的?

    零基础的同学学习大数据开发不能急于求成,要分阶段分步骤来一步步完成,科多大数据给大家来分享一下大数据的学习路线是什么?小白该怎么学习大数据呢,大概可以分为四步: 大数据学习资料分享群142973723 ...

  8. 大数据开发实战:数据流图及相关数据技术

    1.大数据流程图 2.大数据各个环节主要技术 2.1.数据处理主要技术 Sqoop:(发音:skup)作为一款开源的离线数据传输工具,主要用于Hadoop(Hive) 与传统数据库(MySql,Pos ...

  9. python工程师干什么的_大数据开发工程师薪资待遇及招聘要求?

     目录: 大数据开发是干什么的? 大数据开发需要掌握哪些技术?学习路线如何? 大数据开发需要掌握数学知识吗? 大数据开发就业前景如何? 大数据开发工程师薪资待遇及招聘要求? 大数据开发是干什么的? 大 ...

最新文章

  1. R语言层次聚类模型示例
  2. java线程 yield_Java线程中yield与join方法的区别
  3. 电脑如何下载python3-python3可以在哪里下载
  4. linux VIRT内存占太大,Java top VIRT 内存占用有关问题
  5. CodeForces - 86D Powerful array(莫队)
  6. 你知道C#中的Lambda表达式的演化过程吗
  7. 前端学习(2457):文章发布
  8. ubuntu下django的项目相关软件安装测试
  9. 禁掉或启用firefox 的 javascript 脚本
  10. 面试官:你的缺点是什么?这样回答漂亮!
  11. hibernate 基础方法(二)【相关配置详解】
  12. java中赛场统分的情况
  13. 苹果企业开发账号申请三步走
  14. 高通骁龙845与骁龙710处理器参数对比分析
  15. BUPT Summer Journey #test11 A
  16. element-ui el-date-picker日期选择器 value-format问题
  17. skipped: maximum number of running instances reached (1)
  18. DM达梦数据库集群之分布式集群(MPP)主备
  19. 王者荣耀进阶教学攻速/移速/减伤机制/视野/意识
  20. 家用计算机如何关机,win7如何快速关机_win7快速关机的方法

热门文章

  1. 为Array对象添加一个去除重复项的方法
  2. 5G关键技术之波束成型
  3. 解决:Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-defin
  4. 最新版HBuilderx + 夜神模拟器 模拟器调试设置
  5. cropper.js 裁剪图片并上传(文档翻译+demo)(转)
  6. 一位清华差生9年的北京生活
  7. Python计算商品复购率
  8. 使用vuejs 2.x (不是nuxt) 做个demo: 使用 vuex, router ( store, action , mutation)
  9. 北航计算机专业录取线,北航各专业录取分数线
  10. 什么是Galil(加利尔)运动控制卡,它是用来干嘛的呢?galil开发文件dmc32.dll,动态链接库,API