翻译自:How to use SparkSession in Apache Spark 2.0
转载自:
  - SparkSession简单介绍 (是否为原创初始翻译不详)
  - Spark 2.0系列之SparkSession详解 (最后三节)

向原文作者以及原创翻译者的辛勤劳动致敬!

前言

  Apache Spark 2.0引入了SparkSession,为用户提供了一个统一的切入点来使用Spark的各项功能,并且允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序。最重要的是,它减少了用户需要了解的一些概念,使得我们可以很容易地与Spark交互。

  本文将介绍在Spark 2.0中如何使用SparkSession。

探索SparkSession统一的功能

  首先,我们介绍一个简单的Spark应用案例:SparkSessionZipsExample,其从JSON文件中读取邮政编码,并且通过DataFrame API进行一些分析,之后使用Spark SQL进行一些查询,这些操作并没有使用到SparkContext, SQLContext 或者HiveContext。

创建SparkSession

  在2.0版本之前,与Spark交互之前必须先创建SparkConf和SparkContext,代码如下:

//set up the spark configuration and create contexts
val sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")
// your handle to SparkContext to access other context like SQLContext
val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)  

  然而在Spark 2.0中,我们可以通过SparkSession来实现同样的功能,而不需要显式地创建SparkConf, SparkContext 以及 SQLContext,因为这些对象已经封装在SparkSession中。使用生成器的设计模式(builder design pattern),如果我们没有创建SparkSession对象,则会实例化出一个新的SparkSession对象及其相关的上下文。

// Create a SparkSession. No need to create SparkContext
// You automatically get it as part of the SparkSession
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession  .builder()  .appName("SparkSessionZipsExample")  .config("spark.sql.warehouse.dir", warehouseLocation)  .enableHiveSupport()  .getOrCreate()  

  到现在我们可以使用上面创建好的spark对象,并且访问其public方法。

配置Spark运行相关属性

  一旦我们创建好了SparkSession,我们就可以配置Spark运行相关属性。比如下面代码片段我们修改了已经存在的运行配置选项。

//set new runtime options
spark.conf.set("spark.sql.shuffle.partitions", 6)
spark.conf.set("spark.executor.memory", "2g")
//get all settings
val configMap:Map[String, String] = spark.conf.getAll()  

获取Catalog元数据

  通常我们想访问当前系统的Catalog元数据。SparkSession提供了catalog实例来操作metastore。这些方法放回的都是Dataset类型的,所有我们可以使用Dataset相关的API来访问其中的数据。如下代码片段,我们展示了所有的表并且列出当前所有的数据库:

//fetch metadata data from the catalog
scala> spark.catalog.listDatabases.show(false)
+--------------+---------------------+--------------------------------------------------------+
|name          |description          |locationUri                                             |
+--------------+---------------------+--------------------------------------------------------+
|default       |Default Hive database|hdfs://iteblogcluster/user/iteblog/hive/warehouse       |
+--------------+---------------------+--------------------------------------------------------+  scala> spark.catalog.listTables.show(false)
+----------------------------------------+--------+-----------+---------+-----------+
|name                                    |database|description|tableType|isTemporary|
+----------------------------------------+--------+-----------+---------+-----------+
|iteblog                                 |default |null       |MANAGED  |false      |
|table2                                  |default |null       |EXTERNAL |false      |
|test                                    |default |null       |MANAGED  |false      |
+----------------------------------------+--------+-----------+---------+-----------+  

创建Dataset和Dataframe

  使用SparkSession APIs创建 DataFrames 和 Datasets的方法有很多,其中最简单的方式就是使用spark.range方法来创建一个Dataset。当我们学习如何操作Dataset API的时候,这个方法非常有用。操作如下:

scala> val numDS = spark.range(5, 100, 5)
numDS: org.apache.spark.sql.Dataset[Long] = [id: bigint]  scala> numDS.orderBy(desc("id")).show(5)
+---+
| id|
+---+
| 95|
| 90|
| 85|
| 80|
| 75|
+---+
only showing top 5 rows  scala> numDS.describe().show()
+-------+------------------+
|summary|                id|
+-------+------------------+
|  count|                19|
|   mean|              50.0|
| stddev|28.136571693556885|
|    min|                 5|
|    max|                95|
+-------+------------------+
scala> val langPercentDF = spark.createDataFrame(List(("Scala", 35),  | ("Python", 30), ("R", 15), ("Java", 20)))
langPercentDF: org.apache.spark.sql.DataFrame = [_1: string, _2: int]  scala> val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")
lpDF: org.apache.spark.sql.DataFrame = [language: string, percent: int]  scala> lpDF.orderBy(desc("percent")).show(false)
+--------+-------+
|language|percent|
+--------+-------+
|Scala   |35     |
|Python  |30     |
|Java    |20     |
|R       |15     |
+--------+-------+  

使用SparkSession读取CSV

  创建完SparkSession之后,我们就可以使用它来读取数据,下面代码片段是使用SparkSession来从csv文件中读取数据:

val df = sparkSession.read.option("header","true").  csv("src/main/resources/sales.csv")  

上面代码非常像使用SQLContext来读取数据,我们现在可以使用SparkSession来替代之前使用SQLContext编写的代码。下面是完整的代码片段:

package com.iteblog  import org.apache.spark.sql.SparkSession  /**  * Spark Session example  *  */
object SparkSessionExample {  def main(args: Array[String]) {  val sparkSession = SparkSession.builder.  master("local")  .appName("spark session example")  .getOrCreate()  val df = sparkSession.read.option("header","true").csv("src/main/resources/sales.csv")  df.show()  }  }  

使用SparkSession API读取JSON数据

  我们可以使用SparkSession来读取JSON、CVS或者TXT文件,甚至是读取parquet表。比如在下面代码片段里面,我将读取邮编数据的JSON文件,并且返回DataFrame对象:

// read the json file and create the dataframe
scala> val jsonFile = "/user/iteblog.json"
jsonFile: String = /user/iteblog.json
scala> val zipsDF = spark.read.json(jsonFile)
zipsDF: org.apache.spark.sql.DataFrame = [_id: string, city: string ... 3 more fields]  scala> zipsDF.filter(zipsDF.col("pop") > 40000).show(10, false)
+-----+----------+-----------------------+-----+-----+
|_id  |city      |loc                    |pop  |state|
+-----+----------+-----------------------+-----+-----+
|01040|HOLYOKE   |[-72.626193, 42.202007]|43704|MA   |
|01085|MONTGOMERY|[-72.754318, 42.129484]|40117|MA   |
|01201|PITTSFIELD|[-73.247088, 42.453086]|50655|MA   |
|01420|FITCHBURG |[-71.803133, 42.579563]|41194|MA   |
|01701|FRAMINGHAM|[-71.425486, 42.300665]|65046|MA   |
|01841|LAWRENCE  |[-71.166997, 42.711545]|45555|MA   |
|01902|LYNN      |[-70.941989, 42.469814]|41625|MA   |
|01960|PEABODY   |[-70.961194, 42.532579]|47685|MA   |
|02124|DORCHESTER|[-71.072898, 42.287984]|48560|MA   |
|02146|BROOKLINE |[-71.128917, 42.339158]|56614|MA   |
+-----+----------+-----------------------+-----+-----+
only showing top 10 rows  

在SparkSession中还用Spark SQL

  通过SparkSession我们可以访问Spark SQL中所有函数,正如你使用SQLContext访问一样。下面代码片段中,我们创建了一个表,并在其中使用SQL查询:

// Now create an SQL table and issue SQL queries against it without
// using the sqlContext but through the SparkSession object.
// Creates a temporary view of the DataFrame
scala> zipsDF.createOrReplaceTempView("zips_table")  scala> zipsDF.cache()
res3: zipsDF.type = [_id: string, city: string ... 3 more fields]  scala> val resultsDF = spark.sql("SELECT city, pop, state, _id FROM zips_table")
resultsDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]  scala> resultsDF.show(10)
+------------+-----+-----+-----+
|        city|  pop|state|  _id|
+------------+-----+-----+-----+
|      AGAWAM|15338|   MA|01001|
|     CUSHMAN|36963|   MA|01002|
|       BARRE| 4546|   MA|01005|
| BELCHERTOWN|10579|   MA|01007|
|   BLANDFORD| 1240|   MA|01008|
|   BRIMFIELD| 3706|   MA|01010|
|     CHESTER| 1688|   MA|01011|
|CHESTERFIELD|  177|   MA|01012|
|    CHICOPEE|23396|   MA|01013|
|    CHICOPEE|31495|   MA|01020|
+------------+-----+-----+-----+
only showing top 10 rows  

使用SparkSession读写Hive表

  下面我们将使用SparkSession创建一个Hive表,并且对这个表进行一些SQL查询,正如你使用HiveContext一样:

scala> spark.sql("DROP TABLE IF EXISTS iteblog_hive")
res5: org.apache.spark.sql.DataFrame = []  scala> spark.table("zips_table").write.saveAsTable("iteblog_hive")
16/08/24 21:52:59 WARN HiveMetaStore: Location: hdfs://iteblogcluster/user/iteblog/hive/warehouse/iteblog_hive specified for non-external table:iteblog_hive  scala> val resultsHiveDF = spark.sql("SELECT city, pop, state, _id FROM iteblog_hive WHERE pop > 40000")
resultsHiveDF: org.apache.spark.sql.DataFrame = [city: string, pop: bigint ... 2 more fields]  scala> resultsHiveDF.show(10)
+----------+-----+-----+-----+
|      city|  pop|state|  _id|
+----------+-----+-----+-----+
|   HOLYOKE|43704|   MA|01040|
|MONTGOMERY|40117|   MA|01085|
|PITTSFIELD|50655|   MA|01201|
| FITCHBURG|41194|   MA|01420|
|FRAMINGHAM|65046|   MA|01701|
|  LAWRENCE|45555|   MA|01841|
|      LYNN|41625|   MA|01902|
|   PEABODY|47685|   MA|01960|
|DORCHESTER|48560|   MA|02124|
| BROOKLINE|56614|   MA|02146|
+----------+-----+-----+-----+
only showing top 10 rows  

  正如你所见,你使用DataFrame API, Spark SQL 以及 Hive查询的结果都一样。关于这个例子的完整代码可以到这里获取。

Spark REPL和Databricks Notebook中的SparkSession对象

  在之前的Spark版本中,Spark shell会自动创建一个SparkContext对象sc。2.0中Spark shell则会自动创建一个SparkSession对象(spark),在输入spark时就会发现它已经存在了。

  在Databricks notebook中创建集群时也会自动生成一个SparkSession,这里用的名字也是spark。

SparkSession和SparkContext

  下图说明了SparkContext在Spark中的主要功能。

  从图中可以看到SparkContext起到的是一个中介的作用,通过它来使用Spark其他的功能。每一个JVM都有一个对应的SparkContext,driver program通过SparkContext连接到集群管理器来实现对集群中任务的控制。Spark配置参数的设置以及对SQLContext、HiveContext和StreamingContext的控制也要通过SparkContext进行。

  不过在Spark2.0中上述的一切功能都是通过SparkSession来完成的,同时SparkSession也简化了DataFrame/Dataset API的使用和对数据的操作。

小结

  本文通过一个简单的例子演示了SparkSession的使用方法并与Spark2.0之前版本中的SparkContext进行了对比。总而言之SparkSession是既容易学又方便用。

Spark 2.0的SparkSession详解相关推荐

  1. Spark SQL原理及常用方法详解(二)

    Spark SQL 一.Spark SQL基础知识 1.Spark SQL简介 (1)简单介绍 (2)Datasets & DataFrames (3)Spark SQL架构 (4)Spark ...

  2. Spark: sortBy和sortByKey函数详解

    在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外.在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数.sortBy是对标准的RDD进行排序,它是 ...

  3. ASP.NET2.0 ReportingServices使用详解

    ASP.NET2.0 ReportingServices使用详解 作者:清清月儿 主页:http://blog.csdn.net/21aspnet/          时间:2007.4.9 本文先做 ...

  4. Python Numpy多维数组.sum(axis=0/1/2...) 详解

    Python Numpy多维数组.sum(axis=0/1/2-) 详解 numpy中axis取值的说明 首先对numpy中axis取值进行说明:一维数组时axis=0,二维数组时axis=0,1,维 ...

  5. Linux redis安装教程,Linux 下redis5.0.0安装教程详解

    Linux redis5.0.0安装,教程如下所示: 1.从官网下载,然后传到服务器,tar -zxvf解压 2.进入redis ? 3.安装:make, (1)若提示:: gcc: Command ...

  6. Zabbix5.0监控系统安装详解

    Zabbix5.0监控系统安装详解 一.Zabbix介绍 二.Zbbix的LAMP环境安装 1.防火墙和SElinux配置 2.安装LAMP环境 三.安装Zabbix软件 四.Zabbix的Mysql ...

  7. 稳扎稳打Silverlight(17) - 2.0数据之详解DataGrid, 绑定数据到ListBox

    [索引页] [源码下载] 稳扎稳打Silverlight(17) - 2.0数据之详解DataGrid, 详解ListBox 作者:webabcd 介绍 Silverlight 2.0 详解DataG ...

  8. Spring 3.0 注解注入详解

    Spring 3.0 注解注入详解 2011-04-15 09:44 17ZOUGUO ITEYE博客 我要评论(1) 字号:T | T AD: 一.各种注解方式 1.@Autowired注解(不推荐 ...

  9. spark on yarn 内存分配详解

    spark on yarn 内存分配详解

最新文章

  1. AI就是“大数据+机器学习”?答案是否定的
  2. 安全性问题(数据篡改(拿到别人的URL,篡改数据(金额)发送给系统))
  3. 最短路径——Dijkstra算法以及二叉堆优化(含证明)
  4. 【SSM面向CRUD编程专栏 7】springAop 事务控制
  5. ES6 let 和 const 关键字
  6. open() 函数以 w+ 模式打开文件
  7. 7-234 两个有序序列的中位数 (25 分)
  8. php 后端刷新页面
  9. 外网访问Vmware虚拟机中的某个服务(如http)
  10. 继CDH收费之后,这家公司率先推出了免费版大数据套件服务!
  11. foobar android 目录,最强手机音乐播放器?Foobar2K安卓版体验
  12. android手机usb设置在哪里,手机usb调试在哪,教您安卓手机怎么打开USB调试
  13. Node.js meitulu图片批量下载爬虫 1.05版(Final最终版)
  14. android手机界面分区,Android手机fastboot 刷机命令(示例代码)
  15. 后台批量刷新金蝶K3物料名称操作步骤
  16. 【计算机网络】计算机网络的组成
  17. mysql容灾方案_mysql 架构 ~异地容灾
  18. 窗函数法FIR滤波器设计
  19. 格式工厂去广告版|格式工厂(Format Factory)免费版下载 v4.8.0
  20. 第3章 直流电机的工作原理及特性 学习笔记(一)

热门文章

  1. 玩lol哪个服务器最新,LOL:坑最多的四个服务器,有没有你的大区?
  2. 智能家居Homekit解决方案
  3. Oracle特殊语句
  4. 微信小程序周报(第六期)
  5. magic4.0什么时候升级鸿蒙,科技知识:magic4.0什么时候更新 magic4.0更新时间介绍...
  6. python新式类c3算法_Python新式类的方法解析顺序MRO与Super
  7. vue-devtools工具的安装和使用
  8. python数据分析实战之用户分析及RFM模型分析
  9. VS2013 ocx去除安全警告
  10. 【图像分类】2022-CMT CVPR