前言

序列化想必大家都很熟悉了,对象在进行网络传输过程中,需要序列化之后才能传输到客户端,或者客户端的数据序列化之后送达到服务端

序列化的标准解释如下:

序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输

对应的反序列化为序列化的逆向过程

反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象

为什么要序列化

一般来说,程序动态创建出来的“活的” 对象只生存在内存里,一旦服务停机或断电就没了。而且“活”对象只能存活于本地进程,不能发送到网络上其他的服务器或者进程中使用。 然而通过序列化之后,则可以存储“活的”对象,从而进行网络传输,提供给其他进程或机器使用。

为什么不使用Java序列化

在Java中,创建一个对象如果希望这个对象是序列化的对象,只需要实现Serializable接口即可,但Java的序列化在Hadoop看来,是一个重量级序列化框架,一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),从而不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制,只需要对象实现Writable接口,重写里面的两个方法。

Hadoop序列化特点

  • 紧凑 :高效使用存储空间
  • 快速:读写数据的额外开销小
  • 互操作:支持多语言的交互

Hadoop序列化业务场景

在真实的业务场景中,类似于wordcount那样的单个字符串的场景很少,而且无法应对各种复杂的大数据场景和海量数据的处理业务,因此在传输过程中,为了更加灵活的进行数据在Map、Reduce中的传输,将解析到的数据以序列化对象的方式传输,是非常便捷的

在Hadoop中,具体实现bean对象序列化步骤如下7步:

  • 实现Writable接口
  • 反序列化时,需要反射调用空参构造函数,即类对象中必须有空参构造
  • 重写序列化write的方法
  • 重写反序列化的readFields方法
  • 注意反序列化的顺序和序列化的顺序完全一致
  • 若想把结果显示在文件中,需重写toString(),可用"\t"分开,方便后续用
  • 如果需将自定义的bean放在key中传输,还需要实现Comparable接口,因为MapReduce框中Shuffle过程要求对key必须能排序

案例业务描述

业务需求描述,如下数据为从某个地方导出来的一批统计手机号峰值流量和低谷流量的文本文件,现在的业务需求是,通过程序,最终输出各个手机号对应的峰值流量、低谷流量以及总流量的统计分析文件

那么最终的效果可按如下格式输出

了解了上面的业务后,下面开始按照前面描述的几个步骤进行编码实现

编码实现

1、定义一个封装手机流量各个属性的对象

从wordcount的案例中我们了解了使用mapreduce编码的基本编码套路,即map逻辑中读取原始数据文件,然后传递到reduce中

同样,在这里的map逻辑中,需要读取上面的原始的流量文本文件,但是既然在reduce中要能实现最终的统计输出,那么从map中出来的数据格式,必然是已经处理好的bean对象,key为手机号,而value值则为封装了当前手机号对应的峰值流量、低谷流量以及计算的总流量信息

了解了这一点,就大概知道这个bean对象该如何定义了

import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class PhoneBean implements Writable {//峰值流量private long upFlow;//低谷流量private long downFlow;//总流量private long sumFlow;//提供无参构造public PhoneBean() {}//提供三个参数的getter和setter方法public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}public void setSumFlow() {this.sumFlow = this.upFlow + this.downFlow;}//实现序列化和反序列化方法,注意顺序一定要保持一致@Overridepublic void write(DataOutput dataOutput) throws IOException {dataOutput.writeLong(upFlow);dataOutput.writeLong(downFlow);dataOutput.writeLong(sumFlow);}@Overridepublic void readFields(DataInput dataInput) throws IOException {this.upFlow = dataInput.readLong();this.downFlow = dataInput.readLong();this.sumFlow = dataInput.readLong();}//重写ToString方法@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}}

2、自定义Mapper类

该类读取和解析文本文件,将各个手机号的属性封装到PhoneBean对象中,并输出到Reduce使用

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class PhoneMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {private Text outK = new Text();private PhoneBean outV = new PhoneBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();//分割数据String[] split = line.split("\t");//抓取需要的数据:手机号,上行流量,下行流量String phone = split[1];String max = split[3];String mine = split[4];//封装outK outVoutK.set(phone);outV.setUpFlow(Long.parseLong(max));outV.setDownFlow(Long.parseLong(mine));outV.setSumFlow();//写出outK outVcontext.write(outK, outV);}
}

4、自定义Reduce类

关于Reduce中的入参类型和出参类型,到这里想必都已经了解,就不再过多解释了

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.util.LinkedList;public class PhoneMapper extends Mapper<LongWritable, Text, Text, PhoneBean> {private Text outK = new Text();private PhoneBean outV = new PhoneBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = value.toString();//分割数据String[] splits = line.split("\t");LinkedList<String> linkedList = new LinkedList<>();for(String str:splits){if(StringUtils.isNotEmpty(str)){linkedList.add(str.trim());}}//抓取需要的数据:手机号,上行流量,下行流量String phone = linkedList.get(1);String max =  linkedList.get(3);String mine = linkedList.get(4);//封装outK outVoutK.set(phone);outV.setUpFlow(Long.parseLong(max));outV.setDownFlow(Long.parseLong(mine));outV.setSumFlow();//写出outK outVcontext.write(outK, outV);}
}

5、job类

依照wordcount案例中的模板做即可

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class PhoneJob {public static void main(String[] args) throws Exception {//1 获取job对象Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2 关联本Driver类job.setJarByClass(PhoneJob.class);//3 关联Mapper和Reducerjob.setMapperClass(PhoneMapper.class);job.setReducerClass(PhoneReducer.class);//4 设置Map端输出KV类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(PhoneBean.class);//5 设置程序最终输出的KV类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(PhoneBean.class);//6 设置程序的输入输出路径String inPath = "F:\\网盘\\csv\\phone_data.txt";String outPath = "F:\\网盘\\csv\\out.txt";FileInputFormat.setInputPaths(job, new Path(inPath));FileOutputFormat.setOutputPath(job, new Path(outPath));//7 提交Jobboolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}}}

运行这段程序,观察是否在输出的目标路径下,生成了统计结果

打开最后那个文件,然后对比下原始的文件,正好满足预期的业务需求

hadoop 实现序列化相关推荐

  1. hadoop的序列化与java的序列化区别

    java的序列化机制 java序列化时会把具体类的数据和类的继承结构信息都序列化传递. 如下图 hadoop的序列化机制 序列化类的数据,但是不序列化类的继承结构信息. 网络传递的时候就少了很多流量, ...

  2. 【MapReduce】Hadoop的序列化机制以及序列化案例求解每个部门工资总额

    Hadoop的序列化机制以及序列化案例求解每个部门工资总额 1 Hadoop的序列化 1.1 序列化定义 1.2 Java序列化编程 1.3 hadoop序列化编程 2 序列化求解每个部门工资总额 手 ...

  3. Hadoop的序列化和反序列化

    1) 什么是序列化 序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁 盘(持久化)和网络传输. 反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换 ...

  4. 一脸懵逼学习Hadoop中的序列化机制——流量求和统计MapReduce的程序开发案例——流量求和统计排序...

    一:序列化概念 序列化(Serialization)是指把结构化对象转化为字节流. 反序列化(Deserialization)是序列化的逆过程.即把字节流转回结构化对象. Java序列化(java.i ...

  5. MapReduce快速入门系列(4) | Hadoop序列化

    Hello,大家好,本次为大家带来的是Hadoop的序列化操作. 目录 一. 序列化的简单介绍 1.1. 什么是序列化 1.2. 为什么要序列化 1.3. 为什么不用Java的序列化 二. 自定义be ...

  6. spark(kryo)、hadoop(writable)、jdk(serializable)-序列化

    一.SRC 一个类在jvm中是有结构的,但即使是在jvm中,也是一堆数据.网络只能传文本,所以需要序列化和反序列化. 通过几种方式的序列化后文本输出到本地文件,可以对比下大小. 二.jdk的序列化 将 ...

  7. Java拾遗:004 - JDK、Hadoop、Hessian序列化

    2019独角兽企业重金招聘Python工程师标准>>> JDK序列化 在分布式架构中,序列化是分布式的基础构成之一,我们需要把单台设备上的数据通过序列化(编码.压缩)后通过网络传输给 ...

  8. Hadoop详解(三)——MapReduce原理和执行过程,远程Debug,Writable序列化接口,MapReduce程序编写

    MapReduce概述 MapReduce是一种分布式计算模型,由Google提出,主要用于搜索领域,解决海量数据的计算问题. MR由两个阶段组成:Map和Reduce,用户只需要实现map()和Re ...

  9. Hadoop 3.x|第九天|序列化及案例代码编写

    目录 Hadoop序列化 定义 为什么需要序列化 为什么不用Java的序列化 源码 序列化案例实操-流量统计 需求 输入数据 输出数据 分析各个阶段的KV 自定义对象实现序列化接口的步骤 创建Flow ...

最新文章

  1. 对于未来的多种可能,这几位中国科学家想说
  2. WPF 的Listbox 滚动处理
  3. 干货笔记|三分钟让你掌握360高级副总裁的产品之道
  4. 项目打包部署到Tomcat
  5. 工作67:el-table问题
  6. 《编写可维护的Javascript》学习总结
  7. ActiveMQ学习总结(2)——ActiveMQ入门实例教程
  8. C语言实现加密解密功能 附带详细注释源码
  9. 安卓命令和linux命令行,scrcpy:用电脑显示和控制Android设备的命令行工具
  10. js与jQuery操作select大全
  11. Linux桌面词典 GoldenDict词典
  12. (五)RewriteBase 与RewriteCond 语法说明
  13. 人们熟知的一句名言是:“天才是1%的灵感加99%的汗水。”可如果没有那1%的灵感,世界上所有的汗水也就仅仅是一桶汗水而已。...
  14. 微信小程序和微信H5有什么区别?
  15. UTF8与GBK字符编码转换
  16. 基于SpringBoot+vue的前后端分离学生成绩管理系统的设计与实现--毕业设计
  17. NBU 配置ORACLE备份
  18. 我的世界手机版虚拟人生服务器,我的世界大型虚拟人生整合包
  19. 2017 年最佳开源网络监控工具
  20. Allow CORS: Access-Control-Allow-Origin插件使用

热门文章

  1. Mysql取分组中前N条记录
  2. XP下安装SQL2000企业版
  3. Apache POI导出Excel
  4. 查询一个ID出现2种结果的情况
  5. Dapper的语法应用
  6. java 的单态模式(只可以创建一个对象)
  7. IOS之NSValue整理
  8. OAuth和OpenID的区别
  9. Mybatis动态sql语句的生成
  10. golang nil 不等于 nil的问题