将RDD[Map[String,String]] 转化为展平 DataFrame,类似于pyspark 中 dict 结构toDF的效果。

input

val mapRDD: RDD[Map[String, String]] = sc.parallelize(Seq(Map("name" -> "zhangsan", "age" -> "18", "addr" -> "bj"),Map("name" -> "lisi", "age" -> "20", "addr" -> "hz"),
))

output

name     age addr
zhangsan 18  bj
lisi     20  hz

1. Map中元素固定

每个 Map 只有三个元素的情况下

val columns=mapRDD.take(1).flatMap(_.keys)val resultantDF=mapRDD.filter(_.nonEmpty).map{m=>val seq=m.values.toSeq(seq(0),seq(1),seq(2))}.toDF(columns:_*)resultantDF.show()

2. Map中元素不固定
RDD[Map[String,String]] -> RDD[Row] -> DataFrame

  def map2DF(spark: SparkSession, rdd: RDD[Map[String, String]]): DataFrame = {val cols = rdd.take(1).flatMap(_.keys)val resRDD = rdd.filter(_.nonEmpty).map { m =>val seq = m.values.toSeqRow.fromSeq(seq)}val fields = cols.map(fieldName => StructField(fieldName, StringType, nullable = true))val schema = StructType(fields)spark.createDataFrame(resRDD, schema)}

spark convert RDD[Map] to DataFrame相关推荐

  1. Spark中RDD转换成DataFrame的两种方式(分别用Java和scala实现)

    一:准备数据源       在项目下新建一个student.txt文件,里面的内容为:         [plain] view plain copy print? <code class=&q ...

  2. Spark中RDD、DataFrame和DataSet的区别与联系

    一.RDD.DataFrame和DataSet的定义 在开始Spark RDD与DataFrame与Dataset之间的比较之前,先让我们看一下Spark中的RDD,DataFrame和Dataset ...

  3. Spark SQL 1.3.0 DataFrame介绍、使用及提供了些完整的数据写入

     问题导读 1.DataFrame是什么? 2.如何创建DataFrame? 3.如何将普通RDD转变为DataFrame? 4.如何使用DataFrame? 5.在1.3.0中,提供了哪些完整的 ...

  4. Spark 把RDD数据保存到hdfs单个文件中,而不是目录

    相比于Hadoop,Spark在数据的处理方面更加灵活方便.然而在最近的使用中遇到了一点小麻烦:Spark保存文件的的函数(如saveAsTextFile)在保存数据时都需要新建一个目录,然后在这个目 ...

  5. Spark的RDD持久化

    RDD持久化 1. RDD Cache 缓存 说明 RDD 通过Cache 或者Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存在JVM 的堆内存中.但是并不是这两个方法被调用时立即 ...

  6. Spark的RDD依赖关系

    RDD依赖关系 RDD 血缘关系 RDD 只支持粗粒度转换,即在大量记录上执行的单个操作.将创建 RDD 的一系列Lineage(血统)记录下来,以便恢复丢失的分区.RDD 的Lineage 会记录R ...

  7. Spark的RDD行动算子

    目录 基本概念 算子介绍 1. reduce 2. collect 3. count 4. first 5. take 6. takeOrdered 案例实操1-6 7. aggregate 8. f ...

  8. Spark的RDD转换算子

    目录 RDD转换算子 Value 类型 1. map 2. mapPartitions map 和mapPartitions 的区别 3. mapPartitionsWithIndex 4. flat ...

  9. spark算子大全glom_2小时入门Spark之RDD编程

    公众号后台回复关键字:pyspark,获取本项目github地址. 本节将介绍RDD数据结构的常用函数.包括如下内容: 创建RDD 常用Action操作 常用Transformation操作 常用Pa ...

最新文章

  1. 图片提取文字功能很神奇?Java几行代码搞定它!
  2. Visual Studio 2005 创建Windows服务程序(C#)
  3. 车主无忧:天下武功,唯快不破,神策让我们快人一步
  4. Scala集合:ListBuffer可变集合的head/tail/last/init方法
  5. nginx 禁止通过IP,未绑定域名访问服务器
  6. VC6安装错误——Error Launching acmboot.exe
  7. phaser java_【Java并发编程实战】-----“J.U.C”:Phaser
  8. win8系统如何开服务器,Win8.1怎么打开IIS服务器?Win8.1专业版64位系统中打开IIS服务器的方法...
  9. 【Python-3.3】字典中存储列表
  10. 卸载不了mysql2008_卸载SQL2008遇到问题(重启计算机失败、找不到SQL卸载程序)的解决办法...
  11. 当c语言学到大成时,教孩子学编程(信息学奥赛C语言版)
  12. [模板] 球 体积交 体积并
  13. 学前端进度慢怎么办?前端学不懂怎么办?
  14. 频繁默认网关不可用_图文修复win7系统默认网关不可用频繁掉线的办法
  15. 计算机未响应怎样解决方案,电脑提示Internet Explorer未响应怎么办?解决IE浏览器未响应的解决方法...
  16. 天宝数字水准数据处理和生成
  17. excel插入图片(利用vba)
  18. 用卡尔曼滤波器跟踪导弹(量测更新频率与时间更新频率不相等)
  19. 2个免费CAJ转PDF的方法,而且不限页数和大小
  20. project-clean的作用

热门文章

  1. reg51.h、intrins.h这源文件
  2. JAVA 酒店预订系统
  3. GD32F30x系列ADC源码,对初学者参考价值巨大,(非常详细篇)万字源码
  4. mysql统计用户留存_SQL 统计用户留存
  5. java命令行打包war_命令行打包 war文件
  6. Linux在加载模块时报insmod: error inserting xxx.ko -1 File exists这个错
  7. E销宝:dsp广告应该怎么投放?
  8. #WebStorm激活码失效解决方法!
  9. 【离散】如何利用顶点数求树叶或知树叶求顶点
  10. JS:Caesars Cipher(凯撒密码)