Spark SQL | 目前Spark社区最活跃的组件之一​mp.weixin.qq.com

Spark SQL是一个用来处理结构化数据的Spark组件,前身是shark,但是shark过多的依赖于hive如采用hive的语法解析器、查询优化器等,制约了Spark各个组件之间的相互集成,因此Spark SQL应运而生。

Spark SQL在汲取了shark诸多优势如内存列存储、兼容hive等基础上,做了重新的构造,因此也摆脱了对hive的依赖,但同时兼容hive。除了采取内存列存储优化性能,还引入了字节码生成技术、CBO和RBO对查询等进行动态评估获取最优逻辑计划、物理计划执行等。基于这些优化,使得Spark SQL相对于原有的SQL on Hadoop技术在性能方面得到有效提升。

同时,Spark SQL支持多种数据源,如JDBC、HDFS、HBase。它的内部组件,如SQL的语法解析器、分析器等支持重定义进行扩展,能更好的满足不同的业务场景。与Spark Core无缝集成,提供了DataSet/DataFrame的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

DataSet/DataFrame

DataSet/DataFrame都是Spark SQL提供的分布式数据集,相对于RDD而言,除了记录数据以外,还记录表的schema信息。

DataSet是自Spark1.6开始提供的一个分布式数据集,具有RDD的特性比如强类型、可以使用强大的lambda表达式,并且使用Spark SQL的优化执行引擎。DataSet API支持Scala和Java语言,不支持Python。但是鉴于Python的动态特性,它仍然能够受益于DataSet API(如,你可以通过一个列名从Row里获取这个字段 row.columnName),类似的还有R语言。

DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset:

type DataFrame = Dataset[Row]。

DataFrame在编译期不进行数据中字段的类型检查,在运行期进行检查。但DataSet则与之相反,因为它是强类型的。此外,二者都是使用catalyst进行sql的解析和优化。为了方便,以下统一使用DataSet统称。

DataSet创建

DataSet通常通过加载外部数据或通过RDD转化创建。

1.加载外部数据

以加载json和mysql为例:

val ds = sparkSession.read.json("/路径/people.json")val ds = sparkSession.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://ip:port/db",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "tableName", "user" -> "root", "root" -> "123")).load()

2.RDD转换为DataSet通过RDD转化创建DataSet,关键在于为RDD指定schema,通常有两种方式(伪代码):

1.定义一个case class,利用反射机制来推断1) 从HDFS中加载文件为普通RDD
val lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))2) 定义case class(相当于表的schema)
case class Person(id:Int, name:String, age:Int)3) 将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))4) 将RDD转换成DataFrame
val ds= personRDD.toDF2.手动定义一个schema StructType,直接指定在RDD上val schemaString ="name age"val schema =  StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))val ds = sparkSession.createDataFrame(rowRdd,schema)

操作DataSet的两种风格语法

DSL语法

1.查询DataSet部分列中的内容personDS.select(col("name"))personDS.select(col("name"), col("age"))

2.查询所有的name和age和salary,并将salary加1000personDS.select(col("name"), col("age"), col("salary") + 1000)personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)

3.过滤age大于18的personDS.filter(col("age") > 18)

4.按年龄进行分组并统计相同年龄的人数personDS.groupBy("age").count()

注意:直接使用col方法需要import org.apache.spark.sql.functions._

SQL语法

如果想使用SQL风格的语法,需要将DataSet注册成表personDS.registerTempTable("person")

//查询年龄最大的前两名

val result = sparkSession.sql("select * from person order by age desc limit 2")//保存结果为json文件。注意:如果不指定存储格式,则默认存储为parquet

result.write.format("json").save("hdfs://ip:port/res2")

Spark SQL的几种使用方式

1.sparksql-shell交互式查询

就是利用Spark提供的shell命令行执行SQL

2.编程

首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例:

val spark = SparkSession.builder().appName("example").master("local[*]").getOrCreate();val df = sparkSession.read.format("parquet").load("/路径/parquet文件")

然后就可以针对df进行业务处理了。

3.Thriftserverbeeline客户端连接操作

启动spark-sql的thrift服务,sbin/start-thriftserver.sh,启动脚本中配置好Spark集群服务资源、地址等信息。然后通过beeline连接thrift服务进行数据处理。

hive-jdbc驱动包来访问spark-sql的thrift服务

在项目pom文件中引入相关驱动包,跟访问mysql等jdbc数据源类似。示例:

Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://ip:port", "root", "123");
try {val stat = conn.createStatement()val res = stat.executeQuery("select * from people limit 1")while (res.next()) {println(res.getString("name"))}
} catch {case e: Exception => e.printStackTrace()
} finally{if(conn!=null) conn.close()
}

Spark SQL 获取Hive数据

Spark SQL读取hive数据的关键在于将hive的元数据作为服务暴露给Spark。除了通过上面thriftserver jdbc连接hive的方式,也可以通过下面这种方式:

首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下内容:

<property>

<name>hive.metastore.uris</name>

<value>thrift://ip:port</value>

</property>然后,启动hive metastore

最后,将hive-site.xml复制或者软链到$SPARK_HOME/conf/。如果hive的元数据存储在mysql中,那么需要将mysql的连接驱动jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,启动spark-sql即可操作hive中的库和表。而此时使用hive元数据获取SparkSession的方式为:

val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()

UDF、UDAF、Aggregator

UDF

UDF是最基础的用户自定义函数,以自定义一个求字符串长度的udf为例:

val udf_str_length = udf{(str:String) => str.length}
spark.udf.register("str_length",udf_str_length)
val ds =sparkSession.read.json("路径/people.json")
ds.createOrReplaceTempView("people")
sparkSession.sql("select str_length(address) from people")

UDAF

定义UDAF,需要继承抽象类UserDefinedAggregateFunction,它是弱类型的,下面的aggregator是强类型的。以求平均数为例:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._object MyAverage extends UserDefinedAggregateFunction {// Data types of input arguments of this aggregate functiondef inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)// Data types of values in the aggregation bufferdef bufferSchema: StructType = {StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)}// The data type of the returned valuedef dataType: DataType = DoubleType// Whether this function always returns the same output on the identical inputdef deterministic: Boolean = true// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides// the opportunity to update its values. Note that arrays and maps inside the buffer are still// immutable.def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0Lbuffer(1) = 0L}// Updates the given aggregation buffer `buffer` with new input data from `input`def update(buffer: MutableAggregationBuffer, input: Row): Unit = {if (!input.isNullAt(0)) {buffer(0) = buffer.getLong(0) + input.getLong(0)buffer(1) = buffer.getLong(1) + 1}}// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)}// Calculates the final resultdef evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}// Register the function to access it
spark.udf.register("myAverage", MyAverage)val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregatorcase class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)object MyAverage extends Aggregator[Employee, Average, Double] {// A zero value for this aggregation. Should satisfy the property that any b + zero = bdef zero: Average = Average(0L, 0L)// Combine two values to produce a new value. For performance, the function may modify `buffer`// and return it instead of constructing a new objectdef reduce(buffer: Average, employee: Employee): Average = {buffer.sum += employee.salarybuffer.count += 1buffer}// Merge two intermediate valuesdef merge(b1: Average, b2: Average): Average = {b1.sum += b2.sumb1.count += b2.countb1}// Transform the output of the reductiondef finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count// Specifies the Encoder for the intermediate value typedef bufferEncoder: Encoder[Average] = Encoders.product// Specifies the Encoder for the final output value typedef outputEncoder: Encoder[Double] = Encoders.scalaDouble
}val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()

Spark SQL与Hive的对比

sparksql优化_Spark SQL | 目前Spark社区最活跃的组件之一相关推荐

  1. sparksql优化_Spark SQL amp; Streaming

    最近有机会了解spark,所以看了相关的论文,很久没写文章了. 主要有两篇论文和spark sql有关联,建议先阅读 spark sql, 再阅读 Structured,这篇wiki只说明一些基本的概 ...

  2. sparksql 大小写_Spark SQL函数

    Spark SQL函数 发布时间:2018-10-23 14:35, 浏览次数:942 , 标签: Spark SQL <>Spark SQL函数 <>一.概述 <> ...

  3. spark 写mysql 设置主键_Spark Sql 连接mysql

    1.基本概念和用法(摘自spark官方文档中文版) Spark SQL 还有一个能够使用 JDBC 从其他数据库读取数据的数据源.当使用 JDBC 访问其它数据库时,应该首选 JdbcRDD.这是因为 ...

  4. python中spark有什么功能_Spark SQL是什么,提供的主要功能有哪三种?

    Spark SQL允许大家在Python.Java以及Scala中使用数据帧;利用多种结构化格式读取并写入数据;通过SQL进行大数据查询. Spark SQL属于Spark用于处理结构化与半结构化数据 ...

  5. sparksql 操作hive_Spark SQL 物化视图原理与实践

    导言 物化视图作为一种预计算的优化方式,广泛应用于传统数据库中,如Oracle,MSSQL Server等.随着大数据技术的普及,各类数仓及查询引擎在业务中扮演着越来越重要的数据分析角色,而物化视图作 ...

  6. spark core、spark sql、spark streaming 联系与区别

    SparkCore 是做离线批处理 SparkSql 是做sql高级查询 SparkStreaming是做流式处理 SparkShell 是做交互式查询 区别: Spark Core : Spark的 ...

  7. Spark社区可能放弃Spark 1.7而直接发布Spark 2.x

    最近由Reynold Xin给Spark开发者发布的一封邮件透露,Spark社区很有可能会跳过Spark 1.7版本的发布,而直接转向Spark 2.x. 如果Spark 2.x发布,那么它将: (1 ...

  8. dataframe 筛选_Spark.DataFrame与Spark.ML简介

    本文是PySpark销量预测系列第一篇,后面会陆续通过实战案例详细介绍PySpark销量预测流程,包含特征工程.特征筛选.超参搜索.预测算法. 在零售销量预测领域,销售小票数据动辄上千万条,这个量级在 ...

  9. Day43[20180716]_Spark SQL(二)

    Spark SQL是Spark 框架中一个重要模块 SparkSQL: 结构化(Schema)数据处理分析框架 Spark SQL is Apache Spark's module for worki ...

  10. Spark四大组件包括Spark Streaming、Spark SQL、Spark MLlib和Spark GraphX。

    Spark四大组件包括Spark Streaming.Spark SQL.Spark MLlib和Spark GraphX.它们的主要应用场景是: Spark Streaming: Spark Str ...

最新文章

  1. 【转】对random_state参数的理解
  2. Windows SQL Server 2008 群集(摘自网络)
  3. 转:Ubuntu中安装和配置 Java JDK,并卸载自带OpenJDK(以Ubuntu 14.04为例)
  4. Rulo扫地机器人app_要买这样的扫地机器人 浦桑尼克扫地机器人评测
  5. Wex5铛铛开发环境搭建步骤
  6. Leetcode 剑指 Offer 40. 最小的k个数 (每日一题 20210825)
  7. 符号引用(typeglob,别名)与全局变量的修改
  8. win64环境下的一些配置
  9. php之clone 复制对象以及__clone魔术方法
  10. leetcode刷题:最大子序积
  11. Cocos2d-X-3.0之后的版本的环境搭建
  12. IE下Ajax缓存(转载)
  13. Spring Cloud实战(六)-Spring Cloud Netflix Bus
  14. 计算机boot进入u盘启动,电脑boot设置U盘启动项具体方法
  15. 压缩文件暴力破解(免费党的快乐)
  16. python 画图 实时_Python matplotlib实时画图案例
  17. mini《猜字》游戏,谁玩谁迷糊
  18. html 游戏键盘,用html+js+css做一个模拟键盘
  19. Java代码审计手册(1)
  20. identity和assigned 的区别

热门文章

  1. 将java类的泛型集合转换成json对象
  2. 关于Winform下,获取Treeview中CheckBox选中项的“.NET研究”技巧
  3. DNS在什么情况下才能动态更新|活动目录集成的dns区域
  4. 组织结构及权限模型设计
  5. redis缓存和mysql数据库同步
  6. Java和C#的区别
  7. element表格点击行即选中该行复选框
  8. 使用Go构建区块链 第3部分:持久化和cli
  9. 红帽全年总营收24亿美元,同比增长18%
  10. php ZeroMQ 的使用