根据需要构造自己的数据类型

FlowBean.java

package flow;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** 使用对象构造自己的数据结构,这个对象要实现Writable的接口* 1、该类一定要保留空参构造函数* 2、write方法中输出字段二进制数据的顺序  要与  readFields方法读取数据的顺序一致*/
public class FlowBean implements Writable {private int upFlow;private int dFlow;private int amountFlow;public FlowBean(){}public FlowBean(int upFlow, int dFlow) {this.upFlow = upFlow;//上行流量this.dFlow = dFlow;//下行流量this.amountFlow = upFlow + dFlow;//总流量}public int getUpFlow() {return upFlow;}public void setUpFlow(int upFlow) {this.upFlow = upFlow;}public int getdFlow() {return dFlow;}public void setdFlow(int dFlow) {this.dFlow = dFlow;}public int getAmountFlow() {return amountFlow;}public void setAmountFlow(int amountFlow) {this.amountFlow = amountFlow;}@Overridepublic void write(DataOutput out) throws IOException {//该方法必须重写序列化,让hadoop集群来调用out.writeInt(upFlow);out.writeUTF(phone);out.writeInt(dFlow);out.writeInt(amountFlow);}@Overridepublic void readFields(DataInput in) throws IOException {//该方法必须重写反序列化,让hadoop集群调用this.upFlow = in.readInt();this.phone = in.readUTF();this.dFlow = in.readInt();this.amountFlow = in.readInt();}@Overridepublic String toString() {//返回最终的结果return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;}}

partitioner

package flow;import java.util.HashMap;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** MapTask通过这个类的getPartition方法,根据partitioner方法决定maper的结果如何分配给reducer*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{static HashMap<String,Integer> codeMap = new HashMap<>();static{codeMap.put("135", 0);codeMap.put("136", 1);codeMap.put("137", 2);codeMap.put("138", 3);codeMap.put("139", 4);}@Override//默认是根据key的haskcode的值%reducernum来分配任务public int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = codeMap.get(key.toString().substring(0, 3));return code==null?5:code;}}

mapper

package flow;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("\t");int upFlow = Integer.parseInt(fields[1]);int dFlow = Integer.parseInt(fields[2]);context.write(new Text(phone), new FlowBean(upFlow, dFlow));}
}

reducer

package flow;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{/***  key:是某个手机号*  values:是这个手机号所产生的所有访问记录中的流量数据*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)throws IOException, InterruptedException {int upSum = 0;int dSum = 0;for(FlowBean value:values){upSum += value.getUpFlow();dSum += value.getdFlow();}context.write(key, new FlowBean(key.toString(), upSum, dSum));}}

JobSubmitter.java

package flow;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class JobSubmitter {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(JobSubmitter.class);job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);job.setPartitionerClass(ProvincePartitioner.class);//如果不设定默认使用HashPartitionerjob.setNumReduceTasks(6);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("F:\\input"));FileOutputFormat.setOutputPath(job, new Path("F:\\province-output"));job.waitForCompletion(true);}}

hadoop 自定义数据类型相关推荐

  1. hadoop map reduce自定义数据类型时注意顺序,否则报错。

    2019独角兽企业重金招聘Python工程师标准>>> 自定义数据类型,实现Writable接口,重写write方法和readFields方法时,在操作字段时,必须保证顺序,如果在w ...

  2. Hadoop之自定义数据类型

    2019独角兽企业重金招聘Python工程师标准>>> 1.写一个类实现Writable接口 2.重写write和readFilelds方法 3.自定义数据类型,提供相应的gette ...

  3. MapReduce编程实践之自定义数据类型

    一:任务描述 自定义数据类型完成手机流量的分析 二:example data 格式为:记录报告时间戳.手机号码.AP mac.AC mac.访问的网址.网址种类.上行数据包数.下行数据包数.上行总流量 ...

  4. TVM自定义数据类型

    TVM自定义数据类型 本文将介绍"自定义数据类型"框架,该框架可在TVM中使用自定义数据类型. 介绍 在设计加速器时,关键是如何近似地表示硬件中的实数.这个问题具有长期的行业标准解 ...

  5. 自主数据类型:在TVM中启用自定义数据类型探索

    自主数据类型:在TVM中启用自定义数据类型探索 介绍 在设计加速器时,一个重要的决定是如何在硬件中近似地表示实数.这个问题有一个长期的行业标准解决方案:IEEE 754浮点标准.1.然而,当试图通过构 ...

  6. Qt信号与槽传递自定义数据类型——两种解决方法

    Qt信号与槽传递自定义数据类型--两种解决方法 参考文章: (1)Qt信号与槽传递自定义数据类型--两种解决方法 (2)https://www.cnblogs.com/tid-think/p/9300 ...

  7. 【剑仙教程】易语言的结构体。自定义数据类型。

    1楼. [剑仙教程]易语言的结构体.自定义数据类型. 在易语言中,有基本数据类型,自定义数据类型. 自定义数据类型,在C语言/C++中称为结构体.这是面向对象和类的编程. . . 先上代码.打开易语言 ...

  8. Oracle自定义数据类型 1

    原文 oracle 自定义类型 type / create type 一 Oracle中的类型 类型有很多种,主要可以分为以下几类: 1.字符串类型.如:char.nchar.varchar2.nva ...

  9. 玩转C语言之自定义数据类型-typedef

    玩转C语言之自定义数据类型-typedef 1 引言 引言:在C语言中没有提供字节类型,如果在工程中需要使用表示字节的变量时,该怎么办呢? 我们知道 1 个字节占用 8 个二进制位空间,解决这个问题的 ...

最新文章

  1. 听障人士的“有声桥梁”:百度智能云曦灵-AI手语平台发布
  2. [C#]委托和事件(详细讲解)
  3. [摘记]数值方法04——函数求值
  4. GDI+ 设置文本对齐方式
  5. WinCE控制面板添加应用程序
  6. Linux命令之文件与文件夹的拷贝
  7. 【微信小程序】关于小程序的协同工作与发布的工作流程
  8. Python关于pandas中 ValueError: Writing 0 cols but got ”XXX“ aliases的错误
  9. java获取时间天数间隔
  10. CAD工具——图纸剪切
  11. win7出现问题事件名称APPCRASH的解决方法
  12. Python教你18个高效编程的方法
  13. 【真.干货】一篇文章了解关于计算机硬件那些事
  14. 【MATLAB教程案例86】通过matlab实现lorenz混沌系统
  15. 考研数据结构树——读书摘抄总结
  16. php获取网址根目录,php获取当前目录_php获得网站根目录的几个方法
  17. 用计算机寻找素数,找出1到1000所有质数(电脑流程图)
  18. 【愚公系列】2023年03月 Java教学课程 070-HTTP协议
  19. ffmpeg编译及使用 (转载)
  20. 360全景图制作软件,常用VR全景制作软件推荐

热门文章

  1. Python标准库datetime中4种基本对象的用法
  2. Python批量提取docx格式Word文档中所有文本框内的文本
  3. Python正则表达式常用flag含义与用法详解
  4. Python多线程编程的一个掉进去不太容易爬出来的坑
  5. Python使用OpenCV+pillow提取AVI视频中关键帧图像
  6. Python计算整数阶乘的几种方法比较
  7. Python实时获取鼠标下窗口文本
  8. python nltk book_nltk book的下载
  9. java 散列集_java数据结构之散列集HashSet与散列表Hashtable
  10. 贪心算法:跳跃游戏总结