声明:代码主要以Scala为主,希望广大读者注意。本博客以代码为主,代码中会有详细的注释。相关文章将会发布在我的个人博客专栏《Spark 2.0机器学习》,欢迎大家关注。


Spark的发展史可以简单概括为三个阶段,分别为:RDD、DataFrame和DataSet。在Spark 2.0之前,使用Spark必须先创建SparkConf和SparkContext,不过在Spark 2.0中只要创建一个SparkSession就可以了,SparkConf、SparkContext和SQLContext都已经被封装在SparkSession当中,它是Spark的一个全新切入点,大大降低了Spark的学习难度。

一、创建SparkSession

创建SparkSession的方式非常简单,如下:

//创建SparkSession
val spark = SparkSession.builder().master("local[*]").appName("dataset").enableHiveSupport()  //支持hive,如果代码中用不到hive的话,可以省略这一条.getOrCreate()

二、DataSet/DataFrame的创建

1、序列创建DataSet

//1、产生序列dataset
val numDS = spark.range(5, 100, 5)
numDS.orderBy(desc("id")).show(5)  //降序排序,显示5个
numDS.describe().show()  //打印numDS的摘要

结果如下所示:

+---+
| id|
+---+
| 95|
| 90|
| 85|
| 80|
| 75|
+---+
only showing top 5 rows+-------+------------------+
|summary|                id|
+-------+------------------+
|  count|                19|
|   mean|              50.0|
| stddev|28.136571693556885|
|    min|                 5|
|    max|                95|
+-------+------------------+

2、集合创建DataSet

首先创建几个可能用到的样例类:

//样例类
case class Person(name: String, age: Int, height: Int)
case class People(age: Int, names: String)
case class Score(name: String, grade: Int)

然后定义隐式转换:

import spark.implicits._

最后,定义集合,创建dataset

//2、集合转成dataset
val seq1 = Seq(Person("xzw", 24, 183), Person("yxy", 24, 178), Person("lzq", 25, 168))
val ds1 = spark.createDataset(seq1)
ds1.show()

结果如下所示:

+----+---+------+
|name|age|height|
+----+---+------+
| xzw| 24|   183|
| yxy| 24|   178|
| lzq| 25|   168|
+----+---+------+

3、RDD转成DataFrame。

//3、RDD转成DataFrame
val array1 = Array((33, 24, 183), (33, 24, 178), (33, 25, 168))
val rdd1 = spark.sparkContext.parallelize(array1, 3).map(f => Row(f._1, f._2, f._3))
val schema = StructType(StructField("a", IntegerType, false) ::StructField("b", IntegerType, true) :: Nil
)
val rddToDataFrame = spark.createDataFrame(rdd1, schema)
rddToDataFrame.show(false)

结果如下所示:

+---+---+
|a  |b  |
+---+---+
|33 |24 |
|33 |24 |
|33 |25 |
+---+---+

4、读取文件

//4、读取文件,这里以csv文件为例
val ds2 = spark.read.csv("C://Users//Machenike//Desktop//xzw//test.csv")
ds2.show()

结果如下所示:

+---+---+----+
|_c0|_c1| _c2|
+---+---+----+
|xzw| 24| 183|
|yxy| 24| 178|
|lzq| 25| 168|
+---+---+----+

5、读取文件,并配置详细参数

//5、读取文件,并配置详细参数
val ds3 = spark.read.options(Map(("delimiter", ","), ("header", "false"))).csv("C://Users//Machenike//Desktop//xzw//test.csv")
ds3.show()

结果如下图所示:

+---+---+----+
|_c0|_c1| _c2|
+---+---+----+
|xzw| 24| 183|
|yxy| 24| 178|
|lzq| 25| 168|
+---+---+----+

三、DataSet的基础函数

为了节省篇幅,以下内容不再给出运行结果~

//1、DataSet存储类型
val seq1 = Seq(Person("xzw", 24, 183), Person("yxy", 24, 178), Person("lzq", 25, 168))
val ds1 = spark.createDataset(seq1)
ds1.show()
ds1.checkpoint()
ds1.cache()
ds1.persist()
ds1.count()
ds1.unpersist(true)//2、DataSet结构属性
ds1.columns
ds1.dtypes
ds1.explain()//3、DataSet rdd数据互换
val rdd1 = ds1.rdd
val ds2 = rdd1.toDS()
ds2.show()
val df2 = rdd1.toDF()
df2.show()//4、保存文件
df2.select("name", "age", "height").write.format("csv").save("./save")

四、DataSet的Actions操作

五、DataSet的转化操作

package sparkmlimport org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._//样例类
case class Person(name: String, age: Int, height: Int)
case class People(age: Int, names: String)
case class Score(name: String, grade: Int)object WordCount2 {def main(args: Array[String]): Unit = {//设置日志输出格式Logger.getLogger("org").setLevel(Level.WARN)//创建SparkSessionval spark = SparkSession.builder().master("local[*]").appName("dataset").getOrCreate()import spark.implicits._//seq创建datasetval seq1 = Seq(Person("leo", 29, 170), Person("jack", 21, 170), Person("xzw", 21, 183))val ds1 = spark.createDataset(seq1)//1、map操作,flatmap操作ds1.map{x => (x.age + 1, x.name)}.show()ds1.flatMap{x =>val a = x.ageval s = x.name.split("").map{x => (a, x)}s}.show()//2、filter操作,where操作ds1.filter("age >= 25 and height >= 170").show()ds1.filter($"age" >= 25 && $"height" >= 170).show()ds1.filter{x => x.age >= 25 && x.height >= 170}.show()ds1.where("age >= 25 and height >= 170").show()ds1.where($"age" >= 25 && $"height" >= 170).show()//3、去重操作ds1.distinct().show()ds1.dropDuplicates("age").show()ds1.dropDuplicates("age", "height").show()ds1.dropDuplicates(Seq("age", "height")).show()ds1.dropDuplicates(Array("age", "height")).show()//4、加法减法操作val seq2 = Seq(Person("leo", 18, 183), Person("jack", 18, 175), Person("xzw", 22, 183), Person("lzq", 23, 175))val ds2 = spark.createDataset(seq2)val seq3 = Seq(Person("leo", 19, 183), Person("jack", 18, 175), Person("xzw", 22, 170), Person("lzq", 23, 175))val ds3 = spark.createDataset(seq3)ds3.union(ds2).show()  //并集ds3.except(ds2).show()  // 差集ds3.intersect(ds2).show()  //交集//5、select操作ds2.select("name", "age").show()ds2.select(expr("height + 1").as[Int].as("height")).show()//6、排序操作ds2.sort("age").show()  //默认升序排序ds2.sort($"age".desc, $"height".desc).show()ds2.orderBy("age").show()  //默认升序排序ds2.orderBy($"age".desc, $"height".desc).show()//7、分割抽样操作val ds4 = ds3.union(ds2)val rands = ds4.randomSplit(Array(0.3, 0.7))println(rands(0).count())println(rands(1).count())rands(0).show()rands(1).show()val ds5 = ds4.sample(true, 0.5)println(ds5.count())ds5.show()//8、列操作val ds6 = ds4.drop("height")println(ds6.columns)ds6.show()val ds7 = ds4.withColumn("add", $"age" + 2)println(ds7.columns)ds7.show()val ds8 = ds7.withColumnRenamed("add", "age_new")println(ds8.columns)ds8.show()ds4.withColumn("add_col", lit(1)).show()//9、join操作val seq4 = Seq(Score("leo", 85), Score("jack", 63), Score("wjl", 70), Score("zyn", 90))val ds9 = spark.createDataset(seq4)val ds10 = ds2.join(ds9, Seq("name"), "inner")ds10.show()val ds11 = ds2.join(ds9, Seq("name"), "left")ds11.show()//10、分组聚合操作val ds12 = ds4.groupBy("height").agg(avg("age").as("avg_age"))ds12.show()}}

六、DataSet的内置函数

七、例子:WordCount

package sparkmlimport org.apache.spark.sql.SparkSessionobject WordCount {def main(args: Array[String]): Unit = {//创建SparkSessionval spark = SparkSession.builder().appName("Dataset").master("local[*]").getOrCreate()import spark.implicits._val data = spark.read.textFile("C://xzw//wordcount").flatMap(_.split(" ")).map(_.toLowerCase()).filter($"value"=!="," && $"value"=!="." && $"value"=!="not")data.groupBy($"value").count().sort($"count".desc).show(50)}}

结果如下图所示:

Spark DataSet介绍相关推荐

  1. java spark dataset_Spark 2.0介绍:Dataset介绍和使用

    <Spark 2.0技术预览:更容易.更快速.更智能>文章中简单地介绍了 Dataset介绍 Dataset是从Spark 1.6开始引入的一个新的抽象,当时还是处于alpha版本:然而在 ...

  2. Spark MLlib介绍

    Spark MLlib介绍 Spark之所以在机器学习方面具有得天独厚的优势,有以下几点原因: (1)机器学习算法一般都有很多个步骤迭代计算的过程,机器学习的计算需要在多次迭代后获得足够小的误差或者足 ...

  3. 学习笔记Spark(八)—— Spark SQL应用(3)—— Spark DataSet基础操作

    三.Spark DataSet基本操作 3.1.DataSet简介 DataSet是分布式的数据集合,DataSet提供了强类型支持,也是在RDD的每行数据加了类型约束. DateSet整合了RDD和 ...

  4. Spark Streaming介绍,DStream,DStream相关操作(来自学习资料)

    一. Spark Streaming介绍 1. SparkStreaming概述 1.1. 什么是Spark Streaming Spark Streaming类似于Apache Storm,用于流式 ...

  5. Spark之Spark角色介绍及运行模式

    Spark之Spark角色介绍及运行模式 集群角色 运行模式 1. 集群模式 从物理部署层面上来看,Spark主要分为两种类型的节点,Master节点和Worker节点: Master节点主要运行集群 ...

  6. Apache Spark开发介绍

    Databricks的工程师,Apache Spark Committer介绍了Databricks和Spark的历史,包括了Spark 1.4中的重要特性和进展,涵盖了Spark早期版本的主要功能和 ...

  7. 分布式离线计算—Spark—基础介绍

    原文作者:饥渴的小苹果 原文地址:[Spark]Spark基础教程 目录 Spark特点 Spark相对于Hadoop的优势 Spark生态系统 Spark基本概念 Spark结构设计 Spark各种 ...

  8. 快速理解Spark Dataset

    1. 前言 RDD.DataFrame.Dataset是Spark三个最重要的概念,RDD和DataFrame两个概念出现的比较早,Dataset相对出现的较晚(1.6版本开始出现),有些开发人员对此 ...

  9. Spark SQL介绍和特点

    一:Spark SQL的简介 Spark SQL是Spark处理数据的一个模块,跟基本的Spark RDD的API不同,Spark SQL中提供的接口将会提供给Spark更多关于结构化数据和计算的信息 ...

最新文章

  1. NSight Compute 用户手册(中)
  2. 联手中科大、浙大、华科大等高校,阿里研发4项最新AI安全技术
  3. dev 获取gridview没有rows属性_虚拟黑群晖不直通也可以获取到SMART?
  4. Oracle创建表空间(转)
  5. Python爬虫beautifulsoup4常用的解析方法总结
  6. 到底什么是IT服务管理
  7. 【ArcGIS风暴】ArcGIS自定义坐标系统案例教程---以阿尔伯斯投影(Albers)为例
  8. oracle索引sys_nc,通过dba_ind_columns表查到索引所在列的名字为SYS_NC00133$
  9. codeforces C. Vanya and Scales
  10. dart js转换_基于dart生态的FaaS前端一体化建设
  11. 安装memcached:error while loading shared libraries: libevent-1.4.so.2
  12. 博客园的积分原来是这样算的哦
  13. 事件---------2
  14. TI DSP位域寄存器文件(Bit Field and Register-File Struc...
  15. java下载文件excel格式错乱,excel表格数据错乱如何修复-excel表格里的文件突然格式全部乱了,怎么恢复?...
  16. 软件需求说明书模板概要书
  17. 计算机网络基础知识 - 物理层
  18. Unity实现人物旋转+移动
  19. 关于安装mmdetection
  20. 关于AC6003、6005、6605版本关联WIFI6代产品方法

热门文章

  1. Java在Quant应用_BigQuant人工智能量化平台使用
  2. 【优秀课设】51单片机LCD1602显示的温度控制系统设计 包括时间显示 校时、设置最值温度报警功能
  3. 50hz 60hz 级联 陷波器,心电信号50Hz陷波器的FPGA实现
  4. 计算机工程与科学不是CSCD吗,计算机工程与科学核心电子期刊发表要求
  5. catiawin10许可证灰色_安装CATIA V5 6R2017 Win64时“许可证管理工具”窗口不弹出解决方案...
  6. MATLAB数字图像小程序设计
  7. ISO27001体系认证的优势
  8. win10 + Anaconda3(python 3.7) + CPU版TensorFlow
  9. Tachyon架构分析和现存问题讨论
  10. JPA踩坑笔记(一) - 数据查询的两种方式