Hadoop学习_mapreduce提交方式+实现简单流量统计程序(有注释)+shuffle
注:以下内容来源于互联用,用于个人读书笔记。
mapreduce提交方式
MR程序的几种提交运行模式:
本地模型运行
1/在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行
—-输入输出数据可以放在本地路径下(c:/wc/srcdata/)
—-输入输出数据也可以放在hdfs中(hdfs://weekend110:9000/wc/srcdata)
2/在linux的eclipse里面直接运行main方法,但是不要添加yarn相关的配置,也会提交给localjobrunner执行(即本地服务器)
—-输入输出数据可以放在本地路径下(/home/hadoop/wc/srcdata/)
—-输入输出数据也可以放在hdfs中(hdfs://weekend110:9000/wc/srcdata)
集群模式运行
1/将工程打成jar包,上传到服务器,然后用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
2/在linux的eclipse中直接运行main方法,也可以提交到集群中去运行,但是,必须采取以下措施:
—-在工程src目录下加入 mapred-site.xml 和 yarn-site.xml
—-将工程打成jar包(wc.jar),同时在main方法中添加一个conf的配置参数 conf.set(“mapreduce.job.jar”,”wc.jar”);
上面的图片就是在Linux的IDE中运行集群模式的两种配置方法(任选其一)。
3/在windows的eclipse中直接运行main方法,也可以提交给集群中运行,但是因为平台不兼容,需要做很多的设置修改(不推荐)
—-要在windows中存放一份hadoop的安装包(解压好的)
—-要将其中的lib和bin目录替换成根据你的windows版本重新编译出的文件
—-再要配置系统环境变量 HADOOP_HOME 和 PATH
—-修改YarnRunner这个类的源码
补充:https://blog.csdn.net/u010171031/article/details/53024516 Hadoop intellij windows本地环境配置。
实现简单流量统计程序
数据格式:
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
任务是:
对每一个用户(手机号,第二列)统计他的上行流量(第7列),下行流量(第8列)和总的流量(前两个加起来)。
大体思路:
map阶段用手机号做key,上行流量和下行流量作为value传出去供reduce汇总。在这过程有一个问题是原来统计每一个key的次数,直接使用封装好的 LongWrite 类,现在我们这个value必须包含上行流量和下行流量,所以我们必须学着像 LongWrite 类一样去构造我们需要的数据类型类。同时,这个类产生的数据对象还要符合hadoop中序列化传输的要求(什么是序列化;怎么写这个数据类型的类)。
什么是序列化:
在计算机内存中,数据是分块存储的,但是在网络中进行传输是用 流 串行传输的(如下图)。接收端必须反序列化,读取对应的内容。
注意:JDK自带的序列化机制,会将原来的继承信息一起传递过去,可以复原。但hadoop没有传递继承结构(因为hadoop主要用于大数据量的存储和计算,不需要对象之间的继承结构,所以不需要再序列化的传输过程中带上继承信息)。
如何写这个数据类型的类:
第一次接触,肯定不知道相关概念,只有去参照hadoop已经写好的(如 LongWrite 是对数据类型Long的封装)(这是一种学习方式,没事可以多读读源码)。下面是LongWrite 的源码。
package org.apache.hadoop.io;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;@Public
@Stable
public class LongWritable implements WritableComparable<LongWritable> {private long value;public LongWritable() {}public LongWritable(long value) {this.set(value);}public void set(long value) {this.value = value;}public long get() {return this.value;}public void readFields(DataInput in) throws IOException {this.value = in.readLong();}public void write(DataOutput out) throws IOException {out.writeLong(this.value);}public boolean equals(Object o) {if (!(o instanceof LongWritable)) {return false;} else {LongWritable other = (LongWritable)o;return this.value == other.value;}}public int hashCode() {return (int)this.value;}public int compareTo(LongWritable o) {long thisValue = this.value;long thatValue = o.value;return thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1);}public String toString() {return Long.toString(this.value);}static {WritableComparator.define(LongWritable.class, new LongWritable.Comparator());}public static class DecreasingComparator extends LongWritable.Comparator {public DecreasingComparator() {}public int compare(WritableComparable a, WritableComparable b) {return super.compare(b, a);}public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return super.compare(b2, s2, l2, b1, s1, l1);}}public static class Comparator extends WritableComparator {public Comparator() {super(LongWritable.class);}public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {long thisValue = readLong(b1, s1);long thatValue = readLong(b2, s2);return thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1);}}
}
这里面最重要的是定义数据类型(你要传输什么样子的数据),即成员变量;两个构造函数(无参数是反序列化机制过程要用,有参数的是序列化时对成员变量赋值);序列化函数:write(DataOutput out) ,通过java的DataOutput 将成员变量送到网络中传给其他节点;反序列函数:readFields(DataOutput in) ,也是通过java的机制从流中读取数据给成员变量赋值。
下面是flowSum的map程序:
package hadoop_llb_flowmr;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*** Created by llb on 2018/6/18.**/
/**wordcount中是: extends Mapper<LongWritable, Text, Text, LongWritable>* 表示的是:1.输入:偏移量(数字,key)和该行内容(Text)-> <key,value>;* 2.输出:统计name(字符串,Text)和在本行中的统计值(如次数,value) -> <key,value>;* 其中,输入是自动传入的,一行一行的;输出是通过context将<key,value> 发送出去的。* 在这过程中,key和value的类型可以指定,也可以自己写,但是自己写要注意符合序列化的机制。* 因为要统计每一个用户的上行流量,下行流量,以及总的流量和(每个用户多条记录),而其他一般的* 基础类型一次只传一个类目,如只传上行流量,现在我要一次传上行和下行,所以要封装一个自己的数据类型,* 将每条记录中的手机号,上行流量和下行流量封装到一个类的对象中,一起当成是value,即本例中的FlowBean。* FlowBean 是我们自定义的一种数据类型,要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制* 就必须实现hadoop相应的序列化接口*/
public class flowSumMap extends Mapper<LongWritable,Text,Text,FlowBean> { //指定输入输出的数据类型//拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量,然后封装成kv发送出去protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{//拿一行数据,在内部都是用的原始数据类型String line = value.toString();//切分成各个字段String[] fields = StringUtils.split(line,"\t");//拿到需要的字段String phoneNB = fields[1];long u_flow = Long.parseLong(fields[7]);long d_flow = Long.parseLong(fields[8]);//将 string 参数解析为有符号十进制 long,字符串中的字符必须都是十进制数字。//封装成key-value ,通过context传输出去context.write(new Text(phoneNB),new FlowBean(phoneNB,u_flow,d_flow));}}
下面是针对 FlowBean 的封装:
package hadoop_llb_flowmr;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/*** Created by llb on 2018/6/18.*/
// Writable 直接这个就可以了,默认按照key排序,这里WritableComparable
// 指定按照流量进行排序的实现。最后面成员函数就要加compare方法。
public class FlowBean implements WritableComparable<FlowBean>{private String phoneNB;private long up_flow;//上行流量private long d_flow;//下行流量private long s_flow;//总流量,为什么写成私有的呢?防止在reduce中被串改?//在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数public FlowBean(){}
// 写一个有参构造函数,为了初始化方便public FlowBean(String phoneNB,long up_flow,long d_flow){this.phoneNB = phoneNB;this.up_flow = up_flow;this.d_flow = d_flow;this.s_flow = up_flow+d_flow;}//现在写write函数,将对象数据序列化到流中public void write(DataOutput out) throws IOException{out.writeUTF(phoneNB); //将电话号码编码为UTF的字符串格式,传输到流中out.writeLong(up_flow);out.writeLong(d_flow);out.writeLong(s_flow);}//从数据流中取出反序列化对象的数据,注意要和刚刚序列化的顺序一致public void readFields(DataInput in) throws IOException {phoneNB = in.readUTF();up_flow = in.readLong();d_flow = in.readLong();s_flow = in.readLong();}
// 因为成员变量是私有变量,而后面的reduce中要用,所以定义两个方法,获取成员变量的值public long getUp_flow(){return up_flow;}public long getD_flow(){return d_flow;}
// 自定的数据类型,必须实现序列化和反序列化的方法/**我们是自定的一个数据类型,想要在最后的结果中显示为:手机号(key) 上行流量 下行流量 总流量(value)但是我们的contex是不知道怎么排版这个的,所以要单独重写toString方法,指定输出内容的显示格式。*/public String toString(){return " "+up_flow+"\t"+d_flow+"\t"+s_flow;}//因为要按照总流量进行排序,所以要对比较器进行重写public int compareTo(FlowBean o){return s_flow > o.s_flow ? -1:1; //倒排序}
}
下面是reduce程序:
package hadoop_llb_flowmr;
import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*** Created by llb on 2018/6/18.*/
public class flowSumReduce extends Reducer<Text,FlowBean,Text,FlowBean>{//框架每传递一组数据<1387788654(phoneNB),{flowbean,flowbean,flowbean,flowbean.....}>// 调用一次我们的reduce方法//reduce中的业务逻辑就是遍历values,然后进行累加求和再输出protected void reduce(Text key,Iterable<FlowBean> values,Context context)throws IOException, InterruptedException {long up_flow_counter = 0;long d_flow_counter = 0;//遍历对流量求和for (FlowBean bean : values){up_flow_counter += bean.getUp_flow();d_flow_counter += bean.getD_flow();}//最后将结果保存下来,这里面用了bean中的toString方法,指定保存格式context.write(key,new FlowBean(key.toString(),up_flow_counter,d_flow_counter));}
}
下面是主类程序(Runner):
package hadoop_llb_flowmr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*** Created by llb on 2018/6/18.*/
//下面是job提交的规范流程:通过hadoop的tool工具
public class flowSumRunner extends Configured implements Tool{//将wordcount中的main的内容写到run中,并返回成功与否的标志public int run(String[] args) throws Exception{//读取配置文件,并创建一个工作IDConfiguration conf = new Configuration();Job job = Job.getInstance();//将map和reduce类封装类jar包job.setJarByClass(flowSumMap.class);job.setJarByClass(flowSumReduce.class);job.setJarByClass(flowSumRunner.class);//设定输入输出格式,见上一篇wordcount说明job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//输入和输出地址通过参数进行指定,可以多次复用FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));//提交给yarn资源调度器,并返回状态值return job.waitForCompletion(true)?0:1;}public static void main(String [] args) throws Exception {// ToolRunner 是框架里面的,这是调用的标准写法。int res = ToolRunner.run(new Configuration(),new flowSumRunner(),args);System.exit(res); // 正常就正常退出,否则就异常退出。}
}
然后是将这个工程打包上传hadoop,交给集群运行。Intellij生成jar包: https://jingyan.baidu.com/album/c275f6ba0bbb65e33d7567cb.html?picindex=2 。
还有自定义按照什么排序,后面有时间再添加。
下面是shuffle:
首先补充下map task 并发:
切片是逻辑概念,一个split触发一个map task进程。因为split是文件的逻辑切片,所以当一个文件很大时,对应的block也很大,会一个block触发一个map task;若block较小,则多个block触发一个map task。这也从侧面说明hadoop对于大文件的高并发。
shuffle:
我感觉就是不同map和reduce之间的执行过程。放个总图上来:
输入一个split,启动一个map task,执行map程序,得到 key - value 结果,然后存放到缓存区(本机),当缓存不够时存到磁盘。在这过程中要对数据进行合并和排序,最后合并到一个分区文件。其他的结合图中的内容和下面的博客来看。
https://blog.csdn.net/clerk0324/article/details/52461135
Map之所以慢就慢在 很多时候缓存放不了,需要不断的放到磁盘上。现在的spark框架不用放在磁盘上,所以能实时计算,但是内存始终有限,没有hadoop处理的数据量大。
Shuffle整个过程都是由 MRAppMaster 进行监控调度的,因为yarn不懂mapreduce程序。
分区信息主要是告诉 每个part对应结果文件 的偏移量和具体的内容,好拼接起来。
输入数据可能来自于各个客服端,所以为了统一格式,有了InputFormat(内部有从文件系统读,也有从数据库读)。为了写到HDFS文件上,有写了一个OutputFormat 组件。读入文件后,切分为split,然后通过RR形成key-value传给map。
任务:
查找每个单词在哪个文件出现,并统计查询出现出现几次。
思路:
后面有时间再补充这个程序。
Hadoop学习_mapreduce提交方式+实现简单流量统计程序(有注释)+shuffle相关推荐
- Hadoop学习(source方式安装篇)
Hadoop学习(source方式安装篇) 1.为什么要进行源码编译 网上说Hadoop官网没有提供32位的编译好的Hadoop,这其实是假的,自从2.5版本以后,官方已经提供了64位的编译好的Had ...
- Mapreduce的序列化和流量统计程序开发
一.Hadoop数据序列化的数据类型 Java数据类型 => Hadoop数据类型 int IntWritable float FloatWritable long LongWritable d ...
- python表单提交的两种方式_Flask框架学习笔记之表单基础介绍与表单提交方式
本文实例讲述了Flask框架学习笔记之表单基础介绍与表单提交方式.分享给大家供大家参考,具体如下: 表单介绍 表单是HTML页面中负责数据采集功能的部件.由表单标签,表单域和表单按钮组成.通过表单,将 ...
- centos7.6查看什么进程跑的流量_Spark的提交方式有哪些?有什么区别?
spark的提交方式总体来说有两种,分别是standalone模式和yarn模式. 这两种模式又分别有两种提交方式,分别是: standalone下的client提交方式.(客户端提交) standa ...
- Hadoop学习笔记—4.初识MapReduce
一.神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个 编程模型 ,用以进行大数据量的计算.对于大 数据量的计算,通常采用的处理手法就是并行计算.但对许多开 ...
- Hadoop学习(二)——MapReduce\Yarn架构
其他更多java基础文章: java基础学习(目录) 学习资料 理解Hadoop YARN架构 本文先讲MapReduce 1.x的框架.再讲MapReduce 1.x升级改进后MapReduce 2 ...
- Hadoop学习之MapReduce
Hadoop学习之MapReduce 目录 Hadoop学习之MapReduce 1 MapReduce简介 1.1 什么是MapReduce 1.2 MapReduce的作用 1.3 MapRedu ...
- Hadoop学习教程(MapReduce)(四)
MapReduce 1.MapReduce概述 1.1.MapReduce定义 1.2.MapReduce优缺点 1.2.1.MapReduce优点 1.2.2.MapReduce缺点 1.3.Map ...
- Hadoop学习笔记(1) ——菜鸟入门
Hadoop学习笔记(1) --菜鸟入门 Hadoop是什么?先问一下百度吧: [百度百科]一个分布式系统基础架构,由Apache基金会所开发.用户能够在不了解分布式底层细节的情况下.开发分布式 ...
最新文章
- ssh服务常见问题及其解决办法
- 单片机c语言程序设计实训报告,(整理)单片机C语言程序设计实训100例.doc
- 向预训练进一步:掩码式非自回归模型训练
- 文献记录(part68)--K- 近邻分类器鲁棒性验证:从约束放松法到随机平滑法
- Android之AsyncTask异步任务详解总结
- C/C++:各类型变量占用字节
- qt如何把父窗口的变量传给子窗口_父窗口和iframe子窗口之间相互传递参数和调用函数或方法...
- Spring MVC 无XML配置入门示例
- 最全的Python进度条展示程序方案
- RSA的JavaScript程序
- 致远OA_0day批量植Cknife马一步到位
- 深度有趣 | 18 二次元头像生成
- ln火线零线_插座怎么接LN线
- word中安装Zotero插件
- java面试(1)如何防止恶意攻击短信验证码接口
- extern关键字作用
- 黑苹果安装各种问题解决办法
- python 百度人脸 sdk_深更半夜实现python百度api人脸识别
- 正则表达式在线测试网站推荐
- “Python小屋”免费资源汇总(截至2018年11月28日)
热门文章
- 一.mysql数据库保存微信用户名报错
- H2数据库修改用户名密码
- threeJS-Helper13-SkeletonHelper(骨骼显示助手)
- vivado高层次综合(high-level synthesis,HLS)学习日记
- python常用处理字符串函数的详细分析(全)
- 实现PC视频播放最强画质教程( Potplayer播放器+MADVR插件)【转】
- Win10系统无法查看CalcExpress帮助文件(.hlp)的解决办法
- Android 5.0 Termux 实现对米家设备的控制
- 【LOJ】【树形DP】2485 「CEOI2017」Chase
- 数字科技企业研发实力榜 TOP 50 ,华为千亿研发排第一