PySpark学习案例——北京空气质量分析
下方有数据可免费下载
目录
- 原始数据
- 环境
- 各个组件所遇到的问题
- 各种webUI端口
- Python代码
- azkaban调度
- kibana可视化
原始数据
下载数据: 请点击我.提取码:736f
或者登录:http://stateair.net/web/historical/1/1.html
原始数据(北京2015年的空气质量):
本次分析的目的只是简单的对比北京2015,2016,2017这3年的PM值,最后用柱状图表示出来。
环境
作业运行环境:系统:centos7JDK:1.8.0_91Python:3.6.8azkaban:3.81.0(需要编译好的安装包可以私信我)spark:2.4.3-bin-hadoop2.6kibana:7.7.1elasticsearch:7.7.1
开发环境:系统:Win10PyCharm:2020.1Python:3.8.1
开启的进程:45120 Jps29361 SecondaryNameNode29035 NameNode29179 DataNode30151 NodeManager 30027 ResourceManager28157 Elasticsearch43729 AzkabanExecutorServer43917 AzkabanWebServer
各个组件所遇到的问题
1.azkaban的编译最让人痛苦(搞了整整一天)
主要参考博客:https://blog.csdn.net/qq_42784606/article/details/106191408
并且每次启动AzkabanWebServer前都要命令行中输入:
curl http://hadoop000:12321/executor?action=activate(出现任何问题,直接看日志是最好的解决方法)
2.elasticsearch和kibana官网下载很慢,用华为镜像
elasticsearch:https://mirrors.huaweicloud.com/elasticsearch/
kibana:https://mirrors.huaweicloud.com/kibana/
3.elasticsearch因为内存问题无法启动
主要参考博客:https://blog.csdn.net/y506798278/article/details/94312445
elasticsearch启动时说jdk要11,这个可以忽略,用8依然可以
各种webUI端口
yarn:8088
kibana:5601
elasticsearch:9200
azkaban:8443(此端口为SSL端口,原端口为8081)
Python代码
weather3.pyfrom pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType//根据value值来判定grade
def get_grade(value):if 50 >= value >= 0:return "健康"elif value <= 100:return "中等"elif value <= 150:return "对敏感人群健康"elif value <= 200:return "不健康"elif value <= 300:return "非常健康"elif value <= 500:return "危险"elif value > 500:return "爆表"else:return None//udf函数
grade_function_udf = udf(get_grade, StringType())if __name__ == '__main__':spark = SparkSession.builder.appName("weather").getOrCreate()//数据保存到hdfs中//option("header", "true")表示原始数据的第一行作为列名,option("inferSchema", "true")会自动判定每一列的数据类型Data2017 = spark.read.format("CSV").option("header", "true").option("inferSchema", "true").load("/weather/Beijing_2017_HourlyPM25_created20170803.csv").select("Year", "Month", "Day", "Hour", "Value")Data2016 = spark.read.format("CSV").option("header", "true").option("inferSchema", "true").load("/weather/Beijing_2016_HourlyPM25_created20170201.csv").select("Year", "Month", "Day", "Hour", "Value")Data2015 = spark.read.format("CSV").option("header", "true").option("inferSchema", "true").load("/weather/Beijing_2015_HourlyPM25_created20160201.csv").select("Year", "Month", "Day", "Hour", "Value")//通过grade进行分组,并计算每组的数量group2017 = Data2017.withColumn("Grade", grade_function_udf(Data2017['Value'])).groupBy("Grade").count()group2016 = Data2016.withColumn("Grade", grade_function_udf(Data2016['Value'])).groupBy("Grade").count()group2015 = Data2015.withColumn("Grade", grade_function_udf(Data2015['Value'])).groupBy("Grade").count()//计算每组的数量所占总数的百分比result2017_2 = group2017.select("Grade", "count").withColumn("precent", group2017['count'] / Data2017.count() * 100)result2016_2 = group2016.select("Grade", "count").withColumn("precent", group2016['count'] / Data2016.count() * 100)result2015_2 = group2015.select("Grade", "count").withColumn("precent", group2015['count'] / Data2015.count() * 100)//数据的列名修改后,将数据写入到elasticsearch中result2017_2.selectExpr("Grade as grade", "count", "precent").write.format("org.elasticsearch.spark.sql").option("es.nodes", "hadoop000:9200").mode("overwrite").save("weather2017/pm")result2016_2.selectExpr("Grade as grade", "count", "precent").write.format("org.elasticsearch.spark.sql").option("es.nodes", "hadoop000:9200").mode("overwrite").save("weather2016/pm")result2015_2.selectExpr("Grade as grade", "count", "precent").write.format("org.elasticsearch.spark.sql").option("es.nodes", "hadoop000:9200").mode("overwrite").save("weather2015/pm")spark.stop()
azkaban调度
文件一:weather.job
type=command
command=sh ./wea.sh文件二:wea.sh(spark提交作业的脚本)
/home/hadoop/app/spark-2.4.3-bin-hadoop2.6/bin/spark-submit --master yarn --jars ./elasticsearch-spark-20_2.11-7.7.1.jar ./weather3.py文件三:elasticsearch-spark-20_2.11-7.7.1.jar(可在maven仓库下载)文件四:weather3.py(就是上方的python代码)//将4个文件打包
zip weather.zip weather3.py weather.job wea.sh elasticsearch-spark-20_2.11-7.7.1.jar
kibana可视化
运行的结果图如下所示:
PySpark学习案例——北京空气质量分析相关推荐
- python爬虫可视化题目北京空气质量监测数据获取与分析
任务1:数据采集 网页"http://pm25.in/beijing"中包含北京12个监测点的空气质量监测数据,请编写程序抓取网页(网页样本保存在源素材文件夹下src1目录中)上的 ...
- python空气质量分析报告_Python数据可视化:2018年北上广深空气质量分析
原标题:Python数据可视化:2018年北上广深空气质量分析 作者:法纳斯特,Python爱好者,专注爬虫,数据分析及可视化 就在这周偶然看到一个学弟吐槽天津的空气,不禁想起那段厚德载雾,自强不吸的 ...
- 二十八、接了一单Python北京空气质量数据处理
@Author :Runsen 这是接的一单Python数据分析的,文件我删了,不好意思 文章目录 北京空气质量数据处理 分析解决方法 代码 计算北京每年的PM2.5情况 计算北京每年1-12月的PM ...
- 五、空气质量分析与结果展示
五.空气质量分析与结果展示 5.1 实验背景 近年来随着城市化和工业化的发展,城市空气质量越来越差,从中央到地方各级政府对城市空气质量也越发重视.并对全国各个城市的空气质量进行了长期的采样.下面对全国 ...
- python空气质量分析与预测_干货!如何用 Python+KNN 算法实现城市空气质量分析与预测?...
原标题:干货!如何用 Python+KNN 算法实现城市空气质量分析与预测? 作者 | 李秋键 责编 | 伍杏玲 封图 | CSDN 付费下载自东方 IC 出品 | CSDN(ID:CSDNnews) ...
- Python数据分析系列(2)——美国纽约皇后区空气质量分析
感谢关注天善智能,走好数据之路↑↑↑ 欢迎关注天善智能,我们是专注于商业智能BI,人工智能AI,大数据分析与挖掘领域的垂直社区,学习,问答.求职一站式搞定! 天善智能社区地址:https://www. ...
- 基于Python的2013-2018全国城市空气质量分析
基于Python的2013-2018全国城市空气质量分析 项目摘要 本项目使用pandas/numpy工具包对557424条空气质量数据进行导入及清洗,并使用matplotlib/seaborn/py ...
- 使用 Cloud Insight SDK 监控北京空气质量!
现在越来越多的 App 都开始有广告了.特别是空气质量监测,和天气类的 App,广告还是蛮多的,眼花缭乱,真是够了. 最近刚好在用一款系统监控工具 Cloud Insight,它提供的 SDK 可以把 ...
- 城市空气质量分析与预测
城市空气质量分析与预测 一.AQI分析与预测 1.背景信息 2.任务说明 3.数据集描述 二.数据分析流程 基本流程 三.读取数据 1.导入相关的库 2.加载数据集 四.数据清洗 1.缺失值 1.1. ...
最新文章
- 目标立体检测 红外图像_一种有效的红外小目标检测方法
- android manifest 权限组,Android的单个或多个权限动态申请
- CVPR 2021 | 天津大学提出PISE:形状与纹理解耦的人体图像生成与编辑方法
- Eclipse中代码编辑背景颜色修改和XML字体修改
- 电气毕业生在国家电网都干啥工作?
- 错觉图片生成实验 - 闪现的绿点
- 做一个文字跟随鼠标java_JavaScript实现文字跟随鼠标特效
- 取消管理员取得所有权_win10如何获得管理员所有权?
- 经典Android开发教程!面试字节跳动两轮后被完虐,附面试题答案
- 流利阅读day1 Dysmorphia
- 如何利用微信活码快速裂变100个微信群?
- 关于独立DFS和域DFS板书
- 企业管理必须具备的8大要素!
- 基于EAST和Tesseract的文本检测与识别
- [HTML] HTML常见的元素
- PowerApps教程02-了解软件运行逻辑
- 安装sqlserver2016报错
- ImportError: _C.cpython-37m-x86_64-linux-gnu.so: undefined symbol:_ZN3c107Warning4warnENS_14SourceL
- Weblogic启动时报不能锁定AdminServer.lok文件的错误
- 更改xxxx 的权限: 不允许的操作