python中frame是什么意思_DataFrame是什么
DataFrame是什么
DataFrame是一个以命名列方式组织的分布式数据集。在概念上,它跟关系型数据库中的一张表或者1个Python(或者R)中的data frame一样,但是比他们更优化。DataFrame可以根据结构化的数据文件、hive表、外部数据库或者已经存在的RDD构造。
DataFrame的创建
Spark DataFrame可以从一个已经存在的RDD、hive表或者数据源中创建。
以下一个例子就表示一个DataFrame基于一个json文件创建:
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
DataFrame的操作
直接以1个例子来说明DataFrame的操作:
json文件内容:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
程序内容:
val conf = new SparkConf().setMaster("local").setAppName("DataFrameTest")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json(this.getClass.getResource("/").toString + "people.json")
/** 展示DataFrame的内容
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
**/
df.show()
/** 以树的形式打印出DataFrame的schema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
**/
df.printSchema()
/** 打印出name列的数据
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
**/
df.select("name").show()
/** 打印出name列和age列+1的数据,DataFrame的apply方法返回Column
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
**/
df.select(df("name"), df("age") + 1).show()
/** 添加过滤条件,过滤出age字段大于21的数据
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
**/
df.filter(df("age") > 21).show()
/** 以age字段分组进行统计
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
**/
df.groupBy(df("age")).count().show()
使用反射推断出Schema
Spark SQL的Scala接口支持将包括case class数据的RDD转换成DataFrame。
case class定义表的schema,case class的属性会被读取并且成为列的名字,这里case class也可以被当成别的case class的属性或者是复杂的类型,比如Sequence或Array。
RDD会被隐式转换成DataFrame并且被注册成一个表,这个表可以被用在查询语句中:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)
使用编程指定Schema
当case class不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个 DataFrame 可以通过三步来创建。
1.从原来的 RDD 创建一个行的 RDD
2.创建由一个 StructType 表示的模式与第一步创建的 RDD 的行结构相匹配
3.在行 RDD 上通过 applySchema 方法应用模式
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
数据源
Spark SQL默认使用的数据源是parquet(可以通过spark.sql.sources.default修改)。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
可以在读取数据源的时候指定一些往外的参数。数据源也可以使用全名称,比如org.apache.spark.sql.parquet,但是内置的数据源可以使用短名称,比如json, parquet, jdbc。任何类型的DataFrame都可以使用这种方式转换成其他类型:
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
使用read方法读取数据源得到DataFrame,还可以使用sql直接查询文件的方式:
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
保存模式:
保存方法会需要一个可选参数SaveMode,用于处理已经存在的数据。这些保存模式内部不会用到锁的概念,也不是一个原子操作。如果使用了Overwrite这种保存模式,那么写入数据前会清空之前的老数据。
Scala/Java
具体值
含义
SaveMode.ErrorIfExists (默认值)
“error” (默认值)
当保存DataFrame到数据源的时候,如果数据源文件已经存在,那么会抛出异常
SaveMode.Append
“append”
如果数据源文件已经存在,append到文件末尾
SaveMode.Overwrite
“overwrite”
如果数据源文件已经存在,清空数据
SaveMode.Ignore
“ignore”
如果数据源文件已经存在,不做任何处理。跟SQL中的 CREATE TABLE IF NOT EXISTS 类似
持久化表:
当使用HiveContext的时候,使用saveAsTable方法可以把DataFrame持久化成表。跟registerTempTable方法不一样,saveAsTable方法会把DataFrame持久化成表,并且创建一个数据的指针到HiveMetastore对象中。只要获得了同一个HiveMetastore对象的链接,当Spark程序重启的时候,saveAsTable持久化后的表依然会存在。一个DataFrame持久化成一个table也可以通过SQLContext的table方法,参数就是表的名字。
默认情况下,saveAsTable方法会创建一个”被管理的表”,被管理的表的意思是说表中数据的位置会被HiveMetastore所控制,如果表被删除了,HiveMetastore中的数据也相当于被删除了。
Parquet Files
parquet是一种基于列的存储格式,并且可以被很多框架所支持。Spark SQL支持parquet文件的读和写操作,并且会自动维护原始数据的schema,当写一个parquet文件的时候,所有的列都允许为空。
加载Parquet文件
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
Parquet文件的Partition
Parquet文件可以根据列自动进行分区,只需要调用DataFrameWriter的partitionBy方法即可,该方法需要的参数是需要进行分区的列。比如需要分区成这样:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
这个需要DataFrame就需要4列,分别是name,age,gender和country,write的时候如下:
dataFrame.write.partitionBy("gender", "country").parquet("path")
Schema Merging
像ProtocolBuffer,Avro,Thrift一样,Parquet也支持schema的扩展。
由于schema的自动扩展是一次昂贵的操作,所以默认情况下不是开启的,可以根据以下设置打开:
读parquet文件的时候设置参数mergeSchema为true或者设置全局的sql属性spark.sql.parquet.mergeSchema为true:
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
JSON数据源
本文之前的一个例子就是使用的JSON数据源,使用SQLContext.read.json()读取一个带有String类型的RDD或者一个json文件。
需要注意的是json文件不是一个典型的json格式的文件,每一行都是一个json对象。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// Register this DataFrame as a table.
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Hive表
需要使用HiveContext。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
JDBC
直接使用load方法加载:
sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password", "dbtable" -> "your_table"))
DataFrame是一个以命名列方式组织的分布式数据集。在概念上,它跟关系型数据库中的一张表或者1个Python(或者R)中的data frame一样,但是比他们更优化。DataFrame可以根据结构化的数据文件、hive表、外部数据库或者已经存在的RDD构造。
DataFrame的创建
Spark DataFrame可以从一个已经存在的RDD、hive表或者数据源中创建。
以下一个例子就表示一个DataFrame基于一个json文件创建:
val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("examples/src/main/resources/people.json")
// Displays the content of the DataFrame to stdout
df.show()
DataFrame的操作
直接以1个例子来说明DataFrame的操作:
json文件内容:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
程序内容:
val conf = new SparkConf().setMaster("local").setAppName("DataFrameTest")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.json(this.getClass.getResource("/").toString + "people.json")
/** 展示DataFrame的内容
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
**/
df.show()
/** 以树的形式打印出DataFrame的schema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
**/
df.printSchema()
/** 打印出name列的数据
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
**/
df.select("name").show()
/** 打印出name列和age列+1的数据,DataFrame的apply方法返回Column
+-------+---------+
| name|(age + 1)|
+-------+---------+
|Michael| null|
| Andy| 31|
| Justin| 20|
+-------+---------+
**/
df.select(df("name"), df("age") + 1).show()
/** 添加过滤条件,过滤出age字段大于21的数据
+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+
**/
df.filter(df("age") > 21).show()
/** 以age字段分组进行统计
+----+-----+
| age|count|
+----+-----+
|null| 1|
| 19| 1|
| 30| 1|
+----+-----+
**/
df.groupBy(df("age")).count().show()
使用反射推断出Schema
Spark SQL的Scala接口支持将包括case class数据的RDD转换成DataFrame。
case class定义表的schema,case class的属性会被读取并且成为列的名字,这里case class也可以被当成别的case class的属性或者是复杂的类型,比如Sequence或Array。
RDD会被隐式转换成DataFrame并且被注册成一个表,这个表可以被用在查询语句中:
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// this is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Define the schema using a case class.
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface.
case class Person(name: String, age: Int)
// Create an RDD of Person objects and register it as a table.
val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index:
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
// or by field name:
teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
// Map("name" -> "Justin", "age" -> 19)
使用编程指定Schema
当case class不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个 DataFrame 可以通过三步来创建。
1.从原来的 RDD 创建一个行的 RDD
2.创建由一个 StructType 表示的模式与第一步创建的 RDD 的行结构相匹配
3.在行 RDD 上通过 applySchema 方法应用模式
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Create an RDD
val people = sc.textFile("examples/src/main/resources/people.txt")
// The schema is encoded in a string
val schemaString = "name age"
// Import Row.
import org.apache.spark.sql.Row;
// Import Spark SQL data types
import org.apache.spark.sql.types.{StructType,StructField,StringType};
// Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
// Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
// Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
// Register the DataFrames as a table.
peopleDataFrame.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val results = sqlContext.sql("SELECT name FROM people")
// The results of SQL queries are DataFrames and support all the normal RDD operations.
// The columns of a row in the result can be accessed by field index or by field name.
results.map(t => "Name: " + t(0)).collect().foreach(println)
数据源
Spark SQL默认使用的数据源是parquet(可以通过spark.sql.sources.default修改)。
val df = sqlContext.read.load("examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
可以在读取数据源的时候指定一些往外的参数。数据源也可以使用全名称,比如org.apache.spark.sql.parquet,但是内置的数据源可以使用短名称,比如json, parquet, jdbc。任何类型的DataFrame都可以使用这种方式转换成其他类型:
val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")
df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")
使用read方法读取数据源得到DataFrame,还可以使用sql直接查询文件的方式:
val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")
保存模式:
保存方法会需要一个可选参数SaveMode,用于处理已经存在的数据。这些保存模式内部不会用到锁的概念,也不是一个原子操作。如果使用了Overwrite这种保存模式,那么写入数据前会清空之前的老数据。
Scala/Java
具体值
含义
SaveMode.ErrorIfExists (默认值)
“error” (默认值)
当保存DataFrame到数据源的时候,如果数据源文件已经存在,那么会抛出异常
SaveMode.Append
“append”
如果数据源文件已经存在,append到文件末尾
SaveMode.Overwrite
“overwrite”
如果数据源文件已经存在,清空数据
SaveMode.Ignore
“ignore”
如果数据源文件已经存在,不做任何处理。跟SQL中的 CREATE TABLE IF NOT EXISTS 类似
持久化表:
当使用HiveContext的时候,使用saveAsTable方法可以把DataFrame持久化成表。跟registerTempTable方法不一样,saveAsTable方法会把DataFrame持久化成表,并且创建一个数据的指针到HiveMetastore对象中。只要获得了同一个HiveMetastore对象的链接,当Spark程序重启的时候,saveAsTable持久化后的表依然会存在。一个DataFrame持久化成一个table也可以通过SQLContext的table方法,参数就是表的名字。
默认情况下,saveAsTable方法会创建一个”被管理的表”,被管理的表的意思是说表中数据的位置会被HiveMetastore所控制,如果表被删除了,HiveMetastore中的数据也相当于被删除了。
Parquet Files
parquet是一种基于列的存储格式,并且可以被很多框架所支持。Spark SQL支持parquet文件的读和写操作,并且会自动维护原始数据的schema,当写一个parquet文件的时候,所有的列都允许为空。
加载Parquet文件
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.
// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.
people.write.parquet("people.parquet")
// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.
// The result of loading a Parquet file is also a DataFrame.
val parquetFile = sqlContext.read.parquet("people.parquet")
//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerTempTable("parquetFile")
val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
Parquet文件的Partition
Parquet文件可以根据列自动进行分区,只需要调用DataFrameWriter的partitionBy方法即可,该方法需要的参数是需要进行分区的列。比如需要分区成这样:
path
└── to
└── table
├── gender=male
│ ├── ...
│ │
│ ├── country=US
│ │ └── data.parquet
│ ├── country=CN
│ │ └── data.parquet
│ └── ...
└── gender=female
├── ...
│
├── country=US
│ └── data.parquet
├── country=CN
│ └── data.parquet
└── ...
这个需要DataFrame就需要4列,分别是name,age,gender和country,write的时候如下:
dataFrame.write.partitionBy("gender", "country").parquet("path")
Schema Merging
像ProtocolBuffer,Avro,Thrift一样,Parquet也支持schema的扩展。
由于schema的自动扩展是一次昂贵的操作,所以默认情况下不是开启的,可以根据以下设置打开:
读parquet文件的时候设置参数mergeSchema为true或者设置全局的sql属性spark.sql.parquet.mergeSchema为true:
// sqlContext from the previous example is used in this example.
// This is used to implicitly convert an RDD to a DataFrame.
import sqlContext.implicits._
// Create a simple DataFrame, stored into a partition directory
val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")
df1.write.parquet("data/test_table/key=1")
// Create another DataFrame in a new partition directory,
// adding a new column and dropping an existing column
val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")
df2.write.parquet("data/test_table/key=2")
// Read the partitioned table
val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")
df3.printSchema()
// The final schema consists of all 3 columns in the Parquet files together
// with the partitioning column appeared in the partition directory paths.
// root
// |-- single: int (nullable = true)
// |-- double: int (nullable = true)
// |-- triple: int (nullable = true)
// |-- key : int (nullable = true)
JSON数据源
本文之前的一个例子就是使用的JSON数据源,使用SQLContext.read.json()读取一个带有String类型的RDD或者一个json文件。
需要注意的是json文件不是一个典型的json格式的文件,每一行都是一个json对象。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// A JSON dataset is pointed to by path.
// The path can be either a single text file or a directory storing text files.
val path = "examples/src/main/resources/people.json"
val people = sqlContext.read.json(path)
// The inferred schema can be visualized using the printSchema() method.
people.printSchema()
// root
// |-- age: integer (nullable = true)
// |-- name: string (nullable = true)
// Register this DataFrame as a table.
people.registerTempTable("people")
// SQL statements can be run by using the sql methods provided by sqlContext.
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
// Alternatively, a DataFrame can be created for a JSON dataset represented by
// an RDD[String] storing one JSON object per string.
val anotherPeopleRDD = sc.parallelize(
"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
val anotherPeople = sqlContext.read.json(anotherPeopleRDD)
Hive表
需要使用HiveContext。
// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
// Queries are expressed in HiveQL
sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)
JDBC
直接使用load方法加载:
sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password", "dbtable" -> "your_table"))
python中frame是什么意思_DataFrame是什么相关推荐
- python中frame中的元素怎么识别_python3.6+selenium实现操作Frame中的页面元素
有时网页中会嵌套一个或者多个Frame,此时我们直接去找嵌套在Frame里面的元素会抛出异常,所以在操作的时候我们需要将页面焦点切换到Frame里面,下面我们就以一个实例演示一下! 首先先创建三个ht ...
- python中frame是什么意思_python-Tkinter中的Frame类的目的是什么?
下面的代码将构建一个基本的按钮GUI from tkinter import * class App: def __init__(self, master): frame = Frame(master ...
- python中frame用法_Python实例之wxpython中Frame使用方法
本节为大家分享的例子是wxpython Frame的用法. 例子: 代码如下: #!/usr/bin/python # -*- coding: GBK -*- # simple.py import w ...
- python中frame是什么意思_“***Oldest frame”在ipdb中是什么意思?
u是遍历堆栈帧的PDB命令.您已经处于"最上面"的帧中.help u将告诉您有关它的更多信息:u(p) Move the current frame one level up in ...
- python中frame是什么意思_Python实例之wxpython中Frame使用方法
本节为大家分享的例子是wxpython Frame的用法. 例子: #!/usr/bin/python # -*- coding: GBK -*- # simple.py import wx app ...
- python中frame=none是什么意思啊_在python的init函数中,master和master=none的目的是什么?...
我想原因和我得到的自我相似.为什么师父在那里?另外,为什么有时它是master,有时master=none? 例如class Application(tk.Frame): def __init__(s ...
- python中frame用法_python操作dataFrame基本知识点
1)查看DataFrame数据及属性 df_obj = DataFrame() #创建DataFrame对象 df_obj.dtypes #查看各行的数据格式 df_obj['列名'].astype( ...
- python中frame用法_python:pandas中dataframe的基本用法汇总
一. DataFrame的创建 创建一个空的dataframe df=pd.DataFrame(columns={"a":"","b":&q ...
- 如何优雅的在python中暂停死循环?
死循环 有时候在工作中可能会遇到要一直执行某个功能的程序,这时候死循环就派上用途了,python中死循环的具体形式大致如下 while True:run_your_code() 结束死循环 通常我们结 ...
最新文章
- Velocity的中文问题
- centos恢复图形界面_centos图形界面的开启和关闭
- utf-8编码的字符串转成unicode(ucs-4)编码的字符串
- Kafka设计原理看了又忘,忘了又看?
- 4G内存服务器的MySQL配置优化
- 学校计算机教室安全预案,小学校园微机室安全事故应急疏散预案
- python list append 相关知识点
- 【天梯选拔月赛】参与者人数(并查集模版题!remember find_father写法!)
- 电子元器件选型——二极管
- 安卓 VNET 抓取 wskey 教程
- 天财商龙餐饮系统服务器连接不上,天财商龙餐饮系统操作.docx
- amazon aws 亚马逊云服务概述
- Bryntum Gantt 5.0 JS
- 虚拟拨号服务器名称,怎么设置虚拟拨号服务器
- ZeroLogon(CVE-2020-1472) 漏洞的攻击与防御策略(上)
- 福昕:十年暗战Adobe
- Gitlab与Jaeger集成,实现Tracing链路追踪
- 央视新闻30分:开心网流行背后存在的隐忧
- 多线程下载神器IDM,永久使用
- LaTeX \genfrac 分式命令
热门文章
- 收银机和服务器连接不上显示单机,这个收银机修理攻略我秒速收藏了
- android手机做音乐软件,安卓手机必备的五个黑科技APP,每个都强大到没有朋友!要低调使用...
- linux使用mac触控板,Y470 三点触摸板在linux和mac下的实现
- CentOS国内镜像源地址汇总持续更新
- java工作日报管理系统_GitHub - LovebuildJ/book-manager: JavaWeb图书管理系统,简单易用功能强大,可拓展性高,集成主流框架...
- 2021-03-30 一笔记 STM32基础知识
- [足式机器人]Part1 序言+简介Ch01——【Legged Robots that Balance 读书笔记】
- 为什么有的标签的实际性能和仿真不一样?---自调谐
- webdriver常用方法+鼠标键盘事件+浏览器高级操作
- 4000字带你深入掌握字符串