通过反射推断Schema

在Spark SQL中有两种方式可以在DataFrame和RDD进行转换

  • 利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知道RDD的schema。
  • 通过编程接口与RDD进行交互获取schema,并动态创建DataFrame,在运行时决定列及其类型。

1、创建maven工程添加依赖

    <properties><scala.version>2.11.8</scala.version><hadoop.version>2.7.4</hadoop.version><spark.version>2.0.2</spark.version></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.0.2</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.0.2</version></dependency></dependencies><build><sourceDirectory>src/main/scala</sourceDirectory><testSourceDirectory>src/test/scala</testSourceDirectory><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals><configuration><args><arg>-dependencyfile</arg><arg>${project.build.directory}/.scala_dependencies</arg></args></configuration></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass></mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>

2、代码实现

Scala支持使用case class类型导入RDD转换为DataFrame,通过case class创建schema,case class的参数名称会被反射读取并成为表的列名。这种RDD可以高效的转换为DataFrame并注册为表。

package cn.cheng.sql
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}/*** RDD转化成DataFrame:利用反射机制*/
//todo:定义一个样例类Person
case class Person(id:Int,name:String,age:Int) extends Serializableobject InferringSchema {def main(args: Array[String]): Unit = {//todo:1、构建sparkSession 指定appName和master的地址val spark: SparkSession = SparkSession.builder().appName("InferringSchema").master("local[2]").getOrCreate()//todo:2、从sparkSession获取sparkContext对象val sc: SparkContext = spark.sparkContextsc.setLogLevel("WARN")//设置日志输出级别//todo:3、加载数据val dataRDD: RDD[String] = sc.textFile("D:\\person.txt")//todo:4、切分每一行记录val lineArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))//todo:5、将RDD与Person类关联val personRDD: RDD[Person] = lineArrayRDD.map(x=>Person(x(0).toInt,x(1),x(2).toInt))//todo:6、创建dataFrame,需要导入隐式转换import spark.implicits._val personDF: DataFrame = personRDD.toDF()//todo-------------------DSL语法操作 start--------------//1、显示DataFrame的数据,默认显示20行personDF.show()//2、显示DataFrame的schema信息personDF.printSchema()//3、显示DataFrame记录数println(personDF.count())//4、显示DataFrame的所有字段personDF.columns.foreach(println)//5、取出DataFrame的第一行记录println(personDF.head())//6、显示DataFrame中name字段的所有值personDF.select("name").show()//7、过滤出DataFrame中年龄大于30的记录personDF.filter($"age" > 30).show()//8、统计DataFrame中年龄大于30的人数println(personDF.filter($"age">30).count())//9、统计DataFrame中按照年龄进行分组,求每个组的人数personDF.groupBy("age").count().show()//todo-------------------DSL语法操作 end-------------//todo--------------------SQL操作风格 start-----------//todo:将DataFrame注册成表personDF.createOrReplaceTempView("t_person")//todo:传入sql语句,进行操作spark.sql("select * from t_person").show()spark.sql("select * from t_person where name='zhangsan'").show()spark.sql("select * from t_person order by age desc").show()//todo--------------------SQL操作风格 end-------------sc.stop()}
}

通过StructType直接指定Schema

1、当case class不能提前定义时,可以通过以下三步创建DataFrame

  • 1、将RDD转为包含row对象的RDD
  • 1、基于structType类型创建schema,与第一步创建的RDD相匹配
  • 2、通过sparkSession的createDataFrame方法对第一步的RDD应用 
    schema创建DataFrame

2、代码实现

maven依赖同Spark SQL程序实现RDD转换DataFrame(一)

package cn.cheng.sqlimport org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}/*** RDD转换成DataFrame:通过指定schema构建DataFrame*/
object SparkSqlSchema {def main(args: Array[String]): Unit = {//todo:1、创建SparkSession,指定appName和masterval spark: SparkSession = SparkSession.builder().appName("SparkSqlSchema").master("local[2]").getOrCreate()//todo:2、获取sparkContext对象val sc: SparkContext = spark.sparkContext//todo:3、加载数据val dataRDD: RDD[String] = sc.textFile("d:\\person.txt")//todo:4、切分每一行val dataArrayRDD: RDD[Array[String]] = dataRDD.map(_.split(" "))//todo:5、加载数据到Row对象中val personRDD: RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))//todo:6、创建schemaval schema:StructType= StructType(Seq(StructField("id", IntegerType, false),StructField("name", StringType, false),StructField("age", IntegerType, false)))//todo:7、利用personRDD与schema创建DataFrameval personDF: DataFrame = spark.createDataFrame(personRDD,schema)//todo:8、DSL操作显示DataFrame的数据结果personDF.show()//todo:9、将DataFrame注册成表personDF.createOrReplaceTempView("t_person")//todo:10、sql语句操作spark.sql("select * from t_person").show()spark.sql("select count(*) from t_person").show()sc.stop()}
}

Spark SQL程序实现RDD转换DataFrame相关推荐

  1. Spark15:Spark SQL:DataFrame常见算子操作、DataFrame的sql操作、RDD转换为DataFrame、load和save操作、SaveMode、内置函数

    前面我们学习了Spark中的Spark core,离线数据计算,下面我们来学习一下Spark中的Spark SQL. 一.Spark SQL Spark SQL和我们之前讲Hive的时候说的hive ...

  2. Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)

    1. JDBC Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中. 1.1. 从MySQ ...

  3. spark mysql 写_Spark-SQL从MySQL中加载数据以及将数据写入到mysql中(Spark Shell方式,Spark SQL程序)...

    1. JDBC Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中. 1.1. 从MySQ ...

  4. 【未完成】[Spark SQL_2] 在 IDEA 中编写 Spark SQL 程序

    0. 说明 在 IDEA 中编写 Spark SQL 程序,分别编写 Java 程序 & Scala 程序 1. 编写 Java 程序 待补充 2. 编写 Scala 程序 待补充 转载于:h ...

  5. Spark SQL编程之RDD-RDD转换

    背景 本文使用idea编程 spark 版本 <scala.version>2.11.8</scala.version> <spark.version>2.2.0& ...

  6. Spark SQL程序操作HiveContext

    HiveContext是对应spark-hive这个项目,与hive有部分耦合, 支持hql,是SqlContext的子类,也就是说兼容SqlContext; 1.添加依赖 <dependenc ...

  7. hive编程指南电子版_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  8. spark-sql建表语句限制_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

  9. hive编程指南_第三篇|Spark SQL编程指南

    在<第二篇|Spark Core编程指南>一文中,对Spark的核心模块进行了讲解.本文将讨论Spark的另外一个重要模块--Spark SQL,Spark SQL是在Shark的基础之上 ...

最新文章

  1. 完整的由客户端登录(注册)思路
  2. 结果期待3年多的看图软件
  3. 对windows更多的理解
  4. 【Flink】Flink 状态恢复报错 StateMigrationException For heap backendsthe new state serializer must not be
  5. asp.net 小记
  6. 开幕倒计时3天 | 2019中国大数据技术大会(BDTC)邀您一同共赴大数据+AI盛宴!...
  7. 经典神经网络 -- DPN : 设计原理与pytorch实现
  8. 最新 PMP 考试真题概要及答案分析(中文版)(1)
  9. 大屏导航Linux系统下载,掌讯方案MTK3561大屏导航ROOT固件
  10. 超级详细的Junit单元测试教程
  11. php ip纯真数据库Dat,php读取纯真ip数据库使用示例
  12. 一键生成sprite(雪碧图)以及 动态加载1X 2X3X 图片
  13. 10.原码、反码、补码
  14. wxwidget编译安装_linux下编译安装wxWidgets-2.8.12和audacity-2.0.3教程
  15. Jet Aviation Basel基于Siemens PLM Software解决方案构建其未来数字化战略
  16. 快应用上架时,你最关心的问题都在这里!
  17. 【递归 动态规划 备忘录法】Fibonacci数列(斐波那契数列)(C++)
  18. JAVA--equal、length、Arrays、Static
  19. STM32F4移植EMWIN(RA8875驱动显示屏)
  20. 未来的计算机漫画,漫画电脑

热门文章

  1. redis key存在则删除_Redis加锁的几种实现
  2. python在函数内部访问外部全局变量的方法_在函数外部访问函数变量,而无需使用“全局”...
  3. ‘MicrosoftWebDriver.exe‘ executable needs to be in PATH.
  4. python中乘法和除法_python – NumPy的性能:uint8对比浮动和乘法与除法?
  5. linux笔记之 raid
  6. FatFs源码剖析(2)
  7. 微信小程序 RTMP 音视频 通话 ffmpeg_音视频常见问题分析和解决:HLS切片丢帧引起的视频卡顿问题排查...
  8. 优朋普乐大数据_优朋普乐黑维炜:互动电视市场已进入成熟发展期
  9. unity3d 不规则外发光描边_Shader案例之内发光和边缘泛光效果
  10. linux中postscript如何生成,【转载】如何为Linux生成和打上patch