MapReduce的序列化案例

  • 需求
    • 文件内容
  • 案例分析
    • 1. 需求:
    • 2. 输入数据格式
    • 3. 期望输出格式
    • 4. Map阶段
    • 5. Reduce阶段
  • 编码实现
    • 1. 编写流量统计的Bean对象
    • 2. 编写Mapper类
    • 3. 编写Reducer类
    • 4. 编写Driver驱动类
  • 结果截图

需求

统计每一个手机号耗费的总上行流量、下行流量、总流量

文件内容

案例分析

1. 需求:

统计每一个手机号耗费的总上行流量、下行流量、总流量

2. 输入数据格式

3. 期望输出格式

4. Map阶段

(1)读取一行数据,切分字段
(2)抽取手机号,上行流量,下行流量
(3)以手机号为key,bean对象为value输出,即context.write(手机号,bean)
(4)bean对象要想能够传输,必须实现序列化接口

5. Reduce阶段

(1)累加上行流量和下行流量得到总流量

编码实现

1. 编写流量统计的Bean对象

package com.atguigu.mr.flowsum;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {private long upFlow; //上行流量private long downFlow; //下行流量private long sumFlow; //总流量//空参构造器public FlowBean() {super();}public FlowBean(long upFlow, long downFlow) {super();this.upFlow = upFlow;this.downFlow = downFlow;sumFlow = downFlow + upFlow;}//序列化方法@Overridepublic void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}//反序列化方法@Overridepublic void readFields(DataInput in) throws IOException {// 必须要求和序列化方法的顺序一致upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}@Overridepublic String toString() {return  upFlow + "\t" + downFlow + ",\t" + sumFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void set(long sum_upFlow, long sum_downFlow) {// TODO Auto-generated method stubupFlow = sum_upFlow;downFlow = sum_downFlow;sumFlow = sum_upFlow + sum_downFlow;}
}

2. 编写Mapper类

package com.atguigu.mr.flowsum;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {Text k = new Text();FlowBean v = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//获取一行String line = value.toString();//切割\tString[] fileds = line.split("\t");//封装对象k.set(fileds[1]);long upFlow = Long.parseLong(fileds[fileds.length - 3]);long downFlow = Long.parseLong(fileds[fileds.length - 2]);v.setUpFlow(upFlow);v.setDownFlow(downFlow);
//      v.set(upFlow,downFlow);//写出context.write(k, v);}}

3. 编写Reducer类

package com.atguigu.mr.flowsum;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {FlowBean  v = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {long sum_upFlow = 0;long sum_downFlow = 0;//累加求和for (FlowBean flowBean : values) {sum_upFlow += flowBean.getUpFlow();sum_downFlow += flowBean.getDownFlow();}v.set(sum_upFlow, sum_downFlow);//写出context.write(key, v);}}

4. 编写Driver驱动类

package com.atguigu.mr.flowsum;import java.io.File;
import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowsumDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {args = new String[] {"S:\\centos学习笔记\\hadoop\\input","S:\\centos学习笔记\\hadoop\\output"};Configuration  conf = new Configuration();//获取job对象Job job = Job.getInstance(conf);//设置jar路径job.setJarByClass(FlowsumDriver.class);//关联mapper和reducerjob.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);//设置mapper输出的key和value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//设置最终输出的key和 value的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置输入输出路径FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));//提交jobboolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}}

结果截图


MapReduce的序列化案例相关推荐

  1. 【MapReduce】Hadoop的序列化机制以及序列化案例求解每个部门工资总额

    Hadoop的序列化机制以及序列化案例求解每个部门工资总额 1 Hadoop的序列化 1.1 序列化定义 1.2 Java序列化编程 1.3 hadoop序列化编程 2 序列化求解每个部门工资总额 手 ...

  2. 大数据之-Hadoop3.x_MapReduce_序列化案例FlowBean---大数据之hadoop3.x工作笔记0097

    然后我们再来看,我们来写一下这个FlowBean,其实就是我们之前分析,这个hadoop序列化案例的时候,那个 用来承接上行流量,下行流量,总流量的,这个类,我们要为这个类实现hadoop的序列化. ...

  3. Hadoop序列化案例

    Hadoop序列化案例 统计每一个手机号耗费的总上行流量.总下行流量.总流量 数据: 1 13736230513 192.196.100.1 www.baidu.com 2481 24681 200 ...

  4. 大数据之-Hadoop3.x_MapReduce_序列化案例FlowReducer---大数据之hadoop3.x工作笔记0099

    然后我们接着去写hadoop这个序列化案例,统计手机号使用流量情况的 reducer类去,我们起个名字叫 FlowReducer类,可以看到这个类的泛型参数,Reducer的输入,就是 mapper类 ...

  5. Mapreduce的序列化和流量统计程序开发

    一.Hadoop数据序列化的数据类型 Java数据类型 => Hadoop数据类型 int IntWritable float FloatWritable long LongWritable d ...

  6. 【MapReduce】综合案例

    文章目录 综合案例 ① 数据文件 ② 具体要求 ③ 具体实现 • 上传文件 • 封装Bean类 • Mapper类缓存information.txt,实现与student.txt的连接 • Reduc ...

  7. MapReduce入门(一)—— MapReduce概述 + WordCount案例实操

    MapReduce入门(一)-- MapReduce概述 文章目录 MapReduce入门(一)-- MapReduce概述 1.1 MapReduce 定义 1.2 MapReduce 优缺点 1. ...

  8. 【MapReduce】基础案例 ---- Reduce Join 实现数据合并(表连接)

    文章目录 一.Reduce Join ① Reduce Join工作原理 ② Reduce Join 案例 ☠ 需求 ☠ 案例分析 ☠ 代码实现 封装Bean对象 Mapper阶段 Reducer阶段 ...

  9. MAPREDUCE的实战案例

    reduce端join算法实现 1.需求: 订单数据表t_order: id date pid amount 1001 20150710 P0001 2 1002 20150710 P0001 3 1 ...

最新文章

  1. Android采用Application总结一下
  2. c++OpenCV操作mp4
  3. 阿里设计师出品!B端产品文案指南
  4. 小狗分类器,你家的狗子是个什么狗?
  5. android xml获取指定,android:如何从xml文件中获取信息?
  6. URLEncoder编码
  7. 实力剖析一个经典笔试题
  8. Spark安装及其sbt和maven 打包工具安装
  9. 分数的大小比较优秀教案_人教版小学数学五年级下册异分母分数加、减法公开课优质课课件教案视频...
  10. python wmi 重启网卡_python使用WMI检测windows系统信息、硬盘信息、网卡信息的方法...
  11. Intel 64/x86_64/IA-32/x86处理器 - 通用指令(1) - 数据传输指令
  12. showModalDialog模态对话框的使用以及浏览器兼容
  13. 如何为新项目创建新的空分支
  14. SEO HTML语义化
  15. 微信小程序之海报生成
  16. CAD彩色线条直接打印成黑白PDF的办法
  17. 【关于测试开发工程师】
  18. Processor ARM7TDMI/ARM920T raised an exception.Cause:Undefined instruction问题的多个解决方法
  19. 目标检测数据集标注-VOC格式
  20. 会员营销体系中,促成会员转化的关注点有哪些

热门文章

  1. linux-RPM安装
  2. PyCharm入门教程——在编辑器中使用拖放
  3. springboot 项目将本地引用打进jar包
  4. 《敏捷制造——敏捷集成基础结构设计》——1.2相关问题的国内外研究现状
  5. netty 对 protobuf 协议的解码与包装探究(2)
  6. linux 设置代理 安装jdk mysql tomcat redis hadoop
  7. 定位城市_北方城市如何利用GPS定位器减轻铲雪工作压力?
  8. Vivado中单端口和双端口RAM的区别
  9. USB入门系列之一:USB概述
  10. 魂力真的存在那么该如何提高魂力呢