需求:

订单id 商品id 成交金额
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_05 325.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000003 Pdt_01 222.8

按照订单的编号分组,计算出每组的商品价格最大值。

分析:

  我们可以把订单编号当做key,然后按照在reduce端去找出每组的最大值。在这里,我想介绍另外一种方法,顺便介绍GroupingComparator。

  我们可以自定义一个类型,然后通过GroupingComparator来让其被看成一组(到达reduce端),如果我们对类型进行从大到小的排序,根据MapReduce的规则,同一组的内容到达reduce端,是取第一个内容的key作为reduce的key的,我们不妨利用这个规则,写一个OrderBean的类型,只要让其orderid相同,就被分到同一组,这样一来,到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位,就是我们想要的结果。

代码:

OrderBean.java:

package com.darrenchan.mr.groupingcomparator;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;public class OrderBean implements WritableComparable<OrderBean>{private Text itemid;private DoubleWritable amount;public OrderBean() {}public OrderBean(Text itemid, DoubleWritable amount) {set(itemid, amount);}public void set(Text itemid, DoubleWritable amount) {this.itemid = itemid;this.amount = amount;}public Text getItemid() {return itemid;}public DoubleWritable getAmount() {return amount;}@Overridepublic int compareTo(OrderBean o) {//        int cmp = this.itemid.compareTo(o.getItemid());
//        if (cmp == 0) {int    cmp = -this.amount.compareTo(o.getAmount());
//        }return cmp;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(itemid.toString());out.writeDouble(amount.get());}@Overridepublic void readFields(DataInput in) throws IOException {String readUTF = in.readUTF();double readDouble = in.readDouble();this.itemid = new Text(readUTF);this.amount= new DoubleWritable(readDouble);}@Overridepublic String toString() {return itemid.toString() + "\t" + amount.get();}}

ItemidGroupingComparator.java

package com.darrenchan.mr.groupingcomparator;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 利用reduce端的GroupingComparator来实现将一组bean看成相同的key**/
public class ItemidGroupingComparator extends WritableComparator {//传入作为key的bean的class类型,以及制定需要让框架做反射获取实例对象protected ItemidGroupingComparator() {super(OrderBean.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {OrderBean abean = (OrderBean) a;OrderBean bbean = (OrderBean) b;//比较两个bean时,指定只比较bean中的orderidreturn abean.getItemid().compareTo(bbean.getItemid());}}

ItemIdPartitioner.java

package com.darrenchan.mr.groupingcomparator;import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;public class ItemIdPartitioner extends Partitioner<OrderBean, NullWritable>{@Overridepublic int getPartition(OrderBean bean, NullWritable value, int numReduceTasks) {//相同id的订单bean,会发往相同的partition//而且,产生的分区数,是会跟用户设置的reduce task数保持一致return (bean.getItemid().hashCode() & Integer.MAX_VALUE) % numReduceTasks;}}

SecondarySort.java

package com.darrenchan.mr.groupingcomparator;import java.io.IOException;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.sun.xml.bind.v2.schemagen.xmlschema.List;/*** */
public class SecondarySort {static class SecondarySortMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable>{OrderBean bean = new OrderBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();String[] fields = StringUtils.split(line, ",");bean.set(new Text(fields[0]), new DoubleWritable(Double.parseDouble(fields[2])));context.write(bean, NullWritable.get());}}static class SecondarySortReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable>{//到达reduce时,相同id的所有bean已经被看成一组,且金额最大的那个一排在第一位@Overrideprotected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {context.write(key, NullWritable.get());}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(SecondarySort.class);job.setMapperClass(SecondarySortMapper.class);job.setReducerClass(SecondarySortReducer.class);job.setOutputKeyClass(OrderBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("/grouping/srcdata"));FileOutputFormat.setOutputPath(job, new Path("/grouping/output"));//在此设置自定义的Groupingcomparator类 job.setGroupingComparatorClass(ItemidGroupingComparator.class);//在此设置自定义的partitioner类job.setPartitionerClass(ItemIdPartitioner.class);job.setNumReduceTasks(3);job.waitForCompletion(true);}}

运行结果:

作者: DarrenChan陈驰
出处:https://www.cnblogs.com/DarrenChan/p/6773277.html
版权:本站使用「CC BY 4.0」创作共享协议,转载请在文章明显位置注明作者及出处。

下一篇:MR之Shuffle机制(Partition分区、WritableComparable排序、Combiner合并、数据压缩)

GroupingComparator辅助排序相关推荐

  1. hadoop中GroupingComparator分组(辅助排序)

    GroupingComparator分组(辅助排序) 这里举例说明: 需求: 由上图结合GroupingComparator分组,我们可以做出这样的分析: 需要利用"订单id和成交金额&qu ...

  2. GroupingComparator分组(辅助排序)的作用以及通过GroupingComparator实现分组topN

    mapreduce框架在记录到达reducer之前按照键值对记录排序,但是键所对应的值并没有排序.甚至在不同的执行轮次中,这些值的排序也不固定,因为他们来自不同的map任务且这些map任务在不同的轮次 ...

  3. GroupingComparator分组(辅助排序)的作用以及GroupingComparator分组案例实操

    问题分析: partioner是在MapTask阶段将数据写入环形缓冲区中进行的分区操作,其目的是为了划分出几个结果文件(ReduceTask,但是partioner必须小于ReduceTask个数) ...

  4. 辅助排序和Mapreduce整体流程

    一.辅助排序 需求:先有一个订单数据文件,包含了订单id.商品id.商品价格,要求将订单id正序,商品价格倒序,且生成结果文件个数为订单id的数量,每个结果文件中只要一条该订单最贵商品的数据. 思路: ...

  5. Hadoop 教程目录

    目录:基于 Hadoop3.1.3版本 介绍 Hadoop 专栏: 大数据技术生态体系 Hadoop 自定义脚本汇总 Hadoop 入门基本了解 Hadoop 2.10.1源码编译(Apache版-L ...

  6. GroupingComparator分组

    GroupingComparator分组 GroupingComparator分组介绍 概念 分组排序步骤 GroupingComparator分组案例 需求 1. 需求说明 2. 文件 案例分析 1 ...

  7. Hadoop之mapReduce有几种排序及排序发生的阶段

    1)排序的分类: (1)部分排序: MapReduce根据输入记录的键对数据集排序.保证输出的每个文件内部排序. (2)全排序: 如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区. ...

  8. 10大经典排序算法,20+张图就搞定

    作者 | 李肖遥 来源 | 技术让梦想更伟大 冒泡排序 简介 冒泡排序是因为越小的元素会经由交换以升序或降序的方式慢慢浮到数列的顶端,就如同碳酸饮料中二氧化碳的气泡最终会上浮到顶端一样,故名冒泡排序. ...

  9. 这或许是东半球分析十大排序算法最好的一篇文章

    作者 | 不该相遇在秋天 转载自五分钟学算法(ID:CXYxiaowu) 前言 本文全长 14237 字,配有 70 张图片和动画,和你一起一步步看懂排序算法的运行过程. 预计阅读时间 47 分钟,强 ...

最新文章

  1. CentOS 7.0,启用iptables防火墙
  2. 成为顶尖机器学习算法专家需要知道哪些算法?
  3. python能在生活中做什么-Python可以解决哪些生活中的小问题
  4. CKEditor代码高亮显示插件Code Snippet安装及使用方法
  5. springboot actuator_Spring Boot 服务监控,健康检查,线程信息,JVM堆信息,指标收集,运行情况监控...
  6. 关于“粪”,你知道多少?【Feign是个什么玩意儿?】
  7. 信息收集 ——情报分析
  8. python有哪些常用的库
  9. OpenShift 4 之Istio-Tutorial (8) 在服务之间配置Mutual TLS双向传输安全
  10. 成为一名优秀架构师有标准吗?这12点或许能带给你一些启发
  11. Cloud Container Service experimentation
  12. NP、P、NPC、NP-hard 概念辨析
  13. 一文讲清楚ojdbc、Oracle和JDK之间的兼容性关系
  14. 【C语言基础学习笔记】一、初始C语言(总结篇)
  15. cad断点快捷键_CAD打断和打断于点怎么使用?CAD打断快捷键命令及操作方法
  16. css3 匀速运动的圆
  17. JAVA:实现线性丢番图方程求解器算法(附完整源码)
  18. mysql 保留小数位数
  19. 数据治理的成功要素2:数据架构设计
  20. python怎样安装whl文件

热门文章

  1. 【视频】编码基础知识之I帧、P帧、B帧
  2. 数据库知识点【复试】
  3. np在计算机语言中是什么意思,理解-NumPy
  4. Linux for循环语句
  5. Java刷新Jpanel_java更新Jpanel组件
  6. UNET详解和UNET++介绍(零基础)
  7. 树莓派3B入门使用教程
  8. HCIP-DATACOM H12-831(121-140)
  9. 企企通携手塑料新材料行业龙头【佛塑科技】,以数字化采购引领行业转型升级
  10. 廖雪峰python教程有用吗_后怕!当初要是坚持学Python用廖雪峰教程,我肯定坚持不了39天...