hadoop 自定义数据类型
根据需要构造自己的数据类型
FlowBean.java
package flow;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** 使用对象构造自己的数据结构,这个对象要实现Writable的接口* 1、该类一定要保留空参构造函数* 2、write方法中输出字段二进制数据的顺序 要与 readFields方法读取数据的顺序一致*/
public class FlowBean implements Writable {private int upFlow;private int dFlow;private int amountFlow;public FlowBean(){}public FlowBean(int upFlow, int dFlow) {this.upFlow = upFlow;//上行流量this.dFlow = dFlow;//下行流量this.amountFlow = upFlow + dFlow;//总流量}public int getUpFlow() {return upFlow;}public void setUpFlow(int upFlow) {this.upFlow = upFlow;}public int getdFlow() {return dFlow;}public void setdFlow(int dFlow) {this.dFlow = dFlow;}public int getAmountFlow() {return amountFlow;}public void setAmountFlow(int amountFlow) {this.amountFlow = amountFlow;}@Overridepublic void write(DataOutput out) throws IOException {//该方法必须重写序列化,让hadoop集群来调用out.writeInt(upFlow);out.writeUTF(phone);out.writeInt(dFlow);out.writeInt(amountFlow);}@Overridepublic void readFields(DataInput in) throws IOException {//该方法必须重写反序列化,让hadoop集群调用this.upFlow = in.readInt();this.phone = in.readUTF();this.dFlow = in.readInt();this.amountFlow = in.readInt();}@Overridepublic String toString() {//返回最终的结果return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;}}
partitioner
package flow;import java.util.HashMap;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;/*** MapTask通过这个类的getPartition方法,根据partitioner方法决定maper的结果如何分配给reducer*/
public class ProvincePartitioner extends Partitioner<Text, FlowBean>{static HashMap<String,Integer> codeMap = new HashMap<>();static{codeMap.put("135", 0);codeMap.put("136", 1);codeMap.put("137", 2);codeMap.put("138", 3);codeMap.put("139", 4);}@Override//默认是根据key的haskcode的值%reducernum来分配任务public int getPartition(Text key, FlowBean value, int numPartitions) {Integer code = codeMap.get(key.toString().substring(0, 3));return code==null?5:code;}}
mapper
package flow;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String[] fields = line.split("\t");int upFlow = Integer.parseInt(fields[1]);int dFlow = Integer.parseInt(fields[2]);context.write(new Text(phone), new FlowBean(upFlow, dFlow));}
}
reducer
package flow;import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{/*** key:是某个手机号* values:是这个手机号所产生的所有访问记录中的流量数据*/@Overrideprotected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)throws IOException, InterruptedException {int upSum = 0;int dSum = 0;for(FlowBean value:values){upSum += value.getUpFlow();dSum += value.getdFlow();}context.write(key, new FlowBean(key.toString(), upSum, dSum));}}
JobSubmitter.java
package flow;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class JobSubmitter {public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(JobSubmitter.class);job.setMapperClass(FlowCountMapper.class);job.setReducerClass(FlowCountReducer.class);job.setPartitionerClass(ProvincePartitioner.class);//如果不设定默认使用HashPartitionerjob.setNumReduceTasks(6);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);FileInputFormat.setInputPaths(job, new Path("F:\\input"));FileOutputFormat.setOutputPath(job, new Path("F:\\province-output"));job.waitForCompletion(true);}}
hadoop 自定义数据类型相关推荐
- hadoop map reduce自定义数据类型时注意顺序,否则报错。
2019独角兽企业重金招聘Python工程师标准>>> 自定义数据类型,实现Writable接口,重写write方法和readFields方法时,在操作字段时,必须保证顺序,如果在w ...
- Hadoop之自定义数据类型
2019独角兽企业重金招聘Python工程师标准>>> 1.写一个类实现Writable接口 2.重写write和readFilelds方法 3.自定义数据类型,提供相应的gette ...
- MapReduce编程实践之自定义数据类型
一:任务描述 自定义数据类型完成手机流量的分析 二:example data 格式为:记录报告时间戳.手机号码.AP mac.AC mac.访问的网址.网址种类.上行数据包数.下行数据包数.上行总流量 ...
- TVM自定义数据类型
TVM自定义数据类型 本文将介绍"自定义数据类型"框架,该框架可在TVM中使用自定义数据类型. 介绍 在设计加速器时,关键是如何近似地表示硬件中的实数.这个问题具有长期的行业标准解 ...
- 自主数据类型:在TVM中启用自定义数据类型探索
自主数据类型:在TVM中启用自定义数据类型探索 介绍 在设计加速器时,一个重要的决定是如何在硬件中近似地表示实数.这个问题有一个长期的行业标准解决方案:IEEE 754浮点标准.1.然而,当试图通过构 ...
- Qt信号与槽传递自定义数据类型——两种解决方法
Qt信号与槽传递自定义数据类型--两种解决方法 参考文章: (1)Qt信号与槽传递自定义数据类型--两种解决方法 (2)https://www.cnblogs.com/tid-think/p/9300 ...
- 【剑仙教程】易语言的结构体。自定义数据类型。
1楼. [剑仙教程]易语言的结构体.自定义数据类型. 在易语言中,有基本数据类型,自定义数据类型. 自定义数据类型,在C语言/C++中称为结构体.这是面向对象和类的编程. . . 先上代码.打开易语言 ...
- Oracle自定义数据类型 1
原文 oracle 自定义类型 type / create type 一 Oracle中的类型 类型有很多种,主要可以分为以下几类: 1.字符串类型.如:char.nchar.varchar2.nva ...
- 玩转C语言之自定义数据类型-typedef
玩转C语言之自定义数据类型-typedef 1 引言 引言:在C语言中没有提供字节类型,如果在工程中需要使用表示字节的变量时,该怎么办呢? 我们知道 1 个字节占用 8 个二进制位空间,解决这个问题的 ...
最新文章
- 听障人士的“有声桥梁”:百度智能云曦灵-AI手语平台发布
- [C#]委托和事件(详细讲解)
- [摘记]数值方法04——函数求值
- GDI+ 设置文本对齐方式
- WinCE控制面板添加应用程序
- Linux命令之文件与文件夹的拷贝
- 【微信小程序】关于小程序的协同工作与发布的工作流程
- Python关于pandas中 ValueError: Writing 0 cols but got ”XXX“ aliases的错误
- java获取时间天数间隔
- CAD工具——图纸剪切
- win7出现问题事件名称APPCRASH的解决方法
- Python教你18个高效编程的方法
- 【真.干货】一篇文章了解关于计算机硬件那些事
- 【MATLAB教程案例86】通过matlab实现lorenz混沌系统
- 考研数据结构树——读书摘抄总结
- php获取网址根目录,php获取当前目录_php获得网站根目录的几个方法
- 用计算机寻找素数,找出1到1000所有质数(电脑流程图)
- 【愚公系列】2023年03月 Java教学课程 070-HTTP协议
- ffmpeg编译及使用 (转载)
- 360全景图制作软件,常用VR全景制作软件推荐