案例:分组求top1、求topN
====
案例:分组求top1
自定义GroupingComparator求取topN
GroupingComparator是mapreduce当中reduce端的一个功能组件,主要的作用是决定哪些数据作为一组,调用一次reduce的逻辑,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑
3.1 需求
有如下订单数据
订单id 商品id 成交金额
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000002 Pdt_05 722.4
Order_0000003 Pdt_01 222.8
现在需要求出每一个订单中成交金额最大的一笔交易
3.2 分析
1、利用“订单id和成交金额”作为key,可以将map阶段读取到的所有订单数据按照id分区,按照金额排序,发送到reduce
2、在reduce端利用groupingcomparator将订单id相同的kv聚合成组,然后取第一个即是最大值
第一步:读取文件,解析成key, value对。
第二步:自定义map逻辑,接收k1,v1,转换成新的k2,v2。
k1: LongWritable; v1: Text
k2: OrderBean(orderId, price); v2: NullWritable
第一点:一定要保证:相同orderID的数据到同一个reduce里面去,以订单id作为key2。
第二点:我们相同的订单id的数据,需要按照金额进行排序。需要按照谁排序,就把谁作为k2。金额作为key2。
所以,定义一个orderBean,有两个属性,一个是orderId订单号,一个是price金额。以OrderBean作为key2。
默认分区规则HashPartitioner
自定义分区规则,以订单id作为分区一句,相同的订单id发送到同一个reduce里面去。
排序以OrderBean里面的金额进行排序。注意,排序有要求,只有相同的orderId里面的金额才需要排序。
第三步:自定义分区,以我们的OrderBean里面的orderId作为分区的依据。
第四步:排序。按照相同订单的金额进行排序,不同订单金额没有可比性。
第五步:规约。
第六步:分组。接收k2,v2。
k2: OrderBean(orderId, price); v2: NullWritable
每一条数据,都是一个独立的组,每一条数据分完组后,都要去独立地调用一次reduce逻辑。
需要自定义分组,我们的任务相同订单id的数据是同一个组里面的数据,给我们合并为一组。
相同的订单id作为一组,我们这个组里面有多条数据,但是只给我们显示了金额最大的那一条。(分组求top1)
第七步:自定义reduce逻辑接收k2,v2,直接输出k2,就是我们的最大值top1。
第八步:输出文件。
代码实现
第一步:定义OrderBean
package cn.itcast.demo5.top1;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price;
/**
* 比较器
* @param o
* @return
*/
@Override
public int compareTo(OrderBean o) {
//第一个问题:不同订号,不用比较价格排序
int i = this.orderId.compareTo(o.orderId);
if(i ==0){
//订单号相等了
//继续比较交个
i = this.price.compareTo(o.price);
i = -i;
}
return i;
}
/**
* 序列化的方法
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
/**
* 反序列化的方法
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return orderId +"\t"+price;
}
}
第二步:自定义分区
package cn.itcast.demo5.top1;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class Top1Partition extends Partitioner<OrderBean,NullWritable> {
//判断我们的orderId,如果orderId相同的数据,一定要发送到同一个reduce里面去
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numReduceTasks) {
//动态的获取分区,保证了我们相同订单号的数据一定是发送到了同一个reducetask里面去了
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
第三步:自定义groupingComparator
package cn.itcast.demo5.top1;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class Top1GroupingComparator extends WritableComparator{
/**
* 重写无参的构造器,告诉我们的父类,我们传入的数据类型是orderBean
*/
public Top1GroupingComparator(){
super(OrderBean.class,true);
}
/**
* 注意我们需要重新一个compare方法,注意,我们使用的comare方法,必须使用WritableComparable这两个参数的
* 如果使用object两个参数,没法进行比较
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
//比较订单id,如果相同的订单id,我们就认为是同一组
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
//将订单id比较的结果直接返回,直接返回,分组的时候依据我们返回的结果值,就知道两个数据之间进行比较要不要放到同一个组里面去
int i = first.getOrderId().compareTo(second.getOrderId());
return i;
}
}
package cn.itcast.demo5.top1;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class Top1Mapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Order_0000005 Pdt_01 222.8
String[] split = value.toString().split("\t");
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(split[0]);
orderBean.setPrice(Double.valueOf(split[2]));
context.write(orderBean,NullWritable.get());
}
}
package cn.itcast.demo5.top1;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class Top1Reducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key,NullWritable.get());
}
}
第四步:程序main函数入口
package cn.itcast.demo5.top1;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class Top1Main extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "TOP1");
//第一步:读取文件,解析成key,value1
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义groupingComparator\\input"));
//第二步:自定义map逻辑,接收k1 v1 转换成k2 v2 输出
job.setMapperClass(Top1Mapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
//第三步:自定义分区
job.setPartitionerClass(Top1Partition.class);
//第四步:排序
//第五步:规约省掉
//第六步:分组
job.setGroupingComparatorClass(Top1GroupingComparator.class);
//第七步:自定义reduce逻辑
job.setReducerClass(Top1Reducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义groupingComparator\\out_top1"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new Top1Main(), args);
System.exit(run);
}
}
====
案例:分组求topN
k2:
订单id相同的值,作为一组,调用一次reduce逻辑
Order_0000002 Pdt_05 722.4
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
v2:
[Order_0000002 Pdt_05 722.4, Order_0000002 Pdt_03 522.8, Order_0000002 Pdt_04 122.4]
代码实现:
package cn.itcast.demo6.topN;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price;
/**
* 比较器
* @param o
* @return
*/
@Override
public int compareTo(OrderBean o) {
//第一个问题:不同订单号,不用比较价格排序
int i = this.orderId.compareTo(o.orderId);
if(i ==0){
//订单号相等了
//继续比较价格
i = this.price.compareTo(o.price);
i = -i;
}
return i;
}
/**
* 序列化的方法
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
/**
* 反序列化的方法
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return orderId +"\t"+price;
}
}
package cn.itcast.demo6.topN;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
public class TopNPartition extends Partitioner<OrderBean,Text> {
//判断我们的orderId,如果orderId相同的数据,一定要发送到同一个reduce里面去
@Override
public int getPartition(OrderBean orderBean, Text text, int numReduceTasks) {
//动态的获取分区,保证了我们相同订单号的数据一定是发送到了同一个reducetask里面去了
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
}
}
package cn.itcast.demo6.topN;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class TopNGroupingComparator extends WritableComparator{
/**
* 重写无参的构造器,告诉我们的父类,我们传入的数据类型是orderBean
*/
public TopNGroupingComparator(){
super(OrderBean.class,true);
}
/**
* 注意我们需要重新一个compare方法,注意,我们使用的comare方法,必须使用WritableComparable这两个参数的
* 如果使用object两个参数,没法进行比较
* @param a
* @param b
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
//比较订单id,如果相同的订单id,我们就认为是同一组
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
//将订单id比较的结果直接返回,直接返回,分组的时候依据我们返回的结果值,就知道两个数据之间进行比较要不要放到同一个组里面去
int i = first.getOrderId().compareTo(second.getOrderId());
return i;
}
}
package cn.itcast.demo6.topN;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TopNMapper extends Mapper<LongWritable,Text,OrderBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//Order_0000005 Pdt_01 222.8
String[] split = value.toString().split("\t");
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(split[0]);
orderBean.setPrice(Double.valueOf(split[2]));
context.write(orderBean,value);
}
}
package cn.itcast.demo6.topN;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TopNReducer extends Reducer<OrderBean,Text,OrderBean,Text> {
@Override
protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
//输出每组里面最大的前两个值
int i = 0;
for (Text value : values) {
if(i> 1){
break;
}else{
context.write(key,value);
i ++;
}
}
}
}
package cn.itcast.demo6.topN;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TopNMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Job job = Job.getInstance(super.getConf(), "TOPN");
//第一步:读取文件,解析成key,value1
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义groupingComparator\\input"));
//第二步:自定义map逻辑,接收k1 v1 转换成k2 v2 输出
job.setMapperClass(TopNMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(Text.class);
//第三步:自定义分区
job.setPartitionerClass(TopNPartition.class);
//第四步:排序
//第五步:规约省掉
//第六步:分组
job.setGroupingComparatorClass(TopNGroupingComparator.class);
//第七步:自定义reduce逻辑
job.setReducerClass(TopNReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(Text.class);
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///F:\\传智播客大数据离线阶段课程资料\\5、大数据离线第五天\\自定义groupingComparator\\out_topN"));
boolean b = job.waitForCompletion(true);
return b?0:1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new TopNMain(), args);
System.exit(run);
}
}
====
案例:分组求top1、求topN相关推荐
- ML之ME/LF:机器学习中常见模型评估指标/损失函数(LiR损失、L1损失、L2损失、Logistic损失)求梯度/求导、案例应用之详细攻略
ML之ME/LF:机器学习中常见模型评估指标/损失函数(LiR损失.L1损失.L2损失.Logistic损失)求梯度/求导.案例应用之详细攻略 目录 常见损失函数求梯度案例 1.线性回归求梯度 2.L ...
- oracle的rollup操作---按照小组进行分组,同时求总计
rollup配合goup by使用,照小组进行分组,同时求总计.可以提供信息汇总功能(类似于"小计") ROLLUP在数据统计和报表生成过程中带来极大的便利 rollup操作--- ...
- Python编程经典案例【考题】求某个范围内能被3整除且能被5整除的所有数,及这些数的和
本文和你一起探索Python编程经典案例,让你沉浸式学习Python.你可以拿着题目先思考,然后再对照本文解题方法进行比较.有不同的见解欢迎到公众号中跟我一起探讨. 文章目录 一.经典案例[考题 ...
- MATLAB—离散一元、二元、多元函数求导求梯度(二维、三维、多维空间)(diff和gradient)
(离散)一元函数求导-二维 已知同维度的x和y序列,则可使用diff(y)./diff(x)来估算.设x为n维向量,Dx=diff(x),计算向量x的向前差分,DX(i)=X(i+1)-X(i),0& ...
- ACMNO.16用迭代法求 。求平方根的迭代公式为: X[n+1]=1/2(X[n]+a/X[n]) 要求前后两次求出的得差的绝对值少于0.00001。 输出保留3位小数 输入 X 输出 X的
题目描述 用迭代法求 . 求平方根的迭代公式为: X[n+1]=1/2(X[n]+a/X[n]) 要求前后两次求出的得差的绝对值少于0.00001.输出保留3位小数 输入 X 输出 X的平方根 样例输 ...
- python numpy 多项式函数 求导求根
python numpy 多项式函数 求导求根 """求出多项式的 导函数与根 """import numpy as np import m ...
- XDOJ例题及答案第八更 密码编译 密码强度 排序2 排序3 排序算法比较器 判断奇偶性 平均数 气温波动 亲和数 求交错序列前N项和 求平均值求奇数的和
目录 密码编译 密码强度 排序2 排序3 排序算法比较器 判断奇偶性 平均数 气温波动 亲和数 求交错序列前N项和 求平均值 求奇数的和 密码编译 #include<stdio.h> in ...
- 求sinx的近似值 c语言,用C语言求多项式求sinx的近似值。
用C语言求多项式求sinx的近似值. 來源:互聯網 2010-12-22 19:59:48 評論 分類: 電腦/網絡 >> 程序設計 >> 其他編程語言 問題描述: 3.利 ...
- C语言求随机两个向量乘积,求一个“求向量内积”的C语言程序!
求一个"求向量内积"的C语言程序! 來源:互聯網 2010-03-14 02:52:44 評論 分類: 教育/科學 >> 升學入學 >> 高考 參考答案 ...
最新文章
- 日常办公会用到的python模块-Python如何去实际提高工作的效率?也许这个会有用!...
- 20165320 第十周课上测试补做
- 用PHP实现POP3邮件的收取(一)
- 微软想将新版Edge浏览器引入Linux
- 前端开发 表单元素 0229
- Angular 5和ASP.NET Core入门
- L1-044 稳赢-PAT团体程序设计天梯赛GPLT
- C#字节数组的常用解码处理方法
- D-star Lite算法及其动态路径规划实验研究
- 大学生创新创业训练计划项目申请书
- rust倒地了怎么起来_ggxx出招表
- ceph的rbd使用和理解(全)
- MySql作业练习题
- BI工具和数据中台有什么区别?
- LeetCode1359. 有效的快递序列数目
- 威联通[vNAS內置虚拟机]体验评测 让企业实现无限可能
- 6.xp 开机画面【欢迎使用】四个字更改方法:
- 2023测绘资质申请条件
- win10+TeamVIew+花生壳 0元实现 远程开机教程附带截图超详细 已实践成功
- Inno setup 常用修改技巧