pySpark创建DataFrame的方式

有时候需要在迭代的过程中将多个dataframe进行合并(union),这时候需要一个空的初始dataframe。创建空dataframe可以通过spark.createDataFrame()方法来创建:

# 先定义dataframe各列的数据类型
from pyspark.sql.types import *
schema = StructType([StructField("a", IntegerType(), True),StructField("b", IntegerType(), True),StructField("c", IntegerType(), True)])# 通过定义好的dataframe的schema来创建空dataframe
df1 = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema)df2 = sc.parallelize([(4,5,6)]).toDF(['a','b','c'])df1.union(df2).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  4|  5|  6|
+---+---+---+

通过上面的方法可以创建指定列名和数据类型的dataframe。但是有时候我们需要创建的dataframe的数据结构是跟某个dataframe结构是相同的,而这个结构有非常复杂,难以直接创建,这时候就可以直接使用已有的dataframe的schema来创建新的dataframe了:

df3 = spark.createDataFrame(spark.sparkContext.emptyRDD(), df2.schema)
df3.union(df2).show()
+---+---+---+
|  a|  b|  c|
+---+---+---+
|  4|  5|  6|
+---+---+---+

对于Spark 2.0来说,所有的功能都可以以类SparkSession类作为切入点。要创建SparkSession,只需要使用SparkSession.builder()

使用Spark Session,应用程序可以从现有的RDD,Hive表或Spark数据源创建DataFrame,Spark SQL可以使用DataFrame接口在各种数据源上运行。使用Spark SQL DataFrame,我们可以创建一个临时视图。在DataFrame的临时视图中,可以对数据运行SQL查询。

Spark SQL DataFrame API没有提供编译时类型安全性。因此,如果结构未知,就无法操纵数据,一旦我们将域对象转换为数据帧,就不可能重新生成域对象; Spark SQL中的DataFrame API提高了Spark的性能和可伸缩性。它避免了为数据集中的每一行构造单个对象的垃圾收集成本。


from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, StringType
from pyspark.sql import Row
from pyspark.sql import Column
import pandas as pd
import numpy as np# 创建SparkSession连接到Spark集群-SparkSession.builder.appName('name').getOrCreate()
spark=SparkSession \
.builder \
.appName('my_app_name') \
.getOrCreate()# 创建DataFrame,可以从不同的数据创建,以下进行对个数据源读取创建说明def create_json_file():df=pd.DataFrame(np.random.rand(5,5),columns=['a','b','c','d','e']).applymap(lambda x: int(x*10))file=r"random.csv"df.to_csv(file,index=False)def create_df_from_rdd():# 从集合中创建新的RDDstringCSVRDD = spark.sparkContext.parallelize([(123, "Katie", 19, "brown"),(456, "Michael", 22, "green"),(789, "Simone", 23, "blue")])# 设置dataFrame将要使用的数据模型,定义列名,类型和是否为能为空schema = StructType([StructField("id", LongType(), True),StructField("name", StringType(), True),StructField("age", LongType(), True),StructField("eyeColor", StringType(), True)])# 创建DataFrameswimmers = spark.createDataFrame(stringCSVRDD,schema)# 注册为临时表swimmers.registerTempTable("swimmers")# 使用Sql语句data=spark.sql("select * from swimmers")# 将数据转换List,这样就可以查看dataframe的数据元素的样式print(data.collect())# 以表格形式展示数据data.show()print("{}{}".format("swimmer numbers : ",swimmers.count()) )def create_df_from_json():'''read的类型是DataFrameReader'''df = spark.read.json('pandainfo.json')df.show()def create_df_from_csv():df=spark.read.csv('random.csv',header=True, inferSchema=True)df.show()def create_df_from_postgres():"""format : 指定数据源格式 - 如 jdbc , json , csv等options: 为数据源添加相关特性选项"""df=spark.read.format('jdbc').options(url='jdbc:postgresql://localhost:5432/northwind',dbtable='public.orders',user='postgres',password='iamroot').load()df.show()def create_df_from_mysql():""""""df=spark.read.format('jdbc').options(url='jdbc:mysql://localhost:3306',dbtable='mysql.db',user='root',password='iamneo').load()df.show()def create_df_from_pandas():"""从Python pandas获取数据"""df = pd.DataFrame(np.random.random((4,4)))spark_df = spark.createDataFrame (df,schema=['a','b','c','d'])spark_df.show()def create_df_from_hive(hive):# 创建支持Hive的Spark SessionappName = "PySpark Hive Example"master = "local"spark = SparkSession.builder \.appName(appName) \.master(master) \.enableHiveSupport() \.getOrCreate()df = spark.sql("select * from test_db.test_table")df.show()# 将数据保存到Hive新表df.write.mode("overwrite").saveAsTable("test_db.test_table2")# 查看数据spark.sql("select * from test_db.test_table2").show()if __name__=='__main__':create_json_file()create_df_from_rdd() create_df_from_csv()create_df_from_json()create_df_from_db()create_df_from_mysql()create_df_from_pandas()

参考:
https://stackoverflow.com/questions/54503014/how-to-get-the-schema-definition-from-a-dataframe-in-pyspark

https://www.jianshu.com/p/f79838ddb534

https://blog.csdn.net/sinat_26811377/article/details/101217071

pySpark创建DataFrame的方式相关推荐

  1. SparkSQL 创建 DataFrame 的方式

    1.读取 json 格式的文件创建 DataFrame 注意: 可以两种方式读取 json 格式的文件. df.show()默认显示前 20 行数据. DataFrame 原生 API 可以操作 Da ...

  2. pandas创建DataFrame的几种方式(建议收藏)

    pandas创建DataFrame的几种方式 如果你是一个pandas初学者,那么不知道你会不会像我一样.在学用列表或者数组创建DataFrame时理不清怎样用数据生成以及想要形状的的Datafram ...

  3. pandas之创建DataFrame

    pandas创建DataFrame的方式主要有两种,通过列表创建与通过字典创建,下面一一演示一下. import pandas as pddf1 = pd.DataFrame([[1,2,3],[7, ...

  4. SparkSQL之DataFrame 编程(创建DataFrame ,DataFrame数据运算操作 ,输出存储DataFrame)(11)

    一  新的编程入口 SparkSession SparkSession 是 Spark 最新的 SQL 查询起始点 ,实质上是 SQLcontext 和 SparkContext 的组合 ,所以在 S ...

  5. Spark _22 _创建DataFrame的几种方式(一)

    创建DataFrame的几种方式 读取json格式的文件创建DataFrame 注意: json文件中的json数据不能嵌套json格式数据. DataFrame是一个一个Row类型的RDD,df.r ...

  6. PySpark之DataFrame的常用函数(创建、查询、修改、转换)

    import findspark findspark.init()from pyspark import SparkContext sc = SparkContext.getOrCreate()fro ...

  7. python dataframe创建指定大小_pandas DataFrame创建方法的方式

    在pandas里,DataFrame是最经常用的数据结构,这里总结生成和添加数据的方法: ①.把其他格式的数据整理到DataFrame中: ②在已有的DataFrame中插入N列或者N行. 1. 字典 ...

  8. Spark _24 _读取JDBC中的数据创建DataFrame/DataSet(MySql为例)(三)

    两种方式创建DataSet 现在数据库中创建表不能给插入少量数据. javaapi: package SparkSql;import org.apache.spark.SparkConf; impor ...

  9. Pandas创建DataFrame对象的几种常用方法

    DataFrame是pandas常用的数据类型之一,表示带标签的可变二维表格.本文介绍如何创建DataFrame对象,后面会陆续介绍DataFrame对象的用法. 首先,使用pip.conda或类似工 ...

最新文章

  1. ERP_基于Oracle SOA的企业服务总线整合
  2. 面向对象三大特性之一:继承(C++)
  3. python-matplotlib
  4. jbpm系列之五--使用decision节点判断分支情况
  5. 软件工程----9软件实现
  6. 剑指offer--替换空格
  7. php中括号的优先级是不是最高的,理解php中操作符的优先级和结合性
  8. matlab int8 函数,未定义与 'uint8' 类型的输入参数相对应的函数 'fitnessty'
  9. axios的数据拦截(拦截器)
  10. android menu width,如何更改Android PopupMenu宽度
  11. 实战演练:MySQL手动注册binlog文件造成主从同步异常
  12. Hadoop处理HDF文件
  13. Java经典设计模式(3):十一种行为型模式(附实例和详解)
  14. linux通过usb给windows,如何从Linux创建Windows USB安装程序 | MOS86
  15. Android开发的消消乐游戏
  16. 基础篇——View和ViewGroup的区别
  17. 18年12月计算机英语统考成绩查询,没查的抓紧,18年12月四六级成绩查询入口将关闭...
  18. WPS个人版如何启用VBA(宏)
  19. Google Code Jam 2010 Qualification Round 资格赛 Problem A. Snapper Chain 问题A.按扣链条
  20. STM32普通IO模拟SPI和W25Q32通信调试

热门文章

  1. html快速创建块,CAD怎么快速创建带编号的块?
  2. 期刊模板-如何去除左下角的横线
  3. MBA-day13数学-年龄问题
  4. taobao.item_sku获取sku详细信息 API接口的调用
  5. python js 性能_lua与python性能测试比较
  6. unity3dwebgl building之后没有反应_晚会是在考验明星临场反应吗?王源开场无伴奏阿云嘎差点原地跳舞...
  7. python 取整法(进一取值)
  8. 水库大坝隧道安全监测通用的无线解决方案
  9. python分组统计数据_数据分组统计
  10. sql查询数据库中所有表名