6、統計每個月份中,最高的三個溫度。

輸入格式:年月日 空格 時分秒 TAB 溫度

inputfile:

1949-10-01 14:21:02    34c
1949-10-02 14:01:02    36c
1950-01-01 11:21:02    32c
1950-10-01 12:21:02    37c
1951-12-01 12:21:02    23c
1950-10-02 12:21:02    41c
1950-10-03 12:21:02    27c
1951-07-01 12:21:02    45c
1951-07-02 12:21:02    46c
1951-07-03 12:21:03    47c

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WRunner {public static void main(String[] args) throws Exception{Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJobName("weather");job.setJarByClass(WRunner.class);job.setMapperClass(WMapper.class);job.setReducerClass(WReducer.class);job.setMapOutputKeyClass(MyKey.class);job.setMapOutputValueClass(DoubleWritable.class);job.setPartitionerClass(MyPartitioner.class);job.setSortComparatorClass(MySort.class);job.setGroupingComparatorClass(MyGroup.class);job.setInputFormatClass(KeyValueTextInputFormat.class);job.setNumReduceTasks(3);Path in = new Path("/home/jinzhao/mrtest/input");FileInputFormat.setInputPaths(job, in);Path out = new Path("/home/jinzhao/mrtest/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job, out);job.waitForCompletion(true);}static class WMapper extends Mapper<Text, Text, MyKey, DoubleWritable>{SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");NullWritable nw = NullWritable.get();@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {try {Date date = sdf.parse(key.toString());Calendar c = Calendar.getInstance();c.setTime(date);int year = c.get(Calendar.YEAR);int month = c.get(Calendar.MONTH);int day = c.get(Calendar.DAY_OF_MONTH);String h = value.toString().trim();double hot = Double.parseDouble(h.substring(0, h.length()-1));context.write(new MyKey(year, month, day, hot), new DoubleWritable(hot));} catch (ParseException e) {e.printStackTrace();}}}static class WReducer extends Reducer<MyKey, DoubleWritable, Text, NullWritable>{@Overrideprotected void reduce(MyKey key, Iterable<DoubleWritable> values, Context context)throws IOException, InterruptedException {int i=0;for(DoubleWritable v : values){++i;String msg = key.getYear() + "\t" + (key.getMonth() + 1) + "\t" + (key.getDay()+1) + "\t" + v.get();context.write(new Text(msg), NullWritable.get());if (i == 3)break;}}}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;/*** 序列化所传输的对象*/
public class MyKey implements WritableComparable<MyKey> {private int year;private int month;private int day;private double hot;public MyKey(){super();}public MyKey(int year, int month, int day, double hot){this.year = year;this.month = month;this.day = day;this.hot = hot;}public int getYear() {return year;}public void setYear(int year) {this.year = year;}public int getMonth() {return month;}public void setMonth(int month) {this.month = month;}public int getDay() {return day;}public void setDay(int day) {this.day = day;}public double getHot() {return hot;}public void setHot(double hot) {this.hot = hot;}@Overridepublic void readFields(DataInput arg0) throws IOException {this.year = arg0.readInt();this.month = arg0.readInt();this.hot = arg0.readDouble();this.day = arg0.readInt();}@Overridepublic void write(DataOutput arg0) throws IOException {arg0.writeInt(year);arg0.writeInt(month);arg0.writeDouble(hot); arg0.writeInt(day);}/*** 判断是否是同一个对象,当对象作为key时。*/@Overridepublic int compareTo(MyKey arg0) {int r1 = Integer.compare(this.year, arg0.getYear());if (r1 == 0){int r2 = Integer.compare(this.month, arg0.getMonth());if (r2 == 0){return Double.compare(this.hot, arg0.getHot());}else{return r2;}}elsereturn r1;}}
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 分组,将具有相同年份和月份的MyKey作为一组,即传递给一个reduce函数进行处理。*/
public class MyGroup extends WritableComparator{public MyGroup(){super(MyKey.class, true);}public int compare (WritableComparable a, WritableComparable b){MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;int r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1 == 0){return Integer.compare(k1.getMonth(), k2.getMonth());}elsereturn r1;}
}
package hadoop.wheather;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 自定义的排序,先分组,再排序*/
public class MySort extends WritableComparator{public MySort(){super(MyKey.class, true);}public int compare (WritableComparable a, WritableComparable b){MyKey k1 = (MyKey)a;MyKey k2 = (MyKey)b;int r1 = Integer.compare(k1.getYear(), k2.getYear());if (r1 == 0){int r2 = Integer.compare(k1.getMonth(), k2.getMonth());if (r2 == 0){return -Double.compare(k1.getHot(), k2.getHot());}elsereturn r2;}elsereturn r1;}
}
package hadoop.wheather;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.mapreduce.Partitioner;/*** 分区,每个分区由一个reduce进程来处理*/
public class MyPartitioner extends Partitioner<MyKey, DoubleWritable>{@Overridepublic int getPartition(MyKey key, DoubleWritable value, int numReduceTasks) {return(key.getYear() - 1949)%numReduceTasks;}}

7、社交網路的朋友推薦算法

格式:用戶 TAB 朋友1 空格 朋友2 空格 ...

inputfile:

小明    老王 如花 林志玲
老王    小明 凤姐
如花    小明 李刚 凤姐
林志玲    小明 李刚 凤姐 郭美美
李刚    如花 凤姐 林志玲
郭美美    凤姐 林志玲
凤姐    如花 老王 林志玲 郭美美

第一次輸出:

格式:用戶1 空格 用戶2 TAB 次數

第二次輸出:

格式:用戶 TAB 推薦1 空格 推薦2 空格...

import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class Friends {static class FofMapper extends Mapper<Text, Text, Fof, IntWritable>{@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String user = key.toString();String[] friends = value.toString().split(" ");for (int i = 0; i < friends.length; ++i){context.write(new Fof(user, friends[i]), new IntWritable(0));for (int j = i + 1; j < friends.length; ++j)context.write(new Fof(friends[i], friends[j]), new IntWritable(1));}}}static class FofReducer extends Reducer<Fof, IntWritable, Fof, IntWritable>{@Overrideprotected void reduce(Fof key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException {int sum = 0;boolean flag = true;for (IntWritable i : values){if (i.get() == 0){flag = false;break;}else{sum = sum + i.get();}}if (flag)context.write(key, new IntWritable(sum));}}public static void main(String[] args){try {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(Friends.class);job.setJobName("friend-I");job.setMapperClass(FofMapper.class);job.setReducerClass(FofReducer.class);job.setMapOutputKeyClass(Fof.class);job.setMapOutputValueClass(IntWritable.class);job.setInputFormatClass(KeyValueTextInputFormat.class);Path in = new Path("/home/jinzhao/mrtest/input");FileInputFormat.setInputPaths(job, in);Path out = new Path("/home/jinzhao/mrtest/output");FileSystem fs = FileSystem.get(conf);if (fs.exists(out))fs.delete(out, true);FileOutputFormat.setOutputPath(job,  out);if ( job.waitForCompletion(true)){Job job2 = Job.getInstance(conf);job2.setJarByClass(Friends.class);job2.setJobName("friend-II");job2.setMapperClass(SortMapper.class);job2.setReducerClass(SortReducer.class);job2.setMapOutputKeyClass(User.class);job2.setMapOutputValueClass(User.class);job2.setInputFormatClass(KeyValueTextInputFormat.class);job2.setSortComparatorClass(FSort.class);job2.setGroupingComparatorClass(FGroup.class);Path in2 = new Path("/home/jinzhao/mrtest/output");FileInputFormat.setInputPaths(job2, in2);Path out2 = new Path("/home/jinzhao/mrtest/output2");if (fs.exists(out2))fs.delete(out2, true);FileOutputFormat.setOutputPath(job2,  out2);job2.waitForCompletion(true);}} catch (Exception e){e.printStackTrace();}}static class SortMapper extends Mapper<Text, Text, User, User>{@Overrideprotected void map(Text key, Text value, Context context)throws IOException, InterruptedException {String[] friends = key.toString().split(" ");int count = Integer.parseInt(value.toString());context.write(new User(friends[0], count), new User(friends[1], count));context.write(new User(friends[1], count), new User(friends[0], count));}}static class SortReducer extends Reducer<User, User, Text, Text>{@Overrideprotected void reduce(User key, Iterable<User> values, Context context)throws IOException, InterruptedException {StringBuilder sb = new StringBuilder();for (User i : values)sb.append(i.getUsername() + "," + i.getCount() + " ");context.write(new Text(key.getUsername()), new Text(sb.toString().trim()));}}
}
import org.apache.hadoop.io.Text;public class Fof extends Text{public Fof(){super();}public Fof(String a, String b){super(getFof(a, b));}public static String getFof(String a, String b){int r = a.compareTo(b);if (r < 0)return a + " " + b;else return b + " " + a;}
}
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.WritableComparable;public class User implements WritableComparable<User>{private String username;private int count;public User(){}public User(String username, int count){this.username = username;this.count = count;}public String getUsername() {return username;}public void setUsername(String username) {this.username = username;}public int getCount() {return count;}public void setCount(int count) {this.count = count;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(username);out.writeInt(count);}@Overridepublic void readFields(DataInput in) throws IOException {this.username = in.readUTF();this.count = in.readInt();}@Overridepublic int compareTo(User arg0) {int c1 = this.username.compareTo(arg0.username);if (c1 == 0){return this.count - arg0.getCount();} elsereturn c1;}}
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class FGroup extends WritableComparator{public FGroup(){super(User.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {User u1 = (User)a;User u2 = (User)b;return  u1.getUsername().compareTo(u2.getUsername());}}
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;public class FSort extends WritableComparator{public FSort(){super(User.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {User u1 = (User)a;User u2 = (User)b;int c1 = u1.getUsername().compareTo(u2.getUsername());if (c1==0){return u2.getCount() - u1.getCount();} elsereturn c1;}
}

05 MapReduce应用案例02相关推荐

  1. 05 MapReduce应用案例03

    8.PageRank Page-rank源于Google,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度. Page-rank实现了将链接价值概念作为排名因素. 算法原理 – 入链 == ...

  2. 05 MapReduce应用案例01

    1.单词计数 在一定程度上反映了MapReduce设计的初衷--对日志文件进行分析. public class WordCountMapper extends Mapper<LongWritab ...

  3. 大数据 - MapReduce编程案例 -BH3

    MapReduce编程案例 用mapreduce解决问题的关键是确定key,只有key相同的结果才会到同一个reduce中进行处理 默认分区使用HashPartitoner,hashCode%redu ...

  4. hadoop之mapreduce教程+案例学习(二)

    第3章 MapReduce框架原理 目录 第3章 MapReduce框架原理 3.1 InputFormat数据输入 3.1.1 切片与MapTask并行度决定机制 3.1.2 Job提交流程源码和切 ...

  5. CSS实验案例02简单专业介绍网页

    文章目录 CSS实验案例02简单专业介绍网页 2.1CSS 2.2HTML CSS实验案例02简单专业介绍网页 2.1CSS body {/* 主体*/font-size: 24px;text-ali ...

  6. MapReduce经典案例总结

    MapReduce经典案例总结 首先得搭好hadoop环境,windows搭好单机环境 1.根据手机号统计上网流量,统计上行流量.下行流量和总流量 数据如下,文件 flow.log,上传到hadoop ...

  7. hadoop之mapreduce教程+案例学习(一)

    第1章 MapReduce概述 目录 第1章 MapReduce概述 1.1 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析 ...

  8. oracle实体视图日志,Oracle案例02——ORA-12034: SCOTT.USER_TABLE 上的实体化视图日志比上次刷新后的内容新...

    通过查看schedual job报错日志,具体报错信息如下 ORA-12034:"SCOTT"."USER_TABLE" 上的实体化视图日志比上次刷新后的内容新 ...

  9. 【赵强老师】MapReduce编程案例之求工资总额

    先看视频. [赵强老师]MapReduce编程案例之求工资总额 Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上, ...

最新文章

  1. java bean spring_Java+Spring+Bean+注入方式
  2. 全程360°4K!CNN将VR直播美国超级日全食
  3. 深入出不来nodejs源码-V8引擎初探
  4. 关于 app测试工具
  5. Loader之二:CursorLoader基本实例
  6. 详解各类以太网标准10BASE-T/100BASE-T4/100BASE-FX/1000BASE-X等
  7. 在Java中实现过滤器和面包店锁
  8. 利用SQL和Python分别实现人流量查询,考验逻辑思维的时候到了
  9. django mysql settings
  10. 互联网的未来之上:平权的互联网
  11. webshell使用方法-防止DDOS脚本-暴力破解-自动劫持root密码并转发密码到邮箱
  12. C#设计模式(20)——策略者模式(Stragety Pattern)
  13. 微信js-sdk集成小结
  14. python通过身份证或出生日期获取年龄
  15. 将老电脑从windows xp系统升级到10//将win10 32位系统更为64位
  16. 汽车故障诊断方法及注意事项
  17. 【2014-08-23】Beyong Coding
  18. 锐捷无线网络优化 之 精准配置无线接入点发射功率
  19. 360前端校招2019笔试编程题
  20. 使用Python进行并发编程

热门文章

  1. java拆装_JAVA线性表拆解
  2. Java SecurityManager checkDelete()方法与示例
  3. dbms数据库管理系统_基本数据库管理系统(DBMS)能力问题和解答
  4. kotlin 二进制_Kotlin程序检查数字是否为二进制
  5. 按钮旁边加一个提示_地铁站的那些“红色按钮”,你知道是干啥用的吗?乱按可能被拘留...
  6. 转——idapython import site failed
  7. linux操作系统之线程
  8. mysql gtid基础_MySQL 基础知识梳理学习(四)----GTID
  9. 装配组件_基于Haption力反馈系统的交互式装配仿真
  10. 【剑指offer】_17正则表达式的匹配