一、创建DataSet

使用SparkSession,应用程序可以从现有的RDD,Hive表的或Spark数据源创建DataFrame 。

(1)基于JSON的内容创建一个DataFrame

//hdfs
Dataset<Row> df = spark.read().json("hdfs://master:9000/test.json");//rdd
RDD<String> jsonRDD = ...
Dataset<Row> df = spark.read().json(jsonRDD);//dataset
Dataset<String> jsonDataset = ...
Dataset<Row> df = spark.read().json(dataSet);

(2)基于parquet的内容创建一个DataFrame

//hdfs
Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.parquet");

(3)基于orc的内容创建一个DataFrame

//hdfs
Dataset<Row> df = spark.read().parquet("hdfs://master:9000/test.orc");

(4)基于txt的内容创建一个DataFrame

//hdfs 创建只有value列的数据
Dataset<Row> df = spark.read().txt("hdfs://master:9000/test.txt");

(5)基于cvs的内容创建一个DataFrame

//hdfs
Dataset<Row> df = spark.read().cvs("hdfs://master:9000/test.cvs");

(6)基于jdbc的内容创建一个DataFrame

Dataset<Row> df1 = spark.read().format("jdbc").option("url", "jdbc:mysql://localhost:3306/man").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "man").option("user", "root").option("password","admin").load();
df1.show();Properties properties = new Properties();
properties.put("user", "root");
properties.put("password","admin");
properties.put("driver", "com.mysql.jdbc.Driver");
Dataset<Row> df2 = spark.read().jdbc("jdbc:mysql://localhost:3306/man",  "man", properties);
df2.show();

(7)基于textFile的内容创建一个DataSet

//hdfs
Dataset<String> ds = spark.read().textFile("hdfs://master:9000/test.txt");

(8)rdd创建DataSet

//反射推断StructType
JavaRDD<Person> peopleRDD = ...
Dataset<Row> peopleDF = spark.createDataFrame(peopleRDD, Person.class);//编程方式指定StructType
String schemaString = ...
List<StructField> fields = new ArrayList<>();
for (String fieldName : schemaString.split(" ")) {StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);fields.add(field);
}
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = peopleRDD.map((Function<String, Row>) record -> {String[] attributes = record.split(",");return RowFactory.create(attributes[0], attributes[1].trim());
});
Dataset<Row> peopleDataFrame = spark.createDataFrame(rowRDD, schema);

二、DataSet操作

(1)schema结构

df.printSchema();
StructType type = df.schema();

(2)map一对一映射操作

//dataframe格式转换
Dataset<Row> df1 = df.map(v-> v, RowEncoder.apply(df.schema()));
df1.show();//dataframe格式转换
StructField structField = new StructField("name", DataTypes.StringType, true, null);
StructType structType = new StructType(new StructField[]{structField});
Dataset<Row> df2 = df.map(v-> new GenericRowWithSchema(new Object[]{v.getAs("name")}, structType), RowEncoder.apply(structType));
df2.show();//dataSet格式转换
Dataset<String> dfs =  df.map(v-> v.getAs("name"), Encoders.STRING());
dfs.show();

(3)flatMap一对多映射操作

//dataSet格式转换
Dataset<String> dfs =  df.flatMap(v-> Arrays.asList((String)v.getAs("name")).iterator(), Encoders.STRING());
dfs.show();

(4)filter过滤操作

Dataset<Row>  df1 =  df.filter(new Column("name").$eq$eq$eq("mk"));
Dataset<Row>  df2 =  df.filter(new Column("name").notEqual("mk"));

(5)withColumn加列或者覆盖

Dataset<Row> df1 = df.withColumn("name1", functions.col("name"));
df1.show();
Dataset<Row>  df2 = df.withColumn("name", functions.lit("a"));
df2.show();
Dataset<Row>  df3 = df.withColumn("name", functions.concat(functions.col("name"),  functions.lit("zzz")));
df3.show();

(6)select选择列

Dataset<Row>  df1 = df.select(functions.concat(functions.col("name"),  functions.lit("zzz")).as("name1"));
df1.show();
Dataset<Row>  df2 = df.select(functions.col("name"), functions.concat(functions.col("name"),  functions.lit("zzz")).as("name1"));
df2.show();

(7)selectExpr表达式选择列

Dataset<Row>  df1 = df.selectExpr("name", "'a' as name1");
df1.show();

(8)groupBy agg分组统计

Dataset<Row>  df1 = df.groupBy(functions.col("name")).agg(functions.expr("count(1)").as("c"), functions.expr("max(desc)").as("desc"));
df1.show();

(9)drop删除列

Dataset<Row>  df1 = df.drop("name");
df1.show();

(10)distinct去重

Dataset<Row>  df1 = df.distinct();
df1.show();

(11)dropDuplicates 根据字段去重

Dataset<Row>  df1 = df.dropDuplicates("name");
df1.show();

(12)summary统计count、mean、stddev、min、max、25%、50%、75%,支持统计类型过滤

Dataset<Row>  df1 = df.summary("count");
df1.show();

(13)describe统计count、mean、stddev、min、max,支持列过滤

Dataset<Row>  df1 = df.describe("name");
df1.show();

(14)sort 排序

Dataset<Row>  df1 = df.sort(functions.col("name").asc());
df1.show();

(15)limit 分页

Dataset<Row>  df1 = df.limit(1);
df1.show();

三、DataSet连接

(1)join连接

Dataset<Row>  df1 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")), "left_outer");
df1.show();Dataset<Row>  df2 = df.as("a").join(df.as("b"), functions.col("a.name").notEqual(functions.col("b.name")));
df2.show();

(2)crossJoin笛卡尔连接

Dataset<Row>  df1 = df.as("a").crossJoin(df.as("b"));
df1.show();

四、DataSet集合运算

(1)except差集

Dataset<Row>  df1 = df.except(df.filter("name='mk'"));
df1.show();

(2)union并集,根据列位置合并行,列数要一致

Dataset<Row>  df1 = df.union(df.filter("name='mk'"));
df1.show();

(3)unionByName并集,根据列名合并行,不同名报错,列数要一致

Dataset<Row>  df1 = df.unionByName(df.filter("name='mk'"));
df1.show();

(4)intersect交集

Dataset<Row>  df1 = df.intersect(df.filter("name='mk'"));
df1.show();

五、DataSet分区

repartition(numPartitions:Int):RDD[T]

coalesce(numPartitions:Int,shuffle:Boolean=false):RDD[T]

两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现

假设RDD有N个分区,需要重新划分成M个分区

1、N<M。一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。

2、如果N>M并且N和M相差不多,(假如N是100,M是10)那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这时可以将shuff设置为false。

在shuffl为false的情况下,如果M>N时,coalesce为无效的,不进行shuffle过程,父RDD和子RDD之间是窄依赖关系。

3、如果N>M并且两者相差悬殊,这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能。

如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以讲shuffle设置为true。

DataSet的coalesce是Repartition shuffle=false的简写方法

Dataset<Row>  df1 = df.coalesce(1);
Dataset<Row>  df2 = df.repartition(1);

Spark SQL(二)之DataSet操作相关推荐

  1. Spark Sql对列的操作

    SQL写得好,工作随便找 本篇博客讲的是关于Spark SQL中对于列的操作.在SQL中对列的操作有起别名,转化类型等在Spark SQL中同样也支持,下面来看一看把 Spark withColumn ...

  2. spark sql练习之join操作

    数据集如下:有两个json文件,table1.json和table2.json table1.json {"A":"A1", "B":30, ...

  3. Spark SQL程序操作HiveContext

    HiveContext是对应spark-hive这个项目,与hive有部分耦合, 支持hql,是SqlContext的子类,也就是说兼容SqlContext; 1.添加依赖 <dependenc ...

  4. spark sql 本地调试_干货 | 如何成为大数据Spark高手

    Spark是发源于美国加州大学伯克利分校AMPLab的集群计算平台,它立足于内存计算,性能超过Hadoop百倍,从多迭代批量处理出发,兼收并蓄数据仓库.流处理和图计算等多种计算范式,是罕见的全能选手. ...

  5. 大数据入门之分布式计算框架Spark(2) -- Spark SQL

    1.Spark SQL概述 一个运行在Spark上执行sql的处理框架,可以用来处理结构化的数据[外部数据源(访问hive.json.parquet等文件的数据)]. Spark SQL提供了SQL的 ...

  6. Spark SQL(四)之DataSet与RDD转换

    一.创建DataSet DataSet与RDD相似,但是,它们不使用Java序列化或Kryo,而是使用专用的Encoder对对象进行序列化以进行网络处理或传输.虽然编码器和标准序列化都负责将对象转换为 ...

  7. Spark SQL原理及常用方法详解(二)

    Spark SQL 一.Spark SQL基础知识 1.Spark SQL简介 (1)简单介绍 (2)Datasets & DataFrames (3)Spark SQL架构 (4)Spark ...

  8. 快学Big Data -- Spark SQL总结(二十四)

    Spark  SQL 总结 概述 Spark  Sql 是用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用. 特点 spark  sql 要比 ...

  9. Spark15:Spark SQL:DataFrame常见算子操作、DataFrame的sql操作、RDD转换为DataFrame、load和save操作、SaveMode、内置函数

    前面我们学习了Spark中的Spark core,离线数据计算,下面我们来学习一下Spark中的Spark SQL. 一.Spark SQL Spark SQL和我们之前讲Hive的时候说的hive ...

最新文章

  1. WAL streaming (max_wal_senders 0) requires wal_level replica or logical
  2. jQuery修改数组$.map
  3. 【高校宿舍管理系统】第五章 JWT原理和应用以及实现功能菜单
  4. sprintf函数实现_从Go结构成员的升格到面向对象类的实现
  5. Google是如何设定目标并测量成功的
  6. 新建test.c为什么没有.h文件_新建STM32工程全局声明两个宏的原因
  7. Linux——常用文件管理命令(必会)
  8. matlab数据变成一列数据,用MATLAB处理EXCEL中一列共100000个数据,请问如何将数据导入并将数据做正态曲线拟合...
  9. asp上传服务器后台图片显示,利用ASPUPLOAD,ASPJPEG实现图片上传自动生成缩略图及加上水印...
  10. 数据结构算法——1097. Hub Connection plan
  11. Shiro源码剖析——Subject的创建与获取(一次完整的请求执行流程)
  12. 用 Opencv 和 Python 对汪星人做模糊检测
  13. PCL库中I/O操作
  14. 依图科技发布语音开放平台,联袂微软、华为撬动语音市场
  15. 机器学习实战——逻辑回归和线性判别分析
  16. 数据结构与算法【Java】05---排序算法总结
  17. 开视界 创未来丨酷雷曼第十期合作商交流会圆满举办
  18. 用全开源的协同OA办公平台,可以自己搭建OA啦!
  19. Unity-- Gfx.WaitForPresentOnGfxThread占用CPU过高导致帧率低
  20. WinForm使用Prism和DryIoc实现跨窗体数据绑定

热门文章

  1. c语言prime函数怎么用_C语言 要发就发
  2. mysql decimal型转化为float_5分钟搞懂MySQL数据类型之数值型DECIMAL类型
  3. leetcode139. 单词拆分
  4. [mybatis]映射文件_参数处理
  5. 得到选择框句柄 怎么操作_知道借名买房有风险,只能选择借名买房该怎么操作?...
  6. c语言程序设计7.4思考题答案,C语言程序设计习题集及答案(7)
  7. Ubuntu 上不了网
  8. CF1422F Boring Queries(ST表 + 主席树)
  9. [LOJ]体育成绩统计 / Score (无脑模拟,没有脑子,就是上!)
  10. CF1016F:Road Projects(树形dp)