SparkStreaming

Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。

SparkStreaming概述

数据处理延迟方式实时:数据处理在毫秒级别,秒离线:数据处理延迟以小时,天为单位
数据处理的方式流式:一个一个数据进行处理批处理:一批一批数据进行处理SparkCore:       离线数据分析框架|批处理
SparkStreaming:  基于SparkCore来完成实时数据处理分析(执行场景在离线批处理和实时流式之间,可以称为准实                   时,微批次数据处理框架)

SparkStreaming架构

背压机制

调整数据采集能力与数据消费能力

Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。

Dstream入门

WordCount案例实操

需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

添加依赖关系

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version>
</dependency>

idea代码

package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object WordCount02 {def main(args: Array[String]): Unit = {//TODO SparkStreaming流式数据处理//TODO 建立环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")//SparkStreaming环境对象的第二个参数表示数据采集周期val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作//创建采集器(一行一行)val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) //监听本机的9999端口//拿到一个一个的单词val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToCountDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_+_)wordToCountDS.print()//启动采集器ssc.start()//等待结束ssc.awaitTermination()}}

在hadoop102启动nc服务,然后启动idea

# nc -lk 9999
# nc -lp 9999
[atguigu@hadoop102 ~]$ nc -lk 9999
pihao
hello
world

Dstream创建

RDD队列

测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。

案例实操

循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount

object RDDStream {def main(args: Array[String]) {//1.初始化Spark配置信息val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")//2.初始化SparkStreamingContextval ssc = new StreamingContext(conf, Seconds(4))//3.创建RDD队列val rddQueue = new mutable.Queue[RDD[Int]]()//4.创建QueueInputDStreamval inputStream = ssc.queueStream(rddQueue,oneAtATime = false)//5.处理队列中的RDD数据val mappedStream = inputStream.map((_,1))val reducedStream = mappedStream.reduceByKey(_ + _)//6.打印结果reducedStream.print()//7.启动任务ssc.start()//8.循环创建并向RDD队列中放入RDDfor (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}

自定义数据源

自定义一个数据源,获取其中的数据

package com.pihao.streamingimport java.util.concurrent.TimeUnitimport org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiverimport scala.util.Randomobject SparkStreaming_DIY {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//执行操作val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver()) //使用自己定义的接收器ds.print()ssc.start()ssc.awaitTermination()}/*** 自定义数据采集器* 1.继承Receiver,定义泛型,并指定存储级别* 2.重写方法*    onStart*    onStop*/class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){private var flag = true//开启接受override def onStart(): Unit = {new Thread(new Runnable {override def run(): Unit = {while(flag){val i: Int = new Random().nextInt(100)store(i+"")TimeUnit.NANOSECONDS.sleep(500)}}}).start()}//关闭接收override def onStop(): Unit = {flag = false}}}

Kafka数据源

版本选型

1.ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用2.DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。(采集和计算都在一个节点,容易控制)

Kafka 0-10 Direct 模式

需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做计算,最终打印到控制台。

提前创建kafka主题

启动zookeeper集群
启动kafka集群# 创建sparkstreaming的topic
[atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic sparkstreaming --partitions 3 --replication-factor 2[atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
__consumer_offsets
sparkstreaming
[atguigu@hadoop102 ~]$ 

添加依赖

<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
</dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.1</version>
</dependency>

编写代码

package com.pihao.streamingimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object KafkaStream {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作//定义kafka的连接配置val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "spark","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")//Kafka专门用于实时数据生成,提供了很多封装类//ConsumerRecord是一个KVval kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个brokerConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara))//获取ConsumerRecord中的数据val kafkaData: DStream[String] = kafkaDStream.map(_.value())kafkaData.print()ssc.start()ssc.awaitTermination()}}

接收成功

Dstream转换

DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。

无状态化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。

Transform

Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次(周期性执行)。其实也就是对DStream中的RDD应用转换。

object Transform {def main(args: Array[String]): Unit = {//创建SparkConfval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("host", 9999)//在某些情况,DS对象不能执行所有的操作//如果想要进行特殊操作,那么可以直接通过底层的RDD进行val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {val words: RDD[String] = rdd.flatMap(_.split(" "))val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value})//打印wordAndCountDStream.print//启动ssc.start()ssc.awaitTermination()}
}

join

两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同。

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object JoinTest {def main(args: Array[String]): Unit = {//1.创建SparkConfval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JoinTest")//2.创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(5))//3.从端口获取数据创建流val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)//4.将两个流转换为KV类型val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))//5.流的JOINval joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)//6.打印joinDStream.print()//7.启动任务ssc.start()ssc.awaitTermination()}
}

有状态转化操作

有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。需要保存之前每次数据的聚合操作

UpdateStateByKey

package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming_State {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//检查点应该设置在hdfs中比较稳妥ssc.checkpoint("cp")//TODO 执行操作val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))//reduceByKey方法属于无状态操作的方法//updateStateByKey方法属于有状态操作的方法//方法参数中的第一个参数,表示相同key的value序列//方法参数中的第二个参数,表示相同key缓冲区的数据值val state: DStream[(String, Int)] = wordToOneDS.updateStateByKey((seq: Seq[Int], buffer: Option[Int]) => {val newValue: Int = seq.sum + buffer.getOrElse(0)Option(newValue)})state.print()ssc.start()ssc.awaitTermination()}
}

WindowOperations

所谓的窗口操作,其实就是将多个采集周期的数据一次性进行处理,而不是一个采集周期处理一次。

SparkStreaming总的窗口范围不能切断采集周期,可以是采集周期的整数倍

SparkStreaming总的窗口滑动幅度也应该是采集周期的整数倍

package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object SparkStreaming_window{def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))//window方法可以传递一个参数,表示窗口的范围(时间周期,应该为采集周期的整数倍)//window方法可以传递两个参数,//第一个表示窗口的范围(时间周期,应该为采集周期的整数倍)//第二个表示窗口滑动的步长(时间周期,也是采集周期的整数倍,不传默认是一个一个采集周期)//窗口计算的时间以窗口步长做基础的val windowDS: DStream[(String, Int)] = wordToOneDS.window(Seconds(6),Seconds(3))val resultDS: DStream[(String, Int)] = windowDS.reduceByKey(_+_)resultDS.print()ssc.start()ssc.awaitTermination()}
}

增量计算

增量计算 = 当前窗口的值+ 新进入到窗口的数据-排除掉窗口的数据

使用场合:适合在窗口范围大,滑动浮动小的场合,这样就会有大量的重复数据

package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object Sparkstream_window2 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))ssc.checkpoint("cp")//TODO 执行操作val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))//增量计算//需要设定检查点路径val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow((x: Int, y: Int) => {x + y},(x: Int, y: Int) => {x - y},Seconds(6),Seconds(3))windowDS.print()ssc.start()ssc.awaitTermination()}
}

Dstream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。

#1 print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。#2 saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。#3 saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
#4 saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。#5 foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。
注意:
1)连接不能写在driver层面(序列化)
2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
增加foreachPartition,在分区创建(获取)。

优雅的关闭

流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。

使用外部文件系统来控制内部程序关闭。

package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
import org.apache.spark.streaming.dstream.{ ReceiverInputDStream}object SparkStreaming_Close {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)socketDS.print()//stop()方法主要用于停止数据采集和Driver的调度//不可能启动采集后马上停止,所以一般在业务更新或者逻辑更新的场合停止//在main线程中停止是不现实的。应该启动一个新的线程来执行停止操作new Thread(new Runnable {override def run(): Unit = {while(true){Thread.sleep(10000) //时间//这个时间应该是判断数据处理是否可以继续的标记//这个标记应该多线程都可以访问//这个标记一般情况放在第三方的系统中(mysql,redis)val state: StreamingContextState = ssc.getState() //装填if (state == StreamingContextState.ACTIVE){ssc.stop(true,true) //优雅的关闭System.exit(0) //关闭线程}}}}).start()ssc.start()ssc.awaitTermination()}
}

案例

模拟实时生成数据,然后统计分析

package com.pihao.mainimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import scala.collection.mutable.ListBuffer
import scala.util.Randomobject SparkStreaming_MockData {def main(args: Array[String]): Unit = {// 创建配置对象val prop = new Properties()// 添加配置prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String,String](prop)val topic = "sparkstreaming"val areas = ListBuffer("华北","华东","华南")val citys = ListBuffer("北京","上海","深圳")while(true){Thread.sleep(2000)//生成数据for (i <- 1 to new Random().nextInt(20)){val area: String = areas(new Random().nextInt(3))val city: String = citys(new Random().nextInt(3))val userId = new Random().nextInt(10)val adId = new Random().nextInt(10)val message = s"${System.currentTimeMillis()} ${area} ${city} ${userId} ${adId}"val record = new ProducerRecord[String,String](topic,message)producer.send(record)}}}
}
package com.pihao.mainimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object KafkaStream {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作//定义kafka的连接配置val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "spark","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")//Kafka专门用于实时数据生成,提供了很多封装类val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个brokerConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara))//获取广告点击的数据val kafkaData: DStream[AdvClickData] = kafkaDStream.map(data => {val kafkaVal: String = data.value()val datas: Array[String] = kafkaVal.split(" ")AdvClickData(datas(0),datas(1),datas(2),datas(3),datas(4))})// todo 1.周期性获取数据库中最新的黑名单数据val reduceDS: DStream[((String, String, String), Int)] = kafkaData.transform(rdd => {// todo 2.判断当前周期内的数据是否已经在黑名单中// 查询数据库操作selectval blackList = List[String]()val filterRDD: RDD[AdvClickData] = rdd.filter(data => !blackList.contains(data.userId))//todo 3.将一个周期内的数据进行统计filterRDD.map(data => {//将数据组装返回 timestamp这里要换成日期格式((data.timestamp, data.userId, data.adId), 1)}).reduceByKey(_ + _)})//todo 4.判断统计的结果是否超过阈值reduceDS.foreachRDD(rdd => {rdd.foreach {case ((timestamp, userId, adId), sum) => {//如果当前采集周期内的sum都大于100,那么将userId就直接拉入黑名单if (sum>=100){//insert into}else{val oldStateVal = 0 //select 根据userId查询mysql中旧的值val newStateVal = 0 + sumif (newStateVal >= 100){// insert into}else{// update 根据数据库newStateVal}}}}})ssc.start()ssc.awaitTermination()}case class AdvClickData(timestamp: String, area: String, city: String, userId: String, adId: String)
}

spark学习之SparkStreaming相关推荐

  1. Hadoop学习系列之Hadoop、Spark学习路线(很值得推荐)

    Hadoop学习系列之Hadoop.Spark学习路线(很值得推荐) 文章出自:http://www.cnblogs.com/zlslch/p/5448857.html 1 Java基础: 视频方面: ...

  2. Apache Spark学习:利用Eclipse构建Spark集成开发环境

    介绍了如何使用Maven编译生成可直接运行在Hadoop 2.2.0上的Spark jar包,而本文则在此基础上, 介绍如何利用Eclipse构建Spark集成开发环境 . 不建议大家使用eclips ...

  3. Apache Spark学习:利用Scala语言开发Spark应用程序

    Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情.如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Ja ...

  4. Spark学习之Spark调优与调试(7)

    Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项. 当创建一个SparkContext时就会创建一个SparkConf实例. 2. ...

  5. 用Spark学习FP Tree算法和PrefixSpan算法

    在FP Tree算法原理总结和PrefixSpan算法原理总结中,我们对FP Tree和PrefixSpan这两种关联算法的原理做了总结,这里就从实践的角度介绍如何使用这两个算法.由于scikit-l ...

  6. Spark学习(一) -- Spark安装及简介

    标签(空格分隔): Spark 学习中的知识点:函数式编程.泛型编程.面向对象.并行编程. 任何工具的产生都会涉及这几个问题: 现实问题是什么? 理论模型的提出. 工程实现. 思考: 数据规模达到一台 ...

  7. spark学习-58-Spark的EventLoggingListener

    1.本次调试查看源代码采用 spark学习-57-Spark下Scala版HBase下的根据权重获取最真实数据http://blog.csdn.net/qq_21383435/article/deta ...

  8. spark学习-28-Spark数据倾斜问题

    文章目录 推荐:先看看这个 spark学习-27-Spark性能调优(2) 目的 数据倾斜调优 简述 数据倾斜发生时的现象 数据倾斜发生的原理 上面说了那么多其实我还是没具体见过什么是数据倾斜了 分析 ...

  9. spark学习-Spark算子Transformations和Action使用大全(Action章)

    spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...

  10. spark学习-Spark算子Transformations和Action使用大全(Transformations章(二))

    spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...

最新文章

  1. iOS开发中乱用hook可能导致灾难
  2. 进程与线程 thread (二)——线程概念
  3. php+反序列化方法,PHP序列化反序列化的方法详解
  4. 微信iOS 7.0.9版本更新:今天的朋友圈是一片欢乐的海洋!
  5. [转载] Python基础知识:构造函数中self用法
  6. 栈Stack的相关操作(java)
  7. segment fault 至core dump的原因
  8. window下环境变量立即生效
  9. 简单 黑苹果dsdt教程_从零开始学黑苹果-进阶安装教程(10.12.6)
  10. JDK各版本新特性(完整版)
  11. 番外4. Python OpenCV 中鼠标事件相关处理与常见问题解决方案
  12. 个人成长语录——我愿永远做一个上进的少年,一个敢于拼搏的人
  13. home目录权限linux,linux 文件/文件夹权限
  14. python中mod是什么意思_【python中,mod_python到底做了些什么呢?】mod python 教程
  15. 在Linux下驱动D-link DFE-530TX(最终稿)(转)
  16. 全差分放大器(FDA)的基本知识
  17. mysql的LRU队列详解
  18. 使用Motrix解决浏览器下载速度慢的问题
  19. ubuntu18.04 ros-melodic 在安装ros依赖包时总是出现E软件包无法定位
  20. 美团点评Robust(泛型与热更新方案)

热门文章

  1. 渗透测试工程师(实习生)面试题目
  2. cd40系列芯片_CD4068_CD4068PDF资料详细参数下载_Powered by 奥伟斯
  3. 初秋最佳运动蓝牙耳机推荐,100-500这几款防水蓝牙耳机可以试试
  4. 一场面试过后—移动前端开发
  5. 吉林大学linux校园网客户端 64位操作系统不能上网解决方案
  6. 单片机驱动DM9000
  7. 网卡驱动DM9000-基于uboot
  8. 车牌识别算法实现及其代码实现之三:车牌识别
  9. java事务和分布式事务详解
  10. 在函数前面加上WINAPI、CALLBACK