WritableComparable排序的使用案例
WritableComparable排序的使用案例:
文章目录
- 全排序
- 1)需求
- 2)需求分析
- 3)编程实现:
- 1.创建FloewBean类
- 2.创建Mapper类
- 3.创建Reducer类
- 4.创建Driver类
- 4)查看结果
- 二次排序+Partition
- 1)需求
- 2)需求分析
- 3)编程实现
- 1.创建Partition类
- 2.创建Bean类
- 3.创建Mapper类
- 4.创建Reducer类
- 5.创建Driver类
- 4)查看结果
全排序
1)需求
对总流量进行倒序排序
对phone_data.txt第一次进行获取后得到part-r-0000,再由part-r-0000为输入数据得到排序后的结果
2)需求分析
3)编程实现:
1.创建FloewBean类
package com.yingzi.mapreduce.writableComparable;/*** @author 影子* @create 2022-01-13-15:57**/import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** 1、定义类实现writable接口* 2、重写序列化和反序列化方法* 3、重写空参构造* 4、toString方法*/
public class FlowBean implements WritableComparable<FlowBean> {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量// 空参构造public FlowBean() {}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {// 总流量的倒序排序if (this.sumFlow > o.sumFlow) {return -1;} else if (this.sumFlow < o.sumFlow) {return 1;} else {// 按照上行流量的正序排if (this.upFlow > o.upFlow) {return 1;} else if (this.upFlow < o.upFlow) {return -1;} else {return 0;}}}
}
2.创建Mapper类
package com.yingzi.mapreduce.writableComparable;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author 影子* @create 2022-01-13-16:12**/
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {private FlowBean outK = new FlowBean();private Text outV = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {// 获取一行String line = value.toString();// 切割String[] split = line.split("\t");// 封装outV.set(split[0]);outK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();// 写出context.write(outK,outV);}
}
3.创建Reducer类
package com.yingzi.mapreduce.writableComparable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author 影子* @create 2022-01-13-16:32**/
public class FlowReducer extends Reducer<FlowBean, Text,Text, FlowBean>{@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value,key);}}
}
4.创建Driver类
package com.yingzi.mapreduce.writableComparable;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 影子* @create 2022-01-13-16:40**/
public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 1.获取jobConfiguration conf = new Configuration();Job job = Job.getInstance(conf);// 2.设置jarjob.setJarByClass(FlowDriver.class);// 3.关联Mapper、Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 4.设置mapper,输出的key和value类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 5.设置最终数据输出的key和value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 6.设置数据的输入和输出路径FileInputFormat.setInputPaths(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\_output\\output1"));FileOutputFormat.setOutputPath(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\_output\\output8"));// 7.提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0:1);}
}
4)查看结果
二次排序+Partition
1)需求
要求按照手机号开头输出到不同的文件,在不同的文件内按照总流量排序
2)需求分析
3)编程实现
1.创建Partition类
package com.yingzi.mapreduce.PartationAndWritableComparable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** @author 影子* @create 2022-01-14-14:45**/
public class ProvincePartitioner2 extends Partitioner<FlowBean, Text> {@Overridepublic int getPartition(FlowBean flowBean, Text text, int numPartitions) {String phone = text.toString();String prePhone = phone.substring(0, 3);int partition;if ("136".equals(prePhone)) {partition = 0;} else if ("137".equals(prePhone)) {partition = 1;} else if ("138".equals(prePhone)) {partition = 2;} else if ("139".equals(prePhone)) {partition = 3;}else{partition = 4;}return partition;}
}
2.创建Bean类
package com.yingzi.mapreduce.PartationAndWritableComparable;/*** @author 影子* @create 2022-01-13-15:57**/import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** 1、定义类实现writable接口* 2、重写序列化和反序列化方法* 3、重写空参构造* 4、toString方法*/
public class FlowBean implements WritableComparable<FlowBean> {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量// 空参构造public FlowBean() {}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}@Overridepublic int compareTo(FlowBean o) {// 总流量的倒序排序if (this.sumFlow > o.sumFlow) {return -1;} else if (this.sumFlow < o.sumFlow) {return 1;} else {// 按照上行流量的正序排if (this.upFlow > o.upFlow) {return 1;} else if (this.upFlow < o.upFlow) {return -1;} else {return 0;}}}
}
3.创建Mapper类
package com.yingzi.mapreduce.PartationAndWritableComparable;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author 影子* @create 2022-01-13-16:12**/
public class FlowMapper extends Mapper<LongWritable, Text, FlowBean, Text> {private FlowBean outK = new FlowBean();private Text outV = new Text();@Overrideprotected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context) throws IOException, InterruptedException {// 获取一行String line = value.toString();// 切割String[] split = line.split("\t");// 封装outV.set(split[0]);outK.setUpFlow(Long.parseLong(split[1]));outK.setDownFlow(Long.parseLong(split[2]));outK.setSumFlow();// 写出context.write(outK,outV);}
}
4.创建Reducer类
package com.yingzi.mapreduce.PartationAndWritableComparable;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author 影子* @create 2022-01-13-16:32**/
public class FlowReducer extends Reducer<FlowBean, Text,Text, FlowBean>{@Overrideprotected void reduce(FlowBean key, Iterable<Text> values, Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {for (Text value : values) {context.write(value,key);}}
}
5.创建Driver类
package com.yingzi.mapreduce.PartationAndWritableComparable;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;/*** @author 影子* @create 2022-01-13-16:40**/
public class FlowDriver {public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {// 1.获取jobConfiguration conf = new Configuration();Job job = Job.getInstance(conf);// 2.设置jarjob.setJarByClass(FlowDriver.class);// 3.关联Mapper、Reducerjob.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 4.设置mapper,输出的key和value类型job.setMapOutputKeyClass(FlowBean.class);job.setMapOutputValueClass(Text.class);// 5.设置最终数据输出的key和value类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);job.setPartitionerClass(ProvincePartitioner2.class);job.setNumReduceTasks(5);// 6.设置数据的输入和输出路径FileInputFormat.setInputPaths(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\_output\\output1"));FileOutputFormat.setOutputPath(job,new Path("G:\\计算机资料\\大数据开发\\尚硅谷大数据技术之Hadoop3.x\\资料\\_output\\output9"));// 7.提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0:1);}
}
4)查看结果
WritableComparable排序的使用案例相关推荐
- WritableComparable排序案例(区内排序)
WritableComparable排序案例(区内排序) 需求 1. 需求说明 2. 文件 案例分析 1. 需求分析 2. 输入数据 3. 期望输出数据 4. 实现基于WritableComparab ...
- WritableComparable排序案例(全排序)
WritableComparable排序案例(全排序) 需求 1. 需求说明 2. 文件 案例分析 1. 需求分析 2. 输入数据 3. 期望输出数据 4. FlowBean类 5. Mapper类 ...
- Go语言中 经典的map排序方法及案例 (Golang经典编程案例)
注意:Golang中的map默认是无序的,每次遍历,得到的输出结果可能不一样. Golang中的map排序: 将map的key放到切片中: 对切片排序: 遍历切片,然后来按key来输出map的值. 案 ...
- c语言排序算法实际案例,[C语言] 部分经典排序算法详解(有图解)
目录 1.内容概括 2.主要算法 3.技术的具体应用 4.算法实际应用 5.总结 0.前言 在上一篇文章<[C语言] 数组的实际应用三则>中我们提到了数组的一些基础知识,并通过三个实际例子 ...
- 【Scratch案例实操】Scratch字母排序 scratch编程案例教学 scratch创意编程 少儿编程教案
目录 零基础学Scratch3.0系列文章目录 案例介绍 一.案例演示 二.案例分析
- C++基础-字符串的排序与查找案例(查找姓名位置)
将若干个姓名按照字典序重新排列,然后从中找一个名字,输出所在位置. #include <iostream> using namespace std; #include <cstrin ...
- Hadoop——MapReduce(3)
MapReduce:自己处理业务相关代码 + 自身的默认代码 文章目录 1.MapReduce优缺点 2.MapReduce进程 3.序列化 4 InputFormat数据输入 4.1 切片与MapT ...
- MySQL排序原理与MySQL5.6案例分析【转】
本文来自:http://www.cnblogs.com/cchust/p/5304594.html,其中对于自己觉得是重点的加了标记,方便自己查阅.更多详细的说明可以看沃趣科技的文章说明. 前言 ...
- pandas中series一维数组的创建、索引的更改+索引切片和布尔索引+dataframe二维数组的创建、基本属性、索引方法(传统方法和lociloc)、nan操作、排序+案例
目录 一.为什么要学习pandas? 二.pandas的常用数据类型 1.series--一维的且带标签的数组 (1)创建一维数组 (2)通过列表形式创建的series带标签数组可以改变索引,传入索引 ...
最新文章
- 从一个Bug开始,重新认识一个强大的 Gson
- python数据可视化库 动态的_python --数据可视化(一)
- Boost:双图bimap与Boost序列化的测试程序
- 简易版Dubbo方法级性能监控(实现TP90、TP99)
- python音频实时频谱分析_基于python的音频设计及频谱分析
- Centos下 为Firefox安装Flash插件
- SqlDataReader循环取值
- 看这里NetWork location failed because baidu location service can not decrypt the request query,so加载不到
- 千月影视admin漏洞
- 【Android驱动】屏和TP谁先休眠的问题
- IBM山东开建全球首个完整云计算中心
- 【youcans 的 OpenCV 例程 200 篇】119. 图像的形态学梯度
- spark shuffle 内幕彻底解密
- 51信用卡股价年初至今上浮5倍,引入银行背景高管担任行政总裁
- 【电机/控制理论】DTC(Direct Torque Control)直接转矩控制
- Windows下谨慎使用动态磁盘
- 美股日志|三大股票指数升金价重上1800
- 谷歌chrome浏览器扩展Fatkun图片批量下载
- 能力不足,眼力辅助——山寨CSDN发帖表情插件
- 按键精灵怎么打地鼠_和大家聊少儿编程,什么时候学最佳?怎么选课?