Spark SQL and DataFrame Guide(1.4.1)——之DataFrames
Spark SQL是处理结构化数据的Spark模块。它提供了DataFrames这样的编程抽象。同一时候也能够作为分布式SQL查询引擎使用。
DataFrames
DataFrame是一个带有列名的分布式数据集合。等同于一张关系型数据库中的表或者R/Python中的data frame,只是在底层做了非常多优化;我们能够使用结构化数据文件、Hive tables,外部数据库或者RDDS来构造DataFrames。
1. 開始入口:
入口须要从SQLContext类或者它的子类開始,当然须要使用SparkContext创建SQLContext;这里我们使用pyspark(已经自带了SQLContext即sc):
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
还能够使用HiveContext,它能够提供比SQLContext很多其它的功能。比如能够使用更完整的HiveQL解析器写查询,使用Hive UDFs。从Hive表中读取数据等。
使用HiveContext并不须要安装hive,Spark默认将HiveContext单独打包避免对hive过多的依赖
2.创建DataFrames
使用JSON文件创建:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)df = sqlContext.read.json("examples/src/main/resources/people.json")# Displays the content of the DataFrame to stdout
df.show()
注意:
这里你可能须要将文件存入HDFS(这里的文件在Spark安装文件夹中,1.4版本号)
hadoop fs -mkdir examples/src/main/resources/
hadoop fs -put /appcom/spark/examples/src/main/resources/* /user/hdpuser/examples/src/main/resources/
3.DataFrame操作
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)# Create the DataFrame
df = sqlContext.read.json("examples/src/main/resources/people.json")# Show the content of the DataFrame
df.show()
## age name
## null Michael
## 30 Andy
## 19 Justin# Print the schema in a tree format
df.printSchema()
## root
## |-- age: long (nullable = true)
## |-- name: string (nullable = true)# Select only the "name" column
df.select("name").show()
## name
## Michael
## Andy
## Justin# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()
## name (age + 1)
## Michael null
## Andy 31
## Justin 20# Select people older than 21
df.filter(df['age'] > 21).show()
## age name
## 30 Andy# Count people by age
df.groupBy("age").count().show()
## age count
## null 1
## 19 1
## 30 1
4.使用编程执行SQL查询
SQLContext能够使用编程执行SQL查询并返回DataFrame。
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.sql("SELECT * FROM table")
5.和RDD交互
将RDD转换成DataFrames有两种方法:
- 利用反射来判断包括特定类型对象的RDD的schema。这样的方法会简化代码而且在你已经知道schema的时候非常适用。
- 使用编程接口。构造一个schema并将其应用在已知的RDD上。
一、利用反射判断Schema
Spark SQL能够将含Row对象的RDD转换成DataFrame。并判断数据类型。通过将一个键值对(key/value)列表作为kwargs传给Row类来构造Rows。
key定义了表的列名,类型通过看第一列数据来判断。
(所以这里RDD的第一列数据不能有缺失)未来版本号中将会通过看很多其它数据来判断数据类型。像如今对JSON文件的处理一样。
# sc is an existing SparkContext.
from pyspark.sql import SQLContext, Row
sqlContext = SQLContext(sc)# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))# Infer the schema, and register the DataFrame as a table.
schemaPeople = sqlContext.createDataFrame(people)
schemaPeople.registerTempTable("people")# SQL can be run over DataFrames that have been registered as a table.
teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")# The results of SQL queries are RDDs and support all the normal RDD operations.
teenNames = teenagers.map(lambda p: "Name: " + p.name)
for teenName in teenNames.collect():print teenName
二、编程指定Schema
通过编程指定Schema须要3步:
- 从原来的RDD创建一个元祖或列表的RDD。
- 用StructType 创建一个和步骤一中创建的RDD中元祖或列表的结构相匹配的Schema。
- 通过SQLContext提供的createDataFrame方法将schema 应用到RDD上。
# Import SQLContext and data types
from pyspark.sql import SQLContext
from pyspark.sql.types import *# sc is an existing SparkContext.
sqlContext = SQLContext(sc)# Load a text file and convert each line to a tuple.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: (p[0], p[1].strip()))# The schema is encoded in a string.
schemaString = "name age"fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)# Apply the schema to the RDD.
schemaPeople = sqlContext.createDataFrame(people, schema)# Register the DataFrame as a table.
schemaPeople.registerTempTable("people")# SQL can be run over DataFrames that have been registered as a table.
results = sqlContext.sql("SELECT name FROM people")# The results of SQL queries are RDDs and support all the normal RDD operations.
names = results.map(lambda p: "Name: " + p.name)
for name in names.collect():print name
转载于:https://www.cnblogs.com/lytwajue/p/7290254.html
Spark SQL and DataFrame Guide(1.4.1)——之DataFrames相关推荐
- Spark SQL and DataFrame for Spark 1.3
Spark SQL and DataFrame Guide 启动Start Shell [jifeng@feng03 spark-1.3.0-bin-hadoop2.4]$ ./bin/spark-s ...
- Spark性能优化 -- Spark SQL、DataFrame、Dataset
本文将详细分析和总结Spark SQL及其DataFrame.Dataset的相关原理和优化过程. Spark SQL简介 Spark SQL是Spark中 具有 大规模关系查询的结构化数据处理 模块 ...
- dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化
Spark大数据分析中涉及到RDD.Data Frame和SparkSQL的操作,本文简要介绍三种方式在数据统计中的算子使用. 1.在IPython Notebook运行Python Spark程序 ...
- Spark修炼之道(进阶篇)——Spark入门到精通:第八节 Spark SQL与DataFrame(一)
本节主要内宾 Spark SQL简介 DataFrame 1. Spark SQL简介 Spark SQL是Spark的五大核心模块之一,用于在Spark平台之上处理结构化数据,利用Spark SQL ...
- Spark15:Spark SQL:DataFrame常见算子操作、DataFrame的sql操作、RDD转换为DataFrame、load和save操作、SaveMode、内置函数
前面我们学习了Spark中的Spark core,离线数据计算,下面我们来学习一下Spark中的Spark SQL. 一.Spark SQL Spark SQL和我们之前讲Hive的时候说的hive ...
- Spark SQL之DataFrame概述
产生背景 DataFrame不是Spark SQL提出的,而是早期在R.Pandas语言就已经有了的. Spark RDD API vs MapReduce API Spark诞生之初,其中一个很重要 ...
- Spark SQL编程DataFrame 创建_大数据培训
DataFrame 创建 在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建:从一个存在的RDD ...
- Spark SQL中的DataFrame
在2014年7月1日的 Spark Summit 上,Databricks 宣布终止对 Shark 的开发,将重点放到 Spark SQL 上.在会议上,Databricks 表示,Shark 更多是 ...
- Spark SQL之RDD转DataFrame
准备文件 首先准备好测试文件info.txt,内容如下: 1,vincent,20 2,sarah,19 3,sofia,29 4,monica,26 将RDD转成DataFrame 方式一:反射 可 ...
最新文章
- 定时自动按键软件_[按键精灵手机版教程]QUI界面也可以如此炫酷
- Python pip安装命令
- Interview:算法岗位面试—11.05下午上海某银行信息(总行,四大行之一)技术岗笔试记录
- django中路由匹配规则
- react-template 包含客户端,服务端渲染完整示例
- 处理selinux方法
- 手把手实现YOLOv3(二)
- error=Error Domain=NSURLErrorDomain Code=-1003
- 关于IPv4设置一些常见问题的解答
- JS高级——JSON、数据存储学习笔记
- 毫秒级从百亿大表任意维度筛选数据,是怎么做到的…
- SpringBoot 工程目录 整合mybatis-neo4j(注解类型)
- Python 父类调用子类方法
- Java游戏编程不完全详解-2(1万2千字吐血推荐)
- 腾达ac5第三方固件_腾达AC9官方固件增加KoolProxy版
- (学习笔记)读取PDF/OFD文件
- NAT映射和代理服务器
- python网格交易法详解_干货 | 等分网格交易法详解
- 唯美的英文短文!!!
- 百度编辑器调用135编辑器方法
热门文章
- 对Linux网络配置管理的简单使用经验
- Web前端笔记(10)Grid布局
- c++数据结构与算法 图
- JAVA程序员一定知道的优秀第三方库(2016版)
- python创建自定义函数is_number()来判断一个字符是否是数字
- Zabbix(二)通过API在zabbix系统中查看、删除及创建监控主机
- EPERM: operation not permitted, Please run SwitchHosts
- Linux刚刚安装完anaconda,启动anaconda-navigator
- 经济学家Alex Kruger:DeFi是比特币的自然演变
- Hadoop初步简介