1.日志源文件 (各个列分别是: 账户,营业额,花费,日期)

zhangsan@163.com    6000    0    2014-02-20
lisi@163.com    2000    0    2014-02-20
lisi@163.com    0    100    2014-02-20
zhangsan@163.com    3000    0    2014-02-20
wangwu@126.com    9000    0    2014-02-20
wangwu@126.com    0    200    2014-02-20

想要的结果: (计算出每个账户的总营业额和总花费,要求营业额排序降序,如果营业额相同则花费少的在上面)

zhangsan@163.com    9000    0    9000
wangwu@126.com    9000    200    8800
lisi@163.com    2000    100    1900

2.写代码:

InfoBean.java  对账户的后三个字段封装成一个Bean对象

 1 import java.io.DataInput;
 2 import java.io.DataOutput;
 3 import java.io.IOException;
 4
 5 import org.apache.hadoop.io.WritableComparable;
 6
 7 //要和其他的InfoBean类型进行比较,所以此处泛型T为InfoBean
 8 public class InfoBean implements WritableComparable<InfoBean> {
 9
10     private String account;
11     private double income;
12     private double expenses;
13     private double surplus;
14
15     /*
16      *如果不写这个方法,封装InfoBean对象的时候就要分别set这个对象的各个属性.
17      */
18     public void set(String account,double income,double expenses){
19         this.account = account;
20         this.income = income;
21         this.expenses = expenses;
22         this.surplus = income -expenses;
23     }
24     @Override
25     public void write(DataOutput out) throws IOException {
26         out.writeUTF(account);
27         out.writeDouble(income);
28         out.writeDouble(expenses);
29         out.writeDouble(surplus);
30     }
31
32     @Override
33     public void readFields(DataInput in) throws IOException {
34         this.account = in.readUTF();
35         this.income = in.readDouble();
36         this.expenses = in.readDouble();
37         this.surplus = in.readDouble();
38     }
39
40     @Override
41     public int compareTo(InfoBean o) {
42         if(this.income == o.getIncome()){
43             return this.expenses > o.getExpenses() ? 1 : -1;
44         } else {
45             return this.income > o.getIncome() ? -1 : 1;
46         }
47     }
48
49     @Override
50     //toString()方法输出的格式最好和源文件trade_info.txt中的格式一样, 字段通过Tab键分隔.
51     //而且在SumReducer类输出k3,v3的时候会输出k3(context.write(key, v);) 所以这个地方没有必要再输出k3(account)
52     public String toString() {
53 //        return "InfoBean [account=" + account + ", income=" + income
54 //                + ", expenses=" + expenses + ", surplus=" + surplus + "]";
55         return this.income + "\t" + this.expenses+"\t" + this.surplus;
56     }
57     public double getIncome() {
58         return income;
59     }
60
61     public void setIncome(double income) {
62         this.income = income;
63     }
64
65     public double getExpenses() {
66         return expenses;
67     }
68
69     public void setExpenses(double expenses) {
70         this.expenses = expenses;
71     }
72
73     public double getSurplus() {
74         return surplus;
75     }
76
77     public void setSurplus(double surplus) {
78         this.surplus = surplus;
79     }
80
81     public String getAccount() {
82         return account;
83     }
84
85     public void setAccount(String account) {
86         this.account = account;
87     }
88
89 }

SumStep.java

 1 import java.io.IOException;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.Text;
 7 import org.apache.hadoop.mapreduce.Job;
 8 import org.apache.hadoop.mapreduce.Mapper;
 9 import org.apache.hadoop.mapreduce.Reducer;
10 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
11 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12
13 public class SumStep {
14
15     public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
16         private Text k = new Text();
17         private InfoBean bean = new InfoBean();
18
19         @Override
20         protected void map(LongWritable key, Text value,Mapper<LongWritable, Text, Text, InfoBean>.Context context)
21                 throws IOException, InterruptedException {
22
23             String line = value.toString();
24             String [] fields = line.split("\t");
25             String account = fields[0];
26             double income = Double.parseDouble(fields[1]);
27             double expenses = Double.parseDouble(fields[2]);
28             k.set(account);
29             bean.set(account, income, expenses);
30             context.write(k, bean);
31         }
32     }
33     public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
34         private InfoBean v = new InfoBean();
35         @Override
36         protected void reduce(Text key, Iterable<InfoBean> values,Reducer<Text, InfoBean, Text, InfoBean>.Context context)
37                 throws IOException, InterruptedException {
38             double sum_in = 0;
39             double sum_out = 0;
40             for(InfoBean bean : values){
41                 sum_in += bean.getIncome();
42                 sum_out += bean.getExpenses();
43             }
44             /*
45              * 在crxy的流量统计的案例中   是如下的方式写出k3和v3的 在reduce方法中new这个封装好的对象.
46              * 但是如果数据量比较大的情况下 是可能会造成内存溢出的.
47              * TrafficWritable v3 = new TrafficWritable(t1, t2, t3, t4);
48              * context.write(k2, v3);
49              *
50              * 所以建议把这个封装的对象写在"脑袋顶上" 如上所示....private InfoBean v = new InfoBean();
51              * 但是如果你Java基础比较好的话可能会说 在Java中是引用传递...所以后面的v会覆盖前面的v,造成最后只有最有一个v
52              * 其实这里是不会产生问题的,因为context.write()方法会直接把v3对应的InfoBean对象序列化.
53              * 虽然之前对象的引用确实覆盖了,但是之前对象的值等都保存了下来.是可以放在这个类的"脑袋顶上"的.
54              * 让这个类公用这个InfoBean对象.
55              */
56
57             v.set(key.toString(),sum_in,sum_out);
58             context.write(key, v);
59         }
60     }
61     public static void main(String[] args) throws Exception {
62         Configuration conf = new Configuration();
63         Job job = Job.getInstance(conf);
64         job.setJarByClass(SumStep.class);
65
66         job.setMapperClass(SumMapper.class);
67         //以下两行可以在满足一定条件的时候省略掉.
68         //在满足k2和k3,v2和v3一一对应的时候就可以省略掉. 看SumReducer类所在行的泛型.
69         job.setMapOutputKeyClass(Text.class);
70         job.setMapOutputValueClass(InfoBean.class);
71
72         FileInputFormat.setInputPaths(job, new Path(args[0]));
73
74         job.setReducerClass(SumReducer.class);
75         job.setOutputKeyClass(Text.class);
76         job.setOutputValueClass(InfoBean.class);
77         FileOutputFormat.setOutputPath(job, new Path(args[1]));
78         job.waitForCompletion(true);
79     }
80 }

项目打成jar包放到Linux中,日志源文件上传到HDFS上.运行结果如下:

hadoop jar /root/itcastmr.jar itcastmr.SumStep /user/root/trade_info.txt /tradeout

但是这个结果并没有排序.还是按照账号的字典排序.

以这个MR的输出当做输入对其根据InfoBean对象进行排序.....

上代码SortStep.java:

 1 import java.io.IOException;
 2
 3 import org.apache.hadoop.conf.Configuration;
 4 import org.apache.hadoop.fs.Path;
 5 import org.apache.hadoop.io.LongWritable;
 6 import org.apache.hadoop.io.NullWritable;
 7 import org.apache.hadoop.io.Text;
 8 import org.apache.hadoop.mapreduce.Job;
 9 import org.apache.hadoop.mapreduce.Mapper;
10 import org.apache.hadoop.mapreduce.Reducer;
11 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
12 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
13
14 public class SortStep {
15     //这个Mapper读取的HDFS文件是SumStep Reduce计算输出的文件.
16     public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{
17         private InfoBean k = new InfoBean();
18         @Override
19         protected void map(LongWritable key,Text value,Mapper<LongWritable, Text, InfoBean, NullWritable>.Context context)
20                 throws IOException, InterruptedException {
21             String line = value.toString();
22             String [] fields = line.split("\t");
23             String account = fields[0];
24             double income = Double.parseDouble(fields[1]);
25             double expenses = Double.parseDouble(fields[2]);
26             k.set(account, income, expenses);
27             //现在是要求按照InfoBean对象中的规则排序(InfoBean中有compareTo方法)...所以InfoBean对象当做k2...
28             context.write(k,NullWritable.get());//不能传null,NullWritable.get() 是获得的this对象.
29         }
30     }
31     public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
32         private Text k = new Text();
33         @Override
34         protected void reduce(InfoBean bean, Iterable<NullWritable> values,Reducer<InfoBean, NullWritable, Text, InfoBean>.Context context)
35                 throws IOException, InterruptedException {
36             String account  = bean.getAccount();
37             k.set(account);
38             context.write(k, bean);
39         }
40     }
41
42     public static void main(String[] args) throws Exception {
43         Configuration conf = new Configuration();
44         Job job = Job.getInstance(conf);
45         job.setJarByClass(SortStep.class);
46
47         job.setMapperClass(SortMapper.class);
48         //以下两行可以在满足一定条件的时候省略掉.
49         //在满足k2和k3,v2和v3一一对应的时候就可以省略掉. 看SumReducer类所在行的泛型.
50         job.setMapOutputKeyClass(InfoBean.class);
51         job.setMapOutputValueClass(NullWritable.class);
52
53         FileInputFormat.setInputPaths(job, new Path(args[0]));
54
55         job.setReducerClass(SortReducer.class);
56         job.setOutputKeyClass(Text.class);
57         job.setOutputValueClass(InfoBean.class);
58         FileOutputFormat.setOutputPath(job, new Path(args[1]));
59         job.waitForCompletion(true);
60     }
61 }

打成jar包,然后运行命令....输入为上面SumStep.java的输出

hadoop jar /root/itcastmr.jar itcastmr.SortStep /tradeout /trade_sort_out

排序之后的结果:

在MapReduce读取输入数据的时候,如果这个文件是以下划线开始的话,那么会不会读取这个文件中的内容...."_SUCCESS"文件就不会读取....

如果想对某个类进行排序,

1.这个类要实现WritableComparable接口,

2.还要重写compareTo方法. 根据自己的业务逻辑自定义排序.

只需要把要排序的类当做k2 就可以了...框架自动排序.

要排序对象的compareTo方法是框架调用的,框架在shuffle这个阶段会调用排序.

shuffle后面会讲,shuffle由很多很多的阶段组成,分区,排序,分组,combiner等等...把这些小的细节都讲完了之后再讲shuffle.

本文转自SummerChill博客园博客,原文链接:http://www.cnblogs.com/DreamDrive/p/7398455.html,如需转载请自行联系原作者

MapReduce对交易日志进行排序的Demo(MR的二次排序)相关推荐

  1. java二次排序_使用java 实现二次排序

    二次排序工具类: import java.io.Serializable; import scala.math.Ordered; /** * @author 作者 E-mail: * @version ...

  2. 数据结构排序系列详解之二 希尔排序

    接着上一篇博客,这篇要介绍的是希尔排序(缩小增量排序法). 希尔排序:通过比较相距一定间隔的元素来工作:各趟比较所用的距离(增量)随着算法的进行而减小,直到只比较相邻元素的最后一趟排序为止.是插入排序 ...

  3. hadoop之MapReduce自定义二次排序流程实例详解

    一.概述 MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的.在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求 ...

  4. 详细讲解MapReduce二次排序过程

    2019独角兽企业重金招聘Python工程师标准>>> 我在15年处理大数据的时候还都是使用MapReduce, 随着时间的推移, 计算工具的发展, 内存越来越便宜, 计算方式也有了 ...

  5. MapReduce自定义二次排序流程

    每一条记录开始是进入到map函数进行处理,处理完了之后立马就入自定义分区函数中对其进行分区,当所有输入数据经过map函数和分区函数处理完之后,就调用自定义二次排序函数对其进行排序. MapReduce ...

  6. Hadoop Mapreduce分区、分组、二次排序过程详解

    2019独角兽企业重金招聘Python工程师标准>>> 1.MapReduce中数据流动    (1)最简单的过程:  map - reduce    (2)定制了partition ...

  7. MapReduce二次排序

    2019独角兽企业重金招聘Python工程师标准>>> 默认情况下,Map输出的结果会对Key进行默认的排序,但是有时候需要对Key排序的同时还需要对Value进行排序,这时候就要用 ...

  8. Hadoop Mapreduce分区、分组、二次排序过程详解[转]

    徐海蛟 教学用途 1.MapReduce中数据流动 (1)最简单的过程: map - reduce (2)定制了partitioner以将map的结果送往指定reducer的过程: map - par ...

  9. [Spark的二次排序的实现]

    二次排序原理 二次排序就是首先按照第一字段排序,然后再对第一字段相同的行按照第二字段排序,注意不能破坏第一次排序的结果. 二次排序技术 假设对应的Key = K有如下值: (K,V1), (K,V2) ...

最新文章

  1. 汇编调用c语言函数 call printf,如何在汇编调用c语言函数?
  2. VTK:Utilities之DataAnimation
  3. java考题_Java考题知识点
  4. 如何制作计算机启动盘,一款U盘启动盘制作小工具
  5. web csrf java_在Java Web应用程序中阻止CSRF
  6. 如何在Xubuntu 15.04中安装最新版Eclipse luna
  7. 手把手打造开源新监控利器check_mk
  8. 一维搜索——进退法matlab实现(含代码)
  9. perl-我的第一个程序
  10. 组态软件(Configure)
  11. 情侣天气推送升级简单版 项目上传github实现定时自动推送教程
  12. vue中使用iview修改Cascader的默认数据
  13. seo软件优化工具软件-免费seo软件优化工具以及免费排名软件下载
  14. 不同编程语言下CH347DLL的调用方法
  15. H G W S哪一个不是状态函数_师大版成语词典发布!不学不是师大人!
  16. A5SHB,A5SHB芯片三极管规格书
  17. 安装zsh 、omyzsh
  18. 自定义域名:为自己的CSDN博客添加自定义域名吧!
  19. 推荐系统精排模型中的ID特征
  20. jetsonnano安装cartographer记录

热门文章

  1. 数字化校园passport
  2. swift3.0提示框新用法
  3. node.js学习5--------------------- 返回html内容给浏览器
  4. linux硬盘满了问题排查
  5. 黑客之王!韩国神童登顶黑客界的“极限挑战”
  6. Thrift抛直接内存OOM一点解决思路
  7. mysql 数据目录更改
  8. LomoX 桌面UI框架更新,增加资源管理
  9. HTML学习笔记_002_如何学习HTML
  10. 应用Etherchannel扩展企业服务的高可用性