1.1 什么是RDD?

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。

1.2 RDD的属性

(1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

(2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

(3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

(4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

(5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

1.3 WordCount粗图解RDD

其中hello.txt

2、DataSet、DataFrame是什么?

DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新接口,它提供了RDD的优势(强类型,使用强大的lambda函数的能力)和Spark SQL优化执行引擎的优点。DataSet可以被构造从JVM对象,然后使用功能性的转换(操作mapflatMapfilter等等)。DataSet API在Scala和 Java中可用。Python没有对Dataset API的支持。但由于Python的动态特性,数据集API的许多好处已经可用(即您可以自然地按名称访问行的字段row.columnName)。R的情况类似。

DataFrame是一个组织成命名列的DataSet。它在概念上等同于关系数据库中的表或R / Python中的数据框,但在底层具有更丰富的优化。DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。DataFrame API在Scala,Java,Python和R中可用。在Scala和Java中,DataFrame由Rows 的数据集表示。在Scala API中,DataFrame它只是一个类型别名Dataset[Row]。而在Java API中,用户需要使用Dataset<Row>来表示DataFrame

在本文档中,我们经常将Rows的Scala / Java数据集称为DataFrame。

Dataset<Row>  == DataFream 示例:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

DataSet 示例:

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;public static class Person implements Serializable {private String name;private int age;public String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}
}// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(Collections.singletonList(person),personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map((MapFunction<Integer, Integer>) value -> value + 1,integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;// Create an RDD of Person objects from a text file
JavaRDD<Person> peopleRDD = spark.read().textFile("examples/src/main/resources/people.txt").javaRDD().map(line -> {String[] parts = line.split(",");Person person = new Person();person.setName(parts[0]);person.setAge(Integer.parseInt(parts[1].trim()));return person;});// Apply a schema to an RDD of JavaBeans to get a DataFrame
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people");// SQL statements can be run by using the sql methods provided by spark
Dataset<Row> teenagersDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19");// The columns of a row in the result can be accessed by field index
Encoder<String> stringEncoder = Encoders.STRING();
Dataset<String> teenagerNamesByIndexDF = teenagersDF.map((MapFunction<Row, String>) row -> "Name: " + row.getString(0),stringEncoder);
teenagerNamesByIndexDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+// or by field name
Dataset<String> teenagerNamesByFieldDF = teenagersDF.map((MapFunction<Row, String>) row -> "Name: " + row.<String>getAs("name"),stringEncoder);
teenagerNamesByFieldDF.show();
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+import java.util.ArrayList;
import java.util.List;import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;// Create an RDD
JavaRDD<String> peopleRDD = spark.sparkContext().textFile("examples/src/main/resources/people.txt", 1).toJavaRDD();// The schema is encoded in a string
String schemaString = "name age";// Generate the schema based on the string of schema
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);// Convert records of the RDD (people) to Rows
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim());
});// Apply the schema to the RDD
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);// Creates a temporary view using the DataFrame
peopleDataFrame.createOrReplaceTempView("people");// SQL can be run over a temporary view created using DataFrames
Dataset<Row> results = spark.sql("SELECT name FROM people");// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
Dataset<String> namesDS = results.map((MapFunction<Row, String>) row -> "Name: " + row.getString(0),Encoders.STRING());
namesDS.show();
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

3、小结:

RDD适用于迭代计算、数据各种操作等。(Spark Core)

DataSet、DataFrame使用与规范化数据处理。(SparkSQL)

RDD、DataSet与DataFream相关推荐

  1. sql能查到数据 dataset对象里面没有值_spark系列:RDD、DataSet、DataFrame的区别

    RDD与DataSet的区别 二者都是由元素构成的分布式数据集合 1. 对于spark来说,并不知道RDD元素的内部结构,仅仅知道元素本身的类型,只有用户才了解元素的内部结构,才可以进行处理.分析:但 ...

  2. Spark总结之RDD(四)

    Spark总结之RDD(四) 1. 背景 Spark针对RDD的整体设计思想很多地方和MapReduce相似,但又做了很多优化. Spark整体API设计针对分布式数据处理做了很多优化,并且做了更高层 ...

  3. Apache Spark 3.0 SQL DataFrame和DataSet指南

    目录 简介 SQL 数据集和数据框 入门 起点:SparkSession Scala语言 Java语言 Python语言 R语言 创建DataFrame Scala语言 Java语言 Python语言 ...

  4. SparkSQL系列-5、什么是Dataset?

    版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明. 传送门:大数据系列文章目录 官方网址:http://spark.apache.org/. ht ...

  5. SparkSQL核心编程

    目录 基本介绍 DataFrame 创建 DataFrame DataSet 创建 DataSet RDD 转换为 DataSet DataSet 转换为 RDD DataFrame 和 DataSe ...

  6. 初识Spark2.0之Spark SQL

    内存计算平台Spark在今年6月份的时候正式发布了spark2.0,相比上一版本的spark1.6版本,在内存优化,数据组织,流计算等方面都做出了较大的改变,同时更加注重基于DataFrame数据组织 ...

  7. Spark之SparkSQL理论篇

    Spark SQL 理论学习: 简介 Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用. 特点 1)易整合 2) ...

  8. 详细解读Spark的数据分析引擎:Spark SQL

    一.spark SQL:类似于Hive,是一种数据分析引擎 什么是spark SQL? spark SQL只能处理结构化数据 底层依赖RDD,把sql语句转换成一个个RDD,运行在不同的worker上 ...

  9. 基于java的随机森林算法_基于Spark实现随机森林代码

    本文实例为大家分享了基于Spark实现随机森林的具体代码,供大家参考,具体内容如下 public class RandomForestClassficationTest extends TestCas ...

最新文章

  1. 构造函数(constructor)与原型链(prototype)关系
  2. FD.io/VPP — L2TP
  3. 字符集GBK和UTF8的区别说明
  4. pandas教程:series和dataframe
  5. java 冒泡排序_Java冒泡排序详解
  6. JDK 1.5 主要新特性
  7. 做一个更好的A牌 从《Artifact》2.0看Valve的设计思路
  8. 如何通过 C# 实现对象的变更跟踪 ?
  9. Redis HyperLogLog 是什么?这些场景使用它~
  10. C语言图像处理二值图细化,Visual C 实现二值图像处理
  11. 设置linux RHEL6.6本地镜像作为yum源
  12. using的一种用法
  13. 主流Ajax框架介绍
  14. 「Medical Image Analysis」Note on 3D U-Net
  15. (批处理)批量文件夹重命名,要求是在原文件夹名前加上英文字母前缀aa
  16. java实现 poi XWPFDocument 读取word文档
  17. 03.项目管理实践工具-团队绩效评价
  18. 嵌入式实操----基于RT1170 移植mbw做SDRAM带宽测试(三十一)
  19. c语言问题 角谷猜想,hdu 1279 验证角谷猜想(简单的模拟)
  20. C语言学生成绩管理系统大纲,C语言教学大纲

热门文章

  1. 电脑金山毒霸彻底卸载的方法
  2. css的边界虚线如何拉长dashed
  3. python中文件的write语句_Python之文件读写
  4. python接收邮件内容启动程序_如何使用Python脚本来处理电子邮件?
  5. 【C++】SGI一级空间配置器
  6. PDF怎么转成PPT文件免费?分享几个方法!
  7. 流行学习Manifold Learning
  8. Autodesk AutoCAD 2024 Mac软件安装包下载Autodesk CAD2024安装教程支持M1/2芯片
  9. 系统学习OpenCVSharp的课程大纲
  10. 随smart登陆欧洲,亿咖通科技踏上出海新征程