Hadoop-MapReduce基本代码一览
Hadoop-MapReduce基本代码一览
Hadoop-MapReduce基本代码一览
JavaBean类
Map类
Partitions类
Reduce类
Driver类
JavaBean类
import org.apache.hadoop.io.Writable;import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;public class PhoneBean implements Writable {//设置四个需要存储读取的数据private Integer upFlow; //上行流量private Integer downFlow;//下行流量private Integer upCountFlow;//上行总流量private Integer downCountFlow;//下行总流量public PhoneBean( ){}public PhoneBean(Integer upFlow, Integer downFlow, Integer upCountFlow, Integer downCountFlow) {this.upFlow = upFlow;this.downFlow = downFlow;this.upCountFlow = upCountFlow;this.downCountFlow = downCountFlow;}public Integer getUpFlow() {return upFlow;}public void setUpFlow(Integer upFlow) {this.upFlow = upFlow;}public Integer getDownFlow() {return downFlow;}public void setDownFlow(Integer downFlow) {this.downFlow = downFlow;}public Integer getUpCountFlow() {return upCountFlow;}public void setUpCountFlow(Integer upCountFlow) {this.upCountFlow = upCountFlow;}public Integer getDownCountFlow() {return downCountFlow;}public void setDownCountFlow(Integer downCountFlow) {this.downCountFlow = downCountFlow;}@Overridepublic String toString() {return upFlow +"_"+ downFlow +"_"+ upCountFlow +"_"+ downCountFlow ;}//数据序列化@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(upFlow);out.writeInt(downFlow);out.writeInt(upCountFlow);out.writeInt(downCountFlow);}//数据反序列化@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow= in.readInt();this.downFlow= in.readInt();this.upCountFlow= in.readInt();this.downCountFlow= in.readInt();} }
Map类
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class BMap extends Mapper<LongWritable, Text,Text,PhoneBean> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//value=136315799**** 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 未知 20 16 4116 1432 200String[] datas = value.toString().split("\\t");String phone=datas[1];PhoneBean phoneBean = new PhoneBean();phoneBean.setUpFlow(Integer.parseInt(datas[6]));phoneBean.setDownFlow(Integer.parseInt(datas[7]));phoneBean.setUpCountFlow(Integer.parseInt(datas[8]));phoneBean.setDownCountFlow(Integer.parseInt(datas[9]));context.write(new Text(phone), phoneBean);} }
Partitions类
import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Partitioner;public class Partitions extends Partitioner<PhoneBean , NullWritable> {@Overridepublic int getPartition(PhoneBean phoneBean , NullWritable nullWritable, int i) {if (phoneBean .getUpCountFlow()==x){return 0;}else {return 1;}} }
Reduce类
import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class BReduce extends Reducer<Text,PhoneBean,Text,PhoneBean> {@Overrideprotected void reduce(Text key, Iterable<PhoneBean> values, Context context) throws IOException, InterruptedException {int UpFlow=0,DownFlow=0,UpCountFlow=0,DownCountFlow=0;for (PhoneBean phone : values) {UpFlow+=phone.getUpFlow();DownFlow+=phone.getDownFlow();UpCountFlow+=phone.getUpCountFlow();DownCountFlow+=phone.getDownCountFlow();}PhoneBean bean = new PhoneBean(UpFlow,DownFlow,UpCountFlow,DownCountFlow);context.write(key,bean);} }
Driver类
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class BDriver {public static void main(String[] args) throws Exception{Configuration conf=new Configuration();Job job =Job.getInstance(conf,"phoneBean");//设置程序的主类job.setJarByClass(BDriver.class);//设置Map程序代码 和 reduce程序代码job.setMapperClass(BMap.class);job.setReducerClass(BReduce.class);//设置Map输出的key value的类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(PhoneBean.class);//设置reduce 的输出的key value的类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(PhoneBean.class);//设置自定义分区job.setPartitionerClass(Partitions.class);job.setNumReduceTasks(2);//设置去哪里读取数据FileInputFormat.addInputPath(job,new Path("E:\\data_flow3.0.txt"));//设置最终结果写到哪里去FileOutputFormat.setOutputPath(job, new Path("D:\\aaa"));//提交作业-任务//job.submit();boolean b = job.waitForCompletion(true);System.exit(b?0:1);} }
Hadoop-MapReduce基本代码一览相关推荐
- hadoop错误: 找不到或无法加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster
错误: 找不到或无法加载主类 org.apache.hadoop.mapreduce.v2.app.MRAppMaster 原创hongxiao2016 最后发布于2019-03-30 21:20:5 ...
- hadoop调用python算法_使用Python实现Hadoop MapReduce程序
根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序, 打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...
- Hadoop MapReduce编程 API入门系列之最短路径(十五)
不多说,直接上代码. ====================================== = Iteration: 1 = Input path: out/shortestpath/inpu ...
- Hadoop mapreduce框架简介
传统hadoop MapReduce架构(老架构) 从上图中可以清楚的看出原 MapReduce 程序的流程及设计思路: 1.首先用户程序 (JobClient) 提交了一个 job,job 的信息会 ...
- mapreduce编程实例python-使用Python语言写Hadoop MapReduce程序
原标题:使用Python语言写Hadoop MapReduce程序 Python部落(python.freelycode.com)组织翻译,禁止转载,欢迎转发. 在本教程中,我将描述如何使用Pytho ...
- hadoop MapReduce实例解析
1.MapReduce理论简介 1.1 MapReduce编程模型 MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然 ...
- 使用Python实现Hadoop MapReduce程序
根据上面两篇文章,下面是我在自己的ubuntu上的运行过程.文字基本采用博文使用Python实现Hadoop MapReduce程序, 打字很浪费时间滴. 在这个实例中,我将会向大家介绍如何使用Py ...
- Hadoop MapReduce
先看一段代码: package com.abc;import java.io.IOException; import java.util.Iterator; import java.util.Stri ...
- 《Hadoop MapReduce性能优化》一1.3 Hadoop MapReduce的工作原理
本节书摘来异步社区<Hadoop MapReduce性能优化>一书中的第1章,第1.3节,作者: [法]Khaled Tannir 译者: 范欢动 责编: 杨海玲,更多章节内容可以访问云栖 ...
最新文章
- 简单探讨隐私增强技术的类型和用途
- python操作neo4j
- 面试题:二叉树的深度
- 安装和使用花生壳(linux)
- 树莓4派开机动画_树莓派4+无屏幕安装系统+ssh远程+远程桌面
- Delphi关于StringGrid的公用模块[转]
- 10分钟带你探索css中更为奇妙的奥秘
- Bitmap缩放(二)
- CentOS 7 Tomcat 安装
- Java 程序员必须掌握的 4 大开源框架!
- 算法打卡Ques20201009
- js exploit
- 电脑热点突然不能用了,想想你是否新装了VMware等软件
- 小猪佩奇与Tom猫的一场内网友谊赛
- python getch_macOS 下的 getch()
- 【C语言小游戏】学生信息管理系统
- NXP JN5169 读写片外 FLASH
- 自定义EL表达式的函数
- java 信号量 闭锁_Java并发包之闭锁/栅栏/信号量
- 【张钹院士 | 大师谈AI】迈向第三代人工智能「AI核心算法」