MapReduce实操5-1数据预处理——巴西利亚历史气温数据分析
大数据相关博客的目录
数据集及其介绍
数据来源:conventional_weather_stations_inmet_brazil_1961_2019.csv
序号 | 葡萄牙文字段 - 英文字段 | 中文描述 |
---|---|---|
1 | Esracao - Weather station sode | 气象监测站代码 |
2 | Data - Date (dd/MM/YYYY) | 日期 |
3 | Hora - Hour (%H%M) | 小时 |
4 | Precipitacao - Precipitation (mm) | 降水量 |
5 | TempBulboSeco - Dry bulb temperature (°C) | 干球湿度 |
6 | TempBulboUmido - Wet bulb temperature (°C) | 湿球湿度 |
7 | UmidadeRelativa - Relative humidity (%) | 最高温度 |
8 | TempMinima - Minimum temperature (°C) | 最低温度 |
9 | UmidadeRelativa - Relative humidity (%) | 相对湿度 |
10 | PressaoAtmEstacao - Station Atmospheric Pressure (mbar) | 站大气压力 |
11 | PressaoAtmMar - Atmospheric pressure at sea level (mbar) | 海平面的大气压 |
12 | DirecaoVento - Wind Direction (tabela) | 风向 |
13 | VelocidadeVento - Wind speed (m/s) | 风速 |
14 | Insolacao - Insolation (hs) | 日照 |
15 | Nebulosidade - Cloudiness (tenths) | 云量 |
16 | Evaporacao Piche - Piche Evaporation (mm) | 蒸发量 |
17 | Temp Comp Media - Average Compensated Temperature (°C) | 平均补偿温度(平均温度) |
18 | Umidade Relativa Media - Avarage Relative Humidity (%) | 平均相对湿度(平均湿度) |
19 | Velocidade do Vento Media - Average Wind Speed (mps) | 平均风速 |
需求分析
任务
- 天气数据导入
- 查询某一天的天气数据
- 查询每一年的最高气温
- 查询每年的最低气温
- 查询每年的平均气温
- 查询每年下雨天数
- 预测明天气温
- 菜单与运行
- 数据可视化
思路分析
天气数据导入
通过Web管理界面上传
查询
由于Hadoop不擅长秒级响应,因此应当在查询之前将数据按照需求处理完毕。
该需求中,涉及按天和按年查询,因此实体类中需要记录日期,当天的最高气温、最低气温和平均气温,以及降水量。在数据集中,不光包含了83377观测站记录的信息,因此实体类中也应当添加观测站点的编号,以保证后续的业务扩大。
数据集中,一天有三条记录,这三条记录分别记录了最高气温、最低气温和平均气温,以及每次观测时的降水量,因此可以将原数据压缩,将每天的数据汇总在一条数据中。
预测
可视化
工具类编写:HadoopUtils.java
由于需要多次使用MapReduce,有大量重复性工作,因此编写工具类。
package club.kwcoder.weather.util;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;/*** Hadoop 工具类* @author zhinushannan*/
public class HadoopUtils {/*** hadoop配置*/private static final Configuration conf;/*** hdfs文件系统*/private static final FileSystem hdfs;static {conf = new Configuration();try {hdfs = FileSystem.get(conf);} catch (IOException e) {throw new RuntimeException(e);}}/*** 若指定目标路径存在,则删除* @param targetPath 指定目标路径*/public static void deleteIfExist(Path targetPath) {try {if (hdfs.exists(targetPath)) {hdfs.delete(targetPath, true);}} catch (IOException e) {throw new RuntimeException(e);}}/*** 打印指定路径下的文件内容* @param path 指定路径*/public static void showAllFiles(Path path) {FSDataInputStream open = null;InputStreamReader inputStreamReader = null;BufferedReader bufferedReader = null;try {if (!hdfs.exists(path)) {throw new RuntimeException("target path is not exist!");}FileStatus[] fileStatuses = hdfs.listStatus(path);for (FileStatus fileStatus : fileStatuses) {if (fileStatus.isFile()) {open = hdfs.open(fileStatus.getPath());inputStreamReader = new InputStreamReader(open);bufferedReader = new BufferedReader(inputStreamReader);bufferedReader.lines().forEach(System.out::println);}}} catch (IOException e) {e.printStackTrace();} finally {if (bufferedReader != null) {try {bufferedReader.close();} catch (IOException e) {e.printStackTrace();}}if (inputStreamReader != null) {try {inputStreamReader.close();} catch (IOException e) {e.printStackTrace();}}if (open != null) {try {open.close();} catch (IOException e) {e.printStackTrace();}}}}public static Configuration getConf() {return conf;}public static FileSystem getHdfs() {return hdfs;}
}
工具类编写:ValidateUtils.java
package club.kwcoder.weather.util;import org.apache.commons.lang3.StringUtils;/*** 校验工具类,统一返回方式,针对本项目,合法返回false,非法返回true** @author zhinushannan*/
public class ValidateUtils {/*** 校验输入文本数据* @param line 文本数据* @return 空返回true*/public static boolean validate(String line) {return StringUtils.isBlank(line);}/*** 校验字符串数组长度* @param items 字符串数组* @param length 长度* @return 不相同返回true*/public static boolean validate(String[] items, int length) {return items.length != length;}/*** 校验输入文本数据并分割* @param line 输入文本数据* @param sep 分隔符* @param limit 目标长度* @return 当字符串为空或无法分割到目标长度时,返回true*/public static String[] splitAndValidate(String line, String sep, int limit) {if (validate(line)) {return null;}String[] split = line.split(sep, limit);return validate(split, limit) ? null : split;}}
启动器编写:WeatherStarter.java
因为需要多次启动MapReduce,会有较多的程序,因此借助启动器进行统一规划。
package club.kwcoder.weather;import club.kwcoder.weather.runner.Runner;
import club.kwcoder.weather.util.HadoopUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.Map;/*** 启动类*/
public class WeatherStarter {/*** 运行时参数,用于添加输入/输出路径*/private static Map<String, String> PATH;static {PATH = new HashMap<>();}public static void main(String[] args) throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException {}/*** 根据Runner接口的实现类,利用反射机制获取该类实例和run方法并调用** @param step run类* @param jobName 运行的job名称* @param inputKey 输入路径* @param outputKey 输出路径*/public static void run(Class<? extends Runner> step, String jobName, String inputKey, String outputKey) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException, InstantiationException {String input = PATH.get(inputKey);String output = PATH.get(outputKey);// 获取run方法Method run = step.getMethod("run", RunnerBuilder.class);RunnerBuilder build = new RunnerBuilder().setJobName(jobName).setInput(input).setOutput(output).build();// 获取实例对象Runner runner = step.newInstance();// 调用方法run.invoke(runner, build);}public static class RunnerBuilder {private Configuration conf;private String jobName;private FileSystem hdfs;private Path input;private Path output;private RunnerBuilder() {}public RunnerBuilder build() {this.conf = HadoopUtils.getConf();this.hdfs = HadoopUtils.getHdfs();if (jobName == null || input == null || output == null) {throw new RuntimeException("参数配置不完整!");}try {if (this.hdfs.exists(output)) {hdfs.delete(output, true);}} catch (IOException e) {throw new RuntimeException(e);}return this;}public RunnerBuilder setJobName(String jobName) {this.jobName = jobName;return this;}public RunnerBuilder setInput(String input) {this.input = new Path(input);return this;}public RunnerBuilder setOutput(String output) {this.output = new Path(output);return this;}public Configuration getConf() {return conf;}public String getJobName() {return jobName;}public FileSystem getHdfs() {return hdfs;}public Path getInput() {return input;}public Path getOutput() {return output;}}}
数据清洗
需求
将原数据清洗为如下格式:
监测站代码、日期、降水量、最高温度、最低温度、平均温度
方案
- 筛选并只保留83377观测站的信息
- 将每一天的三条数据合并
- 丢弃非法记录
代码编写
实体类WeatherWritable.java
package club.kwcoder.weather.writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;public class WeatherWritable implements WritableComparable<WeatherWritable> {/*** 气象站代码*/private String code;/*** 日期*/private String date;/*** 降水量*/private Float precipitation;/*** 最高温度*/private Float maxTemperature;/*** 最低温度*/private Float minTemperature;/*** 平均温度*/private Float avgTemperature;public static class Builder {private String code;private String date;private Float precipitation;private Float maxTemperature;private Float minTemperature;private Float avgTemperature;public Builder setCode(String code) {this.code = code;return this;}public Builder setDate(String date) {this.date = date;return this;}public Builder setPrecipitation(Float precipitation) {this.precipitation = precipitation;return this;}public Builder setMaxTemperature(Float maxTemperature) {this.maxTemperature = maxTemperature;return this;}public Builder setMinTemperature(Float minTemperature) {this.minTemperature = minTemperature;return this;}public Builder setAvgTemperature(Float avgTemperature) {this.avgTemperature = avgTemperature;return this;}public WeatherWritable build() {return new WeatherWritable(this);}public Builder() {}}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(this.code);out.writeUTF(this.date);out.writeFloat(this.precipitation);out.writeFloat(this.maxTemperature);out.writeFloat(this.minTemperature);out.writeFloat(this.avgTemperature);}@Overridepublic void readFields(DataInput in) throws IOException {this.code = in.readUTF();this.date = in.readUTF();this.precipitation = in.readFloat();this.maxTemperature = in.readFloat();this.minTemperature = in.readFloat();this.avgTemperature = in.readFloat();}@Overridepublic int compareTo(WeatherWritable other) {if (null == other) {return 1;}// 仅考虑83377巴西利亚的数据return this.date.compareTo(other.getDate());}public WeatherWritable(Builder builder) {this.code = builder.code;this.date = builder.date;this.precipitation = builder.precipitation;this.maxTemperature = builder.maxTemperature;this.minTemperature = builder.minTemperature;this.avgTemperature = builder.avgTemperature;}public WeatherWritable() {}@Overridepublic String toString() {return code + '\t' + date + '\t' + precipitation + '\t' + maxTemperature + '\t' + minTemperature + '\t' + avgTemperature;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;WeatherWritable that = (WeatherWritable) o;if (!Objects.equals(code, that.code)) return false;if (!Objects.equals(date, that.date)) return false;if (!Objects.equals(precipitation, that.precipitation))return false;if (!Objects.equals(maxTemperature, that.maxTemperature))return false;if (!Objects.equals(minTemperature, that.minTemperature))return false;return Objects.equals(avgTemperature, that.avgTemperature);}@Overridepublic int hashCode() {int result = code != null ? code.hashCode() : 0;result = 31 * result + (date != null ? date.hashCode() : 0);result = 31 * result + (precipitation != null ? precipitation.hashCode() : 0);result = 31 * result + (maxTemperature != null ? maxTemperature.hashCode() : 0);result = 31 * result + (minTemperature != null ? minTemperature.hashCode() : 0);result = 31 * result + (avgTemperature != null ? avgTemperature.hashCode() : 0);return result;}public String getCode() {return code;}public void setCode(String code) {this.code = code;}public String getDate() {return date;}public void setDate(String date) {this.date = date;}public Float getPrecipitation() {return precipitation;}public void setPrecipitation(Float precipitation) {this.precipitation = precipitation;}public Float getMaxTemperature() {return maxTemperature;}public void setMaxTemperature(Float maxTemperature) {this.maxTemperature = maxTemperature;}public Float getMinTemperature() {return minTemperature;}public void setMinTemperature(Float minTemperature) {this.minTemperature = minTemperature;}public Float getAvgTemperature() {return avgTemperature;}public void setAvgTemperature(Float avgTemperature) {this.avgTemperature = avgTemperature;}public WeatherWritable(String code, String date, Float precipitation, Float maxTemperature, Float minTemperature, Float avgTemperature) {this.code = code;this.date = date;this.precipitation = precipitation;this.maxTemperature = maxTemperature;this.minTemperature = minTemperature;this.avgTemperature = avgTemperature;}}
DataCleaning.java
package club.kwcoder.weather.runner.runnerimpl;import club.kwcoder.weather.WeatherStarter;
import club.kwcoder.weather.util.ValidateUtils;
import club.kwcoder.weather.writable.WeatherWritable;
import club.kwcoder.weather.runner.Runner;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;/*** 第一步:数据清洗,数据验证,数据导入,三行合一* 输出格式:监测站代码、日期、降水量、最高温度、最低温度、平均温度*/
public class DataCleaning implements Runner {@Overridepublic void run(WeatherStarter.RunnerBuilder builder) {try {Job job = Job.getInstance(builder.getConf(), builder.getJobName());// 设置执行类job.setJarByClass(DataCleaning.class);// 设置输入job.setInputFormatClass(TextInputFormat.class);FileInputFormat.setInputPaths(job, builder.getInput());// 设置Mapperjob.setMapperClass(DataCleanMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(WeatherWritable.class);// 设置Reducerjob.setReducerClass(DataCleanReducer.class);job.setOutputKeyClass(WeatherWritable.class);job.setOutputValueClass(NullWritable.class);// 设置输出job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job, builder.getOutput());// 运行boolean flag = job.waitForCompletion(true);if (flag) {System.out.println(builder.getJobName() + " process success");}} catch (IOException | InterruptedException | ClassNotFoundException e) {e.printStackTrace();}}private static class DataCleanMapper extends Mapper<LongWritable, Text, Text, WeatherWritable> {/*Estacao;Data;Hora;Precipitacao;TempBulboSeco;TempBulboUmido;TempMaxima;TempMinima;UmidadeRelativa;PressaoAtmEstacao;PressaoAtmMar;DirecaoVento;VelocidadeVento;Insolacao;Nebulosidade;Evaporacao Piche;Temp Comp Media;Umidade Relativa Media;Velocidade do Vento Media;82024;01/01/1961;0000;;;;32.3;;;;;;;4.4;;;26.56;82.5;3;82024;01/01/1961;1200;;26;23.9;;22.9;83;994.2;;5;5;;8;;;;;82024;01/01/1961;1800;;32.3;27;;;65;991.6;;5;3;;9;;;;;*/private static final Text outKey = new Text();WeatherWritable.Builder builder = new WeatherWritable.Builder();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, WeatherWritable>.Context context) throws IOException, InterruptedException {String line = value.toString();if (ValidateUtils.validate(line)) {return;}// 跳过标题行:key为0指的是每一块的第一个字符,不是整个文件的,不能只使用key判断if (key.equals(new LongWritable(0L)) && line.startsWith("Estacao")) {return;}// 不选择83377巴西利亚的数据if (!line.contains("83377")) {return;}/*拆分数据:如果指使用;,则在没有数据的地方会直接跳过,即";;;;;".split(";").length为0使用强制拆分,在满足条件的情况下拆分成19列*/String[] items = line.split(";", 19);// 块切分时可能会把一行切在两块中if (ValidateUtils.validate(items, 19)) {return;}WeatherWritable weatherWritable = builder.setCode(items[0]).setDate(items[1]).setPrecipitation(StringUtils.isBlank(items[3]) ? 0 : Float.parseFloat(items[3])).setMaxTemperature(StringUtils.isBlank(items[6]) ? 0 : Float.parseFloat(items[6])).setMinTemperature(StringUtils.isBlank(items[7]) ? 0 : Float.parseFloat(items[7])).setAvgTemperature(StringUtils.isBlank(items[16]) ? 0 : Float.parseFloat(items[16])).build();// <83377-01/01/1961, weatherWritable>outKey.set(weatherWritable.getCode() + "-" + weatherWritable.getDate());context.write(outKey, weatherWritable);}}private static class DataCleanReducer extends Reducer<Text, WeatherWritable, WeatherWritable, NullWritable> {WeatherWritable.Builder builder = new WeatherWritable.Builder();@Overrideprotected void reduce(Text key, Iterable<WeatherWritable> values, Reducer<Text, WeatherWritable, WeatherWritable, NullWritable>.Context context) throws IOException, InterruptedException {String code = null, date = null;float precipitation = 0.0F, maxTemp = 0.0F, minTemp = 0.0F, avgTemp = 0.0F;for (WeatherWritable value : values) {code = value.getCode();date = value.getDate();precipitation += value.getPrecipitation();maxTemp = maxTemp + value.getMaxTemperature();minTemp = minTemp + value.getMinTemperature();avgTemp = avgTemp + value.getAvgTemperature();}// 数据验证if (avgTemp > maxTemp || avgTemp < minTemp) {return;}WeatherWritable weatherWritable = builder.setCode(code).setDate(date).setMaxTemperature(maxTemp).setMinTemperature(minTemp).setAvgTemperature(avgTemp).setPrecipitation(precipitation).build();context.write(weatherWritable, NullWritable.get());}}}
修改启动类
在PATH
中添加如下数据:
static {PATH = new HashMap<>();// 输入路径PATH.put("data_cleaning_input", "/weather");// 输出路径PATH.put("data_cleaning_output", "/weather_result/data_cleaning");}
在main方法中添加:
run(DataCleaning.class, "DataCleaning", "data_cleaning_input", "data_cleaning_output");
数据分析:按年统计
需求
将数据清洗步骤的结果进行进一步提取统计,统计为以年为单位的数据,其格式为:
监测站代码、年份、年降水量、年最高温度、年最低温度、降水天数
方案
- 将每一天的数据格式转换为目标数据格式
- 按年份统计年降水量、年最高温度、年最低温度、降水天数
代码编写
实体类:WeatherWritableSummary.java
package club.kwcoder.weather.writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.Objects;public class WeatherWritableSummary extends WeatherWritable {/*** 年份*/private String year;/*** 降雨天数*/private Integer rainDays;public WeatherWritableSummary(WeatherWritableSummary.Builder builder) {super(builder);this.year = builder.year;this.rainDays = builder.rainDays;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(this.year);out.writeInt(this.rainDays);out.writeUTF(super.getCode());out.writeFloat(super.getPrecipitation());out.writeFloat(super.getMaxTemperature());out.writeFloat(super.getMinTemperature());}@Overridepublic void readFields(DataInput in) throws IOException {this.year = in.readUTF();this.rainDays = in.readInt();super.setCode(in.readUTF());super.setPrecipitation(in.readFloat());super.setMaxTemperature(in.readFloat());super.setMinTemperature(in.readFloat());}@Overridepublic String toString() {return super.getCode() + '\t' + this.getYear() + '\t' + super.getPrecipitation() + '\t' + super.getMaxTemperature() + '\t' + super.getMinTemperature() + "\t" + this.rainDays;}public static class Builder extends WeatherWritable.Builder {private String year;private Integer rainDays;public WeatherWritableSummary.Builder setRainDays(Integer rainDays) {this.rainDays = rainDays;return this;}public WeatherWritableSummary.Builder setYear(String year) {this.year = year;return this;}public WeatherWritableSummary.Builder setCode(String code) {super.setCode(code);return this;}public WeatherWritableSummary.Builder setPrecipitation(Float precipitation) {super.setPrecipitation(precipitation);return this;}public WeatherWritableSummary.Builder setMaxTemperature(Float maxTemperature) {super.setMaxTemperature(maxTemperature);return this;}public WeatherWritableSummary.Builder setMinTemperature(Float minTemperature) {super.setMinTemperature(minTemperature);return this;}public WeatherWritableSummary.Builder setAvgTemperature(Float avgTemperature) {super.setAvgTemperature(avgTemperature);return this;}public WeatherWritableSummary buildSummary() {return new WeatherWritableSummary(this);}public Builder() {}}public WeatherWritableSummary(String code, Float precipitation, Float maxTemperature, Float minTemperature, Float avgTemperature, String year, Integer rainDays) {super(code, null, precipitation, maxTemperature, minTemperature, avgTemperature);this.year = year;this.rainDays = rainDays;}public WeatherWritableSummary(String year, Integer rainDays) {this.year = year;this.rainDays = rainDays;}public WeatherWritableSummary() {}public String getYear() {return year;}public void setYear(String year) {this.year = year;}public Integer getRainDays() {return rainDays;}public void setRainDays(Integer rainDays) {this.rainDays = rainDays;}@Overridepublic boolean equals(Object o) {if (this == o) return true;if (o == null || getClass() != o.getClass()) return false;if (!super.equals(o)) return false;WeatherWritableSummary that = (WeatherWritableSummary) o;if (!Objects.equals(year, that.year)) return false;return Objects.equals(rainDays, that.rainDays);}@Overridepublic int hashCode() {int result = super.hashCode();result = 31 * result + (year != null ? year.hashCode() : 0);result = 31 * result + (rainDays != null ? rainDays.hashCode() : 0);return result;}
}
YearSummary.java
package club.kwcoder.weather.runner.runnerimpl;import club.kwcoder.weather.WeatherStarter;
import club.kwcoder.weather.runner.Runner;
import club.kwcoder.weather.util.ValidateUtils;
import club.kwcoder.weather.writable.WeatherWritable;
import club.kwcoder.weather.writable.WeatherWritableSummary;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;import java.io.IOException;/*** 按年汇总数据,输出结果:观测站编码、年份、年降水量、年最高温度、年最低温度、降水天数*/
public class YearSummary implements Runner {@Overridepublic void run(WeatherStarter.RunnerBuilder builder) {try {Job job = Job.getInstance(builder.getConf(), builder.getJobName());// 设置执行类job.setJarByClass(DataCleaning.class);// 设置输入job.setInputFormatClass(TextInputFormat.class);FileInputFormat.setInputPaths(job, builder.getInput());// 设置Mapperjob.setMapperClass(YearSummaryMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(WeatherWritableSummary.class);// 设置Reducerjob.setReducerClass(YearSummaryReducer.class);job.setOutputKeyClass(WeatherWritableSummary.class);job.setOutputValueClass(NullWritable.class);// 设置输出job.setOutputFormatClass(TextOutputFormat.class);FileOutputFormat.setOutputPath(job, builder.getOutput());// 运行boolean flag = job.waitForCompletion(true);if (flag) {System.out.println(builder.getJobName() + " process success");}} catch (IOException | InterruptedException | ClassNotFoundException e) {e.printStackTrace();}}private static class YearSummaryMapper extends Mapper<LongWritable, Text, Text, WeatherWritableSummary> {private final Text outKey = new Text();private final WeatherWritableSummary.Builder outValBuilder = new WeatherWritableSummary.Builder();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, WeatherWritableSummary>.Context context) throws IOException, InterruptedException {/*编号 日期 降水量 最高温度 最低温度 平均温度83377 01/01/1963 0.0 29.0 16.7 21.7483377 01/01/1964 3.2 26.0 18.0 20.8483377 01/01/1965 21.2 24.7 16.6 19.6683377 01/01/1966 20.0 27.8 17.5 20.783377 01/01/1967 0.1 27.6 15.4 21.283377 01/01/1968 0.0 28.6 17.8 22.2883377 01/01/1970 45.2 26.3 16.3 19.883377 01/01/1971 0.0 30.3 18.5 24.02*/String line = value.toString();String[] items = ValidateUtils.splitAndValidate(line, "\t", 6);if (null == items) {return;}WeatherWritableSummary outVal = outValBuilder.setCode(items[0]).setYear(items[1].split("/")[2]).setPrecipitation(Float.parseFloat(items[2])).setMaxTemperature(Float.parseFloat(items[3])).setMinTemperature(Float.parseFloat(items[4])).setRainDays(items[2].equals("0.0") ? 0 : 1).buildSummary();outKey.set(outVal.getCode() + "-" + outVal.getYear());context.write(outKey, outVal);}}private static class YearSummaryReducer extends Reducer<Text, WeatherWritableSummary, WeatherWritable, NullWritable> {private final WeatherWritableSummary.Builder outKeyBuilder = new WeatherWritableSummary.Builder();private final NullWritable outVal = NullWritable.get();@Overrideprotected void reduce(Text key, Iterable<WeatherWritableSummary> values, Reducer<Text, WeatherWritableSummary, WeatherWritable, NullWritable>.Context context) throws IOException, InterruptedException {String code = "", year = "";int rainDays = 0;float precipitation = 0.0f, maxTemp = Float.MIN_VALUE, minTemp = Float.MAX_VALUE;// 计算年降雨量、年最高气温、年最低气温、降雨天数for (WeatherWritableSummary value : values) {if (ValidateUtils.validate(code) || ValidateUtils.validate(year)) {code = value.getCode();year = value.getYear();}precipitation += value.getPrecipitation();maxTemp = Math.max(maxTemp, value.getMaxTemperature());if (value.getMinTemperature() != 0.0F) {minTemp = Math.min(minTemp, value.getMinTemperature());}rainDays += value.getRainDays();}WeatherWritableSummary outKey = outKeyBuilder.setCode(code).setYear(year).setPrecipitation(precipitation).setMaxTemperature(maxTemp).setMinTemperature(minTemp).setRainDays(rainDays).buildSummary();// 输出格式:code year precipitation maxTemperature minTemperature rainDayscontext.write(outKey, outVal);}}}
修改启动类
在PATH
中添加如下数据:
http://www.taodudu.cc/news/show-2370493.html
相关文章:
- AI健身房真的存在?比炒概念更可怕的是VENTO已经做出来了
- 计算机音乐演奏jojo,【FF14】诗人演奏用 il vento d'oro(动画《JOJO的奇妙冒险 黄金之风》插曲)...
- 重装系统之制作U盘启动盘 - 一盘通装
- 网上赚钱新途径:这些方法简单又有效!
- 网上做什么可以赚钱?网上最靠谱的赚钱方法
- 淘思精灵-网赚菜鸟的福音
- 2019当今互联网赚钱的六大方式,赠送给迷茫的网赚新手
- 怎么网上兼职赚钱?盘点5个互联网赚钱的方法!
- 网上赚钱靠谱的方法,看懂了的都是老手!
- 网上赚钱方法有哪些?这5种赚钱方法,非常适合草根!
- 最新网上赚钱方法,这四种最靠谱!
- 新手平面设计师如何在网上接单赚钱?
- 20种在学习编程的同时也可以在线赚钱的方法
- 产品读书《周鸿祎-我的互联网方法论》
- 新手可以操作的日赚50元的网赚项目
- python新手怎么兼职-用Python在家兼职赚钱的4个方法
- 网上赚钱的好方法,实战案例讲解,让你秒懂赚钱的秘密!
- 做网赚赚不到钱,你的方法用对了么?
- 怎么样可以在网络上赚钱,告诉你网上赚钱的5种方法!
- 推荐几个新手可以在网上赚钱的几个项目
- 沉睡者 - 怎么样可以在网络上挣钱,告诉你网上挣钱的5种方法!
- 网上赚钱的平台哪个好?7个方式总有你喜欢的!
- 怎么能网上赚钱?这6种方法是目前最常赚钱的方式!
- 抖音网上如何赚钱变现,有哪些具体的方法
- 程序员老外通过编程赚钱的10个途径
- 网上赚钱最快的方法 干什么能挣钱快
- 怎样网上赚钱最快方法有哪些?IP打造是关键
- 51个赚钱的好方法
- 几种网赚项目引流的方法
- 最实用的网上赚钱方法:这7个方法真的很实用哦!
MapReduce实操5-1数据预处理——巴西利亚历史气温数据分析相关推荐
- 华为云MapReduce、ModelArts实现大数据综合案例-在线拍卖数据分析
1.实验简介:本实验案例涉及数据预处理,数据存储,数据查询分析及可视化 展示等大数据处理的全部操作流程.首先需配置部署在线拍卖数据分 析系统所需要的环境,然后把数据集上传到HDFS 分布式文件系统, ...
- 【Social listening实操】用大数据文本挖掘,来洞察“共享单车”的行业现状及走势
本文转自知乎 作者:苏格兰折耳喵 ----------------------------------------------------- 对于当下共享单车在互联网界的火热状况,笔者想从大数据文本挖 ...
- 数据预处理 1.5 租房数据分析实例
目录 租房数据分析实例 1 数据加载和查看 1.1 加载LJdata.csv数据集 1.2查看house_data数据的列标签 1.3重新设置house_data的行标签 1.4查看house_dat ...
- 尚硅谷大数据技术Spark教程-笔记09【SparkStreaming(概念、入门、DStream入门、案例实操、总结)】
尚硅谷大数据技术-教程-学习路线-笔记汇总表[课程资料下载] 视频地址:尚硅谷大数据Spark教程从入门到精通_哔哩哔哩_bilibili 尚硅谷大数据技术Spark教程-笔记01[SparkCore ...
- python海量数据分析师_数据分析师真的月入过万吗?(基于Python的招聘数据分析全流程实操)...
0 前言 作为一名数据分析小白,经过一轮融汇贯穿学习后,也迫不及待想做一份数据分析报告,于是选取了现阶段最感兴趣的数据分析相关岗位招聘信息进行一波数据分析. 1 理解问题确定分析的目的和方向 因为目前 ...
- 免费下载 |《数据分析从理念到实操》白皮书上线!先收藏!
今日,神策数据重磅推出<数据分析从理念到实操>白皮书. 该白皮书结合神策数据服务的 1000 多家各行业标杆企业的实践经验,总结了如何构建数据基础.看数据.分析数据.落地数据应用.构建数据 ...
- Python数据分析入门笔记4——数据预处理之重复值
系列文章目录 Python数据分析入门笔记1--学习前的准备 Python数据分析入门笔记2--pandas数据读取 Python数据分析入门笔记3--数据预处理之缺失值 Python数据分析入门笔记 ...
- Python数据分析入门笔记5——数据预处理之异常值
系列文章目录 Python数据分析入门笔记1--学习前的准备 Python数据分析入门笔记2--pandas数据读取 Python数据分析入门笔记3--数据预处理之缺失值 Python数据分析入门笔记 ...
- Python数据分析入门笔记9——数据预处理案例综合练习(男篮女篮运动员)
系列文章目录 Python数据分析入门笔记1--学习前的准备 Python数据分析入门笔记2--pandas数据读取 Python数据分析入门笔记3--数据预处理之缺失值 Python数据分析入门笔记 ...
- 图解大数据 | 应用Map-Reduce进行大数据统计@实操案例
作者:韩信子@ShowMeAI 教程地址:http://www.showmeai.tech/tutorials/84 本文地址:http://www.showmeai.tech/article-det ...
最新文章
- How can I create an Asynchronous function in Javascript?
- html5 兼容移动端参数设置的一些小细节
- SharePoint 2010 新体验7 - Managed Metadata (托管元数据)
- 整理与总结Python关于对文件的操作
- python自动数据结构_Python 数据结构
- SQLServer 可更新订阅数据冲突的一个原因
- SQL 日期格式化处理.sql
- 查看dll/exe所依赖的库文件、导出函数、系统位数
- Tomcat底层原理
- gps北斗高精度卫星时间同步系统应用案例
- 彩信 添加 html,彩信接口 | 微米-中国领先的短信彩信接口平台服务商
- SpringBoot实现简易支付宝网页支付
- 【SCIENTIFIC AMERICAN December 2019】【NUCLEAR PHYSICS】Proton Size Puzzle
- 与门,或门,与非门,异或门的python实现
- python中if语句的实例_python的if语句里怎样写两个条件
- make_blobs
- python很全的爬虫入门教程
- 关于vs2005、vs2008和vs2010项目互转的总结
- POCO C++ 在IOS上的使用
- 小菜鸟的python学习之路(7)