1.需求

2.思路

3.代码实现

3.1MyWeather 类代码:

这个类主要是用来定义hadoop的配置,在执行计算程序时所需加载的一些类。

package com.hadoop.mr.weather;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TestMapReduceLazyOutput.TestMapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class MyWeather {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf =new Configuration(true);Job job = Job.getInstance(conf);job.setJarByClass(MyWeather.class);//----------conf-----------------------//---begin Map ://输入格式化类
//        job.setInputFormatClass(ooxx.class);//设置mapper类job.setMapperClass(TMapper.class);job.setMapOutputKeyClass(TQ.class);job.setMapOutputValueClass(IntWritable.class);//设置partitioner类job.setPartitionerClass(TPartitioner.class);//设置排序比较器类job.setSortComparatorClass(TSortComparator.class);//设置combiner类
//        job.setCombinerClass(TCombiner.class);//----end Map//----begin Reduce://设置组比较器的类job.setGroupingComparatorClass(TGroupingComparator.class);//设置reducer类job.setReducerClass(TReducer.class);//-----end Reduce://设置输入数据的路径Path input = new Path("/data/tq/input");FileInputFormat.addInputPath(job, input);//设置输出数据的路径Path output=new Path("/data/tq/output");if(output.getFileSystem(conf).exists(output)){//如果目录存在递归删除output.getFileSystem(conf).delete(output,true);}FileOutputFormat.setOutputPath(job, output);//设置reduceTask的数量 和 partitions数量对应job.setNumReduceTasks(2);//-------------------------------------job.waitForCompletion(true);}
}

3.2Tmapper类代码

该类继承Mapper类他的主要作用是对输入的文件做一些预处理工作。

package com.hadoop.mr.weather;import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.StringUtils;
//TextInputFormat.class  --key类型是 longWritable 偏移量  --value是Text类型public class TMapper extends Mapper<LongWritable, Text, TQ, IntWritable>{//创建map的 k v 对象TQ mkey=new TQ();  // map --->keyIntWritable mval=new IntWritable(); //map --->value//重写map方法
    @Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, TQ, IntWritable>.Context context)throws IOException, InterruptedException {/**1949-10-01 14:21:02     34c1949-10-01 19:21:02        38c1949-10-02 14:01:02        36c1950-01-01 11:21:02        32c1950-10-01 12:21:02        37c**/try {String[] strs = StringUtils.split(value.toString(),'\t');//对文本将制表符切分
            SimpleDateFormat sdf= new SimpleDateFormat("yyyy-MM-dd");Date date = sdf.parse(strs[0]);Calendar cal= Calendar.getInstance();cal.setTime(date);mkey.setYear(cal.get(Calendar.YEAR));mkey.setMonth(cal.get(Calendar.MONTH)+1); //第一个月默认从0开始所以加1
            mkey.setDay(cal.get(Calendar.DAY_OF_MONTH));int wd = Integer.parseInt(strs[1].substring(0, strs[1].length()-1));//获取温度字符串并强转为int类型
            mkey.setWd(wd);mval.set(wd);context.write(mkey, mval);} catch (ParseException e) {// TODO Auto-generated catch block
            e.printStackTrace();}}
}

3.3TQ类代码

该类实现WritableComparable接口他的作用是给生成相关的属性并重写 写入,读取,比较的方法,

package com.hadoop.mr.weather;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class TQ implements WritableComparable<TQ> {//定义属性private int year;private int month;private int day;private int wd;  //温度属性public int getYear() {return year;}public void setYear(int year) {this.year = year;}public int getMonth() {return month;}public void setMonth(int month) {this.month = month;}public int getDay() {return day;}public void setDay(int day) {this.day = day;}public int getWd() {return wd;}public void setWd(int wd) {this.wd = wd;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(year);out.writeInt(month);out.writeInt(day);out.writeInt(wd);}@Overridepublic void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.year=in.readInt();this.month=in.readInt();this.day=in.readInt();this.wd=in.readInt();}@Overridepublic int compareTo(TQ that) {//compare方法返回值说明the value 0 if x == y; a value less than 0 if x < y; and a value greater than 0 if x > y// 日期正序 ,使用这年和那年比较 -.-int c1=Integer.compare(this.year, that.getYear());// 如果年份相同比较天if(c1==0){int c2=Integer.compare(this.month, that.getMonth());//如果是同一天返回0if(c2==0){return Integer.compare(this.day, that.getDay());}return c2;}return 0;}}

3.4Tpartitioner类代码

该类的作用,是定义输出文件的分布规则,避免产生数据倾斜

package com.hadoop.mr.weather;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class TPartitioner extends Partitioner<TQ, IntWritable> {//约定成俗规则:避免数据倾斜,将少的数据都放在一个reduce任务组里,将数据量大的单独放一个任务组里。
    @Overridepublic int getPartition(TQ key, IntWritable value, int numPartitions) {return key.hashCode() % numPartitions;}}

3.5TSortComparator类代码:

该类的作用是定义一个排序比较器

package com.hadoop.mr.weather;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class TSortComparator extends WritableComparator{public TSortComparator() {super(TQ.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TQ t1=(TQ) a;TQ t2=(TQ) b;int c1 = Integer.compare(t1.getYear(), t2.getYear());if(c1==0){int c2= Integer.compare(t1.getMonth(), t2.getMonth());if(c2==0){return -Integer.compare(t1.getWd(), t2.getWd());// -号表示返回温度的倒序排列
            }}return super.compare(a, b);}
}

3.6TGroupingComparator类代码:

该类的作用是根据年月两个维度做分组

package com.hadoop.mr.weather;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class TGroupingComparator extends WritableComparator {public TGroupingComparator() {super(TQ.class,true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {TQ t1=(TQ) a;TQ t2=(TQ) b;int c1 = Integer.compare(t1.getYear(), t2.getYear());if(c1==0){return Integer.compare(t1.getMonth(), t2.getMonth()); //返回月份的比较结果来分组
        }return c1;}
}

3.7TReducer 类代码

该类的作用是定义数据的输出格式和内容

package com.hadoop.mr.weather;import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class TReducer extends Reducer<TQ, IntWritable, Text, IntWritable>{Text rkey=new Text();IntWritable rval=new IntWritable();/* (non-Javadoc)* @see org.apache.hadoop.mapreduce.Reducer#reduce(KEYIN, java.lang.Iterable, org.apache.hadoop.mapreduce.Reducer.Context)*/@Overrideprotected void reduce(TQ key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {//相同的key为一组。。。。//1970 01 01 88   88//1970 01 11 78   78//1970 01 21 68   68//1970 01 01 58   58int flag=0; //迭代的次数int day=0;  for (IntWritable v : values) {if(flag==0){//将reduce的key格式化成1970-01-01:88   rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()+":"+key.getWd());//将reduce的value设置为温度
                rval.set(key.getWd());flag++;day=key.getDay();context.write(rkey, rval);}//如果迭代次数不为0且当前的天不等于迭代得到的天就将新的天气数据赋值给reduce的 kvif(flag!=0 && day!=key.getDay()){//将reduce的key格式化成1970-01-01:88   rkey.set(key.getYear()+"-"+key.getMonth()+"-"+key.getDay()+":"+key.getWd());//将reduce的value设置为温度
                rval.set(key.getWd());context.write(rkey, rval);break;}}}
}

4.执行程序

4.1将包导出为jar包 上传至服务器

4.2创建hdfs文件输入路径

hdfs dfs -mkdir -p /data/tq/input

4.3上传测试文件到创建的hdfs目录下

[root@node01 ~]# cat tq.txt
1949-10-01 14:21:02    34c
1949-10-01 19:21:02    38c
1949-10-02 14:01:02    36c
1950-01-01 11:21:02    32c
1950-10-01 12:21:02    37c
1951-12-01 12:21:02    23c
1950-10-02 12:21:02    41c
1950-10-03 12:21:02    27c
1951-07-01 12:21:02    45c
1951-07-02 12:21:02    46c
1951-07-03 12:21:03    47c[root@node01 ~]# hdfs dfs -put tq.txt /data/tq/input

4.4服务端执行程序

[root@node01 ~]# hadoop jar Myweather.jar com.hadoop.mr.weather.MyWeather
2018-12-29 22:42:01,101 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm2
2018-12-29 22:42:01,484 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
2018-12-29 22:42:01,548 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /user/root/.staging/job_1546092355023_0004
2018-12-29 22:42:02,025 INFO input.FileInputFormat: Total input files to process : 1
2018-12-29 22:42:02,922 INFO mapreduce.JobSubmitter: number of splits:1
2018-12-29 22:42:02,975 INFO Configuration.deprecation: yarn.resourcemanager.zk-address is deprecated. Instead, use hadoop.zk.address
2018-12-29 22:42:02,976 INFO Configuration.deprecation: yarn.resourcemanager.system-metrics-publisher.enabled is deprecated. Instead, use yarn.system-metrics-publisher.enabled
2018-12-29 22:42:03,643 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1546092355023_0004
2018-12-29 22:42:03,644 INFO mapreduce.JobSubmitter: Executing with tokens: []
2018-12-29 22:42:03,932 INFO conf.Configuration: resource-types.xml not found
2018-12-29 22:42:03,932 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2018-12-29 22:42:04,012 INFO impl.YarnClientImpl: Submitted application application_1546092355023_0004
2018-12-29 22:42:04,064 INFO mapreduce.Job: The url to track the job: http://node04:8088/proxy/application_1546092355023_0004/
2018-12-29 22:42:04,065 INFO mapreduce.Job: Running job: job_1546092355023_0004
2018-12-29 22:42:13,301 INFO mapreduce.Job: Job job_1546092355023_0004 running in uber mode : false
2018-12-29 22:42:13,302 INFO mapreduce.Job:  map 0% reduce 0%
2018-12-29 22:42:20,490 INFO mapreduce.Job:  map 100% reduce 0%
2018-12-29 22:42:35,850 INFO mapreduce.Job:  map 100% reduce 50%
2018-12-29 22:42:38,877 INFO mapreduce.Job:  map 100% reduce 100%
2018-12-29 22:42:39,899 INFO mapreduce.Job: Job job_1546092355023_0004 completed successfully
2018-12-29 22:42:40,043 INFO mapreduce.Job: Counters: 53File System CountersFILE: Number of bytes read=254FILE: Number of bytes written=653891FILE: Number of read operations=0FILE: Number of large read operations=0FILE: Number of write operations=0HDFS: Number of bytes read=366HDFS: Number of bytes written=141HDFS: Number of read operations=13HDFS: Number of large read operations=0HDFS: Number of write operations=4Job CountersLaunched map tasks=1Launched reduce tasks=2Rack-local map tasks=1Total time spent by all maps in occupied slots (ms)=4437Total time spent by all reduces in occupied slots (ms)=29074Total time spent by all map tasks (ms)=4437Total time spent by all reduce tasks (ms)=29074Total vcore-milliseconds taken by all map tasks=4437Total vcore-milliseconds taken by all reduce tasks=29074Total megabyte-milliseconds taken by all map tasks=4543488Total megabyte-milliseconds taken by all reduce tasks=29771776Map-Reduce FrameworkMap input records=11Map output records=11Map output bytes=220Map output materialized bytes=254Input split bytes=102Combine input records=0Combine output records=0Reduce input groups=6Reduce shuffle bytes=254Reduce input records=11Reduce output records=9Spilled Records=22Shuffled Maps =2Failed Shuffles=0Merged Map outputs=2GC time elapsed (ms)=351CPU time spent (ms)=1640Physical memory (bytes) snapshot=419917824Virtual memory (bytes) snapshot=8213352448Total committed heap usage (bytes)=164515840Peak Map Physical memory (bytes)=206139392Peak Map Virtual memory (bytes)=2733309952Peak Reduce Physical memory (bytes)=108830720Peak Reduce Virtual memory (bytes)=2740228096Shuffle ErrorsBAD_ID=0CONNECTION=0IO_ERROR=0WRONG_LENGTH=0WRONG_MAP=0WRONG_REDUCE=0File Input Format CountersBytes Read=264File Output Format CountersBytes Written=141

4.5将hdfs上生成的输出文件 拉取到本地

[root@node01 ~]# hdfs dfs -get  /data/tq/output/* ./test

4.6查看输出文件

[root@node01 test]# ls
part-r-00000  part-r-00001  _SUCCESS
[root@node01 test]# cat part-r-00000
[root@node01 test]# cat part-r-00001
1951-7-3:47    47
1951-7-2:46    46
1950-10-2:41    41
1950-10-3:27    27
1951-12-1:23    23
1950-1-1:32    32
1950-10-1:37    37
1949-10-1:38    38
1949-10-2:36    36

0分区是空的  1分区有程序定义的k v输出。这就发生了数据倾斜,可能上面的Tpartitioner类的代码对数据分布规则定义的不恰当导致的。

5.Combiner说明

由于数据量比较少,这边没有对combiner类做扩展

每一个map都可能会产生大量的本地输出Combiner的作用就是对map端的输出先做一次合并,以减少在map和reduce节点之间的数据传输量,以提高网络IO性能,是MapReduce的一种优化手段之一,其具体的作用如下所述。

(1)Combiner最基本是实现本地key的聚合,对map输出的key排序,value进行迭代。如下所示:

  map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K2, V2) reduce: (K2, list(V2)) → list(K3, V3)

(2)Combiner还有本地reduce功能(其本质上就是一个reduce),例如Hadoop自带的wordcount的例子和找出value的最大值的程序,combiner和reduce完全一致,如下所示:

  map: (K1, V1) → list(K2, V2) combine: (K2, list(V2)) → list(K3, V3) reduce: (K3, list(V3)) → list(K4, V4)

如果在wordcount中不用combiner,那么所有的结果都是reduce完成,效率会相对低下。使用combiner之后,先完成的map会在本地聚合,提升速度。对于hadoop自带的wordcount的例子,value就是一个叠加的数字,所以map一结束就可以进行reduce的value叠加,而不必要等到所有的map结束再去进行reduce的value叠加。

转载于:https://www.cnblogs.com/benjamin77/p/10196704.html

MapReduce案例一:天气温度相关推荐

  1. Matlab决策树对空气质量和天气温度及天气数据做交通出行推荐预测|数据分享

    全文链接:http://tecdat.cn/?p=31784 为解决城市交通拥堵问题,本文提出了一种基于 Matlab决策树的交通预测方法,我们通过采集上海地区的空气质量数据和温度数据,帮助客户在 M ...

  2. 一个Mapreduce案例

    一个统计文件中单词出现次数的Mapreduce案例 WordCountMapper.java中的内容: package com.jxd.mapreduce.wordcount;import org.a ...

  3. Windows更新右下角出现 天气温度等提示,如何取消

    Windows更新后出现桌面下面出现 天气温度等提示,如何取消 解决:点击鼠标右键

  4. 15.大数据---Mapreduce案例之---统计手机号耗费的总上行流量、下行流量、总流量

    Mapreduce案例之-统计手机号耗费的总上行流量.下行流量.总流量 1.需求: 统计每一个手机号耗费的总上行流量.下行流量.总流量 2.数据准备: 2.1 输入数据格式: 时间戳.电话号码.基站的 ...

  5. 如何获取当前地址以及天气温度情况,适用于微信小程序(端午假期将至,祝愿大家端午快乐)

    我的需求:获取当前地址,温度和天气[微信小程序] 先获取当前经纬度uni.getLocation 获取经纬度以后,通过经纬度进行逆地理编码,获取当前定位地址uni.request 处理显示[" ...

  6. 初次玩pyecharts:30行代码做一个可视化广东省各地天气温度排行图

    初次玩pyecharts:30行代码做一个可视化广东省各地天气温度排行图 开篇点题,直接上效果图 开篇再点题,直接上源码 import re import requests from pyechart ...

  7. python 网络爬虫 1.3 获取中国天气网8-15天的天气信息,包含: 日期,天气,温度,风力. 将数据存入文档。

    题目: 获取中国天气网8-15天的天气信息,包含: 日期,天气,温度,风力. 将数据存入文档. 代码: from requests_html import HTMLSessionurl = " ...

  8. Hadoop快速入门——第三章、MapReduce案例(字符统计)

    Hadoop快速入门--第三章.MapReduce案例 目录 环境要求: 1.项目创建: 2.修改Maven 3.编码 4.本地文件测试 5.修改[Action]文件(修改测试文件路径) 6.导出ja ...

  9. 宁波2021高考成绩查询几号,2021宁波高考天气怎么样 2021宁波高考天气温度多少...

    天气网讯,明天高考全国开考,现在全民关注的事情就是高考能否顺利进行,高考的顺利举行与天气有着莫大的关系,天气好,考试才能更顺利.那么,2021宁波高考天气怎么样?2021宁波高考天气温度多少?一起去看 ...

  10. NanoPC-T4(RK3399) game1 oled(I2C)显示时间天气温度

    文章目录 1. 查看GPIO定义 wiringpi库 2. 确保硬件无问题 (i2c-tools) 3. 安装oled驱动python包 3.1 用例测试 3.2 测试用例2 4. oled显示时间, ...

最新文章

  1. 知乎热议20年科研怪状:为何论文创新性越强越难发表,跟风修修补补反而更容易发?...
  2. 我的第一篇Windows Live Writer小文
  3. DotNet_Performance_Tuning_ANTS_Performance_Profiler
  4. 【解决方案】Basemap安装出现的错误(Python)
  5. [原创]Synergy安装方法
  6. 我们在PMCAFF上偷偷上线了这个
  7. 计算机删除默认共享怎样操作,如何清除计算机默认共享隐患
  8. 实现暂停一秒输出的效果_从暂停游戏联想到的
  9. 高速连传与LORA的区别和优势
  10. oracle触发器无效且,oracle创建触发器成功但是插值失败
  11. 小明交友第五次2018.6.24
  12. Java高级程序猿技术积累
  13. java对象转json格式
  14. 12.15daily_scrum
  15. go - str - byte
  16. 今天写的一个导出html页面的过程
  17. java入门第五步之数据库项目实战
  18. 飞秋常见文件解决方案
  19. java和vue的狱警管理系统监狱系统狱务管理系统
  20. 物业收费管理系统 java_基 于java的小区物业收费管理系统.doc

热门文章

  1. Nginx Parsing HTTP Package、header/post/files/args Sourcecode Analysis
  2. Apache SolrCloud安装
  3. MapXtreme开发(二)
  4. MySql 应该选择普通索引 还是唯一 索引???
  5. SpringBoot(入门)
  6. NodeJs本地搭建服务器,模拟接口请求,获取json数据
  7. 源码分析:Java对象的内存分配
  8. JPEG2000开发SDK及其特点
  9. Docker解析及轻量级PaaS平台演练(三)--Dockerfile编写
  10. MYSQL delete 从多人表中删除