一、数据集分析

数据文件按照气象站和日期进行组织,每个气象站都是一个总目录,而且每个气象站下面从 1980 年到 2010 年,每一年又都作为一个子目录。 因为某国有成千上万个气象站,所以整个数据集由大量的小文件组成。通常情况下,处理少量的大型文件更容易、更有效,因此,这些数据需要经过预处理,将每个气象站的数据文件拼接成一个单独的文件。 预处理过的数据文件示例如下所示:

30yr_03103.dat
30yr_03812.dat
30yr_03813.dat
30yr_03816.dat
30yr_03820.dat
30yr_03822.dat
30yr_03856.dat
30yr_03860.dat
30yr_03870.dat
30yr_03872.dat

其中03103、03812、03813、03816、03820、03822、03856、03860、03870、03872代表的是气象站编号。

这些数据按行并以 ASCII 格式存储,其中每一行是一条记录。 下面我们展示一行采样数据,其中重要的字段被突出显示。该行数据被分割成很多行以突出每个字段,但在实际文件中,这些字段被整合成一行且没有任何分隔符。

数据              含义              所占位数
1998            #year               4
03              #month              3
09              #day                3
17              #hour               3
11              #temperature        6
-100            #dew                6
10237           #pressure           6
60              #wind_direction     6
72              #wind_speed         6
0               #sky_condition      6
0               #rain_1h            6
-9999           #rain_6h            6

真实数据如下所示:

2010 09 08 21   200    83 10091   230    72 -9999     0 -9999

数据集下载:请点击这里.

二、将数据导入到HDFS上

通过HDFS Java API封装了一个本地上传文件到HDFS的工具类HDFSUploadFileUtil,目前支持将本地目录下的所有文件上传到HDFS的某个目录,也可以通过正则表达式去过滤本地目录的文件,将不需要的文件过滤掉。实现代码如下:

private static FileSystem fileSystem = null;private static FileSystem localFileSystem = null;public static void uploadFile(String srcDirectory, String tagDirectory, String hdfsUrl, String regex) throws URISyntaxException, IOException {Configuration conf = new Configuration();URI uri = new URI(hdfsUrl);if (fileSystem == null) {fileSystem = FileSystem.get(uri, conf);}if (localFileSystem == null) {localFileSystem = FileSystem.getLocal(conf);}Path path = new Path(hdfsUrl + tagDirectory);if (fileSystem.exists(path)) {if (fileSystem.listStatus(path).length > 0) {fileSystem.close();throw new IllegalArgumentException("目标目录存在其他文件!");}} else {fileSystem.mkdirs(path);}FileStatus[] listStatus = null;if (regex.length() > 0) {listStatus = localFileSystem.globStatus(new Path(srcDirectory), new RegexAcceptPathFilter(regex));} else {listStatus = localFileSystem.globStatus(new Path(srcDirectory));}Path[] sources = FileUtil.stat2Paths(listStatus);for (Path p : sources) {fileSystem.copyFromLocalFile(p, path);System.out.println("文件[" + p.toString() + "]上传成功!");}fileSystem.close();
}

三、spark分析数据

我们要计算每个气象站的平均气温,那首先要得到两个东西:气象站编号气温值大小

气温值大小:通过观察数据,我们知道截取每行数据的14~19位就可以获取到气温值

气象站编号:我们发现,在数据中是获取不到我们所需要的气象站编号,只能通过输入文件的文件名去截取,所以这里的解决方案是通过hadoop的InputSplit去获取文件名,具体代码如下:

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]val lines = hadoopRDD.mapPartitionsWithInputSplit((inputSplit : InputSplit, iterator : Iterator[(LongWritable, Text)]) =>{val file = inputSplit.asInstanceOf[FileSplit]iterator.map(x => x._2 + " " + file.getPath.toString)
})

这里通过InputSplit获取到输入文件的名称之后,将文件名拼在每行数据的后面,产生新的RDD

然后就可以用截取每行数据获取我们所需要的信息,同时在这里我们可以对数据做校验,过滤掉没用的数据:

val groupRDD = lines.filter(line => line.substring(14, 19).trim().toInt != 9999).map(line => (line.substring(line.length - 9, line.length - 4), line.substring(14, 19).trim().toInt))

获取到的数据为:("03812", 22),key为气象站编号,value为气温值

最后,通过强大的combineByKey进行平均值的计算,并打印出来,也可以将输出结果保存到文件中:

val resultRDD = groupRDD.combineByKey((temp) => (1, temp),(tmp : (Int, Int), temp) => (tmp._1 + 1, tmp._2 + temp),(tmp1 : (Int, Int), tmp2 : (Int, Int)) => (tmp1._1 + tmp2._1, tmp1._2 + tmp2._2)
).map{case(name, (num, temp)) => (name, temp/num)}.collect().foreach(println)

四、完整代码

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, TextInputFormat}
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.{SparkConf, SparkContext}object WeatherAverage {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("weather-average")val sc = new SparkContext(conf)val input = "/data/spark-example/weather/"val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]val lines = hadoopRDD.mapPartitionsWithInputSplit((inputSplit : InputSplit, iterator : Iterator[(LongWritable, Text)]) =>{val file = inputSplit.asInstanceOf[FileSplit]iterator.map(x => x._2 + " " + file.getPath.toString)})val groupRDD = lines.filter(line => line.substring(14, 19).trim().toInt != 9999).map(line => (line.substring(line.length - 9, line.length - 4), line.substring(14, 19).trim().toInt))val resultRDD = groupRDD.combineByKey((temp) => (1, temp),(tmp : (Int, Int), temp) => (tmp._1 + 1, tmp._2 + temp),(tmp1 : (Int, Int), tmp2 : (Int, Int)) => (tmp1._1 + tmp2._1, tmp1._2 + tmp2._2)).map{case(name, (num, temp)) => (name, temp/num)}.collect().foreach(println)sc.stop()}
}   

五、打包,提交spark任务

通过maven将写好的代码打成jar包,并上传到装有spark的服务器的某个目录上,并运行

bin/spark-submit \
--class cn.oldsix.spark.weather.WeatherAverage \
--master local \
/home/oldsix/app/app-jars/spark/spark-example-0.0.1.jar

从控制台可以看到如下结果:

转载于:https://www.cnblogs.com/oldsix666/articles/9458186.html

spark分析某国气象站平均气温实例相关推荐

  1. mapreduce剖析气象站平均气温

    一.气象数据 按行并以 ASCII 格式存储,每一行是一条记录.下图展示了一行采样数据. 1998 #year 03 #month 09 #day 17 #hour 11 #temperature - ...

  2. mapreduce实战:统计美国各个气象站30年来的平均气温项目分析(MapReduce处理多文件数据)

    气象数据集 我们要写一个气象数据挖掘的程序.气象数据是通过分布在美国各地区的很多气象传感器每隔一小时进行收集,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用 MapReduce 程序来 ...

  3. mapreduce实战:统计美国各个气象站30年来的平均气温项目分析

    气象数据集 我们要写一个气象数据挖掘的程序.气象数据是通过分布在美国各地区的很多气象传感器每隔一小时进行收集,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用 MapReduce 程序来 ...

  4. 1月全国平均气温-4.1℃ 较常年同期偏高0.9℃

    中新网1月31日电 31日,中国气象局应急减灾与公共服务司司长.新闻发言人张祖强表示,2019年1月(1-29日),全国平均气温-4.1℃,较常年同期偏高0.9℃.全国平均降水量11.9毫米,较常年同 ...

  5. 各纬度气候分布图_读我国一月平均气温分布图,寻找我国冬季气温最高和最低的地方...

    地图是地理学的语言,地图能够科学地反映出自然和社会经济现象的分布特征及其相互关系,所以我们阅读地图也能获取许多的地理信息.之前我们已经通过读中国年平均气温分布图.世界年降水量分布图.世界年平均气温分布 ...

  6. 【数据分享】1901-2021年1km分辨率逐月平均气温栅格数据(全国/分省/免费获取)

    气温数据是我们最常用的气象指标之一,之前我们给大家分享过1950-2022年0.1° x 0.1°精度的逐月平均气温栅格数据和逐年平均气温栅格数据(均可查询之前的文章获悉详情)! 本次我们分享的是精度 ...

  7. 离线轻量级大数据平台Spark之MLib机器学习协同过滤ALS实例

    1.协同过滤 协同过滤(Collaborative Filtering,简称CF,WIKI上的定义是:简单来说是利用某个兴趣相投.拥有共同经验之群体的喜好来推荐感兴趣的资讯给使用者,个人透过合作的机制 ...

  8. 各纬度气候分布图_读中国年平均气温分布图,寻找中国全年平均气温最高和最低的地方...

    自然地理环境包括地形.气候.土壤.植被和水文等要素,一个地区的气候又是由气温.降水.光照等要素来组成的.在分析一个地区的气温特征时,我们主要从年均温和气温年较差两个角度来入手,影响气温高低的因素有很多 ...

  9. 运用自回归滑动平均模型、灰色预测模型、BP神经网络三种模型分别预测全球平均气温,并进行预测精度对比(附代码、数据)

    大家好,我是带我去滑雪,每天教你一个小技巧!全球变暖是近十年来,人们关注度最高的话题.2022年夏天,蔓延全球40℃以上的极端天气不断刷新人们对于高温的认知,人们再也不会像从前那样认为全球变暖离我们遥 ...

最新文章

  1. [leedcode 118] Pascal's Triangle
  2. Codeforces Round #645 (Div. 2)(D.The Best Vacation)
  3. Linux stty
  4. GoAhead 2.5 Web Server 网页ROM化的改进
  5. MFC制作打地鼠小游戏
  6. 编写一个生成器需要编写__iter__和__next__
  7. 自己动手Centos7搭建wordpress网站步骤(LNMP+wordpress)
  8. DOTween Sequence的使用
  9. AngularJs HelloWorld
  10. python批量查询ip归属地_IP地址地理信息批量查询小工具
  11. 使用js获取移动端设备屏幕高度和宽度尺寸的方法
  12. 受移动竞争所致,联通的宽带用户大跌,电信将面临同样遭遇
  13. 【mean teacher】RuntimeError: Integer division of tensors using div or / is no longer suppor的解决
  14. paper的经验和会议排名
  15. 杂谈(20210405)
  16. Crash: ‘SQLiteDatabaseCorruptException: file is encrypted or is not a database‘的分析与解决
  17. idea spring boot 修改 html,js 等不用重启即时生效
  18. 金铜仙人辞汉歌-李贺
  19. Linux系统一键安装可视化桌面环境支持浏览器及RDP访问
  20. 计算机导论sql试题,数据库SQL语句练习题

热门文章

  1. LabVIEW数据采集:配套视频教程第2集(2.1.1节-2.1.13节)
  2. 仿饿了么订餐外卖系统
  3. 列车时刻表的数据存储策略
  4. Python调用OpenCV接口播放本地视频文件、本地和网络摄像头
  5. php iis oracle,部署IIS+PHP+Oracle环境
  6. IKE与IPSec详解
  7. 2023上半年软考记录
  8. 2022软考成绩能不能复核?
  9. 中国薪酬发展报告(2011年)
  10. 海湾gst5000协议号_海湾GST5000/GST9000主机电源盒/DC-DC火灾报警控制器电源盒