基于PySpark的航天日志分析(SQL分析)
文章目录
- 1、导入PySpark包
- 2、创建SparkSession实例对象
- 3、读取数据(Schema()信息)
- 读取数据方法1
- 读取数据方法2
- 4、查看DataFrame数据信息(显示完整【列名】不省略)
- 6、SparkSQL模块中,结构化数据分析:DSL和SQL(filter)
- 7、分组聚合(groupBy Rename)
- 8、可视化展示(SparkSQL中DataFrame转换为Pandas中DataFrame)
基于PySpark的航天日志 ——运行环境`jupyter notebook`
1、导入PySpark包
#!/usr/bin/env python
# -*- coding: utf-8 -*-import os
import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
-----------------------------------------------------------
#设置环境变量
os.environ['JAVA_HOME'] = 'C:/Java/jdk1.8.0_91'
#hadoop安装目录
os.environ['HADOOP_HOME'] = 'C:/Java/hadoop-2.6.0-cdh5.7.6'
#设置spark安装目录
os.environ['SPARK_HOME'] = 'C:/Java/spark-2.2.0-bin-2.6.0-cdh5.7.6'
2、创建SparkSession实例对象
#创建spark实例:local[2]本地模式两个线程。
spark = SparkSession.builder\.appName('Python_Spark_test')\.master('local[2]')\.getOrCreate()
------------------------------------------------------------------
# 读取数据
print type(spark)
print spark
输出:<class 'pyspark.sql.session.SparkSession'><pyspark.sql.session.SparkSession object at 0x000000000670BD68>
3、读取数据(Schema()信息)
读取数据方法1
#读取csv格式文件
df = spark.read.csv("flights.csv",header = True)
df.head(5)
输出:[Row(year=u'2014', month=u'1', day=u'1', dep_time=u'1', dep_delay=u'96', arr_time=u'235', arr_delay=u'70', carrier=u'AS', tailnum=u'N508AS', flight=u'145', origin=u'PDX', dest=u'ANC', air_time=u'194', distance=u'1542', hour=u'0', minute=u'1'),Row(year=u'2014', month=u'1', day=u'1', dep_time=u'4', dep_delay=u'-6', arr_time=u'738', arr_delay=u'-23', carrier=u'US', tailnum=u'N195UW', flight=u'1830', origin=u'SEA', dest=u'CLT', air_time=u'252', distance=u'2279', hour=u'0', minute=u'4'),Row(year=u'2014', month=u'1', day=u'1', dep_time=u'8', dep_delay=u'13', arr_time=u'548', arr_delay=u'-4', carrier=u'UA', tailnum=u'N37422', flight=u'1609', origin=u'PDX', dest=u'IAH', air_time=u'201', distance=u'1825', hour=u'0', minute=u'8'),Row(year=u'2014', month=u'1', day=u'1', dep_time=u'28', dep_delay=u'-2', arr_time=u'800', arr_delay=u'-23', carrier=u'US', tailnum=u'N547UW', flight=u'466', origin=u'PDX', dest=u'CLT', air_time=u'251', distance=u'2282', hour=u'0', minute=u'28'),Row(year=u'2014', month=u'1', day=u'1', dep_time=u'34', dep_delay=u'44', arr_time=u'325', arr_delay=u'43', carrier=u'AS', tailnum=u'N762AS', flight=u'121', origin=u'SEA', dest=u'ANC', air_time=u'201', distance=u'1448', hour=u'0', minute=u'34')]
# 查看DataFrame的Schema信息
df.printSchema()输出:root|-- year: string (nullable = true)|-- month: string (nullable = true)|-- day: string (nullable = true)|-- dep_time: string (nullable = true)|-- dep_delay: string (nullable = true)|-- arr_time: string (nullable = true)|-- arr_delay: string (nullable = true)|-- carrier: string (nullable = true)|-- tailnum: string (nullable = true)|-- flight: string (nullable = true)|-- origin: string (nullable = true)|-- dest: string (nullable = true)|-- air_time: string (nullable = true)|-- distance: string (nullable = true)|-- hour: string (nullable = true)|-- minute: string (nullable = true)
读取数据方法2
# 指定程序自定推断Schema数据
df2 = spark.read.csv('flights.csv', header=True, inferSchema=True)
df2.printSchema()输出:root|-- year: integer (nullable = true)|-- month: integer (nullable = true)|-- day: integer (nullable = true)|-- dep_time: string (nullable = true)|-- dep_delay: string (nullable = true)|-- arr_time: string (nullable = true)|-- arr_delay: string (nullable = true)|-- carrier: string (nullable = true)|-- tailnum: string (nullable = true)|-- flight: integer (nullable = true)|-- origin: string (nullable = true)|-- dest: string (nullable = true)|-- air_time: string (nullable = true)|-- distance: integer (nullable = true)|-- hour: string (nullable = true)|-- minute: string (nullable = true)
4、查看DataFrame数据信息(显示完整【列名】不省略)
df.printSchema() # 查看Schame信息
输出:root|-- year: string (nullable = true)|-- month: string (nullable = true)|-- day: string (nullable = true)|-- dep_time: string (nullable = true)|-- dep_delay: string (nullable = true)|-- arr_time: string (nullable = true)|-- arr_delay: string (nullable = true)|-- carrier: string (nullable = true)|-- tailnum: string (nullable = true)|-- flight: string (nullable = true)|-- origin: string (nullable = true)|-- dest: string (nullable = true)|-- air_time: string (nullable = true)|-- distance: string (nullable = true)|-- hour: string (nullable = true)|-- minute: string (nullable = true)
df.schema
输出:
StructType(List(StructField(year,StringType,true),StructField(month,StringType,true),
StructField(day,StringType,true),StructField(dep_time,StringType,true),
StructField(dep_delay,StringType,true),StructField(arr_time,StringType,true),
StructField(arr_delay,StringType,true),StructField(carrier,StringType,true),
StructField(tailnum,StringType,true),StructField(flight,StringType,true),
StructField(origin,StringType,true),StructField(dest,StringType,true),
StructField(air_time,StringType,true),StructField(distance,StringType,true),
StructField(hour,StringType,true),StructField(minute,StringType,true)))
df.show() # 显示前N条数据, 默认显示前20条数据+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+|2014| 1| 1| 1| 96| 235| 70| AS| N508AS| 145| PDX| ANC| 194| 1542| 0| 1||2014| 1| 1| 4| -6| 738| -23| US| N195UW| 1830| SEA| CLT| 252| 2279| 0| 4||2014| 1| 1| 8| 13| 548| -4| UA| N37422| 1609| PDX| IAH| 201| 1825| 0| 8||2014| 1| 1| 28| -2| 800| -23| US| N547UW| 466| PDX| CLT| 251| 2282| 0| 28||2014| 1| 1| 34| 44| 325| 43| AS| N762AS| 121| SEA| ANC| 201| 1448| 0| 34||2014| 1| 1| 37| 82| 747| 88| DL| N806DN| 1823| SEA| DTW| 224| 1927| 0| 37||2014| 1| 1| 346| 227| 936| 219| UA| N14219| 1481| SEA| ORD| 202| 1721| 3| 46||2014| 1| 1| 526| -4| 1148| 15| UA| N813UA| 229| PDX| IAH| 217| 1825| 5| 26||2014| 1| 1| 527| 7| 917| 24| UA| N75433| 1576| SEA| DEN| 136| 1024| 5| 27||2014| 1| 1| 536| 1| 1334| -6| UA| N574UA| 478| SEA| EWR| 268| 2402| 5| 36||2014| 1| 1| 541| 1| 911| 4| UA| N36476| 1569| PDX| DEN| 130| 991| 5| 41||2014| 1| 1| 549| 24| 907| 12| US| N548UW| 649| PDX| PHX| 122| 1009| 5| 49||2014| 1| 1| 550| 0| 837| -12| DL| N660DL| 1634| SEA| SLC| 82| 689| 5| 50||2014| 1| 1| 557| -3| 1134| -16| AA| N3JLAA| 1094| SEA| DFW| 184| 1660| 5| 57||2014| 1| 1| 557| -3| 825| -25| AS| N562AS| 81| SEA| ANC| 188| 1448| 5| 57||2014| 1| 1| 558| -2| 801| -2| AS| N402AS| 200| SEA| SJC| 100| 697| 5| 58||2014| 1| 1| 559| -1| 916| -9| F9| N210FR| 796| PDX| DEN| 125| 991| 5| 59||2014| 1| 1| 600| 0| 1151| -19| AA| N3JFAA| 2240| SEA| ORD| 206| 1721| 6| 0||2014| 1| 1| 600| -10| 842| -8| AS| N786AS| 426| SEA| LAX| 125| 954| 6| 0||2014| 1| 1| 602| -3| 943| 5| F9| N201FR| 144| SEA| DEN| 129| 1024| 6| 2|+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+only showing top 20 rows
# 显示前5条数据, 显示每列字段的所有内容,不进行省略
df.show(n=5, truncate=False)
输出:+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+|2014|1 |1 |1 |96 |235 |70 |AS |N508AS |145 |PDX |ANC |194 |1542 |0 |1 ||2014|1 |1 |4 |-6 |738 |-23 |US |N195UW |1830 |SEA |CLT |252 |2279 |0 |4 ||2014|1 |1 |8 |13 |548 |-4 |UA |N37422 |1609 |PDX |IAH |201 |1825 |0 |8 ||2014|1 |1 |28 |-2 |800 |-23 |US |N547UW |466 |PDX |CLT |251 |2282 |0 |28 ||2014|1 |1 |34 |44 |325 |43 |AS |N762AS |121 |SEA |ANC |201 |1448 |0 |34 |+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+only showing top 5 rows
df.columns # 获取所有列的名称
输出:['year','month','day','dep_time','dep_delay','arr_time','arr_delay','carrier','tailnum','flight','origin','dest','air_time','distance','hour','minute']
df.count() # 统计条目数
输出:52535
----------------------------------------------------------------------------
df.cache() # 缓存数据"""
对RDD数据缓存到内存中,DataFrame = RDD + schema(字段类型、字段名称)也可以缓存到内存中,
但是cache()是一个Lazy 懒惰性操作,必须RDD/DataFrame使用Action函数触发
"""
print df.count()
输出:52535
6、SparkSQL模块中,结构化数据分析:DSL和SQL(filter)
# 注册DataFrame为视图
df.createOrReplaceTempView("view_tmp_flights")# 编写并执行SQL语句,95%类似于MySQL中SQL
sql_str = "SELECT dest, arr_delay FROM view_tmp_flights"
dest_df = spark.sql(sql_str)# 查看分析结果
dest_df.show(n=10)
输出:+----+---------+|dest|arr_delay|+----+---------+| ANC| 70|| CLT| -23|| IAH| -4|| CLT| -23|| ANC| 43|| DTW| 88|| ORD| 219|| IAH| 15|| DEN| 24|| EWR| -6|+----+---------+only showing top 10 rows
# 上述语句可以使用DSL分析,类似Pandas分析
df.select(df['dest'], df['arr_delay']).show(n=10)
输出:+----+---------+|dest|arr_delay|+----+---------+| ANC| 70|| CLT| -23|| IAH| -4|| CLT| -23|| ANC| 43|| DTW| 88|| ORD| 219|| IAH| 15|| DEN| 24|| EWR| -6|+----+---------+only showing top 10 rows
# 过滤
help(df.filter)
>>> df.filter(df.age > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where(df.age == 2).collect()
[Row(age=2, name=u'Alice')]
jfk_df = df.filter(df['dest'] == 'JFK')
jfk_df.show(n=3)print jfk_df.count()
输出:+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+|2014| 1| 1| 654| -6| 1455| -10| DL| N686DA| 418| SEA| JFK| 273| 2422| 6| 54||2014| 1| 1| 708| -7| 1510| -19| AA| N3DNAA| 236| SEA| JFK| 281| 2422| 7| 8||2014| 1| 1| 708| -2| 1453| -20| DL| N3772H| 2258| PDX| JFK| 267| 2454| 7| 8|+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+only showing top 3 rows932
# 上述可以使使用如下SQL表示
spark.sql("SELECT * FROM view_tmp_flights WHERE dest = 'JFK'").show(n=3)
输出:+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+|year|month|day|dep_time|dep_delay|arr_time|arr_delay|carrier|tailnum|flight|origin|dest|air_time|distance|hour|minute|+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+|2014| 1| 1| 654| -6| 1455| -10| DL| N686DA| 418| SEA| JFK| 273| 2422| 6| 54||2014| 1| 1| 708| -7| 1510| -19| AA| N3DNAA| 236| SEA| JFK| 281| 2422| 7| 8||2014| 1| 1| 708| -2| 1453| -20| DL| N3772H| 2258| PDX| JFK| 267| 2454| 7| 8|+----+-----+---+--------+---------+--------+---------+-------+-------+------+------+----+--------+--------+----+------+only showing top 3 rows
7、分组聚合(groupBy Rename)
"""使用DataFrame对象中groupBy函数实现分组统计,agg函数实现聚合-1. groupBy(self, *cols)接收一个列表作为参数,用于分组依据-2. agg(self, *exprs) 接收一个 字典作为参数,其中Key表示的是列名称,Value表示的是对列进行操作的函数名称,比如max,min,avg等等
"""daily_delay_df = df.groupBy(df.day).agg({'dep_delay': 'mean', 'arr_delay': 'mean'})
print daily_delay_df.count()
输出:31
daily_delay_df.show(n=10, truncate=False)
输出:+---+--------------------+------------------+|day|avg(arr_delay) |avg(dep_delay) |+---+--------------------+------------------+|7 |0.025215252152521524|5.243243243243243 ||15 |1.0819155639571518 |4.818353236957888 ||11 |5.749170537491706 |7.250661375661376 ||29 |6.407451923076923 |11.32174955062912 ||3 |5.629350893697084 |11.526241799437676||30 |9.433526011560694 |12.31663788140472 ||8 |0.52455919395466 |4.555904522613066 ||22 |-1.0817571690054912 |6.10231425091352 ||28 |-3.4050632911392404 |4.110270951480781 ||16 |0.31582125603864736 |4.2917420132610005|+---+--------------------+------------------+only showing top 10 rows
# 重命名DataFrame中各个字段的名称daily_delay_df.withColumnRenamed?"""
>>> df.withColumnRenamed('age', 'age2').collect()
[Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
"""
# 重命名
daily_delay_df2 = daily_delay_df.withColumnRenamed('avg(arr_delay)', 'avg_arr_delay')daily_delay_df2.printSchema()
输出:root|-- day: string (nullable = true)|-- avg_arr_delay: double (nullable = true)|-- avg(dep_delay): double (nullable = true)
daily_delay_df3 = daily_delay_df2.withColumnRenamed('avg(dep_delay)', 'avg_dep_delay')
daily_delay_df3.show(n=5)+---+--------------------+------------------+|day| avg_arr_delay| avg_dep_delay|+---+--------------------+------------------+| 7|0.025215252152521524| 5.243243243243243|| 15| 1.0819155639571518| 4.818353236957888|| 11| 5.749170537491706| 7.250661375661376|| 29| 6.407451923076923| 11.32174955062912|| 3| 5.629350893697084|11.526241799437676|+---+--------------------+------------------+only showing top 5 rows
8、可视化展示(SparkSQL中DataFrame转换为Pandas中DataFrame)
# 将SparkSQL中DataFrame转换为Pandas中DataFrame, 注意此时DataFrame中数据条目数很少
local_daily_delay_df = daily_delay_df3.toPandas()
local_daily_delay_df.head(10)
day | avg_arr_delay | avg_dep_delay | |
---|---|---|---|
0 | 7 | 0.025215 | 5.243243 |
1 | 15 | 1.081916 | 4.818353 |
2 | 11 | 5.749171 | 7.250661 |
3 | 29 | 6.407452 | 11.321750 |
4 | 3 | 5.629351 | 11.526242 |
5 | 30 | 9.433526 | 12.316638 |
6 | 8 | 0.524559 | 4.555905 |
7 | 22 | -1.081757 | 6.102314 |
8 | 28 | -3.405063 | 4.110271 |
9 | 16 | 0.315821 | 4.291742 |
# 使用matplotlib进行数据可视化,将图像内嵌 到Jupyter中如何操作
%matplotlib inline
基于PySpark的航天日志分析(SQL分析)相关推荐
- 基于实时ETL的日志存储与分析实践
日志大数据下的鱼和熊掌 我们正处于大数据.多样化数据(非结构化)的时代,实时的机器数据快速产生,做一家数据公司的核心之一是如何充分利用好大量日志数据. 由此背景,对日志的采集.存储.分析.管理也提出了 ...
- Mysql慢查询深入剖析_《深入精通Mysql(六)》系列之如何通过慢查询日志进行SQL分析和优化...
深入精通Mysql系列其他文章推荐: 从本系列第一篇<深入精通Mysql(一)>系列之Mysql整体架构和sql执行过程我们就可以知道一条sql语句的执行过程会经过优化器进行优化. 优化器 ...
- 基于Filebeat自动收集Kubernetes日志的分析系统
基于Filebeat自动收集Kubernetes日志的分析系统 摘要:Docker容器产生的日志分散在不同的相互隔离的容器中, 并且容器具有即用即销的特点, 传统的解决方式是将日志文件挂载到宿主机上, ...
- 2018年9月杭州云栖大会Workshop - 基于日志的安全分析实战
基于日志的安全分析实战 背景 越来越多的企业开始重视构建基于日志的安全分析与防护系统.我们会讲述如何使用日志服务从0到1收集海量日志,并从中实时筛选.甄别出可疑操作并快速分析,进一步构建安全大盘与可视 ...
- 基于python的Nginx日志管理分析系统
温馨提示:文末有 CSDN 平台官方提供的学长 Wechat / QQ 名片 :) 1. 项目简介 本项目利用 pandas + sklearn 对 Nginx 的日志数据进行统计分析,并利用 fla ...
- 腾讯技术课|基于Elastic Stack 搭建日志分析平台
为了让读者们可以更好的理解「如何基于Elastic Stack 搭建日志分析平台」,腾讯技术工程公众号特别邀请腾讯基础架构部的陈曦工程师通过语音录播分享的方式在「腾讯技术课」小程序里同步录制了语音+P ...
- oracle sql 查询优化器,基于ORACLE成本优化器的SQL查询优化分析与应用
第 39 卷 第 2 期2018 年 3 月 内蒙古农业大学学报( 自 然 科 学 版 ) Journal of Inner Mongolia Agricultural University ( Na ...
- 干货实战|基于Elastic Stack的日志分析系统
Elastic Stack简介 Elastic Stack是Elastic公司旗下的一系列软件总称,包括Elasticsearch.Logstash.Kibana和Beats.Elasticsearc ...
- php log 行号 debug_backtrace,PHP 基于debug_backtrace的流程日志与日志分析
#PHP 基于debug_backtrace的流程日志与日志分析# 我们都知道php测试性能有一个叫xhprof的(不知道也没事儿的确挺消耗性能的),执行后能看到全部函数的调用关系图,但是我压根不知道 ...
最新文章
- 2020考研公共课_基础精讲课_管理类联考综合能力 联考逻辑(读书笔记)
- Verilog中的加法器(半加器,全加器,串行、超前进位加法器)
- Kubernetes监控之Heapster源码分析
- Cloud Native Infrastructures Meetup 北京 | 活动安排
- bp神经网络和softmax原理_BP人工神经网络常用传递函数
- 打印机如何共享多台电脑_多台电脑打印机共享的方法
- [转载] python json 编码(dump/dumps:字典转化为json)、解码(load/loads:json转化为字典)
- 网络软工个人作业4——Alpha阶段个人总结
- 左手用R右手Python系列14——日期与时间处理
- ELK 把date替换为logstash的@timestamp
- 飞秒激光制备量子计算机,制备出世界最大规模光量子计算芯片
- Oracle 监控索引使用率脚本分享
- Javaweb生成族谱树形图
- 老树新芽,在ES6下使用Express
- igfxpers.exe
- 查看BMP格式图片的十六进制代码
- 国庆节放假调休安排来了!共7天,中疾控:不提倡聚集聚会
- Java分布式跟踪系统Zipkin(五):Brave源码分析-Brave和SpringMVC整合
- 北京航空航天大学软件学院2021年保研复试流程介绍+经验分享
- Chrome 您的连接不是私密连接 NET::ERR_CERT_INVALID
热门文章
- strcmp, strncmp和memcmp的区别
- A Survey of Zero-Shot Learning: Settings, Methods, and Applications [reading notes]
- java 嵌入ppt_Java 插入图片到PPT幻灯片
- 一个数组实现两个栈(共享栈)
- 福建农林大学计算机分数线,福建农林大学录取分数线2021是多少分(附历年录取分数线)...
- 计算机开机自启文件夹,开机启动文件夹在哪
- IPhone、Windows Mobil、Symbian、Android移动开发前景分析
- 零代码技能平台技术实践探索
- 程序员工作中用一机多屏或者大显示器的好处
- 修复iPhone手机白苹果