DataFrame是什么

DataFrame是一个以命名列方式组织的分布式数据集。在概念上,它跟关系型数据库中的一张表或者1个Python(或者R)中的data frame一样,但是比他们更优化。DataFrame可以根据结构化的数据文件、hive表、外部数据库或者已经存在的RDD构造。

DataFrame的创建

Spark DataFrame可以从一个已经存在的RDD、hive表或者数据源中创建。

以下一个例子就表示一个DataFrame基于一个json文件创建:

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout

df.show()

DataFrame的操作

直接以1个例子来说明DataFrame的操作:

json文件内容:

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

程序内容:

val conf = new SparkConf().setMaster("local").setAppName("DataFrameTest")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val df = sqlContext.read.json(this.getClass.getResource("/").toString + "people.json")

/** 展示DataFrame的内容

+----+-------+

| age| name|

+----+-------+

|null|Michael|

| 30| Andy|

| 19| Justin|

+----+-------+

**/

df.show()

/** 以树的形式打印出DataFrame的schema

root

|-- age: long (nullable = true)

|-- name: string (nullable = true)

**/

df.printSchema()

/** 打印出name列的数据

+-------+

| name|

+-------+

|Michael|

| Andy|

| Justin|

+-------+

**/

df.select("name").show()

/** 打印出name列和age列+1的数据,DataFrame的apply方法返回Column

+-------+---------+

| name|(age + 1)|

+-------+---------+

|Michael| null|

| Andy| 31|

| Justin| 20|

+-------+---------+

**/

df.select(df("name"), df("age") + 1).show()

/** 添加过滤条件,过滤出age字段大于21的数据

+---+----+

|age|name|

+---+----+

| 30|Andy|

+---+----+

**/

df.filter(df("age") > 21).show()

/** 以age字段分组进行统计

+----+-----+

| age|count|

+----+-----+

|null| 1|

| 19| 1|

| 30| 1|

+----+-----+

**/

df.groupBy(df("age")).count().show()

使用反射推断出Schema

Spark SQL的Scala接口支持将包括case class数据的RDD转换成DataFrame。

case class定义表的schema,case class的属性会被读取并且成为列的名字,这里case class也可以被当成别的case class的属性或者是复杂的类型,比如Sequence或Array。

RDD会被隐式转换成DataFrame并且被注册成一个表,这个表可以被用在查询语句中:

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by field index:

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:

teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]

teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)

// Map("name" -> "Justin", "age" -> 19)

使用编程指定Schema

当case class不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个 DataFrame 可以通过三步来创建。

1.从原来的 RDD 创建一个行的 RDD

2.创建由一个 StructType 表示的模式与第一步创建的 RDD 的行结构相匹配

3.在行 RDD 上通过 applySchema 方法应用模式

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD

val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string

val schemaString = "name age"

// Import Row.

import org.apache.spark.sql.Row;

// Import Spark SQL data types

import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema

val schema =

StructType(

schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.

val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.

peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by field index or by field name.

results.map(t => "Name: " + t(0)).collect().foreach(println)

数据源

Spark SQL默认使用的数据源是parquet(可以通过spark.sql.sources.default修改)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

可以在读取数据源的时候指定一些往外的参数。数据源也可以使用全名称,比如org.apache.spark.sql.parquet,但是内置的数据源可以使用短名称,比如json, parquet, jdbc。任何类型的DataFrame都可以使用这种方式转换成其他类型:

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")

df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

使用read方法读取数据源得到DataFrame,还可以使用sql直接查询文件的方式:

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式:

保存方法会需要一个可选参数SaveMode,用于处理已经存在的数据。这些保存模式内部不会用到锁的概念,也不是一个原子操作。如果使用了Overwrite这种保存模式,那么写入数据前会清空之前的老数据。

Scala/Java

具体值

含义

SaveMode.ErrorIfExists (默认值)

“error” (默认值)

当保存DataFrame到数据源的时候,如果数据源文件已经存在,那么会抛出异常

SaveMode.Append

“append”

如果数据源文件已经存在,append到文件末尾

SaveMode.Overwrite

“overwrite”

如果数据源文件已经存在,清空数据

SaveMode.Ignore

“ignore”

如果数据源文件已经存在,不做任何处理。跟SQL中的 CREATE TABLE IF NOT EXISTS 类似

持久化表:

当使用HiveContext的时候,使用saveAsTable方法可以把DataFrame持久化成表。跟registerTempTable方法不一样,saveAsTable方法会把DataFrame持久化成表,并且创建一个数据的指针到HiveMetastore对象中。只要获得了同一个HiveMetastore对象的链接,当Spark程序重启的时候,saveAsTable持久化后的表依然会存在。一个DataFrame持久化成一个table也可以通过SQLContext的table方法,参数就是表的名字。

默认情况下,saveAsTable方法会创建一个”被管理的表”,被管理的表的意思是说表中数据的位置会被HiveMetastore所控制,如果表被删除了,HiveMetastore中的数据也相当于被删除了。

Parquet Files

parquet是一种基于列的存储格式,并且可以被很多框架所支持。Spark SQL支持parquet文件的读和写操作,并且会自动维护原始数据的schema,当写一个parquet文件的时候,所有的列都允许为空。

加载Parquet文件

// sqlContext from the previous example is used in this example.

// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.

people.write.parquet("people.parquet")

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.

// The result of loading a Parquet file is also a DataFrame.

val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.

parquetFile.registerTempTable("parquetFile")

val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

Parquet文件的Partition

Parquet文件可以根据列自动进行分区,只需要调用DataFrameWriter的partitionBy方法即可,该方法需要的参数是需要进行分区的列。比如需要分区成这样:

path

└── to

└── table

├── gender=male

│ ├── ...

│ │

│ ├── country=US

│ │ └── data.parquet

│ ├── country=CN

│ │ └── data.parquet

│ └── ...

└── gender=female

├── ...

├── country=US

│ └── data.parquet

├── country=CN

│ └── data.parquet

└── ...

这个需要DataFrame就需要4列,分别是name,age,gender和country,write的时候如下:

dataFrame.write.partitionBy("gender", "country").parquet("path")

Schema Merging

像ProtocolBuffer,Avro,Thrift一样,Parquet也支持schema的扩展。

由于schema的自动扩展是一次昂贵的操作,所以默认情况下不是开启的,可以根据以下设置打开:

读parquet文件的时候设置参数mergeSchema为true或者设置全局的sql属性spark.sql.parquet.mergeSchema为true:

// sqlContext from the previous example is used in this example.

// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory

val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")

df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,

// adding a new column and dropping an existing column

val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")

df2.write.parquet("data/test_table/key=2")

// Read the partitioned table

val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")

df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together

// with the partitioning column appeared in the partition directory paths.

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

JSON数据源

本文之前的一个例子就是使用的JSON数据源,使用SQLContext.read.json()读取一个带有String类型的RDD或者一个json文件。

需要注意的是json文件不是一个典型的json格式的文件,每一行都是一个json对象。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.

// The path can be either a single text file or a directory storing text files.

val path = "examples/src/main/resources/people.json"

val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.

people.printSchema()

// root

// |-- age: integer (nullable = true)

// |-- name: string (nullable = true)

// Register this DataFrame as a table.

people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by

// an RDD[String] storing one JSON object per string.

val anotherPeopleRDD = sc.parallelize(

"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive表

需要使用HiveContext。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL

sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

JDBC

直接使用load方法加载:

sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password", "dbtable" -> "your_table"))

DataFrame是一个以命名列方式组织的分布式数据集。在概念上,它跟关系型数据库中的一张表或者1个Python(或者R)中的data frame一样,但是比他们更优化。DataFrame可以根据结构化的数据文件、hive表、外部数据库或者已经存在的RDD构造。

DataFrame的创建

Spark DataFrame可以从一个已经存在的RDD、hive表或者数据源中创建。

以下一个例子就表示一个DataFrame基于一个json文件创建:

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout

df.show()

DataFrame的操作

直接以1个例子来说明DataFrame的操作:

json文件内容:

{"name":"Michael"}

{"name":"Andy", "age":30}

{"name":"Justin", "age":19}

程序内容:

val conf = new SparkConf().setMaster("local").setAppName("DataFrameTest")

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val df = sqlContext.read.json(this.getClass.getResource("/").toString + "people.json")

/** 展示DataFrame的内容

+----+-------+

| age| name|

+----+-------+

|null|Michael|

| 30| Andy|

| 19| Justin|

+----+-------+

**/

df.show()

/** 以树的形式打印出DataFrame的schema

root

|-- age: long (nullable = true)

|-- name: string (nullable = true)

**/

df.printSchema()

/** 打印出name列的数据

+-------+

| name|

+-------+

|Michael|

| Andy|

| Justin|

+-------+

**/

df.select("name").show()

/** 打印出name列和age列+1的数据,DataFrame的apply方法返回Column

+-------+---------+

| name|(age + 1)|

+-------+---------+

|Michael| null|

| Andy| 31|

| Justin| 20|

+-------+---------+

**/

df.select(df("name"), df("age") + 1).show()

/** 添加过滤条件,过滤出age字段大于21的数据

+---+----+

|age|name|

+---+----+

| 30|Andy|

+---+----+

**/

df.filter(df("age") > 21).show()

/** 以age字段分组进行统计

+----+-----+

| age|count|

+----+-----+

|null| 1|

| 19| 1|

| 30| 1|

+----+-----+

**/

df.groupBy(df("age")).count().show()

使用反射推断出Schema

Spark SQL的Scala接口支持将包括case class数据的RDD转换成DataFrame。

case class定义表的schema,case class的属性会被读取并且成为列的名字,这里case class也可以被当成别的case class的属性或者是复杂的类型,比如Sequence或Array。

RDD会被隐式转换成DataFrame并且被注册成一个表,这个表可以被用在查询语句中:

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)

// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by field index:

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

// or by field name:

teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]

teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)

// Map("name" -> "Justin", "age" -> 19)

使用编程指定Schema

当case class不能提前确定(例如,记录的结构是经过编码的字符串,或者一个文本集合将会被解析,不同的字段投影给不同的用户),一个 DataFrame 可以通过三步来创建。

1.从原来的 RDD 创建一个行的 RDD

2.创建由一个 StructType 表示的模式与第一步创建的 RDD 的行结构相匹配

3.在行 RDD 上通过 applySchema 方法应用模式

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// Create an RDD

val people = sc.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string

val schemaString = "name age"

// Import Row.

import org.apache.spark.sql.Row;

// Import Spark SQL data types

import org.apache.spark.sql.types.{StructType,StructField,StringType};

// Generate the schema based on the string of schema

val schema =

StructType(

schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))

// Convert records of the RDD (people) to Rows.

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))

// Apply the schema to the RDD.

val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

// Register the DataFrames as a table.

peopleDataFrame.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val results = sqlContext.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by field index or by field name.

results.map(t => "Name: " + t(0)).collect().foreach(println)

数据源

Spark SQL默认使用的数据源是parquet(可以通过spark.sql.sources.default修改)。

val df = sqlContext.read.load("examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet")

可以在读取数据源的时候指定一些往外的参数。数据源也可以使用全名称,比如org.apache.spark.sql.parquet,但是内置的数据源可以使用短名称,比如json, parquet, jdbc。任何类型的DataFrame都可以使用这种方式转换成其他类型:

val df = sqlContext.read.format("json").load("examples/src/main/resources/people.json")

df.select("name", "age").write.format("parquet").save("namesAndAges.parquet")

使用read方法读取数据源得到DataFrame,还可以使用sql直接查询文件的方式:

val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

保存模式:

保存方法会需要一个可选参数SaveMode,用于处理已经存在的数据。这些保存模式内部不会用到锁的概念,也不是一个原子操作。如果使用了Overwrite这种保存模式,那么写入数据前会清空之前的老数据。

Scala/Java

具体值

含义

SaveMode.ErrorIfExists (默认值)

“error” (默认值)

当保存DataFrame到数据源的时候,如果数据源文件已经存在,那么会抛出异常

SaveMode.Append

“append”

如果数据源文件已经存在,append到文件末尾

SaveMode.Overwrite

“overwrite”

如果数据源文件已经存在,清空数据

SaveMode.Ignore

“ignore”

如果数据源文件已经存在,不做任何处理。跟SQL中的 CREATE TABLE IF NOT EXISTS 类似

持久化表:

当使用HiveContext的时候,使用saveAsTable方法可以把DataFrame持久化成表。跟registerTempTable方法不一样,saveAsTable方法会把DataFrame持久化成表,并且创建一个数据的指针到HiveMetastore对象中。只要获得了同一个HiveMetastore对象的链接,当Spark程序重启的时候,saveAsTable持久化后的表依然会存在。一个DataFrame持久化成一个table也可以通过SQLContext的table方法,参数就是表的名字。

默认情况下,saveAsTable方法会创建一个”被管理的表”,被管理的表的意思是说表中数据的位置会被HiveMetastore所控制,如果表被删除了,HiveMetastore中的数据也相当于被删除了。

Parquet Files

parquet是一种基于列的存储格式,并且可以被很多框架所支持。Spark SQL支持parquet文件的读和写操作,并且会自动维护原始数据的schema,当写一个parquet文件的时候,所有的列都允许为空。

加载Parquet文件

// sqlContext from the previous example is used in this example.

// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

val people: RDD[Person] = ... // An RDD of case class objects, from the previous example.

// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.

people.write.parquet("people.parquet")

// Read in the parquet file created above. Parquet files are self-describing so the schema is preserved.

// The result of loading a Parquet file is also a DataFrame.

val parquetFile = sqlContext.read.parquet("people.parquet")

//Parquet files can also be registered as tables and then used in SQL statements.

parquetFile.registerTempTable("parquetFile")

val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

Parquet文件的Partition

Parquet文件可以根据列自动进行分区,只需要调用DataFrameWriter的partitionBy方法即可,该方法需要的参数是需要进行分区的列。比如需要分区成这样:

path

└── to

└── table

├── gender=male

│ ├── ...

│ │

│ ├── country=US

│ │ └── data.parquet

│ ├── country=CN

│ │ └── data.parquet

│ └── ...

└── gender=female

├── ...

├── country=US

│ └── data.parquet

├── country=CN

│ └── data.parquet

└── ...

这个需要DataFrame就需要4列,分别是name,age,gender和country,write的时候如下:

dataFrame.write.partitionBy("gender", "country").parquet("path")

Schema Merging

像ProtocolBuffer,Avro,Thrift一样,Parquet也支持schema的扩展。

由于schema的自动扩展是一次昂贵的操作,所以默认情况下不是开启的,可以根据以下设置打开:

读parquet文件的时候设置参数mergeSchema为true或者设置全局的sql属性spark.sql.parquet.mergeSchema为true:

// sqlContext from the previous example is used in this example.

// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

// Create a simple DataFrame, stored into a partition directory

val df1 = sc.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")

df1.write.parquet("data/test_table/key=1")

// Create another DataFrame in a new partition directory,

// adding a new column and dropping an existing column

val df2 = sc.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")

df2.write.parquet("data/test_table/key=2")

// Read the partitioned table

val df3 = sqlContext.read.option("mergeSchema", "true").parquet("data/test_table")

df3.printSchema()

// The final schema consists of all 3 columns in the Parquet files together

// with the partitioning column appeared in the partition directory paths.

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

JSON数据源

本文之前的一个例子就是使用的JSON数据源,使用SQLContext.read.json()读取一个带有String类型的RDD或者一个json文件。

需要注意的是json文件不是一个典型的json格式的文件,每一行都是一个json对象。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// A JSON dataset is pointed to by path.

// The path can be either a single text file or a directory storing text files.

val path = "examples/src/main/resources/people.json"

val people = sqlContext.read.json(path)

// The inferred schema can be visualized using the printSchema() method.

people.printSchema()

// root

// |-- age: integer (nullable = true)

// |-- name: string (nullable = true)

// Register this DataFrame as a table.

people.registerTempTable("people")

// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// Alternatively, a DataFrame can be created for a JSON dataset represented by

// an RDD[String] storing one JSON object per string.

val anotherPeopleRDD = sc.parallelize(

"""{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

val anotherPeople = sqlContext.read.json(anotherPeopleRDD)

Hive表

需要使用HiveContext。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

// Queries are expressed in HiveQL

sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

JDBC

直接使用load方法加载:

sqlContext.load("jdbc", Map("url" -> "jdbc:mysql://localhost:3306/your_database?user=your_user&password=your_password", "dbtable" -> "your_table"))

python中frame是什么意思_DataFrame是什么相关推荐

  1. python中frame中的元素怎么识别_python3.6+selenium实现操作Frame中的页面元素

    有时网页中会嵌套一个或者多个Frame,此时我们直接去找嵌套在Frame里面的元素会抛出异常,所以在操作的时候我们需要将页面焦点切换到Frame里面,下面我们就以一个实例演示一下! 首先先创建三个ht ...

  2. python中frame是什么意思_python-Tkinter中的Frame类的目的是什么?

    下面的代码将构建一个基本的按钮GUI from tkinter import * class App: def __init__(self, master): frame = Frame(master ...

  3. python中frame用法_Python实例之wxpython中Frame使用方法

    本节为大家分享的例子是wxpython Frame的用法. 例子: 代码如下: #!/usr/bin/python # -*- coding: GBK -*- # simple.py import w ...

  4. python中frame是什么意思_“***Oldest frame”在ipdb中是什么意思?

    u是遍历堆栈帧的PDB命令.您已经处于"最上面"的帧中.help u将告诉您有关它的更多信息:u(p) Move the current frame one level up in ...

  5. python中frame是什么意思_Python实例之wxpython中Frame使用方法

    本节为大家分享的例子是wxpython Frame的用法. 例子: #!/usr/bin/python # -*- coding: GBK -*- # simple.py import wx app ...

  6. python中frame=none是什么意思啊_在python的init函数中,master和master=none的目的是什么?...

    我想原因和我得到的自我相似.为什么师父在那里?另外,为什么有时它是master,有时master=none? 例如class Application(tk.Frame): def __init__(s ...

  7. python中frame用法_python操作dataFrame基本知识点

    1)查看DataFrame数据及属性 df_obj = DataFrame() #创建DataFrame对象 df_obj.dtypes #查看各行的数据格式 df_obj['列名'].astype( ...

  8. python中frame用法_python:pandas中dataframe的基本用法汇总

    一. DataFrame的创建 创建一个空的dataframe df=pd.DataFrame(columns={"a":"","b":&q ...

  9. 如何优雅的在python中暂停死循环?

    死循环 有时候在工作中可能会遇到要一直执行某个功能的程序,这时候死循环就派上用途了,python中死循环的具体形式大致如下 while True:run_your_code() 结束死循环 通常我们结 ...

最新文章

  1. Velocity的中文问题
  2. centos恢复图形界面_centos图形界面的开启和关闭
  3. utf-8编码的字符串转成unicode(ucs-4)编码的字符串
  4. Kafka设计原理看了又忘,忘了又看?
  5. 4G内存服务器的MySQL配置优化
  6. 学校计算机教室安全预案,小学校园微机室安全事故应急疏散预案
  7. python list append 相关知识点
  8. 【天梯选拔月赛】参与者人数(并查集模版题!remember find_father写法!)
  9. 电子元器件选型——二极管
  10. 安卓 VNET 抓取 wskey 教程
  11. 天财商龙餐饮系统服务器连接不上,天财商龙餐饮系统操作.docx
  12. amazon aws 亚马逊云服务概述
  13. Bryntum Gantt 5.0 JS
  14. 虚拟拨号服务器名称,怎么设置虚拟拨号服务器
  15. ZeroLogon(CVE-2020-1472) 漏洞的攻击与防御策略(上)
  16. 福昕:十年暗战Adobe
  17. Gitlab与Jaeger集成,实现Tracing链路追踪
  18. 央视新闻30分:开心网流行背后存在的隐忧
  19. 多线程下载神器IDM,永久使用
  20. LaTeX \genfrac 分式命令

热门文章

  1. 收银机和服务器连接不上显示单机,这个收银机修理攻略我秒速收藏了
  2. android手机做音乐软件,安卓手机必备的五个黑科技APP,每个都强大到没有朋友!要低调使用...
  3. linux使用mac触控板,Y470 三点触摸板在linux和mac下的实现
  4. CentOS国内镜像源地址汇总持续更新
  5. java工作日报管理系统_GitHub - LovebuildJ/book-manager: JavaWeb图书管理系统,简单易用功能强大,可拓展性高,集成主流框架...
  6. 2021-03-30 一笔记 STM32基础知识
  7. [足式机器人]Part1 序言+简介Ch01——【Legged Robots that Balance 读书笔记】
  8. 为什么有的标签的实际性能和仿真不一样?---自调谐
  9. webdriver常用方法+鼠标键盘事件+浏览器高级操作
  10. 4000字带你深入掌握字符串