一、Hadoop数据序列化的数据类型

  Java数据类型 => Hadoop数据类型

  int         IntWritable

  float        FloatWritable

  long        LongWritable

  double         DoubleWritable

  String       Text

  boolean      BooleanWritable

  byte        ByteWritable

  map          MapWritable

  array        ArrayWritable

二、Hadoop的序列化

  1.什么是序列化?

   在java中,序列化接口是Serializable,它下面又实现了很多的序列化接口,所以java的序列化是一个重量级的序列化框架,一个对象被java序列化之后会附带很多额外的信息(校验信息、header、继承体系等),不便于在网络中进行高效的传输,所以Hadoop开发了一套自己的序列化框架——Writable。

      序列化就是把内存当中的对象,转化为字节序列以便于存储和网络传输;

   反序列化是将收到的字节序列或硬盘当中的持续化数据,转换成内存中的对象。

  2.序列化的理解方法(自己悟的,不对勿喷~~)

    比如下面流量统计案例中,流量的封装类FlowBean实现了Writable接口,其中定义了变量upFlow、dwFlow、flowSum;

    在Mapper和Reducer类中初始化封装类FlowBean时,内存会分配空间加载这些对象,而这些对象不便于在网络中高效的传输,这是封装类FlowBean中的序列化方法将这些对象转换为字节序列,方便了存储和传输;

    当Mapper或Reducer需要将这些对象的字节序列写出到磁盘时,封装类FlowBean中的反序列化方法将字节序列转换为对象,然后写道磁盘中。

  3.序列化特点

   序列化与反序列化时分布式数据处理当中经常会出现的,比如hadoop通信是通过远程调用(rpc)实现的,这个过程就需要序列化。

  特点:1)紧凑;

     2)快速

     3)可扩展

     4)可互操作

三、Mapreduce的流量统计程序案例

  1.代码

/*** @author: PrincessHug* @date: 2019/3/23, 23:38* @Blog: https://www.cnblogs.com/HelloBigTable/*/
public class FlowBean implements Writable {private long upFlow;private long dwFlow;private long flowSum;public long getUpFlow() {return upFlow;}public void setUpFlow(long upFlow) {this.upFlow = upFlow;}public long getDwFlow() {return dwFlow;}public void setDwFlow(long dwFlow) {this.dwFlow = dwFlow;}public long getFlowSum() {return flowSum;}public void setFlowSum(long flowSum) {this.flowSum = flowSum;}public FlowBean() {}public FlowBean(long upFlow, long dwFlow) {this.upFlow = upFlow;this.dwFlow = dwFlow;this.flowSum = upFlow + dwFlow;}/*** 序列化* @param out 输出流* @throws IOException*/@Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(dwFlow);out.writeLong(flowSum);}/*** 反序列化* @param in* @throws IOException*/@Overridepublic void readFields(DataInput in) throws IOException {upFlow = in.readLong();dwFlow = in.readLong();flowSum = in.readLong();}@Overridepublic String toString() {return upFlow + "\t" + dwFlow + "\t" + flowSum;}
}public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//获取数据String line = value.toString();//切分数据String[] fields = line.split("\t");//封装数据String phoneNum = fields[1];long upFlow = Long.parseLong(fields[fields.length - 3]);long dwFlow = Long.parseLong(fields[fields.length - 2]);//发送数据context.write(new Text(phoneNum),new FlowBean(upFlow,dwFlow));}
}public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> {@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {//聚合数据long upFlow_sum = 0;long dwFlow_sum = 0;for (FlowBean f:values){upFlow_sum += f.getUpFlow();dwFlow_sum += f.getDwFlow();}//发送数据context.write(key,new FlowBean(upFlow_sum,dwFlow_sum));}
}public class FlowPartitioner extends Partitioner<Text,FlowBean> {@Overridepublic int getPartition(Text key, FlowBean value, int i) {//获取用来分区的电话号码前三位String phoneNum = key.toString().substring(0, 3);//设置分区逻辑int partitionNum = 4;if ("135".equals(phoneNum)){return 0;}else if ("137".equals(phoneNum)){return 1;}else if ("138".equals(phoneNum)){return 2;}else if ("139".equals(phoneNum)){return 3;}return partitionNum;}
}
public class FlowCountDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {//获取配置,定义工具Configuration conf = new Configuration();Job job = Job.getInstance();//设置运行类job.setJarByClass(FlowCountDriver.class);//设置Mapper类及Mapper输出数据类型job.setMapperClass(FlowCountMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);//设置Reducer类及其输出数据类型job.setReducerClass(FlowCountReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//设置自定义分区job.setPartitionerClass(FlowPartitioner.class);job.setNumReduceTasks(5);//设置文件输入输出流FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\in"));FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\inpartitionout"));//返回运行完成if (job.waitForCompletion(true)){System.out.println("运行完毕!");}else {System.out.println("运行出错!");}}
}

  

转载于:https://www.cnblogs.com/HelloBigTable/p/10590705.html

Mapreduce的序列化和流量统计程序开发相关推荐

  1. Hadoop学习_mapreduce提交方式+实现简单流量统计程序(有注释)+shuffle

    注:以下内容来源于互联用,用于个人读书笔记. mapreduce提交方式 MR程序的几种提交运行模式: 本地模型运行 1/在windows的eclipse里面直接运行main方法,就会将job提交给本 ...

  2. 【MapReduce】实战:流量统计(完整Java代码)

    [MapReduce]系列学习笔记: 第一部分:基本介绍 第二部分:MapReduce的编程 第三部分:MapReduce的分区 第四部分:MaoReduce的排序 第五部分:MapReduce实战: ...

  3. 使用mapreduce进行流量汇总程序开发

    现有文件关于流量文件内容如下 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 2 ...

  4. 使用Mapreduce案例编写用于统计文本中单词出现的次数的案例、mapreduce本地运行等,Combiner使用及其相关的知识,流量统计案例和流量总和以及流量排序案例,自定义Partitioner

    工程结构: 在整个案例过程中,代码如下: WordCountMapper的代码如下: package cn.toto.bigdata.mr.wc; import java.io.IOException ...

  5. MapReduce编程规范及实践(流量统计)

    一.MapReduce编码规范 Map阶段2个步骤 设置 InputFormat 类, 将数据切分为 Key-Value(K1和V1) 对, 输入到第二步 自定义 Map 逻辑, 将第一步的结果转换成 ...

  6. php 开源 流量统计,5款开源的PHP网站流量统计应用程序

    下面是5款开源的PHP网站流量统计应用程序. piwik Piwik 是一套基于Php+MySQL技术构建的开源网站访问统计系统,前身是phpMyVisites.Piwik可以给你详细的统计信息,比如 ...

  7. AI绘画绘图流量主小程序开发

    AI绘画绘图流量主小程序开发 响应式设计--响应式布局,手机.平板.PC自适应匹配. 自定义模型--自定义内容模型.自定义字段.自定义表单. 付费阅读--支持企业支付宝.企业微信支付.余额支付无缝整合 ...

  8. MapReduce案例:手机流量的统计

    案例:手机流量的统计 对于记录用户手机信息的文件,得出统计每一个用户(手机号)所耗费的总上行流量.下行流量,总流量结果. 分析 1. 实现自定义的 bean 来封装流量信息,使用手机号码作为Key,B ...

  9. 5款开源网站流量统计应用程序

    网站流量分析是每个站长的必备工作.我们之前介绍过一些提供在线流量统计服务的网站: 下面则是5款开源的PHP网站流量统计应用程序. piwik Piwik 是一套基于Php+MySQL技术构建的开源网站 ...

最新文章

  1. IOS单例的两种实现方式
  2. windows XP常见进程(个人总结)
  3. gcc对C语言的扩展:语句内嵌表达式(statement-embedded expression)
  4. python的六个类型_介绍Python中6个序列的内置类型
  5. c++ map 获取key列表_一日一技:举例说明python中的map()方法
  6. 程序在发布前就应该发现的一些错误
  7. python subprocess.Popen简明总结
  8. 阻止brew自动更新
  9. Android快速开发框架ZBLibrary源码分享
  10. 上海交通大学计算机应用作业,上海交通大学继续教育学院计算机应用基础(二)第六次作业计算机安全多媒体_1...
  11. Restarting data prefetching from start repeated many times one by one. why?
  12. 【论文研读】Category-level Adversaries for Semantics Consistent Domain Adaptation(cvpr2019)
  13. Asp.net Web Api开发Help Page配置和扩展
  14. Leetcode 120. Triangle 三角形问题(动态规划经典) 解题报告
  15. Flyme-Substratum主题
  16. 人民币小写转大写的一般方法
  17. 信号量机制实现进程互斥与同步,生产者消费者
  18. 打开相机拍照或从相册中选择照片
  19. APIcoud 手机二维码or条码 生成与扫描模块
  20. 中国十大军工集团介绍

热门文章

  1. [精华][推荐]SSO CAS单点登录框架学习 搭建详细步骤及源码
  2. 20150206--JS巩固与加强4-02
  3. Android内核剖析
  4. vue项目工程中npm run dev 到底做了什么
  5. vue --- 提交表单到服务器
  6. 修改chrome记住密码后自动填充表单的背景
  7. 别再管你的API叫微服务了
  8. KAFKA介绍(分布式架构)
  9. 第一个PowerShell脚本——PowerShell三分钟(九)
  10. Jenkins 使用 maven 出现C:\Windows\system32\config\systemprofile的解决