项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
欢迎大家star,留言,一起学习进步

1.为什么要有DataFrame

Spark中的RDD叫做分布式弹性数据集。RDD是一个粗粒度的分布式计算,用函数声明式的api就能完成分布式的计算,比如wordcount,在mapreduce要写比较冗长的代码,而在Spark中可以用一行代码搞定。
既然RDD这么简单方便,为什么还要搞出一个DataFrame来呢?DataFrame是借鉴了R与pandas众DataFrame的思想,是业界处理标准化数据集的事实标准。DataFrame跟RDD相比,多了一些限制条件,但同时也有了更多的优化。比如基于Spark Catalyst优化器,提供如列裁剪,谓词下推,map join等优化。同时,采用code generation ,动态编译表达式,提升性能,比用rdd的自定义函数性能高5倍左右。

Example:

rdd.map{x => x.split("\t"); (lines(0), lines(1))}.filter(x => x._2 >= 18).select(x => (x._1, x._2))

而如果采用类似DataFrame的结构:

sqlContext.table("people").filter(col("age") >= 19).select("id", "name")

用rdd读结构化文本要用map函数,需要按位置获取数据,没有schema,性能和可读性都不好。
而用dataframe可以直接通过sede读取结构化数据,性能比RDD高2到3倍左右,比MR高5倍左右,同时,具有结构化的数据,可读性更好。

Spark2.0以后的版本推出了DataSet,是更加强类型的API,用了scala的泛型,能在编译是发现更多的编译问题DataFrame是DataSet〈Row〉类型,DS在接口上和DataFrame很相似。

2.通过toDF方法创建DataFrame

sparkSQL中implicits里面有toDF方法,toDF方法可以将本地的Seq,Array或者RDD转化为DataFrame

import sqlContext.implicits._
val wordDataFrame = Seq((0, Array("Hi", "I", "heard", "about", "Spark")),(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),(2, Array("Logistic", "regression", "models", "are", "neat"))).toDF("label", "words")

在spark shell中运行上面代码以后,结果如下:

scala> val wordDataFrame = Seq((0, Array("Hi", "I", "heard", "about", "Spark")),(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),(2, Array("Logistic", "regression", "models", "are", "neat"))).toDF("label", "words")
wordDataFrame: org.apache.spark.sql.DataFrame = [label: int, words: array<string>]

可以看到我们已经得到了一个DataFrame。
如果toDF方法不指定字段名称,那么默认列名为"_1", “_2”, …

scala> val wordDataFrame = Seq((0, Array("Hi", "I", "heard", "about", "Spark")),(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),(2, Array("Logistic", "regression", "models", "are", "neat"))).toDF()
wordDataFrame: org.apache.spark.sql.DataFrame = [_1: int, _2: array<string>]

我们还可以通过case class 与toDF的方式得到DataFrame

scala> import sqlContext.implicits._
import sqlContext.implicits._scala> import scala.collection.JavaConversions._
import scala.collection.JavaConversions._scala> case class word(num: Int, wordlist:Array[String])
defined class wordsc.parallelize(Seq((0, Array("Hi", "I", "heard", "about", "Spark")),(1, Array("I", "wish", "Java", "could", "use", "case", "classes")),(2, Array("Logistic", "regression", "models", "are", "neat")))).map(x => word(x._1.toInt, x._2)).foreach(x => println(x.num, x.wordlist.mkString("-")))
(1,I-wish-Java-could-use-case-classes)
(2,Logistic-regression-models-are-neat)
(0,Hi-I-heard-about-Spark)

3.通过createDataFrame创建DataFrame

在SqlContext中使用createDataFrame也可以创建DataFrame。跟toDF一样,这里创建DataFrame的数据形态也可以是本地数组或者RDD。这种方法在由于数据的结构以字符串的形式编码而无法提前定义定制类的情况下非常实用。

import org.apache.spark.sql._
import org.apache.spark.sql.types._scala> val schema = StructType(List(StructField("integer_column", IntegerType, nullable = false),StructField("string_column", StringType, nullable = true),StructField("date_column", DateType, nullable = true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(integer_column,IntegerType,false), StructField(string_column,StringType,true), StructField(date_column,DateType,true))scala> val rdd = sc.parallelize(Seq(Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))))
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[4] at parallelize at <console>:39scala> val df = sqlContext.createDataFrame(rdd, schema)
df: org.apache.spark.sql.DataFrame = [integer_column: int, string_column: string, date_column: date]

4.通过parquet文件或者json文件创建DataFrame

val df = sqlContext.read.parquet("xxx/file")
val df = spark.read.json("xxx/file.json")

spark生成DataFrame相关推荐

  1. [Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子:

    [Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子: mydf001=sqlContext.read.format("jdbc").o ...

  2. spark to mysql date_[Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子:

    [Spark][Python]Spark 访问 mysql , 生成 dataframe 的例子: mydf001=sqlContext.read.format("jdbc").o ...

  3. 向Spark的DataFrame增加一列数据

    前言 先说个题外话,如何给hive表增加一个列,并且该把该列的所有字段设为'China'? 如果仅仅是增加一列倒是很简单: alter table test add columns(flag stri ...

  4. Spark将dataframe存为csv文件

    楓尘君一直觉得将df存为csv文件是一个非常简单操作处理,但是当查阅资料之后发现网上有很多误导和错误,本着从实际出发的原则,记录一下过程: 1. 发现问题 背景:楓尘君想利用spark处理较大的数据集 ...

  5. pandas使用date_range函数按照指定的频率(freq)和指定的个数(periods)生成dataframe的时间格式数据列、基于dataframe的日期数据列生成日期索引

    pandas使用date_range函数按照指定的频率(freq)和指定的个数(periods)生成dataframe的时间格式数据列.基于dataframe的日期数据列生成日期索引(dates in ...

  6. pandas使用date_range函数按照指定的频率(freq)和指定的个数(periods)生成dataframe的时间格式数据列

    pandas使用date_range函数按照指定的频率(freq)和指定的个数(periods)生成dataframe的时间格式数据列 目录

  7. 从 Spark 的 DataFrame 中取出具体某一行详解

    Spark 中 DataFrame 是 RDD 的扩展,限于其分布式与弹性内存特性,我们没法直接进行类似 df.iloc(r, c) 的操作来取出其某一行. 如何从 Spark 的 DataFrame ...

  8. python数据框的横向贾总_[Spark][Python]DataFrame的左右连接例子

    [Spark][Python]DataFrame的左右连接例子 $ hdfs dfs -cat people.json {"name":"Alice",&quo ...

  9. 通过Spark生成HFile,并以BulkLoad方式将数据导入到HBase

    在实际生产环境中,将计算和存储进行分离,是我们提高集群吞吐量.确保集群规模水平可扩展的主要方法之一,并且通过集群的扩容.性能的优化,确保在数据大幅增长时,存储不能称为系统的瓶颈. 具体到我们实际的项目 ...

  10. 【求助】如何从 Spark 的 DataFrame 中取出具体某一行?我自己的一些思考

    如何从 Spark 的 DataFrame 中取出具体某一行? 根据阿里专家Spark的DataFrame不是真正的DataFrame-秦续业的文章-知乎的文章: DataFrame 应该有『保证顺序 ...

最新文章

  1. PostgreSQL-8-数据合并
  2. 牛X的web报表设计工具Grid++Report
  3. QT中使用QCustomplot设置坐标原点在左上或者反转镜像坐标轴
  4. DMLC深盟分布式深度机器学习开源平台解析
  5. 【NLP】N-LTP:基于预训练模型的中文自然语言处理平台
  6. git 提交修改到github上
  7. C# winform 窗体怎么隐藏标题栏,不显示标题栏
  8. 实时获取ccd图像_图像处理基础
  9. mybatis 鉴别其_MyBatis之Mapper XML 文件详解(四)-JDBC 类型和嵌套查询
  10. 电脑文件夹加密软件_上海靠谱电脑资料加密软件解决方案
  11. String类的流程控制
  12. 双机热备份VRRP当接入方式为PPPOE拨号的图文教程
  13. EBS AP 创建会计科目失败
  14. AForge.Video.FFMPEG桌面录屏
  15. 2020年电工(技师)找答案及电工(技师)考试申请表
  16. 【解决方案】完全修改Windows用户名
  17. Win7批量离线更新补丁方法
  18. 【NGUI】实现半圆形进度条,技能CD效果
  19. 保研面试-中英文问题及回答总结
  20. latex常见错误之缺少sty文件

热门文章

  1. MongoDB lsm降低 disk lantency
  2. 实例讲解遗传算法——基于遗传算法的自动组卷系统【实践篇】
  3. 通过javaBean反射转换成mybatis映射文件
  4. 终端安全求生指南(三)--脆弱性管理
  5. 简述RHEL7新特性(二)
  6. install mysql with source code
  7. 安装realmedia多路分配器
  8. http 请求中的 referer
  9. 【干货】js 数组操作合集(前端自我修养)
  10. AOP切面五大通知类型