spark学习之SparkStreaming
SparkStreaming
Spark Streaming用于流式数据的处理。Spark Streaming支持的数据输入源很多,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后可以用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在很多地方,如HDFS,数据库等。
SparkStreaming概述
数据处理延迟方式实时:数据处理在毫秒级别,秒离线:数据处理延迟以小时,天为单位
数据处理的方式流式:一个一个数据进行处理批处理:一批一批数据进行处理SparkCore: 离线数据分析框架|批处理
SparkStreaming: 基于SparkCore来完成实时数据处理分析(执行场景在离线批处理和实时流式之间,可以称为准实 时,微批次数据处理框架)
SparkStreaming架构
背压机制
调整数据采集能力与数据消费能力
Spark 1.5以前版本,用户如果要限制Receiver的数据接收速率,可以通过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然可以通过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。比如:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会造成资源利用率下降等问题。为了更好的协调数据接收速率与资源处理能力,1.5版本开始Spark Streaming可以动态控制数据接收速率来适配集群数据处理能力。背压机制(即Spark Streaming Backpressure): 根据JobScheduler反馈作业的执行信息来动态调整Receiver数据接收率。
通过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用。
Dstream入门
WordCount案例实操
需求:使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数
添加依赖关系
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version>
</dependency>
idea代码
package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object WordCount02 {def main(args: Array[String]): Unit = {//TODO SparkStreaming流式数据处理//TODO 建立环境val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")//SparkStreaming环境对象的第二个参数表示数据采集周期val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作//创建采集器(一行一行)val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999) //监听本机的9999端口//拿到一个一个的单词val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToCountDS: DStream[(String, Int)] = wordDS.map((_,1)).reduceByKey(_+_)wordToCountDS.print()//启动采集器ssc.start()//等待结束ssc.awaitTermination()}}
在hadoop102启动nc服务,然后启动idea
# nc -lk 9999
# nc -lp 9999
[atguigu@hadoop102 ~]$ nc -lk 9999
pihao
hello
world
Dstream创建
RDD队列
测试过程中,可以通过使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理。
案例实操
循环创建几个RDD,将RDD放入队列。通过SparkStream创建Dstream,计算WordCount
object RDDStream {def main(args: Array[String]) {//1.初始化Spark配置信息val conf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")//2.初始化SparkStreamingContextval ssc = new StreamingContext(conf, Seconds(4))//3.创建RDD队列val rddQueue = new mutable.Queue[RDD[Int]]()//4.创建QueueInputDStreamval inputStream = ssc.queueStream(rddQueue,oneAtATime = false)//5.处理队列中的RDD数据val mappedStream = inputStream.map((_,1))val reducedStream = mappedStream.reduceByKey(_ + _)//6.打印结果reducedStream.print()//7.启动任务ssc.start()//8.循环创建并向RDD队列中放入RDDfor (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}ssc.awaitTermination()}
}
自定义数据源
自定义一个数据源,获取其中的数据
package com.pihao.streamingimport java.util.concurrent.TimeUnitimport org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.receiver.Receiverimport scala.util.Randomobject SparkStreaming_DIY {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//执行操作val ds: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver()) //使用自己定义的接收器ds.print()ssc.start()ssc.awaitTermination()}/*** 自定义数据采集器* 1.继承Receiver,定义泛型,并指定存储级别* 2.重写方法* onStart* onStop*/class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY){private var flag = true//开启接受override def onStart(): Unit = {new Thread(new Runnable {override def run(): Unit = {while(flag){val i: Int = new Random().nextInt(100)store(i+"")TimeUnit.NANOSECONDS.sleep(500)}}}).start()}//关闭接收override def onStop(): Unit = {flag = false}}}
Kafka数据源
版本选型
1.ReceiverAPI:需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。早期版本中提供此方式,当前版本不适用2.DirectAPI:是由计算的Executor来主动消费Kafka的数据,速度由自身控制。(采集和计算都在一个节点,容易控制)
Kafka 0-10 Direct 模式
需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做计算,最终打印到控制台。
提前创建kafka主题
启动zookeeper集群
启动kafka集群# 创建sparkstreaming的topic
[atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --create --topic sparkstreaming --partitions 3 --replication-factor 2[atguigu@hadoop102 ~]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list
__consumer_offsets
sparkstreaming
[atguigu@hadoop102 ~]$
添加依赖
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.12</artifactId><version>3.0.0</version>
</dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId><version>2.10.1</version>
</dependency>
编写代码
package com.pihao.streamingimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object KafkaStream {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作//定义kafka的连接配置val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "spark","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")//Kafka专门用于实时数据生成,提供了很多封装类//ConsumerRecord是一个KVval kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个brokerConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara))//获取ConsumerRecord中的数据val kafkaData: DStream[String] = kafkaDStream.map(_.value())kafkaData.print()ssc.start()ssc.awaitTermination()}}
接收成功
Dstream转换
DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种Window相关的原语。
无状态化操作
无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD。
Transform
Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次(周期性执行)。其实也就是对DStream中的RDD应用转换。
object Transform {def main(args: Array[String]): Unit = {//创建SparkConfval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")//创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(3))//创建DStreamval lineDStream: ReceiverInputDStream[String] = ssc.socketTextStream("host", 9999)//在某些情况,DS对象不能执行所有的操作//如果想要进行特殊操作,那么可以直接通过底层的RDD进行val wordAndCountDStream: DStream[(String, Int)] = lineDStream.transform(rdd => {val words: RDD[String] = rdd.flatMap(_.split(" "))val wordAndOne: RDD[(String, Int)] = words.map((_, 1))val value: RDD[(String, Int)] = wordAndOne.reduceByKey(_ + _)value})//打印wordAndCountDStream.print//启动ssc.start()ssc.awaitTermination()}
}
join
两个流之间的join需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的RDD进行join,与两个RDD的join效果相同。
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object JoinTest {def main(args: Array[String]): Unit = {//1.创建SparkConfval sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("JoinTest")//2.创建StreamingContextval ssc = new StreamingContext(sparkConf, Seconds(5))//3.从端口获取数据创建流val lineDStream1: ReceiverInputDStream[String] = ssc.socketTextStream("linux1", 9999)val lineDStream2: ReceiverInputDStream[String] = ssc.socketTextStream("linux2", 8888)//4.将两个流转换为KV类型val wordToOneDStream: DStream[(String, Int)] = lineDStream1.flatMap(_.split(" ")).map((_, 1))val wordToADStream: DStream[(String, String)] = lineDStream2.flatMap(_.split(" ")).map((_, "a"))//5.流的JOINval joinDStream: DStream[(String, (Int, String))] = wordToOneDStream.join(wordToADStream)//6.打印joinDStream.print()//7.启动任务ssc.start()ssc.awaitTermination()}
}
有状态转化操作
有时,我们需要在DStream中跨批次维护状态(例如流计算中累加wordcount)。需要保存之前每次数据的聚合操作
UpdateStateByKey
package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object SparkStreaming_State {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//检查点应该设置在hdfs中比较稳妥ssc.checkpoint("cp")//TODO 执行操作val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))//reduceByKey方法属于无状态操作的方法//updateStateByKey方法属于有状态操作的方法//方法参数中的第一个参数,表示相同key的value序列//方法参数中的第二个参数,表示相同key缓冲区的数据值val state: DStream[(String, Int)] = wordToOneDS.updateStateByKey((seq: Seq[Int], buffer: Option[Int]) => {val newValue: Int = seq.sum + buffer.getOrElse(0)Option(newValue)})state.print()ssc.start()ssc.awaitTermination()}
}
WindowOperations
所谓的窗口操作,其实就是将多个采集周期的数据一次性进行处理,而不是一个采集周期处理一次。
SparkStreaming总的窗口范围不能切断采集周期,可以是采集周期的整数倍
SparkStreaming总的窗口滑动幅度也应该是采集周期的整数倍
package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object SparkStreaming_window{def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))//window方法可以传递一个参数,表示窗口的范围(时间周期,应该为采集周期的整数倍)//window方法可以传递两个参数,//第一个表示窗口的范围(时间周期,应该为采集周期的整数倍)//第二个表示窗口滑动的步长(时间周期,也是采集周期的整数倍,不传默认是一个一个采集周期)//窗口计算的时间以窗口步长做基础的val windowDS: DStream[(String, Int)] = wordToOneDS.window(Seconds(6),Seconds(3))val resultDS: DStream[(String, Int)] = windowDS.reduceByKey(_+_)resultDS.print()ssc.start()ssc.awaitTermination()}
}
增量计算
增量计算 = 当前窗口的值+ 新进入到窗口的数据-排除掉窗口的数据
使用场合:适合在窗口范围大,滑动浮动小的场合,这样就会有大量的重复数据
package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object Sparkstream_window2 {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))ssc.checkpoint("cp")//TODO 执行操作val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)val wordDS: DStream[String] = socketDS.flatMap(_.split(" "))val wordToOneDS: DStream[(String, Int)] = wordDS.map((_,1))//增量计算//需要设定检查点路径val windowDS: DStream[(String, Int)] = wordToOneDS.reduceByKeyAndWindow((x: Int, y: Int) => {x + y},(x: Int, y: Int) => {x - y},Seconds(6),Seconds(3))windowDS.print()ssc.start()ssc.awaitTermination()}
}
Dstream输出
输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。
#1 print():在运行流程序的驱动结点上打印DStream中每一批次数据的最开始10个元素。这用于开发和调试。在Python API中,同样的操作叫print()。#2 saveAsTextFiles(prefix, [suffix]):以text文件形式存储这个DStream的内容。每一批次的存储文件名基于参数中的prefix和suffix。”prefix-Time_IN_MS[.suffix]”。#3 saveAsObjectFiles(prefix, [suffix]):以Java对象序列化的方式将Stream中的数据保存为 SequenceFiles . 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]". Python中目前不可用。
#4 saveAsHadoopFiles(prefix, [suffix]):将Stream中的数据保存为 Hadoop files. 每一批次的存储文件名基于参数中的为"prefix-TIME_IN_MS[.suffix]"。Python API 中目前不可用。#5 foreachRDD(func):这是最通用的输出操作,即将函数 func 用于产生于 stream的每一个RDD。其中参数传入的函数func应该实现将每一个RDD中数据推送到外部系统,如将RDD存入文件或者通过网络将其写入数据库。通用的输出操作foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform() 有些类似,都可以让我们访问任意RDD。在foreachRDD()中,可以重用我们在Spark中实现的所有行动操作。比如,常见的用例之一是把数据写到诸如MySQL的外部数据库中。
注意:
1)连接不能写在driver层面(序列化)
2)如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
增加foreachPartition,在分区创建(获取)。
优雅的关闭
流式任务需要7*24小时执行,但是有时涉及到升级代码需要主动停止程序,但是分布式程序,没办法做到一个个进程去杀死,所有配置优雅的关闭就显得至关重要了。
使用外部文件系统来控制内部程序关闭。
package com.pihao.streamingimport org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
import org.apache.spark.streaming.dstream.{ ReceiverInputDStream}object SparkStreaming_Close {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("hadoop102",9999)socketDS.print()//stop()方法主要用于停止数据采集和Driver的调度//不可能启动采集后马上停止,所以一般在业务更新或者逻辑更新的场合停止//在main线程中停止是不现实的。应该启动一个新的线程来执行停止操作new Thread(new Runnable {override def run(): Unit = {while(true){Thread.sleep(10000) //时间//这个时间应该是判断数据处理是否可以继续的标记//这个标记应该多线程都可以访问//这个标记一般情况放在第三方的系统中(mysql,redis)val state: StreamingContextState = ssc.getState() //装填if (state == StreamingContextState.ACTIVE){ssc.stop(true,true) //优雅的关闭System.exit(0) //关闭线程}}}}).start()ssc.start()ssc.awaitTermination()}
}
案例
模拟实时生成数据,然后统计分析
package com.pihao.mainimport java.util.Propertiesimport org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}import scala.collection.mutable.ListBuffer
import scala.util.Randomobject SparkStreaming_MockData {def main(args: Array[String]): Unit = {// 创建配置对象val prop = new Properties()// 添加配置prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String,String](prop)val topic = "sparkstreaming"val areas = ListBuffer("华北","华东","华南")val citys = ListBuffer("北京","上海","深圳")while(true){Thread.sleep(2000)//生成数据for (i <- 1 to new Random().nextInt(20)){val area: String = areas(new Random().nextInt(3))val city: String = citys(new Random().nextInt(3))val userId = new Random().nextInt(10)val adId = new Random().nextInt(10)val message = s"${System.currentTimeMillis()} ${area} ${city} ${userId} ${adId}"val record = new ProducerRecord[String,String](topic,message)producer.send(record)}}}
}
package com.pihao.mainimport org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}object KafkaStream {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Streaming")val ssc = new StreamingContext(sparkConf,Seconds(3))//TODO 执行操作//定义kafka的连接配置val kafkaPara: Map[String, Object] = Map[String, Object](ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",ConsumerConfig.GROUP_ID_CONFIG -> "spark","key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer","value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer")//Kafka专门用于实时数据生成,提供了很多封装类val kafkaDStream: InputDStream[ConsumerRecord[String, String]] =KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent, //任务与采集器的位置关系,当前表示自动选择,也可以指定哪个brokerConsumerStrategies.Subscribe[String, String](Set("sparkstreaming"), kafkaPara))//获取广告点击的数据val kafkaData: DStream[AdvClickData] = kafkaDStream.map(data => {val kafkaVal: String = data.value()val datas: Array[String] = kafkaVal.split(" ")AdvClickData(datas(0),datas(1),datas(2),datas(3),datas(4))})// todo 1.周期性获取数据库中最新的黑名单数据val reduceDS: DStream[((String, String, String), Int)] = kafkaData.transform(rdd => {// todo 2.判断当前周期内的数据是否已经在黑名单中// 查询数据库操作selectval blackList = List[String]()val filterRDD: RDD[AdvClickData] = rdd.filter(data => !blackList.contains(data.userId))//todo 3.将一个周期内的数据进行统计filterRDD.map(data => {//将数据组装返回 timestamp这里要换成日期格式((data.timestamp, data.userId, data.adId), 1)}).reduceByKey(_ + _)})//todo 4.判断统计的结果是否超过阈值reduceDS.foreachRDD(rdd => {rdd.foreach {case ((timestamp, userId, adId), sum) => {//如果当前采集周期内的sum都大于100,那么将userId就直接拉入黑名单if (sum>=100){//insert into}else{val oldStateVal = 0 //select 根据userId查询mysql中旧的值val newStateVal = 0 + sumif (newStateVal >= 100){// insert into}else{// update 根据数据库newStateVal}}}}})ssc.start()ssc.awaitTermination()}case class AdvClickData(timestamp: String, area: String, city: String, userId: String, adId: String)
}
spark学习之SparkStreaming相关推荐
- Hadoop学习系列之Hadoop、Spark学习路线(很值得推荐)
Hadoop学习系列之Hadoop.Spark学习路线(很值得推荐) 文章出自:http://www.cnblogs.com/zlslch/p/5448857.html 1 Java基础: 视频方面: ...
- Apache Spark学习:利用Eclipse构建Spark集成开发环境
介绍了如何使用Maven编译生成可直接运行在Hadoop 2.2.0上的Spark jar包,而本文则在此基础上, 介绍如何利用Eclipse构建Spark集成开发环境 . 不建议大家使用eclips ...
- Apache Spark学习:利用Scala语言开发Spark应用程序
Spark内核是由Scala语言开发的,因此使用Scala语言开发Spark应用程序是自然而然的事情.如果你对Scala语言还不太熟悉,可以阅读网络教程 A Scala Tutorial for Ja ...
- Spark学习之Spark调优与调试(7)
Spark学习之Spark调优与调试(7) 1. 对Spark进行调优与调试通常需要修改Spark应用运行时配置的选项. 当创建一个SparkContext时就会创建一个SparkConf实例. 2. ...
- 用Spark学习FP Tree算法和PrefixSpan算法
在FP Tree算法原理总结和PrefixSpan算法原理总结中,我们对FP Tree和PrefixSpan这两种关联算法的原理做了总结,这里就从实践的角度介绍如何使用这两个算法.由于scikit-l ...
- Spark学习(一) -- Spark安装及简介
标签(空格分隔): Spark 学习中的知识点:函数式编程.泛型编程.面向对象.并行编程. 任何工具的产生都会涉及这几个问题: 现实问题是什么? 理论模型的提出. 工程实现. 思考: 数据规模达到一台 ...
- spark学习-58-Spark的EventLoggingListener
1.本次调试查看源代码采用 spark学习-57-Spark下Scala版HBase下的根据权重获取最真实数据http://blog.csdn.net/qq_21383435/article/deta ...
- spark学习-28-Spark数据倾斜问题
文章目录 推荐:先看看这个 spark学习-27-Spark性能调优(2) 目的 数据倾斜调优 简述 数据倾斜发生时的现象 数据倾斜发生的原理 上面说了那么多其实我还是没具体见过什么是数据倾斜了 分析 ...
- spark学习-Spark算子Transformations和Action使用大全(Action章)
spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...
- spark学习-Spark算子Transformations和Action使用大全(Transformations章(二))
spark学习-22-Spark算子Transformations和Action使用大全(Transformations章(一)) http://blog.csdn.net/qq_21383435/a ...
最新文章
- iOS开发中乱用hook可能导致灾难
- 进程与线程 thread (二)——线程概念
- php+反序列化方法,PHP序列化反序列化的方法详解
- 微信iOS 7.0.9版本更新:今天的朋友圈是一片欢乐的海洋!
- [转载] Python基础知识:构造函数中self用法
- 栈Stack的相关操作(java)
- segment fault 至core dump的原因
- window下环境变量立即生效
- 简单 黑苹果dsdt教程_从零开始学黑苹果-进阶安装教程(10.12.6)
- JDK各版本新特性(完整版)
- 番外4. Python OpenCV 中鼠标事件相关处理与常见问题解决方案
- 个人成长语录——我愿永远做一个上进的少年,一个敢于拼搏的人
- home目录权限linux,linux 文件/文件夹权限
- python中mod是什么意思_【python中,mod_python到底做了些什么呢?】mod python 教程
- 在Linux下驱动D-link DFE-530TX(最终稿)(转)
- 全差分放大器(FDA)的基本知识
- mysql的LRU队列详解
- 使用Motrix解决浏览器下载速度慢的问题
- ubuntu18.04 ros-melodic 在安装ros依赖包时总是出现E软件包无法定位
- 美团点评Robust(泛型与热更新方案)