1. 气象数据集

我们要写一个气象数据挖掘的程序。气象数据是通过分布在美国各地区的很多气象传感器每隔一小时进行收集,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用 MapReduce 程序来统计分析。

  1. 数据格式

我们使用的数据来自美国国家气候数据中心、美国国家海洋和大气管理局(简称 NCDC NOAA),这些数据按行并以 ASCII 格式存储,其中每一行是一条记录。

  1. 下面我们展示一行采样数据,其中重要的字段被突出显示。该行数据被分割成很多行以突出每个字段,但在实际文件中,这些字段被整合成一行且没有任何分隔符。

1998 # 年

03     # 月

09     # 日

17     # 时

11     # 气温

-100 # 湿度

10237 # 气压

60     # 风向

72     # 风速

0     # 天气状况

0     # 每一小时的降雨量

-9999 # 每一小时的降雨量

  1. 数据文件按照气象站和日期进行组织,数据文件示例如下所示:

  1. 分析

MapReduce 任务过程分为两个处理阶段:map 阶段和reduce阶段。每个阶段都以键值对作为输入和输出,其类型由我们自己选择。 我们还需要写两个函数:map 函数和reduce 函数。

在这里,map阶段的输入是NCDC NOAA原始数据。我们选择文本格式作为输入格式,将数据集的每一行作为文本输入。键是某一行起始位置相对于文件起始位置的偏移量,不过我们不需要这个信息,所以将其忽略。

我们的map函数很简单。由于我们只对气象站和气温感兴趣,所以只需要取出这两个字段数据。在本实战中,map 函数只是一个数据准备阶段,通过这种方式来准备数据,使 reducer 函数继续对它进行处理:即统计出每个气象站30年来的平均气温。map 函数还是一个比较合适去除已损记录的地方,在 map 函数里面,我们可以筛掉缺失的或者错误的气温数据。

为了全面了解 map 的工作方式,输入以下数据作为演示

1985 07 31 02 200 94 10137 220 26 1 0 -9999

1985 07 31 03 172 94 10142 240 0 0 0 -9999

1985 07 31 04 156 83 10148 260 10 0 0 -9999

1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999

1985 07 31 06 122 72 -9999 90 0 -9999 0 0

1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999

1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999

1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999

1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999

1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999

这些数据,以键/值对的方式作为 map函数的输入,如下所示

(0, 1985 07 31 02 200 94 10137 220 26 1 0 -9999)

(62, 1985 07 31 03 172 94 10142 240 0 0 0 -9999)

(124,1985 07 31 04 156 83 10148 260 10 0 0 -9999)

(186,1985 07 31 05 133 78 -9999 250 0 -9999 0 -9999)

(248,1985 07 31 06 122 72 -9999 90 0 -9999 0 0)

(310,1985 07 31 07 117 67 -9999 60 0 -9999 0 -9999)

(371,1985 07 31 08 111 61 -9999 90 0 -9999 0 -9999)

(434,1985 07 31 09 111 61 -9999 60 5 -9999 0 -9999)

(497,1985 07 31 10 106 67 -9999 80 0 -9999 0 -9999)

(560,1985 07 31 11 100 56 -9999 50 5 -9999 0 -9999)

键(key)是文件中的偏移量,这里不需要这个信息,所以将其忽略。map 函数的功能仅限于提取气象站和气温信息,并将它们输出;map 函数的输出经由 MapReduce 框架处理后,最后发送到reduce函数。这个处理过程基于键来对键值对进行排序和分组。因此在这个示例中,reduce 函数看到的是如下输入:

(03103,[200,172,156,133,122,117,111,111,106,100])

每个气象站后面紧跟着一系列气温数据,reduce 函数现在要做的是遍历整个列表并统计出平均气温:

03103        132

上面就是最终输出结果即每一个气象站历年的平均气温。

  1. 实现

    上面已经分析完毕,下面我们就着手实现它。这里需要编写三块代码内容:

    1. map 函数、

    2. reduce函数

    3. 一些用来运行作业的代码。

    1. map 函数

    下面我们来编写 Mapper 类,实现 map() 函数,提取气象站和气温数据

    public static class TemperatureMapper extends Mapper< LongWritable, Text, Text, IntWritable> {

    /**

    * 解析气象站数据

    */

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

    // 每行气象数据

    String line = value.toString();

    // 每小时气温值

    int temperature = Integer.parseInt(line.substring(14, 19).trim());

    // 过滤无效数据

    if (temperature != -9999) {

    FileSplit fileSplit = (FileSplit) context.getInputSplit();

    // 通过文件名称提取气象站id

    String weatherStationId = fileSplit.getPath().getName().substring(5, 10);

    context.write(new Text(weatherStationId), new IntWritable(temperature));

    }

    }

    这个 Mapper 类是一个泛型类型,它有四个形参类型,分别指定 map 函数的输入键、输入值、输出键和输出值的类型。 就本示例来说,输入键是一个长整数偏移量,输入值是一行文本,输出键是气象站id,输出值是气温(整数)。Hadoop 本身提供了一套可优化网络序列化传输的基本类型,而不是使用 java 内嵌的类型。这些类型都在 org.apache.hadoop.io 包中。 这里使用 LongWritable 类型(相当于 Java 的 Long 类型)、Text 类型(相当于 Java 中的 String 类型)和 IntWritable 类型(相当于 Java 的 Integer 类型)。

    map() 方法的输入是一个键(key)和一个值(value),我们首先将 Text 类型的 value 转换成 Java 的 String 类型, 之后使用 substring()方法截取我们业务需要的值。map() 方法还提供了 Context 实例用于输出内容的写入。 在这种情况下,我们将气象站id按Text对象进行读/写(因为我们把气象站id当作键),将气温值封装在 IntWritale 类型中。只有气温数据不缺失,这些数据才会被写入输出记录中。

    1. reduce函数

    下面我们来编写 Reducer类,实现reduce函数,统计每个气象站的平均气温。

    public static class TemperatureReducer extends Reducer< Text, IntWritable, Text, IntWritable> {

    /**

    * 统计美国各个气象站的平均气温

    */

    public void reduce(Text key, Iterable< IntWritable> values,Context context) throws IOException, InterruptedException {

    IntWritable result = new IntWritable();

    int sum = 0;

    int count = 0;

    // 统计每个气象站的气温值总和

    for (IntWritable val : values) {

    sum += val.get();

    count++;

    }

    // 求每个气象站的气温平均值

    result.set(sum / count);

    context.write(key, result);

    }

    }

    同样,reduce 函数也有四个形式参数类型用于指定输入和输出类型。reduce 函数的输入类型必须匹配 map 函数的输出类型:即 Text 类型和 IntWritable 类型。 在这种情况下,reduce 函数的输出类型也必须是 Text 和 IntWritable 类型,分别是气象站id和平均气温。在 map 的输出结果中,所有相同的气象站(key)被分配到同一个reduce执行,这个平均气温就是针对同一个气象站(key),通过循环所有气温值(values)求和并求平均数所得到的。

    1. 一些用来运行作业的代码

    /**

    * 任务驱动方法

    *

    * @param arg0

    * @throws Exception

    */

    @Override

    public int run(String[] arg0) throws Exception {

    // TODO Auto-generated method stub

    // 读取配置文件

    Configuration conf = new Configuration();

    Path mypath = new Path(arg0[1]);

    FileSystem hdfs = mypath.getFileSystem(conf);

    if (hdfs.isDirectory(mypath)) {

    hdfs.delete(mypath, true);

    }

    // 新建一个任务

    Job job = new Job(conf, "temperature");

    // 设置主类

    job.setJarByClass(Temperature.class);

    // 输入路径

    FileInputFormat.addInputPath(job, new Path(arg0[0]));

    // 输出路径

    FileOutputFormat.setOutputPath(job, new Path(arg0[1]));

    // Mapper

    job.setMapperClass(TemperatureMapper.class);

    // Reducer

    job.setReducerClass(TemperatureReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    // 提交任务

    return job.waitForCompletion(true)?0:1;

    }

    /**

    * main 方法

    *

    * @param args

    * @throws Exception

    */

    public static void main(String[] args) throws Exception {

    // 数据输入路径和输出路径

    String[] args0 = {

    "hdfs://ljc:9000/buaa/weather/",

    "hdfs://ljc:9000/buaa/weatherout/"

    };

    int ec = ToolRunner.run(new Configuration(), new Temperature(), args0);

    System.exit(ec);

    }

    Configuration 类读取 Hadoop 的配置文件,如 site-core.xml、mapred-site.xml、hdfs-site.xml 等。

    Job 对象指定作业执行规范,我们可以用它来控制整个作业的运行。我们在 Hadoop 集群上运行这个作业时,要把代码打包成一个JAR文件(Hadoop在集群上发布这个文件)。 不必明确指定 JAR 文件的名称,在 Job 对象的 setJarByClass 方法中传递一个类即可,Hadoop 利用这个类来查找包含它的 JAR 文件,进而找到相关的 JAR 文件。

    构造 Job 对象之后,需要指定输入和输出数据的路径。

    1. 调用 FileInputFormat 类的静态方法 addInputPath() 来定义输入数据的路径,这个路径可以是单个的文件、一个目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一系列文件。由函数名可知,可以多次调用 addInputPath() 来实现多路径的输入。
    2. 调用 FileOutputFormat 类中的静态方法 setOutputPath() 来指定输出路径(只能有一个输出路径)。这个方法指定的是 reduce 函数输出文件的写入目录。 在运行作业前该目录是不应该存在的,否则 Hadoop 会报错并拒绝运行作业。这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是件可怕的事情)。
    3. 通过 setMapperClass() 和 setReducerClass() 指定 map 类型和reduce 类型。
    4. 通过setOutputKeyClass() 和 setOutputValueClass() 控制 map 和 reduce 函数的输出类型,正如本例所示,这两个输出类型一般都是相同的。如果不同,则通过 setMapOutputKeyClass()和setMapOutputValueClass()来设置 map 函数的输出类型。
    5. 输入的类型通过 InputFormat 类来控制,我们的例子中没有设置,因为使用的是默认的 TextInputFormat(文本输入格式)。
    6. Job 中的 waitForCompletion() 方法提交作业并等待执行完成。该方法中的布尔参数是个详细标识,所以作业会把进度写到控制台。 waitForCompletion() 方法返回一个布尔值,表示执行的成(true)败(false),这个布尔值被转换成程序的退出代码 0 或者 1。
  2. 结果

mapreduce实战:统计美国各个气象站30年来的平均气温项目分析(MapReduce处理多文件数据)相关推荐

  1. mapreduce实战:统计美国各个气象站30年来的平均气温项目分析

    气象数据集 我们要写一个气象数据挖掘的程序.气象数据是通过分布在美国各地区的很多气象传感器每隔一小时进行收集,这些数据是半结构化数据且是按照记录方式存储的,因此非常适合使用 MapReduce 程序来 ...

  2. MapReduce实战之美国气候数据MaxTemperatureVeryYear

    使用的数据来自美国国家气候数据中心,首先我们要在Linux上去下载数据,打开linux终端,在终端下写入如下命令:wget -r -c fttp://ftp.ncdc.noaa.gov/pub/dat ...

  3. mapreduce剖析气象站平均气温

    一.气象数据 按行并以 ASCII 格式存储,每一行是一条记录.下图展示了一行采样数据. 1998 #year 03 #month 09 #day 17 #hour 11 #temperature - ...

  4. Google Earth Engine(GEE)——美国大陆(CONUS)30米土壤属性概率图数据库

    SSURGO(POLARIS)土壤属性的概率重绘--美国大陆(CONUS)30米土壤属性概率图数据库.绘制的CONUS变量包括土壤质地.有机物.pH值.饱和导水率.Brooks-Corey和Van G ...

  5. 《Hadoop MapReduce实战手册》一1.4 给WordCount MapReduce程序增加combiner步骤

    本节书摘来异步社区<Hadoop MapReduce实战手册>一书中的第1章,第1.4节,作者: [美]Srinath Perera , Thilina Gunarathne 译者: 杨卓 ...

  6. 《OD大数据实战》MapReduce实战

    一.github使用手册 1. 我也用github(2)--关联本地工程到github 2. Git错误non-fast-forward后的冲突解决 3. Git中从远程的分支获取最新的版本到本地 4 ...

  7. 《Hadoop MapReduce实战手册》一1.10 使用MapReduce监控UI

    本节书摘来异步社区<Hadoop MapReduce实战手册>一书中的第1章,第1.10节,作者: [美]Srinath Perera , Thilina Gunarathne 译者: 杨 ...

  8. php+mysql统计7天、30天每天数据没有补0

    php+mysql统计7天.15天.30天没有补0: 先来看效果图 问题描述 查询数据库表中最近7天的记录 select count(id) count,FROM_UNIXTIME(dateline, ...

  9. Mapreduce程序 统计文件中每个单词出现次数

    mapreduce程序 统计文件中每个单词出现次数 调用MapReduce对文件中各个单词出现次数进行统计 一.安装环境 二.准备工作 1.创建Hadoop账户 2.更新 apt 3.安装vim 4. ...

最新文章

  1. PHP里switch用法举例,PHP Switch语句的功能实例
  2. 汇编中的条件转移指令
  3. mac下为什么光标按方向键只能一个字一个字地蹦
  4. Android 中文 API ——对话框 AlertDialog.Builder
  5. 控制好你的 Wordpress 侧边栏
  6. asp-Webshell免杀
  7. Ext4核心组件Grid的变化及学习(3):可编辑的grid
  8. 小白如何学习大神的小项目
  9. python vbs库_Python语言之requests库
  10. 不想在博问中提出的问题
  11. SAP License:如何学好SAP BASIS
  12. 【LINQ】LINQ 简介
  13. python连接sqlserver_python连接SQL Server数据库
  14. java栈链_Java实现链栈
  15. python贝叶斯分析方法实例_python 贝叶斯分析对应的代码
  16. 如何将XML转换为HL7
  17. 「暑期训练」「基础DP」 Monkey and Banana (HDU-1069)
  18. C语言中宏定义函数的运算优先级问题
  19. excel表格分割线一分为二_PDF转Excel的Python代码
  20. 迅为RK3568开发板实现的NVR/XVR方案

热门文章

  1. 电路基础学习笔记6:实验验证戴维南定理
  2. Flash air应用重启功能实现
  3. 电子税务局网上申报系统弹出Acrobat错误的解决方法
  4. 20-40男人对爱的真实想法
  5. PHP判断输入数据是否合法常用的类
  6. 宇视摄像机——枪机壁装及护罩安装步骤
  7. 企鹅极光盒子显示服务器连接失败,企鹅极光盒呢?这三种方法告诉你如何解决他们...
  8. Webots学习笔记 4.机器人控制器代码介绍
  9. GNN在生化任务上的工程实现学习笔记
  10. 运维面试题之系统运维