====
案例:分组求top1

自定义GroupingComparator求取topN
GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑
3.1 需求
有如下订单数据
订单id    商品id    成交金额
Order_0000001    Pdt_01    222.8
Order_0000001    Pdt_05    25.8
Order_0000002    Pdt_03    522.8
Order_0000002    Pdt_04    122.4
Order_0000002    Pdt_05    722.4
Order_0000003    Pdt_01    222.8

现在需要求出每一个订单中成交金额最大的一笔交易

3.2 分析
1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值

第一步:读取文件,解析成key, value对。
第二步:自定义map逻辑,接收k1,v1,转换成新的k2,v2。
k1: LongWritable; v1: Text
k2: OrderBean(orderId, price); v2: NullWritable
第一点:一定要保证:相同orderID的数据到同一个reduce里面去,以订单id作为key2。
第二点:我们相同的订单id的数据,需要按照金额进行排序。需要按照谁排序,就把谁作为k2。金额作为key2。
所以,定义一个orderBean,有两个属性,一个是orderId订单号,一个是price金额。以OrderBean作为key2。
默认分区规则HashPartitioner
自定义分区规则,以订单id作为分区一句,相同的订单id发送到同一个reduce里面去。
排序以OrderBean里面的金额进行排序。注意,排序有要求,只有相同的orderId里面的金额才需要排序。

第三步:自定义分区,以我们的OrderBean里面的orderId作为分区的依据。
第四步:排序。按照相同订单的金额进行排序,不同订单金额没有可比性。
第五步:规约。
第六步:分组。接收k2,v2。
k2: OrderBean(orderId, price); v2: NullWritable
每一条数据,都是一个独立的组,每一条数据分完组后,都要去独立地调用一次reduce逻辑。
需要自定义分组,我们的任务相同订单id的数据是同一个组里面的数据,给我们合并为一组。
相同的订单id作为一组,我们这个组里面有多条数据,但是只给我们显示了金额最大的那一条。(分组求top1)

第七步:自定义reduce逻辑接收k2,v2,直接输出k2,就是我们的最大值top1。
第八步:输出文件。

代码实现
第一步:定义OrderBean
package cn.itcast.demo5.top1;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

private String orderId;
    private Double price;

/**
     * 比较器
     * @param o
     * @return
     */
    @Override
    public int compareTo(OrderBean o) {
        //第一个问题:不同订号,不用比较价格排序
        int i = this.orderId.compareTo(o.orderId);
        if(i ==0){
            //订单号相等了
            //继续比较交个
           i = this.price.compareTo(o.price);
           i = -i;
        }
        return i;
    }

/**
     * 序列化的方法
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);

}

/**
     * 反序列化的方法
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();

}

public String getOrderId() {
        return orderId;
    }

public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

public Double getPrice() {
        return price;
    }

public void setPrice(Double price) {
        this.price = price;
    }

@Override
    public String toString() {
        return orderId +"\t"+price;
    }
}

第二步:自定义分区
package cn.itcast.demo5.top1;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class Top1Partition extends Partitioner<OrderBean,NullWritable> {
    //判断我们的orderId,如果orderId相同的数据,一定要发送到同一个reduce里面去
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numReduceTasks) {

//动态的获取分区,保证了我们相同订单号的数据一定是发送到了同一个reducetask里面去了
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

第三步:自定义groupingComparator
package cn.itcast.demo5.top1;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class Top1GroupingComparator extends WritableComparator{

/**
     * 重写无参的构造器,告诉我们的父类,我们传入的数据类型是orderBean
     */
    public Top1GroupingComparator(){
        super(OrderBean.class,true);
    }

/**
     * 注意我们需要重新一个compare方法,注意,我们使用的comare方法,必须使用WritableComparable这两个参数的
     * 如果使用object两个参数,没法进行比较
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //比较订单id,如果相同的订单id,我们就认为是同一组
        OrderBean first = (OrderBean) a;
        OrderBean second = (OrderBean) b;
        //将订单id比较的结果直接返回,直接返回,分组的时候依据我们返回的结果值,就知道两个数据之间进行比较要不要放到同一个组里面去
        int i = first.getOrderId().compareTo(second.getOrderId());
        return i;

}
}

package cn.itcast.demo5.top1;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Top1Mapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {

@Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //Order_0000005    Pdt_01    222.8
        String[] split = value.toString().split("\t");
        OrderBean orderBean = new OrderBean();
        orderBean.setOrderId(split[0]);
        orderBean.setPrice(Double.valueOf(split[2]));
        context.write(orderBean,NullWritable.get());
    }
}

package cn.itcast.demo5.top1;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Top1Reducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        context.write(key,NullWritable.get());
    }
}

第四步:程序main函数入口
package cn.itcast.demo5.top1;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Top1Main extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "TOP1");

//第一步:读取文件,解析成key,value1
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义groupingComparator\\input"));
        //第二步:自定义map逻辑,接收k1   v1   转换成k2   v2  输出
        job.setMapperClass(Top1Mapper.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);
        //第三步:自定义分区
        job.setPartitionerClass(Top1Partition.class);

//第四步:排序
        //第五步:规约省掉
        //第六步:分组
        job.setGroupingComparatorClass(Top1GroupingComparator.class);

//第七步:自定义reduce逻辑
        job.setReducerClass(Top1Reducer.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义groupingComparator\\out_top1"));

boolean b = job.waitForCompletion(true);

return b?0:1;
    }

public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new Top1Main(), args);
        System.exit(run);
    }
}
====

案例:分组求topN

k2:
订单id相同的值,作为一组,调用一次reduce逻辑
Order_0000002 Pdt_05 722.4
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4

v2:
[Order_0000002 Pdt_05 722.4, Order_0000002 Pdt_03 522.8, Order_0000002 Pdt_04 122.4]

代码实现:
package cn.itcast.demo6.topN;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

private String orderId;
    private Double price;

/**
     * 比较器
     * @param o
     * @return
     */
    @Override
    public int compareTo(OrderBean o) {
        //第一个问题:不同订单号,不用比较价格排序
        int i = this.orderId.compareTo(o.orderId);
        if(i ==0){
            //订单号相等了
            //继续比较价格
           i = this.price.compareTo(o.price);
           i = -i;
        }
        return i;
    }

/**
     * 序列化的方法
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);

}

/**
     * 反序列化的方法
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();

}

public String getOrderId() {
        return orderId;
    }

public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

public Double getPrice() {
        return price;
    }

public void setPrice(Double price) {
        this.price = price;
    }

@Override
    public String toString() {
        return orderId +"\t"+price;
    }
}

package cn.itcast.demo6.topN;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class TopNPartition extends Partitioner<OrderBean,Text> {
    //判断我们的orderId,如果orderId相同的数据,一定要发送到同一个reduce里面去
    @Override
    public int getPartition(OrderBean orderBean, Text text, int numReduceTasks) {

//动态的获取分区,保证了我们相同订单号的数据一定是发送到了同一个reducetask里面去了
        return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }
}

package cn.itcast.demo6.topN;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class TopNGroupingComparator extends WritableComparator{

/**
     * 重写无参的构造器,告诉我们的父类,我们传入的数据类型是orderBean
     */
    public TopNGroupingComparator(){
        super(OrderBean.class,true);
    }

/**
     * 注意我们需要重新一个compare方法,注意,我们使用的comare方法,必须使用WritableComparable这两个参数的
     * 如果使用object两个参数,没法进行比较
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //比较订单id,如果相同的订单id,我们就认为是同一组
        OrderBean first = (OrderBean) a;
        OrderBean second = (OrderBean) b;
        //将订单id比较的结果直接返回,直接返回,分组的时候依据我们返回的结果值,就知道两个数据之间进行比较要不要放到同一个组里面去
        int i = first.getOrderId().compareTo(second.getOrderId());
        return i;

}
}

package cn.itcast.demo6.topN;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TopNMapper extends Mapper<LongWritable,Text,OrderBean,Text> {

@Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //Order_0000005    Pdt_01    222.8
        String[] split = value.toString().split("\t");
        OrderBean orderBean = new OrderBean();
        orderBean.setOrderId(split[0]);
        orderBean.setPrice(Double.valueOf(split[2]));
        context.write(orderBean,value);
    }
}

package cn.itcast.demo6.topN;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TopNReducer extends Reducer<OrderBean,Text,OrderBean,Text> {
    @Override
    protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
       //输出每组里面最大的前两个值
        int i = 0;
        for (Text value : values) {
            if(i> 1){
                break;
            }else{
                context.write(key,value);
                i ++;
            }
        }
    }
}

package cn.itcast.demo6.topN;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TopNMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(super.getConf(), "TOPN");

//第一步:读取文件,解析成key,value1
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义groupingComparator\\input"));
        //第二步:自定义map逻辑,接收k1   v1   转换成k2   v2  输出
        job.setMapperClass(TopNMapper.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(Text.class);
        //第三步:自定义分区
        job.setPartitionerClass(TopNPartition.class);

//第四步:排序
        //第五步:规约省掉
        //第六步:分组
        job.setGroupingComparatorClass(TopNGroupingComparator.class);

//第七步:自定义reduce逻辑
        job.setReducerClass(TopNReducer.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(Text.class);

job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义groupingComparator\\out_topN"));

boolean b = job.waitForCompletion(true);

return b?0:1;
    }

public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new TopNMain(), args);
        System.exit(run);
    }
}
====

案例:分组求top1、求topN相关推荐

  1. ML之ME/LF:机器学习中常见模型评估指标/损失函数(LiR损失、L1损失、L2损失、Logistic损失)求梯度/求导、案例应用之详细攻略

    ML之ME/LF:机器学习中常见模型评估指标/损失函数(LiR损失.L1损失.L2损失.Logistic损失)求梯度/求导.案例应用之详细攻略 目录 常见损失函数求梯度案例 1.线性回归求梯度 2.L ...

  2. oracle的rollup操作---按照小组进行分组,同时求总计

    rollup配合goup by使用,照小组进行分组,同时求总计.可以提供信息汇总功能(类似于"小计") ROLLUP在数据统计和报表生成过程中带来极大的便利 rollup操作--- ...

  3. Python编程经典案例【考题】求某个范围内能被3整除且能被5整除的所有数,及这些数的和

    本文和你一起探索Python编程经典案例,让你沉浸式学习Python.你可以拿着题目先思考,然后再对照本文解题方法进行比较.有不同的见解欢迎到公众号中跟我一起探讨.    文章目录 一.经典案例[考题 ...

  4. MATLAB—离散一元、二元、多元函数求导求梯度(二维、三维、多维空间)(diff和gradient)

    (离散)一元函数求导-二维 已知同维度的x和y序列,则可使用diff(y)./diff(x)来估算.设x为n维向量,Dx=diff(x),计算向量x的向前差分,DX(i)=X(i+1)-X(i),0& ...

  5. ACMNO.16用迭代法求 。求平方根的迭代公式为: X[n+1]=1/2(X[n]+a/X[n]) 要求前后两次求出的得差的绝对值少于0.00001。 输出保留3位小数 输入 X 输出 X的

    题目描述 用迭代法求 . 求平方根的迭代公式为: X[n+1]=1/2(X[n]+a/X[n]) 要求前后两次求出的得差的绝对值少于0.00001.输出保留3位小数 输入 X 输出 X的平方根 样例输 ...

  6. python numpy 多项式函数 求导求根

    python numpy 多项式函数 求导求根 """求出多项式的 导函数与根 """import numpy as np import m ...

  7. XDOJ例题及答案第八更 密码编译 密码强度 排序2 排序3 排序算法比较器 判断奇偶性 平均数 气温波动 亲和数 求交错序列前N项和 求平均值求奇数的和

    目录 密码编译 密码强度 排序2 排序3 排序算法比较器 判断奇偶性 平均数 气温波动 亲和数 求交错序列前N项和 求平均值 求奇数的和 密码编译 #include<stdio.h> in ...

  8. 求sinx的近似值 c语言,用C语言求多项式求sinx的近似值。

    用C语言求多项式求sinx的近似值. 來源:互聯網  2010-12-22 19:59:48  評論 分類: 電腦/網絡 >> 程序設計 >> 其他編程語言 問題描述: 3.利 ...

  9. C语言求随机两个向量乘积,求一个“求向量内积”的C语言程序!

    求一个"求向量内积"的C语言程序! 來源:互聯網  2010-03-14 02:52:44  評論 分類: 教育/科學 >> 升學入學 >> 高考 參考答案 ...

最新文章

  1. 日常办公会用到的python模块-Python如何去实际提高工作的效率?也许这个会有用!...
  2. 20165320 第十周课上测试补做
  3. 用PHP实现POP3邮件的收取(一)
  4. 微软想将新版Edge浏览器引入Linux
  5. 前端开发 表单元素 0229
  6. Angular 5和ASP.NET Core入门
  7. L1-044 稳赢-PAT团体程序设计天梯赛GPLT
  8. C#字节数组的常用解码处理方法
  9. D-star Lite算法及其动态路径规划实验研究
  10. 大学生创新创业训练计划项目申请书
  11. rust倒地了怎么起来_ggxx出招表
  12. ceph的rbd使用和理解(全)
  13. MySql作业练习题
  14. BI工具和数据中台有什么区别?
  15. LeetCode1359. 有效的快递序列数目
  16. 威联通[vNAS內置虚拟机]体验评测 让企业实现无限可能
  17. 6.xp 开机画面【欢迎使用】四个字更改方法:
  18. 2023测绘资质申请条件
  19. win10+TeamVIew+花生壳 0元实现 远程开机教程附带截图超详细 已实践成功
  20. Inno setup 常用修改技巧

热门文章

  1. 【项目管理案例】第三期:如何应对项目中的风险
  2. IntelliJ IDEA生成时序图的插件(超级好用)
  3. 详解苹果Face ID,将让深度摄像头成主流
  4. 转行IT要趁早,多迪教育新就业数据告诉你真相
  5. 多迪技术总监揭秘:如何成为一个合格的Java程序员?
  6. 复盘今年寒气逼人的秋招
  7. java gui快速设计,Java GUI简单设计 360天气
  8. 2023最新最新PHP代挂网站源码+无需域名授权/支持燃鹅代抽
  9. 基于51单片机设计的呼吸灯
  10. openGauss开源2周年,破解数据库生态痛点