简单介绍

1、什么是序列化

  • 序列化:把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。
  • 反序列化:将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

2、 为什么要序列化

对象的序列化(Serialization)用于将对象编码成一个字节流,以及从字节流中重新构建对象。"将一个对象编码成一个字节流"称为序列化该对象(SeTializing);相反的处理过程称为反序列化(Deserializing)。 序列化有三种主要的用途:

  1. 作为一种持久化格式:一个对象被序列化以后,它的编码可以被存储到磁盘上,供以后反序列化用。

  2. 作为一种通信数据格式:序列化结果可以从一个正在运行的虚拟机,通过网络被传递到另一个虚拟机上。

  3. 作为一种拷贝、克隆(clone)机制:将对象序列化到内存的缓存区中。然后通过反序列化,可以得到一个对已存对象进行深拷贝的新对象。

在分布式数据处理中,主要使用上面提到的前两种功能:数据持久化和通信数据格式

需求

统计每一个手机号耗费的总上行流量、下行流量、总流量(txt文档在/Users/lizhengi/test/input/目录下)

1       13736230513     192.196.2.1     www.shouhu.com  2481    24681   200
2       13846544121     192.196.2.2                     264     0       200
3       13956435636     192.196.2.3                     132     1512    200
4       13966251146     192.168.2.1                     240     0       404
5       18271575951     192.168.2.2     www.shouhu.com  1527    2106    200
6       18240717138     192.168.2.3     www.hao123.com  4116    1432    200
7       13590439668     192.168.2.4                     1116    954     200
8       15910133277     192.168.2.5     www.hao123.com  3156    2936    200
9       13729199489     192.168.2.6                     240     0       200
10      13630577991     192.168.2.7     www.shouhu.com  6960    690     200
11      15043685818     192.168.2.8     www.baidu.com   3659    3538    200
12      15959002129     192.168.2.9     www.hao123.com  1938    180     500
13      13560439638     192.168.2.10                    918     4938    200
14      13470253144     192.168.2.11                    180     180     200
15      13682846555     192.168.2.12    www.qq.com      1938    2910    200
16      13992314666     192.168.2.13    www.gaga.com    3008    3720    200
17      13509468723     192.168.2.14    www.qinghua.com 7335    110349  404
18      18390173782     192.168.2.15    www.sogou.com   9531    2412    200
19      13975057813     192.168.2.16    www.baidu.com   11058   48243   200
20      13768778790     192.168.2.17                    120     120     200
21      13568436656     192.168.2.18    www.alibaba.com 2481    24681   200
22      13568436656     192.168.2.19                    1116    954     200

实现过程

1、新建Maven工程,pom.xml依赖如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.lizhengi</groupId><artifactId>Hadoop-API</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.2.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.2.1</version></dependency></dependencies></project>

2、src/main/resources目录下,新建一个文件,命名为“log4j.properties”,添加内容如下

log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n

3、编写Bean类-FlowBean

package com.lizhengi.flow;import org.apache.hadoop.io.Writable;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;/*** @author lizhengi* @create 2020-07-20*/
// 1 实现writable接口
public class FlowBean implements Writable {private long upFlow;    //上行流量private long downFlow;  //下行流量private long sumFlow;   //总流量//2  反序列化时,需要反射调用空参构造函数,所以必须有public FlowBean() {}@Overridepublic String toString() {return upFlow + "\t" + downFlow + "\t" + sumFlow;}public void set(long upFlow, long downFlow) {this.upFlow = upFlow;this.downFlow = downFlow;this.sumFlow = upFlow + downFlow;}public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDownFlow() {return downFlow;}public void setDownFlow(long downFlow) {this.downFlow = downFlow;}public long getSumFlow() {return sumFlow;}public void setSumFlow(long sumFlow) {this.sumFlow = sumFlow;}//3  写序列化方法public void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}//4 反序列化方法//5 反序列化方法读顺序必须和写序列化方法的写顺序必须一致public void readFields(DataInput in) throws IOException {upFlow = in.readLong();downFlow = in.readLong();sumFlow = in.readLong();}}

4、编写Mapper类-FlowMapper

package com.lizhengi.flow;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author lizhengi* @create 2020-07-20*/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {private Text phone = new Text();private FlowBean flow = new FlowBean();@Overrideprotected void map(LongWritable key, Text value, Context context)  throws IOException, InterruptedException {String[] fields = value.toString().split("\t");phone.set(fields[1]);long upFlow = Long.parseLong(fields[fields.length - 3]);long downFlow = Long.parseLong(fields[fields.length - 2]);flow.set(upFlow,downFlow);context.write(phone, flow);}
}

5、编写Reducer类-FlowReducer

package com.lizhengi.flow;import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;/*** @author lizhengi* @create 2020-07-20*/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {private FlowBean sunFlow = new FlowBean();@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context)throws IOException, InterruptedException {long sum_upFlow = 0;long sum_downFlow = 0;// 1 遍历所用bean,将其中的上行流量,下行流量分别累加for (FlowBean value : values) {sum_upFlow += value.getUpFlow();sum_downFlow += value.getDownFlow();}// 2 封装对象sunFlow.set(sum_upFlow, sum_downFlow);// 3 写出context.write(key, sunFlow);}
}

6、编写Drvier类-FlowDriver

package com.lizhengi.flow;/*** @author lizhengi* @create 2020-07-20*/import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class FlowDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1 获取job实例Job job = Job.getInstance(new Configuration());// 2.设置类路径job.setJarByClass(FlowDriver.class);// 3 指定本业务job要使用的mapper/Reducer业务类job.setMapperClass(FlowMapper.class);job.setReducerClass(FlowReducer.class);// 4 指定mapper输出数据的kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);// 5 指定最终输出的数据的kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);// 6 指定job的输入原始文件所在目录FileInputFormat.setInputPaths(job, "/Users/marron27/test/input");FileOutputFormat.setOutputPath(job, new Path("/Users/marron27/test/output"));//FileInputFormat.setInputPaths(job, new Path(args[0]));//FileOutputFormat.setOutputPath(job, new Path(args[1]));// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}
}

结果展示

Carlota:output marron27$ pwd
/Users/marron27/test/output
Carlota:output marron27$ cat part-r-00000
13470253144 180 180 360
13509468723 7335    110349  117684
13560439638 918 4938    5856
13568436656 3597    25635   29232
13590439668 1116    954 2070
13630577991 6960    690 7650
13682846555 1938    2910    4848
13729199489 240 0   240
13736230513 2481    24681   27162
13768778790 120 120 240
13846544121 264 0   264
13956435636 132 1512    1644
13966251146 240 0   240
13975057813 11058   48243   59301
13992314666 3008    3720    6728
15043685818 3659    3538    7197
15910133277 3156    2936    6092
15959002129 1938    180 2118
18240717138 4116    1432    5548
18271575951 1527    2106    3633
18390173782 9531    2412    11943

代码实现——MapReduce实现Hadoop序列化相关推荐

  1. (2)Hadoop核心 -- java代码对MapReduce的例子1

    案例一:wordcount字数统计功能 1.1 先准备两个txt文件,并上传到hdfs上 test1.txt hello zhangsan lisi nihao hai zhangsan nihao ...

  2. MapReduce快速入门系列(4) | Hadoop序列化

    Hello,大家好,本次为大家带来的是Hadoop的序列化操作. 目录 一. 序列化的简单介绍 1.1. 什么是序列化 1.2. 为什么要序列化 1.3. 为什么不用Java的序列化 二. 自定义be ...

  3. 【MapReduce】Hadoop的序列化机制以及序列化案例求解每个部门工资总额

    Hadoop的序列化机制以及序列化案例求解每个部门工资总额 1 Hadoop的序列化 1.1 序列化定义 1.2 Java序列化编程 1.3 hadoop序列化编程 2 序列化求解每个部门工资总额 手 ...

  4. MapReduce程序之序列化原理与Writable案例

    [TOC] MapReduce程序之序列化原理与Writable案例 前言 在编写MapReduce程序时,我们会发现,对于MapReduce的输入输出数据(key-value),我们只能使用Hado ...

  5. Hadoop之Hadoop序列化

    Hadoop之Hadoop序列化 目录 什么是序列化 为什么要序列化 为什么不用Java的序列化 常用数据序列化类型 自定义bean对象实现序列化接口(Writable) 1. 什么是序列化 序列化就 ...

  6. 他来了他来了,Hadoop序列化和切片机制了解一下?

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 切片机制 一个超大文件在HDFS上存储时,是以多个Block存储在不同的节点上,比如一个512M的 ...

  7. Hadoop序列化案例

    Hadoop序列化案例 统计每一个手机号耗费的总上行流量.总下行流量.总流量 数据: 1 13736230513 192.196.100.1 www.baidu.com 2481 24681 200 ...

  8. Hadoop序列化与Java序列化

    序列化就是把内存中的对象的状态信息转换成字节序列,以便于存储(持久化)和网络传输 反序列化就是就将收到的字节序列或者是硬盘的持久化数据,转换成内存中的对象. 1.JDK的序列化 只要实现了serial ...

  9. MapReduce优劣,理解MapReduce与Hadoop

    MapReduce是一种计算模型,用于大规模数据集(大于1TB)的并行运算.概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程 ...

最新文章

  1. web服务器(IIS)的操作步骤
  2. 常见的计算机系统结构不包括,计算机系统结构
  3. “中国智造”为System x提供创新源动力
  4. 《Python游戏编程入门》——1.2 初识Python
  5. LR中url和html两种录制模式
  6. usb扩展坞同时接键盘鼠标_笔记本扩展伴侣,轻松解决接口烦恼,毕亚兹USB-C扩展坞体验...
  7. DataTable操作相关实例
  8. oracle db2备份数据库,datagurad 使用备库的备份恢复主库的数据文件
  9. 从JVM的角度看JAVA代码1
  10. JSP九大内置对象及四个作用域
  11. safari 浏览器提示添加到主屏幕_Safari浏览器的秘密技能
  12. 程序员必备的css工具,8个提高效率的CSS实用工具
  13. 小米Android 4.3.1刷机包,终于来了:小米4 Win10刷机包下载!附刷机教程
  14. Anaconda下载安装与手动配置环境变量
  15. 未能连接到驱动人生服务器,更新显卡驱动提示“无法连接到Nvidia”,驱动人生来解决。...
  16. Error: The specified query does not exist\nResponse from attempted peer comms was an error
  17. 冰狐智能辅助入门教程
  18. 笔记本html到电视,笔记本怎么连接液晶电视 笔记本连接液晶电视方法【详解】...
  19. 蓝代斯克和玖道在华设立合资企业
  20. 公募基金主要业务逻辑

热门文章

  1. 将服务器get到的响应打印,得到HttpResponse的响应主体
  2. 计算机组成原理第八章课后答案6,计算机组成原理 第八章 复习
  3. Python str 函数 - Python零基础入门教程
  4. jq之fadeOut()
  5. printf函数的格式修饰符
  6. android xml 列表展示,Android中ListView实现展示列表数据
  7. python外星人入侵怎么发给别人_python_外星人入侵(1-1)
  8. mysql集群需要几个ip_rac集群3组机器,scan到底需要几个IP?
  9. 无法删除计算机文件是什么意思,为何计算机的文件删除不了,说是被占用要解除占用...
  10. vue读取服务器文件跨域,新版vue-cli模板下本地开发环境使用node服务器跨域的方法...