Spark大数据分析中涉及到RDD、Data Frame和SparkSQL的操作,本文简要介绍三种方式在数据统计中的算子使用。


1、在IPython Notebook运行Python Spark程序

IPython Notebook具备交互式界面,可以在Web界面输入Python命令后立刻看到结果,还可将数据分析的过程和运行后的命令与结果存储成笔记本,下次可以打开笔记本,重新执行这些命令,IPython Notebook笔记本可以包含文字、数学公式、程序代码、结果、图形等。

1.1 安装IPython

1)若无gcc,需先安装gcc

[root@tango-spark01 /]# gcc –v[root@tango-spark01 /]# yum install gcc

2)若无pip,安装pip

[root@tango-spark01 /]# pip –v[root@tango-spark01 /]# wget https://bootstrap.pypa.io/get-pip.py --no-check-certificate

3)安装Python开发包

[root@tango-spark01 /]# yum install python-devel

4)执行以下命令安装IPython和IPython Notebook:

[root@tango-spark01 /]# pip install ipython[root@tango-spark01 /]# pip install urllib3[root@tango-spark01 /]# pip install jupyter

5)输入ipython进入交互界面

6)输入jupyter notebook

1.2 IPython配置

1)创建远程连接密码

In [2]: from notebook.auth import passwd;In [3]: passwd()Enter password:Verify password:Out[3]: 'sha1:fea9052f4d92:114eb32c684004bf4a6196faac0b26a28156fe5d'

2)生成jupyter配置文件

[root@tango-spark01 /]# jupyter notebook --generate-configWriting default config to: /root/.jupyter/jupyter_notebook_config.py

3)打开配置文件,设置以下内容

## The IP address the notebook server will listen on.#c.NotebookApp.ip = 'localhost'c.NotebookApp.ip = '0.0.0.0'## The directory to use for notebooks and kernels.#c.NotebookApp.notebook_dir = u''c.NotebookApp.notebook_dir = u'/usr/local/spark/ipynotebook'## Hashed password to use for web authentication.#  To generate, type in a python/IPython shell:#    from notebook.auth import passwd; passwd()#  The string should be of the form type:salt:hashed-password.#c.NotebookApp.password = u''c.NotebookApp.password = u'sha1:fea9052f4d92:114eb32c684004bf4a6196faac0b26a28156fe5d'

4)打开jupyter notebook

[root@tango-spark01 /]# jupyter notebook --allow-root[I 14:20:05.618 NotebookApp] Serving notebooks from local directory: /usr/local/spark/ipynotebook[I 14:20:05.618 NotebookApp] The Jupyter Notebook is running at:[I 14:20:05.619 NotebookApp] http://(tango-spark01 or 127.0.0.1):8888/[I 14:20:05.619 NotebookApp] Use Control-C to stop this server and shut down all kernels (twice to skip confirmation).[W 14:20:05.619 NotebookApp] No web browser found: could not locate runnable browser.[I 14:21:00.346 NotebookApp] 302 GET / (192.168.112.1) 2.50ms[I 14:21:00.352 NotebookApp] 302 GET /tree? (192.168.112.1) 1.71ms[I 14:22:16.241 NotebookApp] 302 POST /login?next=%2Ftree%3F (192.168.112.1) 1.58ms

5)浏览器输入地址和端口

输入密码登录进去

1.3 在IPython Notebook中使用Spark

1)进入ipynotebook工作目录

[root@tango-spark01 /]# cd /usr/local/spark/ipynotebook[root@tango-spark01 ipynotebook]#

2)在IPython Notebook界面中运行pyspark

[root@tango-spark01 ipynotebook]# PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook"  pyspark

3)单击New选择Python 2,新建Notebook

4)新建Notebook后会出现新的页面,默认notebook名称为Untitled,单击后修改名称

5)在Notebook运行程序代码

6)保存Notebook下次可继续打开使用

2、Spark SQL、DataFrame、RDD数据统计与可视化

2.1 RDD、DataFrame和Spark SQL比较

RDD和Data Frame都是Spark平台下分布式弹性数据集,都有惰性机制,在进行创建、转换时不会立即执行,等到Action时才会遍历运算。

  1. RDD API进行数据统计,主要使用map配合reduceByKey,需要有Map/Reduce概念

  2. 与RDD不同的是Data Frame更像是传统的数据库表格,除了数据以外,还记录了数据的结构信息

  3. Spark SQL则是由DataFrame派生出来,必须先创建DataFrame,然后通过登录Spark SQL temp table就可以使用Spark SQL语句,直接使用SQL语句进行查询

下表列出在进行数据统计计算时候,RDD、Data Frame和Spark SQL使用的不同方法。

Items 功能描述
RDD API userRDD.map(lambda x:(x[2],1)).reduceByKey(lambda x,y: x+y).collect()
DataFrame user_df.select(“gender”).groupby(“gender”).count().show()
Spark SQL sqlContext.sql(“””SELECT gender,count(*) counts FROM user_table GROUP BY gender”””).show()
2.2 创建RDD、DataFrame和Spark SQL

在Hadoop YARN-client模式运行IPython Notebook

[root@tango-spark01 ipynotebook]# PYSPARK_DRIVER_PYTHON=jupyter PYSPARK_DRIVER_PYTHON_OPTS="notebook"  HADOOP_CONF_DIR=/usr/local/spark/hadoop-2.9.0/etc/hadoop  pyspark --master yarn --deploy-mode client
  • 创建RDD

1)配置文件读取路径

global Pathif sc.master[0:5] =="local":Path="file:/usr/local/spark/ipynotebook/"else:Path="hdfs://tango-spark01:9000/input/"
  1. 如果sc.master[0:5]是“local”,代表当前在本地运行,读取本地文件

  2. 如果sc.master[0:5]不是“local”,有可能是YARN client或Spark Stand Alone,必须读取HDFS文件

2)读取文本文件并且查看数据项数

RawUserRDD=sc.textFile(Path+"data/u.user")RawUserRDD.count()RawUserRDD.take(5)

3)获取每一个字段

userRDD= RawUserRDD.map(lambda line:line.split("|"))userRDD.take(5)

  • 创建Data Frame

1)创建sqlContext:在Spark早期版本中,spark context是Spark的入口、SQLContext是SQL入口、HiveContext是hive入口。在Spark 2.0中,使用Spark Session可同时具备spark context、sqlContext、HiveContext功能

sqlContext=SparkSession.builder.getOrCreate()

2)定义Schema:定义DataFrames的每个字段名与数据类型

from pyspark.sql import Rowuser_Rows = userRDD.map(lambda p:    Row(userid=int(p[0]),age=int(p[1]),gender=p[2],occupation=p[3],zipcode=p[4]))user_Rows.take(5)

3)创建DataFrames:使用sqlContext.createDataFrame()方法创建DataFrame

user_df=sqlContext.createDataFrame(user_Rows)user_df.printSchema()

4)查看DataFrames数据

user_df.show(5)

5)为DataFrame创建别名:可以使用.alias帮DataFrame创建别名

df=user_df.alias("df")df.show(5)

  • 使用SparkSQL

创建DataFrame后,使用该DataFrame登录Spark SQL temp table,登录后可以使用Spark SQL

1)登录临时表

user_df.registerTempTable("user_table")

2)使用Spark SQL查看项数

sqlContext.sql("SELECT count(*) counts FROM user_table").show()

3)多行输入Spark SQL语句,需要使用3个双引号引住SQL

sqlContext.sql("""    SELECT count(*) counts    FROM user_table    """).show()

4)使用SparkSQL查看数据,限定数据项

sqlContext.sql("SELECT * FROM user_table").show()sqlContext.sql("SELECT * FROM user_table").show(5)sqlContext.sql("SELECT * FROM user_table LIMIT 5").show()

2.3 数据统计操作
2.3.1 筛选数据
  • 使用RDD筛选数据

RDD中使用filter方法筛选每一项数据,配合lambda语句创建匿名函数传入参数

userRDD.filter(lambda r:r[3]=='technician' and r[2]=='M' and r[1]=='24').take(5)
  • 输入DataFrames筛选数据

user_df.filter((df.occupation=='technician')&(df.gender=='M')&(df.age==24)).show()
  • 使用Spark SQL筛选数据

sqlContext.sql("""SELECT *FROM user_tablewhere occupation='technician' and gender='M' and age=24""").show(5)
2.3.2 按字段给数据排序
  • RDD按字段给数据排序

userRDD.takeOrdered(5,key=lambda x:int(x[1]))——升序排序userRDD.takeOrdered(5,key=lambda x:-1*int(x[1]))——降序排序userRDD.takeOrdered(5,key=lambda x:(-int(x[1]),x[2]))——多个字段排序
  • 使用DataFrame排序

user_df.select("userid","occupation","gender","age").orderBy("age").show(5)——升序user_df.select("userid","occupation","gender","age").orderBy("age",ascending=0).show(5)df.orderBy(["age","gender"],ascending=[0,1]).show(5)——多个字段排序
  • 使用Spark SQL排序

sqlContext.sql("""SELECT userid,occupation,gender,age FROM user_tableorder by age desc,gender""").show(5)
2.3.3 显示不重复数据
  • RDD显示不重复数据

userRDD.map(lambda x:x[2]).distinct().collect()
  • DataFrame显示不重复数据

user_df.select("gender").distinct().show()
  • Spark SQL显示不重复数据

sqlContext.sql("select distinct gender FROM user_table").show()
2.3.4 分组统计数据

1)RDD分组统计数据

userRDD.map(lambda x:(x[2],1)).reduceByKey(lambda x,y:x+y).collect()

2)DataFrames分组统计数据

user_df.select("gender").groupby("gender").count().show()

3)Spark SQL分组统计数据

sqlContext.sql("""SELECT gender,count(*) counts FROM user_tablegroup by gender""").show()
2.3.5 Join联接数据
  • 准备zipcode数据

1)拷贝数据到HDFS目录下

[root@tango-spark01 data]# hadoop fs -copyFromLocal -f /usr/local/spark/ipynotebook/data/free-zipcode-database-Primary.csv  /input/data

2)读取并查看数据

Path="hdfs://tango-spark01:9000/input/"rawDataWithHeader=sc.textFile(Path+"data/free-zipcode-database-Primary.csv")rawDataWithHeader.take(5)

3)删除第一项数据

header = rawDataWithHeader.first()rawData = rawDataWithHeader.filter(lambda x:x !=header)

4)删除特殊符号

rawData.first()rData=rawData.map(lambda x:x.replace("\"",""))rData.first()

5)获取每一个字段

zipRDD=rData.map(lambda x:x.split(","))zipRDD.first()
  • 创建zipcode_tab

1)创建zipCode Row的schema

from pyspark.sql import Rowzipcode_data = zipRDD.map(lambda p:    Row(zipcode=int(p[0]),zipCodeType=p[1],city=p[2],state=p[3]))zipcode_data.take(5)

2)Row类型数据创建DataFrames

zipcode_df=sqlContext.createDataFrame(zipcode_data)zipcode_df.printSchema()

3)创建登录临时表

zipcode_df.registerTempTable("zipcode_table")zipcode_df.show(10)
  • Spark SQL联接zipcode_table

sqlContext.sql("""select u.*,z.city,z.state from user_table uleft join zipcode_table z ON u.zipcode=z.zipcodewhere z.state='NY'""").show(10)

2.3.6 使用Pandas DataFrame绘图
  • 按照不同的州统计并以直方图显示

1)转换为Pandas DataFrames

import pandas as pdGroupByState_pandas_df = GroupByState_df.toPandas().set_index('state')GroupByState_pandas_df

2)使用Pandas DataFrames绘出直方图

import matplotlib.pyplot as plt#matplotlib inlineax=GroupByState_pandas_df['count'].plot(kind='bar',title='State',figsize=(12,6),legend=True,fontsize=12)plt.show()

  • 按照不同的职业统计并以饼图显示

1)创建Occupation_df

Occupation_df=sqlContext.sql("""SELECT u.occupation,count(*) countsFROM user_table uGROUP BY occupation""")Occupation_df.show(5)

2)创建Occupation_pandas_df

Occupation_pandas_df=Occupation_df.toPandas().set_index('occupation')Occupation_pandas_df

3)用Pandas DataFrame是绘出饼图PieChart

ax=Occupation_pandas_df['counts'].plot(kind='pie',        title='occupation',figsize=(8,8),startangle=90,autopct='%1.1f%%')ax.legend(bbox_to_anchor=(1.05,1),loc=2,borderaxespad=0.)plt.show()
  1. kind='pie':绘制饼图

  2. startangle=90:设置图形旋转角度

  3. autopct='%1.1f%%':设置显示饼图%


参考资料

  1. Python+Spark 2.0+Hadoop机器学习与大数据实战,林大贵

dataframe记录数_大数据系列之Spark SQL、DataFrame和RDD数据统计与可视化相关推荐

  1. Spark SQL程序实现RDD转换DataFrame

    通过反射推断Schema 在Spark SQL中有两种方式可以在DataFrame和RDD进行转换 利用反射机制,推导包含某种类型的RDD,通过反射将其转换为指定类型的DataFrame,适用于提前知 ...

  2. 元数据与数据治理|Spark SQL结构化数据分析(第六篇)

    数据科学家们早已熟悉的R和Pandas等传统数据分析框架 虽然提供了直观易用的API,却局限于单机,无法覆盖分布式大数据场景.在Spark1.3.0以Spark SQL原有的SchemaRDD为蓝本, ...

  3. sql azure 语法_Azure SQL数据同步–在Azure SQL数据库之间复制数据和架构更改

    sql azure 语法 In this article, we will review how to configure the sync group to replicate data betwe ...

  4. Spark SQL DataFrame新增一列的四种方法

    Spark SQL DataFrame新增一列的四种方法 方法一:利用createDataFrame方法,新增列的过程包含在构建rdd和schema中 方法二:利用withColumn方法,新增列的过 ...

  5. sparkstreaming监听hdfs目录_大数据系列之Spark Streaming接入Kafka数据

    Spark Streaming官方提供Receiver-based和Direct Approach两种方法接入Kafka数据,本文简单介绍两种方式的pyspark实现. 1.Spark Streami ...

  6. 魔方大数据系列圆桌论坛(22)之 “数据智能助力产业升级“ 暨国内首个大数据微应用实验室落户上海超级计算中心签约仪式

    大数据技术已经深入应用到各个垂直产业,逐步成为助力产业转型升级的重要推动力.依托海量数据.秒级运算能力.如何围绕具体应用场景,开发更多行业落地应用,成为产业数字化升级的关键. 本着协同发展,共同促进大 ...

  7. php mysql query 行数_如何在PHP中获取MYSQL数据库返回的数据的行数?

    展开全部 1. mysql_num_rows 可得到e69da5e887aa3231313335323631343130323136353331333337383861查询记录数<?php $c ...

  8. java recordset 记录数_【求助】asp中怎样获得一个记录集(Recordset)的所有行数?

    你的位置: 问答吧 -> 网络编程 -> 问题详情 [求助]asp中怎样获得一个记录集(Recordset)的所有行数? 如标题所示 [ 本帖由 smilekiki 最后编辑于 2006- ...

  9. 大数据Hadoop之——Spark SQL+Spark Streaming

    文章目录 一.Spark SQL概述 二.SparkSQL版本 1)SparkSQL的演变之路 2)shark与SparkSQL对比 3)SparkSession 三.RDD.DataFrames和D ...

最新文章

  1. Android设置Settings:预读取设置的选项和更新设置结果【2】
  2. MySQL常用存储引擎之Federated
  3. java 网站开发实例_完整的javaweb项目
  4. 如何在WinForm中发送HTTP请求
  5. java组合与继承始示例_Java 9功能与示例
  6. UDS协议(车辆控制单元诊断系统开发架构及DID读取数据流程)
  7. 3C数码行业供应商管理方案——与供应商结为“成长共同体”-数商云
  8. 软件测试需求分析方法
  9. 4399怎么修复游戏服务器,[ 服务器 ]4399官方服务器(改ip了)
  10. 北京飞漫软件魏永明:浏览器技术与三网融合
  11. android系统时间获取方式
  12. 简单的集装箱号码识别
  13. PPT文件不能编辑,什么情况?
  14. azkaban任务一直处于preparing,解决办法
  15. Titan框架入门指南:Titan如何工作
  16. 石英晶体振荡器的基本原理
  17. 优达twitter 清理_优达资源 | 12个数据可视化工具,人人都能做出超炫图表
  18. 我来说说百度的问题吧。。别和谐就行。
  19. flask(一):简介
  20. 73 [backtrader期货策略]十大经典策略-菲阿里四价(逻辑优化版)

热门文章

  1. apachejmeter_java源码_自定义编写jmeter的Java测试代码
  2. 半素数c语言,非常简单的c题目 不懂 紧急求助
  3. Java SecurityManager getThreadGroup()方法与示例
  4. MySQL 事务的面试题总结
  5. 高质量SQL的30条建议!(后端必备)
  6. Windows任务管理 连接用户登录信息 通用类[C#版]
  7. Android软件开发之盘点所有Dialog对话框大合集(一)
  8. WinSCP中文版FTP工具 V5.19.5
  9. JSONObject与GSON的一些常用的方法的使用
  10. 剑指 Offer 32 . 从上到下打印二叉树