目录

案例一:花式查询

案例二:WordCount

基于DSL编程

基于SQL编程

具体演示代码如下:


案例一:花式查询

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** Author itcast* Desc 演示SparkSQL的各种花式查询*/
object FlowerQueryDemo {case class Person(id:Int,name:String,age:Int)def main(args: Array[String]): Unit = {//1.准备环境-SparkSessionval spark: SparkSession = SparkSession.builder().appName("SparkSQL").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//2.加载数据val lines: RDD[String] = sc.textFile("data/input/person.txt")//3.切割//val value: RDD[String] = lines.flatMap(_.split(" "))//错误的val linesArrayRDD: RDD[Array[String]] = lines.map(_.split(" "))//4.将每一行(每一个Array)转为样例类(相当于添加了Schema)val personRDD: RDD[Person] = linesArrayRDD.map(arr=>Person(arr(0).toInt,arr(1),arr(2).toInt))//5.将RDD转为DataFrame(DF)//注意:RDD的API中没有toDF方法,需要导入隐式转换!import spark.implicits._val personDF: DataFrame = personRDD.toDF//6.查看约束personDF.printSchema()//7.查看分布式表中的数据集personDF.show(6,false)//false表示不截断列名,也就是列名很长的时候不会用...代替//演示SQL风格查询//0.注册表名//personDF.registerTempTable("t_person")//已经过时//personDF.createTempView("t_person")//创建表,如果已存在则报错:TempTableAlreadyExistsException//personDF.createOrReplaceGlobalTempView("t_person")//创建全局表,可以夸session使用,查询的时候使用:SELECT * FROM global_temp.表名;生命周期太大,一般不用personDF.createOrReplaceTempView("t_person")//创建一个临时表,只有当前session可用!且表如果存在会替换!//1.查看name字段的数据spark.sql("select name from t_person").show//2.查看 name 和age字段数据spark.sql("select name,age from t_person").show//3.查询所有的name和age,并将age+1spark.sql("select name,age,age+1 from t_person").show//4.过滤age大于等于25的spark.sql("select name,age from t_person where age >=25").show//5.统计年龄大于30的人数spark.sql("select count(age) from t_person where age >30").show//6.按年龄进行分组并统计相同年龄的人数spark.sql("select age,count(age) from t_person group by age").show//演示DSL风格查询//1.查看name字段的数据import org.apache.spark.sql.functions._personDF.select(personDF.col("name")).showpersonDF.select(personDF("name")).showpersonDF.select(col("name")).showpersonDF.select("name").show//2.查看 name 和age字段数据personDF.select(personDF.col("name"),personDF.col("age")).showpersonDF.select("name","age").show//3.查询所有的name和age,并将age+1//personDF.select("name","age","age+1").show//错误,没有age+1这一列//personDF.select("name","age","age"+1).show//错误,没有age1这一列personDF.select(col("name"),col("age"),col("age")+1).showpersonDF.select($"name",$"age",$"age"+1).show//$表示将"age"变为了列对象,先查询再和+1进行计算personDF.select('name,'age,'age+1).show//'表示将age变为了列对象,先查询再和+1进行计算//4.过滤age大于等于25的,使用filter方法/where方法过滤personDF.select("name","age").filter("age>=25").showpersonDF.select("name","age").where("age>=25").show//5.统计年龄大于30的人数personDF.where("age>30").count()//6.按年龄进行分组并统计相同年龄的人数personDF.groupBy("age").count().show}}

​​​​​​​案例二:WordCount

前面使用RDD封装数据,实现词频统计WordCount功能,从Spark 1.0开始,一直到Spark 2.0,建立在RDD之上的一种新的数据结构DataFrame/Dataset发展而来,更好的实现数据处理分析。DataFrame 数据结构相当于给RDD加上约束Schema,知道数据内部结构(字段名称、字段类型),提供两种方式分析处理数据:DataFrame API(DSL编程)和SQL(类似HiveQL编程),下面以WordCount程序为例编程实现,体验DataFrame使用。

基于DSL编程

使用SparkSession加载文本数据,封装到Dataset/DataFrame中,调用API函数处理分析数据(类似RDD中API函数,如flatMap、map、filter等),编程步骤:

第一步、构建SparkSession实例对象,设置应用名称和运行本地模式;

第二步、读取HDFS上文本文件数据;

第三步、使用DSL(Dataset API),类似RDD API处理分析数据;

第四步、控制台打印结果数据和关闭SparkSession;

基于SQL编程

也可以实现类似HiveQL方式进行词频统计,直接对单词分组group by,再进行count即可,步骤如下:

第一步、构建SparkSession对象,加载文件数据,分割每行数据为单词;

第二步、将DataFrame/Dataset注册为临时视图(Spark 1.x中为临时表);

第三步、编写SQL语句,使用SparkSession执行获取结果;

第四步、控制台打印结果数据和关闭SparkSession;

具体演示代码如下:

package cn.itcast.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}/*** Author itcast* Desc 使用SparkSQL完成WordCount---SQL风格和DSL风格*/
object WordCount {def main(args: Array[String]): Unit = {//1.准备环境val spark: SparkSession = SparkSession.builder().appName("WordCount").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")import spark.implicits._//2.加载数据//val rdd: RDD[String] = sc.textFile("data/input/words.txt")//可以使用该方式,然后使用昨天的知识将rdd转为df/dsval df: DataFrame = spark.read.text("data/input/words.txt")val ds: Dataset[String] = spark.read.textFile("data/input/words.txt")//df.show()//查看分布式表数据//ds.show()//查看分布式表数据//3.做WordCount//切割//df.flatMap(_.split(" ")) //注意:直接这样写报错!因为df没有泛型,不知道_是String!//df.flatMap(row=>row.getAs[String]("value").split(" "))val wordsDS: Dataset[String] = ds.flatMap(_.split(" "))//wordsDS.show()//使用SQL风格做WordCountwordsDS.createOrReplaceTempView("t_words")val sql:String ="""|select value,count(*) as count|from t_words|group by value|order by count desc|""".stripMarginspark.sql(sql).show()//使用DSL风格做WordCountwordsDS.groupBy("value").count().orderBy($"count".desc).show()/*+-----+-----+|value|count|+-----+-----+|hello|    4||  her|    3||  you|    2||   me|    1|+-----+-----++-----+-----+|value|count|+-----+-----+|hello|    4||  her|    3||  you|    2||   me|    1|+-----+-----+*/}
}

无论使用DSL还是SQL编程方式,底层转换为RDD操作都是一样,性能一致,查看WEB UI监控中Job运行对应的DAG图如下:

从上述的案例可以发现将数据封装到Dataset/DataFrame中,进行处理分析,更加方便简洁,这就是Spark框架中针对结构化数据处理模:Spark SQL模块

官方文档:http://spark.apache.org/sql/

2021年大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount相关推荐

  1. 2021年大数据ELK(十七):Elasticsearch SQL 订单统计分析案例

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 订单统计分析案例 一.案例介绍 二.创建索引 三.导入测试数据 四.统计不同支 ...

  2. 2021年大数据Spark(十七):Spark Core的RDD持久化

    目录 RDD 持久化 引入 API 缓存/持久化函数 缓存/持久化级别 释放缓存/持久化 代码演示 总结:何时使用缓存/持久化 RDD 持久化 引入 在实际开发中某些RDD的计算或转换可能会比较耗费时 ...

  3. 2021年大数据Spark(三十六):SparkStreaming实战案例一 WordCount

    目录 SparkStreaming实战案例一 WordCount 需求 准备工作 代码实现 第一种方式:构建SparkConf对象 第二种方式:构建SparkContext对象 完整代码如下所示: 应 ...

  4. 2021年大数据Spark(三十九):SparkStreaming实战案例四 窗口函数

    目录 SparkStreaming实战案例四 窗口函数 需求 代码实现 SparkStreaming实战案例四 窗口函数 需求 使用窗口计算: 每隔5s(滑动间隔)计算最近10s(窗口长度)的数据! ...

  5. 2021年大数据ELK(四):Lucene的美文搜索案例

    全网最详细的大数据ELK文章系列,强烈建议收藏加关注! 新文章都已经列出历史文章目录,帮助大家回顾前面的知识重点. 目录 系列历史文章 美文搜索案例 一.需求 二.准备工作 1.创建IDEA项目 2. ...

  6. 大数据|Spark技术在京东智能供应链预测的应用案例深度剖析(一)

    大数据|Spark技术在京东智能供应链预测的应用案例深度剖析(一) 2017-03-27 11:58  浏览次数:148 1. 背景 前段时间京东公开了面向第二个十二年的战略规划,表示京东将全面走向技 ...

  7. 2021年大数据Spark(二十三):SparkSQL 概述

    目录 SparkSQL 概述 前世今生 Shark 框架-淘汰了 SparkSQL 模块 Hive 与 SparkSQL 官方定义 第一.针对结构化数据处理,属于Spark框架一个部分 第二.抽象数据 ...

  8. 大数据Spark(二十七):SparkSQL案例一花式查询和案例二WordCount

    文章目录 案例一:花式查询 案例二:WordCount 基于DSL编程 基于SQL编程

  9. 2021年大数据Spark(五十二):Structured Streaming 事件时间窗口分析

    目录 事件时间窗口分析 时间概念 ​​​​​​​event-time ​​​​​​​延迟数据处理 ​​​​​​​延迟数据 ​​​​​​​Watermarking 水位 ​​​​​​​官方案例演示 事件 ...

最新文章

  1. 速度快到飞起 如何跟蜻蜓的大脑学习计算?
  2. Java与JavaScript 完美实现字符串拆分(利用数组存储)与合并的互逆操作
  3. 有谁还不会找技能Call跟我来(以打坐为例子)
  4. 理想边界尺寸怎么算_钻石型淋浴房三边尺寸怎么算?安装步骤有哪些?
  5. BZOJ 1588: [HNOI2002]营业额统计
  6. 使命召唤手游迎来欧阳娜娜,这阵容够豪华,玩家期待吗?
  7. Docker 环境下部署 redash
  8. 【LOJ6033】棋盘游戏【二分图博弈】
  9. Nobody can go back and start a new beginning, but anyone can start now and make a new ending.
  10. (六)关于beetlsql版本(分支)的说明
  11. iOS开发之UIAlertController的使用
  12. PCB中 D-Subminiature(DB接口) 连接器系列分类及带有3D封装绘制
  13. Conficker蠕虫作者可随时引爆“网络核武”
  14. 【深度相机系列四】深度相机原理揭秘--结构光(iPhone X 齐刘海原理)
  15. R语言基础图形绘制——箱线图
  16. C语言|const的使用
  17. AtCoder - ABC 167 - E(数学推理+组合数)
  18. 什么是自动化运维?自动化运维必备技能有哪些?
  19. Jmeter入门教程之配置原件(二)
  20. AUTH 使用登录验证

热门文章

  1. 2022-2028年中国硅藻土产业发展态势及市场发展策略报告
  2. 2022-2028年中国防水橡胶布行业市场发展模式及投资前景分析报告
  3. Docker学习(四)-----Docker容器常用命令
  4. Python List extend()方法
  5. Tengine AIFramework框架
  6. GPU上稀疏矩阵的基本线性代数
  7. 计图MPI分布式多卡
  8. 队列:实用程序服务和数据结构
  9. 嵌入式Linux设备驱动程序:编写内核设备驱动程序
  10. 数据治理(三):数据质量管理