MapReduce的序列化案例
MapReduce的序列化案例
- 需求
- 文件内容
- 案例分析
- 1. 需求:
- 2. 输入数据格式
- 3. 期望输出格式
- 4. Map阶段
- 5. Reduce阶段
- 编码实现
- 1. 编写流量统计的Bean对象
- 2. 编写Mapper类
- 3. 编写Reducer类
- 4. 编写Driver驱动类
- 结果截图
需求
统计每一个手机号耗费的总上行流量、下行流量、总流量
文件内容
案例分析
1. 需求:
统计每一个手机号耗费的总上行流量、下行流量、总流量
2. 输入数据格式
3. 期望输出格式
4. Map阶段
(1)读取一行数据,切分字段
(2)抽取手机号,上行流量,下行流量
(3)以手机号为key,bean对象为value输出,即context.write(手机号,bean)
(4)bean对象要想能够传输,必须实现序列化接口
5. Reduce阶段
(1)累加上行流量和下行流量得到总流量
编码实现
1. 编写流量统计的Bean对象
package com.atguigu.mr.flowsum;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//空参构造器public FlowBean() {super();}public FlowBean(long upFlow, long downFlow) {super();this.upFlow = upFlow;this.downFlow = downFlow;sumFlow = downFlow + upFlow;}//序列化方法@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}//反序列化方法@Overridepublic void readFields(DataInput in) throws IOException {// 必须要求和序列化方法的顺序一致upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + downFlow + ",\t" + sumFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void set(long sum_upFlow, long sum_downFlow) {// TODO Auto-generated method stubupFlow = sum_upFlow;downFlow = sum_downFlow;sumFlow = sum_upFlow + sum_downFlow;}
}
2. 编写Mapper类
package com.atguigu.mr.flowsum;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//获取一行String line = value.toString();//切割\tString[] fileds = line.split("\t");//封装对象k.set(fileds[1]);long upFlow = Long.parseLong(fileds[fileds.length - 3]);long downFlow = Long.parseLong(fileds[fileds.length - 2]);v.setUpFlow(upFlow);v.setDownFlow(downFlow);
// v.set(upFlow,downFlow);//写出context.write(k, v);}}
3. 编写Reducer类
package com.atguigu.mr.flowsum;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {FlowBean v = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {long sum_upFlow = 0;long sum_downFlow = 0;//累加求和for (FlowBean flowBean : values) {sum_upFlow += flowBean.getUpFlow();sum_downFlow += flowBean.getDownFlow();}v.set(sum_upFlow, sum_downFlow);//写出context.write(key, v);}}
4. 编写Driver驱动类
package com.atguigu.mr.flowsum;import java.io.File;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowsumDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {args = new String[] {"S:\\centos学习笔记\\hadoop\\input","S:\\centos学习笔记\\hadoop\\output"};Configuration conf = new Configuration();//获取job对象Job job = Job.getInstance(conf);//设置jar路径job.setJarByClass(FlowsumDriver.class);//关联mapper和reducerjob.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//设置mapper输出的key和value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//设置最终输出的key和 value的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}
结果截图
MapReduce的序列化案例相关推荐
- 【MapReduce】Hadoop的序列化机制以及序列化案例求解每个部门工资总额
Hadoop的序列化机制以及序列化案例求解每个部门工资总额 1 Hadoop的序列化 1.1 序列化定义 1.2 Java序列化编程 1.3 hadoop序列化编程 2 序列化求解每个部门工资总额 手 ...
- 大数据之-Hadoop3.x_MapReduce_序列化案例FlowBean---大数据之hadoop3.x工作笔记0097
然后我们再来看,我们来写一下这个FlowBean,其实就是我们之前分析,这个hadoop序列化案例的时候,那个 用来承接上行流量,下行流量,总流量的,这个类,我们要为这个类实现hadoop的序列化. ...
- Hadoop序列化案例
Hadoop序列化案例 统计每一个手机号耗费的总上行流量.总下行流量.总流量 数据: 1 13736230513 192.196.100.1 www.baidu.com 2481 24681 200 ...
- 大数据之-Hadoop3.x_MapReduce_序列化案例FlowReducer---大数据之hadoop3.x工作笔记0099
然后我们接着去写hadoop这个序列化案例,统计手机号使用流量情况的 reducer类去,我们起个名字叫 FlowReducer类,可以看到这个类的泛型参数,Reducer的输入,就是 mapper类 ...
- Mapreduce的序列化和流量统计程序开发
一.Hadoop数据序列化的数据类型 Java数据类型 => Hadoop数据类型 int IntWritable float FloatWritable long LongWritable d ...
- 【MapReduce】综合案例
文章目录 综合案例 ① 数据文件 ② 具体要求 ③ 具体实现 • 上传文件 • 封装Bean类 • Mapper类缓存information.txt,实现与student.txt的连接 • Reduc ...
- MapReduce入门(一)—— MapReduce概述 + WordCount案例实操
MapReduce入门(一)-- MapReduce概述 文章目录 MapReduce入门(一)-- MapReduce概述 1.1 MapReduce 定义 1.2 MapReduce 优缺点 1. ...
- 【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)
文章目录 一.Reduce Join ① Reduce Join工作原理 ② Reduce Join 案例 ☠ 需求 ☠ 案例分析 ☠ 代码实现 封装Bean对象 Mapper阶段 Reducer阶段 ...
- MAPREDUCE的实战案例
reduce端join算法实现 1.需求: 订单数据表t_order: id date pid amount 1001 20150710 P0001 2 1002 20150710 P0001 3 1 ...
最新文章
- Android采用Application总结一下
- c++OpenCV操作mp4
- 阿里设计师出品!B端产品文案指南
- 小狗分类器,你家的狗子是个什么狗?
- android xml获取指定,android:如何从xml文件中获取信息?
- URLEncoder编码
- 实力剖析一个经典笔试题
- Spark安装及其sbt和maven 打包工具安装
- 分数的大小比较优秀教案_人教版小学数学五年级下册异分母分数加、减法公开课优质课课件教案视频...
- python wmi 重启网卡_python使用WMI检测windows系统信息、硬盘信息、网卡信息的方法...
- Intel 64/x86_64/IA-32/x86处理器 - 通用指令(1) - 数据传输指令
- showModalDialog模态对话框的使用以及浏览器兼容
- 如何为新项目创建新的空分支
- SEO HTML语义化
- 微信小程序之海报生成
- CAD彩色线条直接打印成黑白PDF的办法
- 【关于测试开发工程师】
- Processor ARM7TDMI/ARM920T raised an exception.Cause:Undefined instruction问题的多个解决方法
- 目标检测数据集标注-VOC格式
- 会员营销体系中,促成会员转化的关注点有哪些