文章目录

  • 前言
  • 概述
  • Spark和MR的数据处理流程对比
  • Spark的组成示意图
  • Spark模块
  • Spark特点
  • Spark的运行模式
  • Spark官方测试案例
  • SparkWebUI
  • Spark通用运行简易流程
  • Spark核心概念
  • RDD特点
  • WordCount案例
  • 数据分区
  • 算子
  • 转换算子
  • 行动算子
  • 序列化
  • 血缘关系:
  • RDD的持久化和检查点:
  • RDD的分区器:
  • 文件数据的读取和存储
  • 广播变量:
  • 累加器:
  • 自定义累加器:
  • 案例:练习: 计算每个省份广告点击量的TopN
  • SparkSQL
  • RDD和DataFrame的交互:
  • DataSet
  • RDD、DataFrame、DataSet转换关系图
  • SparkSql自定义函数
  • SparkSql的文件读取
  • Spark整合外部Hive
  • SparkStreaming(流式处理)
  • SparkStreaming架构
  • SparkStreaming WordCount案例
  • DStream创建
  • 自定义数据源:
  • Kafka数据源
  • DStream的转换:
  • SparkStreaming的window操作
  • DStream的输出:
  • SparkStreaming中累加器和广播变量
  • GitHub:
  • DF常用操作:
  • 总结

前言

Spark作为一代比较成熟的大数据计算引擎,在其大数据领域占据一席之地,Spark应该是每个在大数据领域奋斗的人必不可少的技能。
本文分享本菜鸟的Spark学习笔记,笔记中偏实操性的内容比较多,原理性的东西比较少,对新手来说可能有点不大友好。整理的东西相对来说有点多,显得有些乱,请谅解!
本菜鸟QQ:599903582
笨鸟先飞,熟能生巧。
比心心 ~


提示:以下是本篇文章正文内容,下面案例可供参考

概述

Spark在多任务之间基于内存处理。
计算单元:RDD
Spark相较于MapReduce提供了丰富的数据处理模型,更方便在并行场合下使用

Spark和MR的数据处理流程对比


Spark的组成示意图


Spark模块


Spark特点

快速、易用、通用、兼容性

Spark的运行模式

Local
Standalone
Yarn  yarn-client yarn-cluster(生产环境)
Mesosbin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
./examples/jars/spark-examples_2.11-2.1.1.jar 100

Spark官方测试案例

bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local[2] \
./examples/jars/spark-examples_2.11-2.1.1.jar 100--master  运行环境  local  一个线程
local[K]  k个线程
local[*]  CPU核数线程

SparkWebUI

http://hadoop201:4040

Spark通用运行简易流程


Spark核心概念

Master
Worker
DriverProgram
Executor
RDDs
ClusterManagers
DeployMode  运行模式
Task:任务。每一个分区分配一个Task
Job:一个job对应一个action
Stage:阶段,上一阶段结束才能开始下阶段;Stage由Task组成
DAG: 有向无环图

RDD特点

弹性
分区
只读
依赖
缓存
checkpoint

WordCount案例

导入pom文件:
<dependencies>    <dependency>       <groupId>org.apache.spark</groupId>        <artifactId>spark-core_2.11</artifactId>        <version>2.1.1</version>    </dependency>
</dependencies><build>    <plugins>        <!-- 打包插件, 否则 scala 类不会编译并打包进去 -->        <plugin>            <groupId>net.alchim31.maven</groupId>           <artifactId>scala-maven-plugin</artifactId>            <version>3.4.6</version>            <executions>                <execution>                    <goals>                        <goal>compile</goal>                        <goal>testCompile</goal>                   </goals>                </execution>           </executions>        </plugin>    </plugins>
</build>object WordCount {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")val sc: SparkContext = new SparkContext(conf)sc.textFile("input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect().foreach(println)sc.stop()}
}

数据分区

//在内存中创建RDD,分区数默认为当前环境cores(cpu核数)的值。
val inputRDD: RDD[Int] = sc.parallelize(list)
val inputRDD: RDD[Int] = sc.makeRDD(list)
//也可以通过第二个参数指定分区数。
val inputRDD: RDD[Int] = sc.makeRDD(list,2)//通过文件创建RDD,分区数默认为当前环境cores(cpu核数)和 2 的最小值
val inputRDD: RDD[String] = sc.textFile("input")
//也可以通过第二个参数来设置最小分区数,但是实际上根据源码中的算法得出最终的分区数
// 最小分区数只会对最终分区数有一定的影响
val inputRDD: RDD[String] = sc.textFile("input",3)

算子

https://www.cnblogs.com/kpsmile/p/10434390.html

转换算子

单Value类型:map()mapPartitions() 注意:以下形式效率更高,一次拿取一个partition中的数据,然后内存里做map缺点:有可能会造成内存溢出inputRDD.mapPartitions(datas=>{datas.map(_ * 2)})mapPartitionsWithIndex()flatMap()glom()    // 将每个分区封装为数组集合groupBy()filter()sample()  //随机取样。 参数1:是否放回 2:抽样比较熟知  3:随机数种子,一般默认或当前的时间distinct()  //去重, 参数可以改变分区coalesce() //缩减分区, 第二个参数决定是否shuffle; repartition() // 重分区, 一定会shuffle, 底层调用的就是coalesce()sortBy() // Tuple类型默认先比较第一个,然后第二个...; 参数可以决定升序还是降序pipe()   //管道,针对每一个分区,把RDD中的数据通过管道传递给shell命令或脚本。每个分区执行一次这个命令双Value类型:union()  //并集subtract()  // 交集cartesian()  // 笛卡尔积zip()   // 拉链。 注意:两个RDD的数量和分区数都必须相同Key-Value类型://RDD本身并不提供对key-value类型数据的操作,而是需要进行隐式转换,转换成PairRDDFunctions进行操作。partitionBy()  //根据指定的规则进行重分区; 参数需要传入一个PartitionerreduceByKey()  // 相较与groupByKey会对数据分区内进行提前聚合(shuffle前),效率更高// 需要分区内和分区间的计算规则一样groupByKey()aggregateByKey()() //聚合。 两个参数列表, 1:zeroValue初始值  2:分区内计算规则,分区间计算规则foldByKey()()  //聚合。 两个参数列表: 当分区内和分区间的计算规则一样时,可以使用combineByKey()  // 三个参数:1. 将分区内第一个值进行结构的转换 2.分区内计算规则 3.分区间计算规则 // 由于无法推断出类型,计算规则中需要标明数据类型。sortByKey()mapValues()  // 只对value'进行处理,返回值中含有keyjoin() // 相同的key进行连接: (k,v) (k,w) => (k,(v,w))leftOuterJoin()rightOuterJoin()fullOuterJoin()cogroup()  //先在自己集合上做一个聚合,然后再和另一个集合聚合

行动算子

reduce()
collect()
count()
take()
first()
takeOrdered()
aggregate()()    // 注意:分区间的计算也会使用zeroValue
saveAsTextFile()
saveAsSequenceFile()
saveAsObjectFile()
countByKey()
countByValue()
foreach()  // 会将数据发送到Executor

序列化

涉及到Driver和Executor之间对象传递的类都需要实现序列化
class User extends Serializable    // 闭包检测
样例类默认序列化Kryo序列化:java序列化太大;Kryo更简洁。Spark2.0内部开始使用Kryo。使用Kryo也要继承Serializable。使用:需要在SparkConf中声明序列化方式和类; 类也要继承Serializable。
val conf: SparkConf = new SparkConf()
.setAppName("SerDemo")
.setMaster("local[*]")
// 替换默认的序列化机制
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册需要使用 kryo 序列化的自定义类            .
.registerKryoClasses(Array(classOf[Searcher]))    Driver: 算子之外运行位置
Executor; 算子运行位置

血缘关系:

rdd.toDebugString   // 查看血缘关系
rdd.dependencies    // 查看依赖关系// 判断依据:数据是否打乱重组
窄依赖:oneToOne
宽依赖:oneToSome   // shuffle 一定是宽依赖

RDD的持久化和检查点:

cache()  //  底层 peresist() ,确定存储的级别; 会增加血缘关系,不改变原有的血缘关系//聚合算子默认缓存checkpoint()  // 可以将计算结果保存到检查点中长时间存储。// 检查点操作一般会重头执行一遍完整的流程,所以一般会和cache()联合使用。// cache方法和checkpoint方法没有关联性,可以随意放置// 检查点会切断血缘

RDD的分区器:

k-v类型才需要分区器,Spark目前支持Hash分区和Range分区。
指定的规则其实就是分区规则:分区器
也可以自定义分区器:
// 自定义分区器
// 1. 继承Partitioner
// 2. 重写方法
class MyPartitioner(num:Int) extends Partitioner {// 获取分区数量override def numPartitions: Int = {num}// 根据数据key获取所在的分区号码(从0开始)override def getPartition(key: Any): Int = {if ( key.isInstanceOf[String] ) {val keyString: String = key.asInstanceOf[String]if ( keyString == "cba" ) {0} else if ( keyString == "nba" ) {1} else {2}} else {2}}
}

文件数据的读取和存储

textFile(path)  // 读取Text文件
saveAsTextFile(path) //存储Text文件import scala.util.parsing.json.JSON
rdd.map(JSON.parseFull)  // 解析json字符串;JSON字符串必须在一行读取Json文件
// 注意:按行读,一行中应该满足json格式
// 注意:每一行必须满足json格式,不然会返回NonesequenceFile[keyClass, valueClass](path) //读取文件
saveAsSequenceFile(path) //保存为sequenceFile文件// 可以将对象序列化保存下来,采用java的序列化机制。就是objectFile
objectFile[(String,Int)](path)  //读取
saveAsObjectFile(path) //存储// 从HDFS读写文件(集群执行不需要添加前缀)
// 注意:端口要与core-site.xml中设置的端口号一致
val inputRDD: RDD[String] = sc.textFile("hdfs://192.168.2.201:9000/hdfsTest/test1.txt")
inputRDD.saveAsTextFile("hdfs://192.168.2.201:9000/hdfsTest/test")// 从MySQL数据读取文件
// https://www.cnblogs.com/wcgstudy/p/10984550.html
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version>
</dependency>// 读
object JDBCDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")val sc = new SparkContext(conf)//定义连接mysql的参数val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://hadoop201:3306/rdd"val userName = "root"val passWd = "aaa"val rdd = new JdbcRDD(sc,() => {Class.forName(driver)DriverManager.getConnection(url, userName, passWd)},"select id, name from user where id >= ? and id <= ?",1,20,2,result => (result.getInt(1), result.getString(2)))rdd.collect.foreach(println)}
}// 写
object JDBCDemo2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")val sc = new SparkContext(conf)//定义连接mysql的参数val driver = "com.mysql.jdbc.Driver"val url = "jdbc:mysql://hadoop201:3306/rdd"val userName = "root"val passWd = "aaa"val rdd: RDD[(Int, String)] = sc.parallelize(Array((110, "police"), (119, "fire")))// 对每个分区执行 参数函数rdd.foreachPartition(it => {Class.forName(driver)val conn: Connection = DriverManager.getConnection(url, userName, passWd)it.foreach(x => {val statement: PreparedStatement = conn.prepareStatement("insert into user values(?, ?)")statement.setInt(1, x._1)statement.setString(2, x._2)statement.executeUpdate()})})}
}//  从 Hbase 读写文件
<dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-server</artifactId><version>1.3.1</version>
</dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.3.1</version>
</dependency>// 读
object HBaseDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")val sc = new SparkContext(conf)val hbaseConf: Configuration = HBaseConfiguration.create()hbaseConf.set("hbase.zookeeper.quorum", "hadoop201,hadoop202,hadoop203")hbaseConf.set(TableInputFormat.INPUT_TABLE, "student")val rdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(hbaseConf,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])val rdd2: RDD[String] = rdd.map {case (_, result) => Bytes.toString(result.getRow)}rdd2.collect.foreach(println)sc.stop()}
}// 写
object HBaseDemo2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("Practice").setMaster("local[2]")val sc = new SparkContext(conf)val hbaseConf = HBaseConfiguration.create()hbaseConf.set("hbase.zookeeper.quorum", "hadoop201,hadoop202,hadoop203")hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "student")// 通过job来设置输出的格式的类val job = Job.getInstance(hbaseConf)job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])job.setOutputKeyClass(classOf[ImmutableBytesWritable])job.setOutputValueClass(classOf[Put])val initialRDD = sc.parallelize(List(("100", "apple", "11"), ("200", "banana", "12"), ("300", "pear", "13")))val hbaseRDD = initialRDD.map(x => {val put = new Put(Bytes.toBytes(x._1))put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(x._2))put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("weight"), Bytes.toBytes(x._3))(new ImmutableBytesWritable(), put)})hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration)}
}

广播变量:

分布式共享只读变量
https://blog.csdn.net/Android_xue/article/details/79780463/简单案例:
object BroadcastTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("BroadcastTest")val sc: SparkContext = new SparkContext(conf)val inputRDD: RDD[(Int, Int)] = sc.makeRDD(List((1, 2), (2, 3)))val list: List[(Int, Int)] = List((1, 3), (2, 4))val listBroadcast: Broadcast[List[(Int, Int)]] = sc.broadcast(list)inputRDD.map{case (k1, v1) =>{var v2:Int = 0for (elem <- listBroadcast.value) {if(elem._1 == k1){v2 = elem._2}}(k1,(v1,v2))}}.collect().foreach(println)sc.stop()}
}

累加器:

分布式共享只写变量
(只写:不同的Executor写入的值不能读)
https://blog.csdn.net/Android_xue/article/details/79780463/自带的累加器:longAccumulator、doubleAccumulator、collectionAccumulator简单案例:
object AccumulatorTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("AccumulatorTest")val sc: SparkContext = new SparkContext(conf)val inputRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4), ("a", 5)))val sum: LongAccumulator = sc.longAccumulator("sum")inputRDD.foreach{case (word, count)=>{sum.add(count)}}println(sum.value)sc.stop()}
}

自定义累加器:

class MyAccumulate extends AccumulatorV2[String, mutable.Map[String, Int]]{private var resultMap = mutable.Map[String, Int]()// 是否为空override def isZero: Boolean = {resultMap.isEmpty}// 复制override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = {new MyAccumulate}// 重置override def reset(): Unit = {resultMap.clear()}// 添加override def add(v: String): Unit = {resultMap(v) = resultMap.getOrElse(v, 0) + 1}// 合并override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = {resultMap = resultMap.foldLeft(other.value)((innerMap, kv) => {innerMap(kv._1) =  innerMap.getOrElse(kv._1, 0) + kv._2innerMap})}// 获取valueoverride def value: mutable.Map[String, Int] = resultMap
}

案例:练习: 计算每个省份广告点击量的TopN

/***  练习: 计算每个省份广告点击量的TopN*  数据模型 :1516609143867 6 7 64 16*            时间戳  省份  城市  用户  广告*/
object SparkExerDemo {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkExerDemo")val sc: SparkContext = new SparkContext(conf)// 1.读取文件val inputRDD: RDD[String] = sc.textFile("WordCountTest/src/main/resources/agent.log")// 2.对数据进行转型 (省份-广告,1)val mapRDD: RDD[(String, Int)] = inputRDD.map(line => {val words: Array[String] = line.split(" ")(words(1) + "_" + words(4), 1)})// 3.聚合,求得每个省份每个广告的和 (省份-广告,sum)val adsAndSumRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)// 4. 转型: (省份-广告,sum)=> (省份,(广告,sum))// map 和case一起使用,模式匹配;val adsAndCitySumRDD: RDD[(String, (String, Int))] = adsAndSumRDD.map {// (ads, sum) 会报错;推断错误case (ads, sum) => {val words: Array[String] = ads.split("_")(words(0), (words(1), sum))}}// 5.对数据进行分组val resultRDD: RDD[(String, Iterable[(String, Int)])] = adsAndCitySumRDD.groupByKey()// 6.对同组的数据进行排序取前三val finalRDD: RDD[(String, List[(String, Int)])] = resultRDD.mapValues(iter => {iter.toList.sortWith((left, right) => {left._2 > right._2}).take(3)})finalRDD.collect().foreach(println)sc.stop()}
}

SparkSQL

Spark SQL能够将Spark SQL转换成RDD,然后执行,执行效率非常快提供了两个编程抽象:
DataFrame
DataSetSpark SQL可以清楚的知道该数据集中包含哪些列,每列的名称各是类型各是什么
RDD不知道Spark SQL具有查询优化器,可以对sql进行优化,得到高效的执行流程DataFrame是DataSet的特列,DataFrame=DataSet[Row]SparkSession内部封装了SparkContext创建DataFrame:
1.通过Spark的数据源创建
2.通过已知的RDD来创建
3.通过查询一个HIVE表来创建DataFrame语法风格:
SQL:    spark.sql("select *  from people")
DSL:    (面向对象)  不需要创建临时视图,可以直接进行一些查询操作注意:
临时视图只能在当前session有效,在新的Session中无效
可以创建全局视图,访问全局视图需要全路径:  global_tmp.xxxpom.xml
<dependency>    <groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.1.1</version>
</dependency>SQL:
------------------------------------------------------
//读取json文件
val df = spark.read.json("/opt/module/spark-local/examples/src/main/resources/employees.json")//展示结果
df.show//创建临时表
df.createOrReplaceTempView("people")//sql查询,并展示
spark.sql("select * from people").show//创建全局临时视图
df.createGlobalTempView("people")//简历新的Session去执行sql操作
spark.newSession.sql("select * from global_temp.people")
--------------------------------------------------
DSL://查看Schema信息
df.printSchema//查询等操作
df.select($"name").show
df.select("name").show
df.select("name", "age").show
df.select($"name", $"age" + 1).show
df.filter($"age" > 21).show
df.groupBy("age").count.show注意:设计到运算的时候, 每列都必须使用$

RDD和DataFrame的交互:

涉及到RDD,DataFrame,DataSet之间的操作时,需要导入import spark.implicits._
注意:上一行的spark不是包名,.而是表示SparkSession的那个对象,所以必须先创建SparkSession对象再导入//rdd=>DataFormat
rdd2.toDF("name", "age").show//通过样例类反射转换
case class People(name :String, age: Int)
val rdd2 = rdd1.map(line => { val paras = line.split(", "); People(paras(0), paras(1).toInt) })//通过API的方式转换    ||  rdd=>DataFormat  rdd.toDF("id","name")  ||Dataframe => rdd  frame.rdd就是说rdd,编程DataFrame需要提供结构,frame到rdd,去除结构// 创建Spark环境配置对象val conf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL02_DataFrame01")// 创建上下文环境对象val spark = SparkSession.builder().config(conf).getOrCreate()// 导入隐式转换规则 : A -> B// 这里的spark不是包名,是上下文环境的对象名称import spark.implicits._//  从RDD转换为DataFrameval dataRDD: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1, "zhangsan", 30), (2, "lisi", 40), (3, "wangwu", 50)))// 给RDD数据增加结构信息val dataDF: DataFrame = dataRDD.toDF("id", "name", "age")//dataDF.show()// 将DataFrame转换为RDD,得到的数据类型是ROWval rdd: RDD[Row] = dataDF.rdd//rdd.foreach(row=>row.get)// 释放资源spark.close()

DataSet

DataSet:强类型,既需要知道数据的结构,又要知道数据的类型
//使用样例类得到DataSet
case class Person(name: String, age: Int)
val ds = Seq(Person("lisi", 20), Person("zs", 21)).toDS
//使用基本类型的序列得到DataSet
val ds = Seq(1,2,3,4,5,6).toDS注意:实际使用很少把序列转换成DataSet,更多的是通过RDD来得到DataSet=========================================
RDD和DataSet的交互:使用反射来推断包含特定类对象的RDD的schema样例类定义了表结构:样例类参数名通过反射被读到,然后成为列名case class Person(name: String, age: Long)peopleRDD.map(line => {val para = line.split(",");Person(para(0),para(1).trim.toInt)}).toDS从DataSet到RDD:val ds = Seq(Person("lisi", 40), Person("zs", 20)).toDSval rdd = ds.rdd================================================
DataFrame和DataSet之间的交互:DataFrame=>DataSet   提供数据类型val df = spark.read.json("examples/src/main/resources/people.json")case class People(name: String, age: Long)val ds = df.as[People]DataSet=>DataFrameval df = ds.toDF============================================
RDD,DataFrame,DataSet共性:惰性机制注意import spark.implicits._ 的使用===================================
区别:DataFrame每一行的数据固定为ROWDataFrame是DataSet的特例

RDD、DataFrame、DataSet转换关系图


SparkSql自定义函数

自定义UDF函数:
spark.udf.register("toUpper", (str:String) => {str.toUpperCase
})一,自定义UDAF类:// 自定义聚合函数
// 1. 继承UserDefinedAggregateFunction
// 2. 重写 方法
class AvgAgeUDAF extends UserDefinedAggregateFunction{// 输入数据的结构信息override def inputSchema: StructType = {StructType(Array( StructField("age", LongType) ))}// 缓冲区的数据结构信息override def bufferSchema: StructType = {StructType(Array( StructField("sum", LongType), StructField("count", LongType) ))}// 计算结果的类型override def dataType: DataType = DoubleType// 稳定性override def deterministic: Boolean = true// 初始化override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}// 更新缓冲区的数据override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1L}// 合并缓冲区override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// 计算结果override def evaluate(buffer: Row): Any = {buffer.getLong(0).toDouble / buffer.getLong(1)}
}主类中使用方法:// 创建自定义聚合函数val udaf = new AvgAgeUDAF()// 注册自定义函数spark.udf.register("avgAge", udaf)// 应用聚合函数spark.sql("select avgAge(age) from user").show()二,自定义UDAF类
case class UserX( username:String, age:Long )
case class AvgBuffer( var sum:Long, var count:Long )// 自定义聚合函数 (强类型)0
// 1. 继承Aggregator
// 2. 重写 方法
class AvgAgeUDAFClass extends Aggregator[UserX, AvgBuffer, Double]{override def zero: AvgBuffer = {AvgBuffer(0L,0L)}override def reduce(buff: AvgBuffer, in: UserX): AvgBuffer = {buff.sum = buff.sum + in.agebuff.count = buff.count + 1Lbuff}override def merge(buff1: AvgBuffer, buff2: AvgBuffer): AvgBuffer = {buff1.sum = buff1.sum + buff2.sumbuff1.count = buff1.count + buff2.countbuff1}override def finish(buff: AvgBuffer): Double = {buff.sum.toDouble / buff.count}override def bufferEncoder: Encoder[AvgBuffer] = Encoders.productoverride def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}使用方法:// 创建自定义聚合函数val udaf = new AvgAgeUDAFClass()// 将聚合函数当成查询的列val column: TypedColumn[UserX, Double] = udaf.toColumn// 用户自定义聚合函数 : UDAFval df: DataFrame = spark.read.json("input/user.json")val ds: Dataset[UserX] = df.as[UserX]// 采用DSL语法ds.select(column).show()// 释放资源spark.close()

SparkSql的文件读取

默认的数据源是parquet
通用加载:spark.read.load
保存数据:df.write.saveal peopleDF = spark.read.format("json").load("examples/src/main/resources/people.json")
peopleDF.select("name", "age").write.format("parquet").save("namesAndAges.parquet")// 通用的加载功能 : 默认加载的数据格式为:parquetval df: DataFrame = spark.read.format("json").load("input/user.json")// 通用的加载功能 : 默认加载的数据格式为:parquetval df: DataFrame = spark.read.format("json").load("input/user.json")// spark.read.load// spark.write.save// 通用的保存功能 : 默认保存的数据格式为:parquetdf.write.mode("overwrite").format("json").save("output")// 访问JDBCval jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://linux1:3306/rdd").option("user", "root").option("password", "000000").option("dbtable", "user").load()//jdbcDF.show()jdbcDF.write.format("jdbc")//.mode("append").option("url", "jdbc:mysql://linux1:3306/rdd").option("user", "root").option("password", "000000").option("dbtable", "user3").save()
对于JDBC数据源的读写:
读:
import org.apache.spark.sql.SparkSessionobject JDBCDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test").getOrCreate()import spark.implicits._val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://hadoop201:3306/rdd").option("user", "root").option("password", "aaa").option("dbtable", "user").load()jdbcDF.show}
}
写:object JDBCDemo3 {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().master("local[*]").appName("Test").getOrCreate()import spark.implicits._val rdd: RDD[User1] = spark.sparkContext.parallelize(Array(User1("lisi", 20), User1("zs", 30)))val ds: Dataset[User1] = rdd.toDSds.write.format("jdbc").option("url", "jdbc:mysql://hadoop201:3306/rdd").option("user", "root").option("password", "aaa").option("dbtable", "user").mode(SaveMode.Append).save()val props: Properties = new Properties()props.setProperty("user", "root")props.setProperty("password", "aaa")ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop201:3306/rdd", "user", props)}
}case class User1(name: String, age: Long)


Spark整合外部Hive

Spark 要接管 Hive 需要把 hive-site.xml copy 到conf/目录下.
把 Mysql 的驱动 copy 到 jars/目录下.
如果访问不到hdfs, 则需要把core-site.xml和hdfs-site.xml 拷贝到conf/目录下.代码中:拷贝 hive-site.xml 到 resources 目录下。
pom.xml:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.1.1</version>
</dependency>val spark: SparkSession = SparkSession.builder().master("local[2]").appName("AreaClickApp").enableHiveSupport().getOrCreate()

SparkStreaming(流式处理)


注意:
spark Streaming中,处理数据的单位是一批而不是单条,数据采集却是逐条进行的
批处理间隔

DStream:
连续的数据流
在内部是由一个RDD序列来表示的


SparkStreaming架构


SparkStreaming WordCount案例


pom.xml文件:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.1.1</version>
</dependency>优雅的关闭:
ssc.start()
ssc.awaitTermination()pom.xml文件:
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.1.1</version>
</dependency>优雅的关闭:
ssc.start()
ssc.awaitTermination()WordCount:
object StreamingWordCount {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamingWordCount")val ssc = new StreamingContext(conf, Seconds(3))val inputStream: ReceiverInputDStream[String] = ssc.socketTextStream("127.0.0.1", 9999)val flatMapRDD: DStream[String] = inputStream.flatMap(line => {val words: Array[String] = line.split(" ")words})val mapRDD: DStream[(String, Int)] = flatMapRDD.map((_, 1))val resultRDD: DStream[(String, Int)] = mapRDD.reduceByKey(_ + _)resultRDD.print()ssc.start()ssc.awaitTermination()}
}自定义数据源:
class MySource(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {// 启动一个新的线程来接收数据new Thread("Socket Receiver"){override def run(): Unit = {receive()}}.start()}override def onStop(): Unit = {}def receive() = {val socket = new Socket(host, port)val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var line:String = reader.readLine()while (!isStopped() && line != null){store(line)line = reader.readLine()}reader.close()socket.close()restart("Trying to connect again")}
}注意事项:
1.一旦StreamingContext已经启动,则不能在添加新的streamingcomputations
2.一旦一个StreamingContext已经停止,他也就不能在重启
3.在一个JVM内,同一时间智能启动一个StreamingContext
4.stop()的方式停止StreamingContext,也会把SparkContext停掉.如果仅仅想停止StreamingContext,则应该这样:stop(false)
5.一个SparkContext可以重用去创建StreamingContext,前提是以前的StreamingContext已经停掉,并且SparkContext没有被停掉

DStream创建

可以使用ssc.queueStream(queueOfRDDs)来创建DStream,每一个推送到这个队列中的RDD,都会作为一个DStream处理object SparkStreamingTest2 {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("Test")val scc: StreamingContext = new StreamingContext(conf, Seconds(5))val sc: SparkContext = scc.sparkContext//创建一个可变队列val queue: mutable.Queue[RDD[Int]] = mutable.Queue[RDD[Int]]()val rddDS: InputDStream[Int] = scc.queueStream(queue, true)rddDS.reduce(_+_).print()scc.start()//循环的方式向队列中添加RDDfor (elem <- 1 to 5){queue += sc.parallelize(1 to 100)Thread.sleep(2000)}scc.awaitTermination()}
}

自定义数据源:

其实就是自定义接收器
需要继承Receiver,并实现onStart,onStop方法来自定义数据源采集需求:自定义数据源,实现监控某个款口号,获取该端口号内容
自定义数据源
object MySource{def apply(host: String, port: Int): MySource = new MySource(host, port)
}class MySource(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY){/*接收器启动的时候调用该方法. This function must initialize all resources (threads, buffers, etc.) necessary for receiving data.这个函数内部必须初始化一些读取数据必须的资源该方法不能阻塞, 所以 读取数据要在一个新的线程中进行.*/override def onStart(): Unit = {// 启动一个新的线程来接收数据new Thread("Socket Receiver"){override def run(): Unit = {receive()}}.start()}// 此方法用来接收数据def receive()={val socket = new Socket(host, port)val reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var line: String = null// 当 receiver没有关闭, 且reader读取到了数据则循环发送给sparkwhile (!isStopped && (line = reader.readLine()) != null ){// 发送给sparkstore(line)}// 循环结束, 则关闭资源reader.close()socket.close()// 重启任务restart("Trying to connect again")}override def onStop(): Unit = {}
}object MySourceDemo {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[*]")// 1. 创建SparkStreaming的入口对象: StreamingContext  参数2: 表示事件间隔val ssc = new StreamingContext(conf, Seconds(5))// 2. 创建一个DStreamval lines: ReceiverInputDStream[String] = ssc.receiverStream[String](MySource("hadoop201", 9999))// 3. 一个个的单词val words: DStream[String] = lines.flatMap(_.split("""\s+"""))// 4. 单词形成元组val wordAndOne: DStream[(String, Int)] = words.map((_, 1))// 5. 统计单词的个数val count: DStream[(String, Int)] = wordAndOne.reduceByKey(_ + _)//6. 显示count.print//7. 启动流式任务开始计算ssc.start()//8. 等待计算结束才退出主程序ssc.awaitTermination()ssc.stop(false)}
}nc -lk 9999  进行测试

Kafka数据源

高级Api和低级Api的区别: 低级Api对于Offset的操作更加灵活。

pom.xml
<dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-8_2.11</artifactId><version>2.1.1</version>
</dependency>object HighKafka {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")val ssc = new StreamingContext(conf, Seconds(3))// kafka 参数//kafka参数声明val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"val topic = "first"val group = "bigdata"val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"val kafkaParams = Map(ConsumerConfig.GROUP_ID_CONFIG -> group,ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))dStream.print()ssc.start()ssc.awaitTermination()}
}object HighKafka2 {def createSSC(): StreamingContext = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")val ssc = new StreamingContext(conf, Seconds(3))// 偏移量保存在 checkpoint 中, 可以从上次的位置接着消费ssc.checkpoint("./ck1")// kafka 参数//kafka参数声明val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"val topic = "first"val group = "bigdata"val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"val kafkaParams = Map("zookeeper.connect" -> "hadoop201:2181,hadoop202:2181,hadoop203:2181",ConsumerConfig.GROUP_ID_CONFIG -> group,ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(topic))dStream.print()ssc}def main(args: Array[String]): Unit = {val ssc: StreamingContext = StreamingContext.getActiveOrCreate("./ck1", () => createSSC())ssc.start()ssc.awaitTermination()}
}低级API:object LowKafka {// 获取 offsetdef getOffset(kafkaCluster: KafkaCluster, group: String, topic: String): Map[TopicAndPartition, Long] = {// 最终要返回的 Mapvar topicAndPartition2Long: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()// 根据指定的主体获取分区信息val topicMetadataEither: Either[Err, Set[TopicAndPartition]] = kafkaCluster.getPartitions(Set(topic))// 判断分区是否存在if (topicMetadataEither.isRight) {// 不为空, 则取出分区信息val topicAndPartitions: Set[TopicAndPartition] = topicMetadataEither.right.get// 获取消费消费数据的进度val topicAndPartition2LongEither: Either[Err, Map[TopicAndPartition, Long]] =kafkaCluster.getConsumerOffsets(group, topicAndPartitions)// 如果没有消费进度, 表示第一次消费if (topicAndPartition2LongEither.isLeft) {// 遍历每个分区, 都从 0 开始消费topicAndPartitions.foreach {topicAndPartition => topicAndPartition2Long = topicAndPartition2Long + (topicAndPartition -> 0)}} else { // 如果分区有消费进度// 取出消费进度val current: Map[TopicAndPartition, Long] = topicAndPartition2LongEither.right.gettopicAndPartition2Long ++= current}}// 返回分区的消费进度topicAndPartition2Long}// 保存消费信息def saveOffset(kafkaCluster: KafkaCluster, group: String, dStream: InputDStream[String]) = {dStream.foreachRDD(rdd => {var map: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long]()// 把 RDD 转换成HasOffsetRanges对val hasOffsetRangs: HasOffsetRanges = rdd.asInstanceOf[HasOffsetRanges]// 得到 offsetRangsval ranges: Array[OffsetRange] = hasOffsetRangs.offsetRangesranges.foreach(range => {// 每个分区的最新的 offsetmap += range.topicAndPartition() -> range.untilOffset})kafkaCluster.setConsumerOffsets(group,map)})}def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("HighKafka")val ssc = new StreamingContext(conf, Seconds(3))// kafka 参数//kafka参数声明val brokers = "hadoop201:9092,hadoop202:9092,hadoop203:9092"val topic = "first"val group = "bigdata"val deserialization = "org.apache.kafka.common.serialization.StringDeserializer"val kafkaParams = Map("zookeeper.connect" -> "hadoop201:2181,hadoop202:2181,hadoop203:2181",ConsumerConfig.GROUP_ID_CONFIG -> group,ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> brokers,ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> deserialization,ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> deserialization)// 读取 offsetval kafkaCluster = new KafkaCluster(kafkaParams)val fromOffset: Map[TopicAndPartition, Long] = getOffset(kafkaCluster, group, topic)val dStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc,kafkaParams,fromOffset,(message: MessageAndMetadata[String, String]) => message.message())dStream.print()// 保存 offsetsaveOffset(kafkaCluster, group, dStream)ssc.start()ssc.awaitTermination()}
}

DStream的转换:

map(func)
flatMap(func)
filter(func)
repartition(numPartitions)
union(otherStream)
count()
reduce(func)
countByValue()
reduceByKey(func, [numTasks])
join(otherStream, [numTasks])
cogroup(otherStream, [numTasks])
transform(func)  // 能实现比较复杂的转化, 能在转换的同时进行一些其他的操作。
updateStateByKey(func)updateStateByKey操作允许在使用新信息不断更新状态的同时能够保留他的状态.需要做两件事情:
定义状态. 状态可以是任意数据类型
定义状态更新函数. 指定一个函数, 这个函数负责使用以前的状态和新值来更新状态.def updateFunction(newValue: Seq[Int], runningCount: Option[Int]): Option[Int] = {// 新的总数和状态进行求和操作val newCount: Int = (0 /: newValue) (_ + _) + runningCount.getOrElse(0)Some(newCount)}// 设置检查点: 使用updateStateByKey必须设置检查点ssc.sparkContext.setCheckpointDir("hdfs://hadoop201:9000/checkpoint")val stateDS: DStream[(String, Int)] = wordAndOne.updateStateByKey[Int](updateFunction _)//结束// 显示stateDS.print

SparkStreaming的window操作

注意:窗口的大小 和 窗口的滑动长度要和 采集的时间 成整数倍的关系object WindowTest {def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WindowTest")val ssc = new StreamingContext(conf, Seconds(2))val inputDS: ReceiverInputDStream[String] = ssc.socketTextStream("127.0.0.1", 9999)// 窗口长度为6s, 滑动为4s
//    val winDS: DStream[String] = inputDS.window(Seconds(6), Seconds(4))
//     滚动窗口val winDS: DStream[String] = inputDS.window(Seconds(4))val flatMapRDD: DStream[String] = winDS.flatMap(line => {val words: Array[String] = line.split(" ")words})val mapRDD: DStream[(String, Int)] = flatMapRDD.map((_, 1))val resultRDD: DStream[(String, Int)] = mapRDD.reduceByKey(_ + _)resultRDD.print()ssc.start()ssc.awaitTermination()}
}窗口的其他操作:
reduceByKeyAndWindow(reduceFunc: (V, V) => V, windowDuration: Duration)
reduceByKeyAndWindow(reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, windowDuration: Duration, slideDuration: Duration)
window(windowLength, slideInterval)
countByWindow(windowLength, slideInterval)
countByValueAndWindow(windowLength, slideInterval, [numTasks])

DStream的输出:

print()
saveAsTextFiles(prefix, [suffix])
saveAsObjectFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
foreachRDD(func)注意:
连接不能写在driver层面(序列化);
如果写在foreach则每个RDD中的每一条数据都创建,得不偿失;
增加foreachPartition,在分区创建(获取)。

SparkStreaming中累加器和广播变量

和RDD中的用法完全一样


GitHub:

https://github.com/MrXuSS/SparkExer


DF常用操作:

https://www.cnblogs.com/Frank99/p/8295949.html

总结

以上就是本菜鸟的Spark学习笔记,拿出来和大家分享一下,还是有点小乱,但是感觉还能看,谅解。有关大数据相关的问题都可以与本菜鸟一起讨论,一起学习。

本菜鸟QQ:599903582

笨鸟先飞,熟能生巧。

比心心~

Spark-学习笔记分享相关推荐

  1. Spark学习笔记1——第一个Spark程序:单词数统计

    Spark学习笔记1--第一个Spark程序:单词数统计 笔记摘抄自 [美] Holden Karau 等著的<Spark快速大数据分析> 添加依赖 通过 Maven 添加 Spark-c ...

  2. Spark学习笔记[1]-scala环境安装与基本语法

    Spark学习笔记[1]-scala环境安装与基本语法   正所谓工欲善其事必先利其器,Spark的开发语言不是java而是scala,虽然都是运行于JVM,但是两门语言的基本特性还是有些不一样,这里 ...

  3. CSDN-我的在线学习笔记分享平台

    CSDN-我的在线学习笔记分享平台 1.你来自哪里?来CSDN想收获什么? 4.你用过哪些开源项目让你忍不住推荐给朋友? 5.有什么事情想做很久了?还没去做的原因是什么? 6.你和朋友讨论过的哪些有趣 ...

  4. 尚硅谷Vue2学习笔记分享

    前言 这里是尚硅谷Vue2的学习笔记分享. 原视频是尚硅谷Vue2.0+Vue3.0全套教程丨vuejs从入门到精通 Vue3的笔记链接 文章目录 前言 初识Vue 模板语法 数据绑定 el和data ...

  5. spark 学习笔记

    spark 学习笔记 spark介绍 Spark是是一种快速通用的集群计算系统,它的主要特点是能够在内存中进行计算.它包含了 spark 核心组件 spark-core,用于 SQL 和结构化处理数据 ...

  6. Linux_红帽8学习笔记分享_3(文件操作管理)

    Linux_红帽8学习笔记分享_3(文件操作管理) 文章目录 Linux_红帽8学习笔记分享_3(文件操作管理) 1.Vi编辑器 1.1两种模式 1.2十种技巧 2.用户的家目录 2.1 su命令的使 ...

  7. Spark学习笔记(8)---Spark Streaming学习笔记

    Spark Streaming学习笔记 同Spark SQL一样,Spark Streaming学习也是放在了github https://github.com/yangtong123/RoadOfS ...

  8. Spark学习笔记(7)---Spark SQL学习笔记

    Spark SQL学习笔记 Spark SQL学习笔记设计到很多代码操作,所以就放在github, https://github.com/yangtong123/RoadOfStudySpark/bl ...

  9. 软件测试-柠檬班python全栈自动化50期测试学习笔记分享

    数组(1174536086)←v是有序的元素序列.用于差异数组的各个元素的数字编号称为下标.若将有限个类型相同的变量的集结命名,那么这个名称为数组名.数组是一个固定长度的存储相同数据类型的数据结构,数 ...

  10. ssm 转发请求_千呼万唤!阿里内部终于把这份SSM框架技术学习笔记分享出来了...

    SSM SSM(Spring+SpringMVC+MyBatis)框架集由Spring.MyBatis两个开源框架整合而成(SpringMVC是Spring中的部分内容).常作为数据源较简单的web项 ...

最新文章

  1. VTK:结构化网格之Vol
  2. endnote x9中科大版_文献管理软件Endnote的一些使用经验
  3. Disabling contextual LOB creation as createClob()
  4. 继涉黄被约谈 “比心陪练”App因内容涉宣扬暴力再被处罚
  5. ubuntu16.4 配置logstash6.3.2 kibanan6.3.2
  6. 19【CTR15】会话兴趣⽹络
  7. 选择尽可能多的不相交区间
  8. 管理思维的逻辑之案例作业
  9. 数字图像处理-空间滤波
  10. 系统迁移工具迁移操作系统到别的硬盘
  11. 人工智能之模式识别(二)
  12. Google Guava简介
  13. gds文件 导出_GaussDB 200使用GDS服务导入导出数据
  14. 8.15美团笔试和奇葩赛码网的输入坑
  15. 星级评价的代码php,JavaScript_使用jQuery实现星级评分代码分享,前面有一篇原生js实现星级评 - phpStudy...
  16. Python数学问题16:百钱买百鸡
  17. 使用完整拼音查找汉字(完整拼音,不是网上散布的首字符拼音那种方法)
  18. C++:评估二伽玛或 psi 功能(附完整源码)
  19. 中年人学C语言Windows程序设计,9 窗口绘图:直线的画法
  20. 凡科网怎么样啊,中小企业建站凡科网值得选择吗,口碑如何?

热门文章

  1. win10 改变输入法的切换快捷键
  2. 32张世界杯海报,惊艳全场
  3. 关于浙政钉水印的问题
  4. 工程项目管理的主要内容都是什么?
  5. Intellij IDEA自动导入jar包
  6. 前端经典面试题(一)-李游Leo-专题视频课程
  7. 【测试开发】几种常见的自动化测试框架
  8. Windows 下使用 grep 命令
  9. ForkJoin 线程池
  10. Java中的线程安全集合类