首发于公众号“大数据风控与机器学习”。

Spark SQL 是 Spark 处理结构化数据的一个模块, 与基础的 Spark RDD API 不同, Spark SQL 提供了查询结构化数据及计算结果等信息的接口.在内部, Spark SQL 使用这个额外的信息去执行额外的优化.有几种方式可以跟 Spark SQL 进行交互, 包括 SQL 和 Dataset API.当使用相同执行引擎进行计算时, 无论使用哪种 API / 语言都可以快速的计算。

SQL

Spark SQL 的功能之一是执行 SQL 查询,Spark SQL 也能够被用于从已存在的 Hive 环境中读取数据。当以另外的编程语言运行SQL 时, 查询结果将以 Dataset/DataFrame的形式返回,也可以使用 命令行或者通过 JDBC/ODBC与 SQL 接口交互.

DataFrames

从RDD里可以生成类似大家在pandas中的DataFrame,同时可以方便地在上面完成各种操作。

1.构建SparkSession

Spark SQL中所有功能的入口点是 SparkSession 类. 要创建一个 SparkSession, 仅使用 SparkSession.builder()就可以了:

from 

2.创建 DataFrames

在一个 SparkSession中, 应用程序可以从一个 已经存在的 RDD 或者 hive表, 或者从Spark数据源中创建一个DataFrames.

举个例子, 下面就是基于一个JSON文件创建一个DataFrame:

df = spark.read.json("data/people.json")
df.show()#必须使用show()不然不会打印

3.DataFrame 操作

DataFrames 提供了一个特定的语法用在 Scala, Java, Python and R中机构化数据的操作。

在Python中,可以通过(df.age) 或者(df['age'])来获取DataFrame的列. 虽然前者便于交互式操作, 但是还是建议用户使用后者, 这样不会破坏列名,也能引用DataFrame的类。

通过以下操作进行select

#查看字段属性
df.printSchema()

root
|-- age: long (nullable = true)
|-- name: string (nullable = true)

df.select("name").show()

df.select(["name",'age']).show()

df.select(df['name'], df['age'] + 1).show()

以下操作的filter做条件过滤

df.filter(df['age'] > 21).show()

df.groupBy("age").count().show()

还可以创建视图,然后使用SQL语句进行处理。得到的也是dataframe。

df.createOrReplaceTempView("people")
sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

spark DataFrame与RDD交互

Spark SQL 支持两种不同的方法用于转换已存在的 RDD 成为 Dataset

第一种方法是使用反射去推断一个包含指定的对象类型的 RDD 的 Schema.在你的 Spark 应用程序中当你已知 Schema 时这个基于方法的反射可以让你的代码更简洁.

第二种用于创建 Dataset 的方法是通过一个允许你构造一个 Schema 然后把它应用到一个已存在的 RDD 的编程接口.然而这种方法更繁琐, 当列和它们的类型知道运行时都是未知时它允许你去构造 Dataset.

当数据不规整,无法像csv或者excel等文件一样直接读取时,可以通过如下形式自定义dataframe样式。

from pyspark.sql import Rowsc = spark.sparkContext
lines = sc.textFile("data/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people)
schemaPeople.createOrReplaceTempView("people")
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
type(teenagers)

pyspark.sql.dataframe.DataFrame

type(teenagers.rdd)

pyspark.rdd.RDD

teenagers.rdd.first()

Row(name='Justin')

teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:print(name)

Name: Justi

以编程的方式指定Schema

也可以通过以下的方式去初始化一个 DataFrame。

  • RDD从原始的RDD创建一个RDD的toples或者一个列表;
  • Step 1 被创建后, 创建 Schema 表示一个 StructType 匹配 RDD 中的结构.
  • 通过 SparkSession 提供的 createDataFrame 方法应用 Schema 到 RDD .
from pyspark.sql.types import *sc = spark.sparkContext# Load a text file and convert each line to a Row.
lines = sc.textFile("data/people.txt")
parts = lines.map(lambda l: l.split(","))
# Each line is converted to a tuple.
people = parts.map(lambda p: (p[0], p[1].strip()))# The schema is encoded in a string.
schemaString = "name age"fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)# Apply the schema to the RDD.
schemaPeople = spark.createDataFrame(people, schema)schemaPeople.createOrReplaceTempView("people")
results = spark.sql("SELECT name FROM people")
results.show()

sparksql dataframe变成csv保存_Spark大数据分析(三):DataFrame和SQL相关推荐

  1. [Pyhon疫情大数据分析] 三.新闻信息抓取及词云可视化、文本聚类和LDA主题模型文本挖掘

    思来想去,虽然很忙,但还是挤时间针对这次肺炎疫情写个Python大数据分析系列博客,包括网络爬虫.可视化分析.GIS地图显示.情感分析.舆情分析.主题挖掘.威胁情报溯源.知识图谱.预测预警及AI和NL ...

  2. [Pyhon大数据分析] 五.人民网新闻话题抓取及Gephi构建主题知识图谱

    该系列文章是Python大数据分析系列博客,包括网络爬虫.可视化分析.GIS地图显示.情感分析.舆情分析.主题挖掘.威胁情报溯源.知识图谱.预测预警及AI和NLP应用等.希望该系列文章对您有所帮助. ...

  3. 使用python读取和保存为excel、csv、txt文件以及对DataFrame文件的基本操作

    文章目录 一.对excel文件的处理 1.读取excel文件并将其内容转化DataFrame和矩阵形式 2.将数据写入xlsx文件 3.将数据保存为xlsx文件 4.使用excel对数据进行处理的缺点 ...

  4. dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化

    Spark大数据分析中涉及到RDD.Data Frame和SparkSQL的操作,本文简要介绍三种方式在数据统计中的算子使用. 1.在IPython Notebook运行Python Spark程序 ...

  5. 如何应对大数据分析工程师面试Spark考察,看这一篇就够了

    作者丨斌迪.HappyMint 来源丨大数据与人工智能(ID:ai-big-data) [导读]本篇文章为大家带来spark面试指南,文内会有两种题型,问答题和代码题,题目大部分来自于网络上,有小部分 ...

  6. 大数据分析工程师面试集锦5--Spark面试指南

    点击上方"大数据与人工智能","星标或置顶公众号" 第一时间获取好内容 作者丨斌迪.HappyMint 编辑丨Zandy 导语 本篇文章为大家带来spark面试 ...

  7. 大数据分析工程师入门8--Spark基础

    本文为<大数据分析师入门课程>系列的第8篇,主要讲解大数据分析师必须了解的Spark基础知识,前7篇分别是JAVA基础.SCALA基础.SQL基础.SQL进阶.HIVE基础.HIVE进阶. ...

  8. 每周一书《Spark与Hadoop大数据分析》分享!

    Spark与Hadoop大数据分析比较系统地讲解了利用Hadoop和Spark及其生态系统里的一系列工具进行大数据分析的方法,既涵盖ApacheSpark和Hadoop的基础知识,又深入探讨所有Spa ...

  9. python xlwings追加数据_大数据分析Python库xlwings提升Excel工作效率教程

    原标题:大数据分析Python库xlwings提升Excel工作效率教程 Excel在当今的企业中非常非常普遍.在AAA教育,我们通常建议出于很多原因使用代码,并且我们的许多数据科学课程旨在教授数据分 ...

最新文章

  1. 教育部:住宿费可以退!你的学校退了吗?
  2. C言语教程第五章:函数(1)
  3. assert宏的用法
  4. 2020-11-19(栈帧)
  5. java 学习知识汇总
  6. 【二分】防具布置/秦腾与教学评估(ybtoj 二分-1-2/jzoj 1253/luogu 4403)
  7. Python3bytes、hex、字符串之间相互转换
  8. C++开源矩阵计算工具——Eigen的简单用法(一)
  9. 周末ROS学习沙龙第二期——自定义消息通信、视频传输、人脸检测、小车动手编程
  10. centos mysql5.5 编译安装_centos6.4下编译安装MySQL-5.5.33
  11. 安卓手机反应慢又卡怎么办_安卓手机出现卡顿反应慢的具体处理方法
  12. Linux下套接字详解(八)----select模式下服务器(非阻塞,单进程+多进程+多线程)
  13. 金蝶K3Cloud中DBUtils的用法
  14. 力扣 2104. 子数组范围和
  15. 百度云不限速下载最新破解真好用
  16. mac无法关机_Mac无法关机时该怎么办
  17. python 多线程实现多任务,多进程实行多任务
  18. 合工大宣城计算机科学与技术,2017秋大学计算机(合肥工业大学 宣城校区)
  19. 【云原生 | Kubernetes 系列】--Gitops持续交付 Tekton Pipeline使用进阶(pvc和Results)
  20. Android post参数太长请求失败问题解决

热门文章

  1. 【Python】牛客的输入输出到底怎么整??
  2. Caffe学习笔记2
  3. [云炬创业学笔记]第一章创业是什么测试4
  4. [云炬创业基础笔记] 第四章测试16
  5. [云炬python3玩转机器学习] 5-7,8 多元线性回归正规解及其实现
  6. [python调试笔记] 编译运行h5py
  7. 云炬Android开发笔记 18解决沉浸栏的Bug和被手机状态栏遮挡的问题
  8. 云炬Android开发笔记 4单Activity界面架构设计与验证
  9. python 中map()和lamda的简单实用
  10. SQL SERVER 跨服务器查询