1 Spark SQL

  • 编程方式:(1)SQL;(2) DataFrame API
scala> case class Customer(id:Int,name:String,age:Int)
defined class Customerscala> val arr = Array("1,Mike,20","2,Mary,19","3,Jerry,23")
arr: Array[String] = Array(1,Mike,20, 2,Mary,19, 3,Jerry,23)scala> val rdd1 = sc.parallelize(arr)
rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26scala> rdd1.collect
res1: Array[String] = Array(1,Mike,20, 2,Mary,19, 3,Jerry,23)scala> :paste
// Entering paste mode (ctrl-D to finish)rdd1.map(e=>{
val arr = e.split(",")
Customer(arr(0).toInt,arr(1),arr(2).toInt)
})// Exiting paste mode, now interpreting.res2: org.apache.spark.rdd.RDD[Customer] = MapPartitionsRDD[2] at map at <console>:31scala> val rdd2 = res2
rdd2: org.apache.spark.rdd.RDD[Customer] = MapPartitionsRDD[2] at map at <console>:31scala> rdd2.collect
res3: Array[Customer] = Array(Customer(1,Mike,20), Customer(2,Mary,19), Customer(3,Jerry,23))scala> val df = spark.createDataFrame(rdd2)
18/12/28 18:38:10 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> df.printSchema
root|-- id: integer (nullable = false)|-- name: string (nullable = true)|-- age: integer (nullable = false)scala> df.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  1| Mike| 20|
|  2| Mary| 19|
|  3|Jerry| 23|
+---+-----+---+
scala> df.createTempView("customer")scala> val df2 = spark.sql("select * from customer")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> df2.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  1| Mike| 20|
|  2| Mary| 19|
|  3|Jerry| 23|
+---+-----+---+scala> val df2 = spark.sql("select * from customer where id <2")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> df2.show
+---+----+---+
| id|name|age|
+---+----+---+
|  1|Mike| 20|
+---+----+---+
scala> val df1 = spark.sql("select * from customer where id < 2")
df1: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> df1.show
+---+----+---+
| id|name|age|
+---+----+---+
|  1|Mike| 20|
+---+----+---+scala> val df2 = spark.sql("select * from customer where id > 2")
df2: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> df2.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  3|Jerry| 23|
+---+-----+---+// union => 纵向连接
scala> val dff = spark.sql("select * from c1 union select * from c2")
dff: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]scala> dff.show
+---+-----+---+
| id| name|age|
+---+-----+---+
|  3|Jerry| 23|
|  1| Mike| 20|
+---+-----+---+

2 Spark SQL 读取 json 文件

  • Dataset<Row> === DataFrame,类似于table的操作
  • SparkSession.read().json()
  • SparkSession.write().json()

2.1 Spark SQL Java 版本

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;public class SQLJava {public static void main(String[] args) {SparkSession session = SparkSession.builder().appName("SQLJava").config("spark.master", "local[2]").getOrCreate();Dataset<Row> df = session.read().json("d:/json.json");df.createOrReplaceTempView("stu");df = session.sql("select * from stu");df.show();}
}

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;import java.util.function.Consumer;public class SQLJava {public static void main(String[] args) {SparkSession session = SparkSession.builder().appName("SQLJava").config("spark.master", "local[2]").getOrCreate();Dataset<Row> df1 = session.read().json("d:/json.json");df1.createOrReplaceTempView("stu");df1 = session.sql("select * from stu");df1.show();Dataset<Row> df2 = session.sql("select * from stu where age > 20");df2.show();System.out.println("=============================");//聚合查询Dataset<Row> dfCount = session.sql("select count(*) from stu");dfCount.show();/** DataFrame 转换为 RDD* */JavaRDD<Row> rdd = df1.toJavaRDD();rdd.collect().forEach(new Consumer<Row>() {public void accept(Row row) {Long id = row.getAs("id");String name = row.getAs("name");Long age = row.getAs("age");System.out.println(id + "-" + name + "-" + age);}});}
}

大数据实时计算Spark学习笔记(9)—— Spar SQL(1) 读取 json 文件相关推荐

  1. .NET 大数据实时计算--学习笔记

    摘要 纯 .Net 自研大数据实时计算平台,在中通快递服务数百亿包裹,处理数据万亿计!将分享大数据如何落地以及设计思路,技术重难点. 目录 背景介绍 计算平台架构 项目实战 背景介绍 计算平台架构 分 ...

  2. 1. 大数据实时计算介绍

    Spark Streaming,其实就是一种Spark提供的,对于大数据,进行实时计算的一种框架.它的底层,其实,也是基于我们之前讲解的Spark Core的.基本的计算模型,还是基于内存的大数据实时 ...

  3. 【大数据实时计算框架】Storm框架

    一.大数据实时计算框架 1.什么是实时计算?流式计算? (一)什么是Storm? Storm为分布式实时计算提供了一组通用原语,可被用于"流处理"之中,实时处理消息并更新数据库.这 ...

  4. 大数据实时计算工程师/Hadoop工程师/数据分析师职业路线图

    大数据实时计算工程师/Hadoop工程师/数据分析师职业路线图 http://edu.51cto.com/roadmap/view/id-29.html http://my.oschina.net/i ...

  5. 接近淘宝 80%的大数据实时计算平台,从0搭建的经验和坑

    上周一,来自武汉的直播平台斗鱼TV宣布C轮融资,腾讯领投的 15 亿人民币,距其获得 B 轮1亿美元不到半年,也是大写的牛逼. 但小寻更关心他们的大数据架构,作为一个在 2 年多时间里崛起的公司,其流 ...

  6. Flink大数据实时计算系列-案例初体验:HotPages

    Flink大数据实时计算系列-案例初体验:HotPages 目录 HotPages代码 输入日志 运行结果 HotPages代码 /*** Copyright (c) 2018-2028 尚硅谷 Al ...

  7. Flink大数据实时计算系列-Flink的Operator Chains的优化机制

    Flink大数据实时计算系列-Flink的Operator Chains的优化机制 目录 Flink改变并行度 并行度改为3 并行度改为2 Flink Operator Chains Flink gr ...

  8. Flink大数据实时计算系列-Flink写出多个parquet小文件处理方法、Presto的介绍与使用场景

    Flink大数据实时计算系列-Flink写出多个parquet小文件处理方法.Presto的介绍与使用场景 Presto的安装与使用 目录 Flink写出多个parquet小文件处理方法 Presto ...

  9. Flink大数据实时计算系列-列式存储parquet文件格式介绍、Flink进行rowformat格式文件保存

    Flink大数据实时计算系列-列式存储parquet文件格式介绍 Flink进行rowformat格式文件保存 列式存储parquet文件格式介绍

最新文章

  1. SQL Server中Identity标识列
  2. 关于学习Python的一点学习总结(19->if及相关的符号运算)
  3. mysql与sqlserver之间的关系转换
  4. [3]工欲善其事必先利其器-------UML常用的图(三)
  5. [云炬创业学笔记]第一章创业是什么测试14
  6. pl0源码(可在delphi7中运行)
  7. OpenCV cv :: UMat与DirectX9曲面的互操作性的实例(附完整代码)
  8. Delphi全局热键的注册
  9. MooTools1.3.1 API(Core)学习及试译(三)——Types(二)
  10. SharePoint REST API - 一个请求批量操作
  11. 【广告技术】隐私集合交集运算结合同态加密,在保障数据安全的同时追踪广告效果
  12. linux asm 使用情况,在Linux 6上使用UDEV解决 ASM存储设备问题( single path)
  13. python学到什么程度可以找到工作-Python学到什么程度可以面试工作?
  14. 如何利用Python爬虫获取网络小说
  15. c语言运行时电脑蓝屏,Windows系统蓝屏时系统都在后台做了什么?-系统蓝屏
  16. Vue回炉重造之图片加载性能优化
  17. 学习型组织的思维方式:保持努力,终身成长!
  18. udev源码开源下载地址分享
  19. 联想服务器控制口登录地址_常用服务器管理口IP及账号密码(欢迎补充)
  20. 使用scaffold-eth脚手架快速构建 Web3 Dapp 应用

热门文章

  1. 【Photoshop调色教程】通用调色技巧详解
  2. Appium 测试遇到问题解决方案
  3. R语言时间序列代码整理
  4. 一个高效的通用光学卫星数据正射校正程序
  5. MySQL高可用与读写分离
  6. iconv java_如何使用iconv(3)将宽字符串转换为UTF-8?
  7. redis详细介绍附实例代码--看一篇就够了
  8. 06 echarts的基本使用6-地图
  9. 说说谷歌Chrome浏览器无痕浏览器窗口
  10. 简单介绍apache网页优化