文章目录

  • 一.Spark DataFrame概述
    • 1.1 创建DataFrame
      • 1.1.1 通过json文件创建DataFrame
      • 1.1.2 通过CSV文件创建DataFrame
      • 1.1.3 通过hive table创建DataFrame
      • 1.1.4 通过jdbc数据源创建DataFrame
  • 二.Spark SQL实战
    • 2.1 DataFrame的统计信息
    • 2.2 DataFrame的select操作
    • 2.3 DataFrame对列的操作
    • 2.3 过滤数据
    • 2.4 简单的聚合操作
      • 2.4.1 简单聚合
    • 2.5 自定义函数
    • 2.6 表连接
      • 2.6.1 内连接
      • 2.6.2 外连接
    • 2.7 排序
    • 2.8 SparkSQL操作文件
  • 参考:

一.Spark DataFrame概述

在Spark语义中,DtatFrame是一个分布式的行集合,可以想象为一个关系型数据库的表,或一个带有列头的Excel表格。它和RDD一样,有这样一些特点:

  1. Immuatable: 一旦RDD、DataFrame被创建,就不能更改,只能通过tranformation生成新的RDD、DataFrame
  2. Lazy Evaluations: 只有action才会出发Transformation的执行。
  3. Distributed: DataFrame和RDD一样都是分布式的。

1.1 创建DataFrame

支持的数据源:

  1. Parquet Files
  2. ORC Files
  3. JSON Files
  4. Hive Tables
  5. JDBC
  6. Avro Files

创建DataFrame的语法:

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

Spark SQL的起点: SparkSession
代码:

import org.apache.spark.sql.SparkSession;SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate();

使用SparkSession,应用程序可以从现有的RDD、Hive表或Spark数据源中创建DataFrames。

1.1.1 通过json文件创建DataFrame

Json测试文件:

{"name": "Michael",  "age": 12}
{"name": "Andy",  "age": 13}
{"name": "Justin",  "age": 8}

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest4 {public static void main(String[] args){SparkSession spark = SparkSession.builder().appName("SparkSQLTest4").config("spark.some.config.option", "some-value").getOrCreate();Dataset<Row> df = spark.read().json("file:///home/pyspark/test.json");df.show();spark.stop();}}

测试记录:

1.1.2 通过CSV文件创建DataFrame

csv测试文件:

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest5 {public static void main(String[] args){SparkSession spark = SparkSession.builder().appName("SparkSQLTest4").config("spark.some.config.option", "some-value").getOrCreate();Dataset<Row> df = spark.read().format("csv").option("header", "true").load("file:///home/pyspark/emp.csv");df.show();spark.stop();}}

测试记录:

1.1.3 通过hive table创建DataFrame

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest2 {public static void main(String[] args){SparkSession spark = SparkSession.builder().appName("SparkSQLTest2").config("spark.some.config.option", "some-value").getOrCreate();Dataset<Row> sqlDF = spark.sql("SELECT * FROM test.ods_fact_sale limit 100");sqlDF.show();spark.stop();}}

测试记录:

1.1.4 通过jdbc数据源创建DataFrame

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest3 {public static void main(String[] args){SparkSession spark = SparkSession.builder().appName("SparkSQLTest3").config("spark.some.config.option", "some-value").getOrCreate();Dataset<Row> jdbcDF = spark.read().format("jdbc").option("url", "jdbc:mysql://10.31.1.123:3306/test").option("dbtable", "(SELECT * FROM EMP) tmp").option("user", "root").option("password", "abc123").load();jdbcDF.printSchema();jdbcDF.show();spark.stop();}}

测试记录:

二.Spark SQL实战

我们选用经典scoot用户下的4张表来模拟Spark SQL实战:

emp
dept
bonus
salgrade

2.1 DataFrame的统计信息

生成DataFrame的时候会保留统计信息,有点类似关系型数据库的统计信息

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest7 {public static void main(String[] args){SparkSession spark = SparkSession.builder().appName("SparkSQLTest7").config("spark.some.config.option", "some-value").getOrCreate();spark.sql("use test");Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");sqlDF.describe().show();spark.stop();}}

测试记录:
从下图可以看出,DataFrame给每一列都做了统计信息。

  1. count 是列不为空的总数
  2. mean 平均值
  3. stddev 标准偏差
  4. min 最小值
  5. max 最大值

2.2 DataFrame的select操作

有些应用场景,我们只需要DataFrame的部分列,此时可以通过select实现:

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest8 {public static void main(String[] args){SparkSession spark = SparkSession.builder().appName("SparkSQLTest8").config("spark.some.config.option", "some-value").getOrCreate();spark.sql("use test");Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");sqlDF.select("ename","hiredate").show();spark.stop();}
}

测试记录:

2.3 DataFrame对列的操作

有些应用场景,我们需要对列进行别名、新增列、删除列等操作。

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest9 {public static void main(String[] args){SparkSession spark = SparkSession.builder().appName("SparkSQLTest8").config("spark.some.config.option", "some-value").getOrCreate();spark.sql("use test");Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");//输出看有哪些列System.out.println("\n" + "\n" + "\n");for ( String col:sqlDF.columns() ){System.out.println(col);}System.out.println("\n" + "\n" + "\n");//删除一列sqlDF.drop("comm").show();//新增(或替换)一列//sqlDF.withColumn("new_comm", "sal").show();//给列进行重命名sqlDF.withColumnRenamed("comm","comm_new").show();spark.stop();}
}

测试记录:
显示列的信息:

删除一列:

替换列名:

2.3 过滤数据

过滤数据用的是filter,其实也可以用where,where是filter的别名

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest10 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("SparkSQLTest10").config("spark.some.config.option", "some-value").getOrCreate();spark.sql("use test");Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");sqlDF.where("comm is not null").show();spark.stop();}
}

测试记录:

2.4 简单的聚合操作

常用的聚合操作:

操作 描述
avg/mean 平均值
count 统计个数
countDistinct 统计唯一的个数
max 求最大值
min 求最小值
sum 求和
sumDistinct 统计唯一值的合计
skewness 偏态
stddev 标准偏差

2.4.1 简单聚合

代码:

package org.example;import org.apache.spark.sql.*;public class SparkSQLTest11 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("SparkSQLTest11").config("spark.some.config.option", "some-value").getOrCreate();spark.sql("use test");Dataset<Row> sqlDF = spark.sql("SELECT * FROM emp");sqlDF.groupBy("deptno").agg(functions.avg("sal").alias("avg_sal"),functions.max("comm").alias("max_comm")).show();spark.stop();}}

测试记录:

2.5 自定义函数

一些比较复杂的场景,我们希望使用自定义函数来实现。

代码:

package org.example;import org.apache.spark.sql.*;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.types.DataTypes;public class SparkSQLTest12 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("SparkSQLTest12").config("spark.some.config.option", "some-value").getOrCreate();spark.udf().register("plusOne", new UDF1<Integer, Integer>() {@Overridepublic Integer call(Integer x) {return x + 1;}}, DataTypes.IntegerType);spark.sql("SELECT plusOne(5)").show();spark.stop();}}

测试记录:

2.6 表连接

语法:

DataFrame.join(other, on=None, how=None)other            需要连接的DataFrame
on                str, list or Column, 可选项
how             str, 可选项default inner. Must be one of: inner, cross, outer, full, fullouter, full_outer, left, leftouter, left_outer, right, rightouter, right_outer, semi, leftsemi, left_semi, anti, leftanti and left_anti

2.6.1 内连接

代码:

package org.example;import org.apache.spark.sql.*;public class SparkSQLTest13 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("SparkSQLTest13").config("spark.some.config.option", "some-value").getOrCreate();spark.sql("use test");Dataset<Row> df1 = spark.sql("SELECT * FROM emp");Dataset<Row> df2 = spark.sql("SELECT * FROM dept");Dataset<Row> df3 = df1.join(df2, df1.col("deptno").equalTo(df2.col("deptno")) ,"inner").select(df1.col("empno"),df1.col("ename"),df2.col("dname"),df2.col("loc"));df3.show();spark.stop();}}

测试记录:

2.6.2 外连接

这里我们使用一个右连接

代码:

package org.example;import org.apache.spark.sql.*;public class SparkSQLTest14 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("SparkSQLTest14").config("spark.some.config.option", "some-value").getOrCreate();spark.sql("use test");Dataset<Row> df1 = spark.sql("SELECT * FROM emp");Dataset<Row> df2 = spark.sql("SELECT * FROM dept");Dataset<Row> df3 = df1.join(df2, df1.col("deptno").equalTo(df2.col("deptno")) ,"right").select(df1.col("empno"),df1.col("ename"),df2.col("dname"),df2.col("loc"));df3.show();spark.stop();}}

测试记录:

2.7 排序

语法:

DataFrame.orderBy(*cols, **kwargs)
-- 返回按指定列排序的新DataFrame参数:      ascending   bool or list,可选项布尔值或布尔值列表(默认为True)。排序升序与降序。为多个排序顺序指定列表。如果指定了列表,则列表的长度必须等于cols的长度。

代码:

package org.example;import org.apache.spark.sql.*;public class SparkSQLTest15 {public static void main(String[] args) {SparkSession spark = SparkSession.builder().appName("SparkSQLTest15").config("spark.some.config.option", "some-value").getOrCreate();spark.sql("use test");Dataset<Row> df1 = spark.sql("SELECT * FROM emp");Dataset<Row> df2 = spark.sql("SELECT * FROM dept");Dataset<Row> df3 = df1.join(df2, df1.col("deptno").equalTo(df2.col("deptno")) ,"right").select(df1.col("empno"),df1.col("ename"),df2.col("dname"),df2.col("loc"));Dataset<Row> df4 = df3.orderBy(df3.col("dname").desc(),df3.col("ename").asc() );df4.show();spark.stop();}
}

测试记录:

2.8 SparkSQL操作文件

SparkSession上的sql函数允许应用程序以编程方式运行sql查询,并将结果作为Dataset返回。

代码:

package org.example;import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;public class SparkSQLTest16 {public static void main(String[] args){SparkSession spark = SparkSession.builder().appName("SparkSQLTest16").config("spark.some.config.option", "some-value").getOrCreate();Dataset<Row> df = spark.read().json("file:///home/pyspark/test.json");df.createOrReplaceTempView("people");spark.sql("select * from people where age = 12").show();spark.stop();}}

测试记录:

参考:

  1. http://spark.apache.org/docs/2.4.2/sql-getting-started.html
  2. http://spark.apache.org/docs/latest/sql-ref-functions-udf-scalar.html
  3. http://spark.apache.org/docs/latest/api/java/index.html

Java-Spark系列6-Spark SQL编程实战相关推荐

  1. 【大数据Spark系列】Spark教程:详细全部

    Spark作为Apache顶级的开源项目,是一个快速.通用的大规模数据处理引擎,和Hadoop的MapReduce计算框架类似,但是相对于MapReduce,Spark凭借其可伸缩.基于内存计算等特点 ...

  2. oracle实战编程,Oracle Databa se 11g PL/SQL编程实战_IT教程网

    资源名称:Oracle Database 11g PL/SQL编程实战 内容简介: <oracle database 11gpl/sql编程实战>通过动手练习.大量的示例以及实际的项目帮助 ...

  3. Spark系列之Spark概述

    title: Spark系列 What is Apache Spark™? Apache Spark™ is a multi-language engine for executing data en ...

  4. Spark系列之Spark体系架构

    title: Spark系列 第四章 Spark体系架构 4.1 Spark核心功能 Alluxio 原来叫 tachyon 分布式内存文件系统 Spark Core提供Spark最基础的最核心的功能 ...

  5. Spark系列之Spark启动与基础使用

    title: Spark系列 第三章 Spark启动与基础使用 3.1 Spark Shell 3.1.1 Spark Shell启动 安装目录的bin目录下面,启动命令: spark-shell $ ...

  6. Spark系列之Spark在不同集群中的架构

    title: Spark系列 第十二章 Spark在不同集群中的架构 ​ Spark 注重建立良好的生态系统,它不仅支持多种外部文件存储系统,提供了多种多样的集群运行模式.部署在单台机器上时,既可以用 ...

  7. Spark系列之Spark的资源调优

    title: Spark系列 第十一章 Spark的资源调优 11.1 概述 ​ 在开发完Spark作业之后,就该为作业配置合适的资源了.Spark的资源参数,基本都可以在sparksubmit命令中 ...

  8. Spark系列之Spark应用程序运行机制

    声明:         文章中代码及相关语句为自己根据相应理解编写,文章中出现的相关图片为自己实践中的截图和相关技术对应的图片,若有相关异议,请联系删除.感谢.转载请注明出处,感谢. By luoye ...

  9. 大数据Spark系列之Spark单机环境搭建

    1. 下载spark与scala Spark下载地址 http://mirrors.hust.edu.cn/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoo ...

  10. 标杆徐linux云计算视频,标杆徐2018 Linux自动化运维系列④: Shell脚本自动化编程实战...

    2019年录制SHell新课地址 贴切企业脚本编写思路讲解,带你玩Shell脚本编程实战. 本套课程从实际项目案例出发,近100个Shell实例讲解,由浅入深,循序渐进,带你玩转Shell编程的方方面 ...

最新文章

  1. CSDN线上直播操作测试方案
  2. CYQ.Data 轻量数据层之路 使用篇三曲 MAction 取值赋值(十四)
  3. [NOTE] Web For Pentester靶场练习笔记
  4. 前端JavaScripts
  5. Codeforces Round #740 (Div. 2, based on VK Cup 2021 - Final (Engine)) A-F全题解
  6. impdp导入dmp文件
  7. 程序员想找工作怎么办?如果记住这一点,不怕找不到好工作!
  8. (44)FPGA条件编译(选择语句)
  9. oracle 批量杀死 死锁进程
  10. HttpRunnerManager(一)--安装
  11. windows2003密码忘记了该如何处理
  12. WebGIS中利用AGS JS+eCharts实现一些数据展示的探索
  13. Webstorm配置运行React Native
  14. 荔枝派通过usb烧录时出现ERROR: Allwinner USB FEL device not found!
  15. 深度揭秘:诺基亚的百年沧桑
  16. 个人使用整理的部分测试数据整理-输入类2022
  17. 攻防对抗形势下代码重用技术的演进
  18. 16丨数据分析基础篇答疑
  19. android实现基于表情识别和敲击识别的认证系统,表情识别支持自动的连续隐藏式拍照
  20. 生态链 (topsort

热门文章

  1. android打开cad文件怎么打开方式,dwg文件怎么打开_手机上dwg文件用什么程序可以打开...
  2. Python网络数据采集的方法
  3. 高等工程热力学复习01
  4. unc0ver 越狱工具来袭,支持iOS11.0~iOS14.8稳定越狱 更新至V8.0.2 支持 A7-A14
  5. 计算机说课大赛ppt,计算机基础说课大赛省一等奖说课课件PPT作品
  6. MTK刷机工具Flash_Tool部分4032错误解决办法
  7. dpt rp1 android apk,DPT-RP1 新固件
  8. python 声音强度检测_python – 从声音文件中检测频率
  9. 发现并充分发挥你的长处—盖洛普优势测试
  10. html隐藏图片白色部分,css去掉gif透明图片的白边