注:以下内容来源于互联用,用于个人读书笔记。

mapreduce提交方式

MR程序的几种提交运行模式:

本地模型运行
1/在windows的eclipse里面直接运行main方法,就会将job提交给本地执行器localjobrunner执行
—-输入输出数据可以放在本地路径下(c:/wc/srcdata/)
—-输入输出数据也可以放在hdfs中(hdfs://weekend110:9000/wc/srcdata)

2/在linux的eclipse里面直接运行main方法,但是不要添加yarn相关的配置,也会提交给localjobrunner执行(即本地服务器)
—-输入输出数据可以放在本地路径下(/home/hadoop/wc/srcdata/)
—-输入输出数据也可以放在hdfs中(hdfs://weekend110:9000/wc/srcdata)

集群模式运行
1/将工程打成jar包,上传到服务器,然后用hadoop命令提交 hadoop jar wc.jar cn.itcast.hadoop.mr.wordcount.WCRunner
2/在linux的eclipse中直接运行main方法,也可以提交到集群中去运行,但是,必须采取以下措施:
—-在工程src目录下加入 mapred-site.xml 和 yarn-site.xml
—-将工程打成jar包(wc.jar),同时在main方法中添加一个conf的配置参数 conf.set(“mapreduce.job.jar”,”wc.jar”);

   上面的图片就是在Linux的IDE中运行集群模式的两种配置方法(任选其一)。 
3/在windows的eclipse中直接运行main方法,也可以提交给集群中运行,但是因为平台不兼容,需要做很多的设置修改(不推荐)
—-要在windows中存放一份hadoop的安装包(解压好的)
—-要将其中的lib和bin目录替换成根据你的windows版本重新编译出的文件
—-再要配置系统环境变量 HADOOP_HOME 和 PATH
—-修改YarnRunner这个类的源码
补充:https://blog.csdn.net/u010171031/article/details/53024516 Hadoop intellij windows本地环境配置。

实现简单流量统计程序

数据格式:

1363157985066   13726230503 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157995052   13826544101 5C-0E-8B-C7-F1-E0:CMCC  120.197.40.4            4   0   264 0   200
1363157991076   13926435656 20-10-7A-28-CC-0A:CMCC  120.196.100.99          2   4   132 1512    200
1363154400022   13926251106 5C-0E-8B-8B-B1-50:CMCC  120.197.40.4            4   0   240 0   200
1363157993044   18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99  iface.qiyi.com  视频网站    15  12  1527    2106    200
1363157995074   84138413    5C-0E-8B-8C-E8-20:7DaysInn  120.197.40.4    122.72.52.12        20  16  4116    1432    200
1363157993055   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200
1363157995033   15920133257 5C-0E-8B-C7-BA-20:CMCC  120.197.40.4    sug.so.360.cn   信息安全    20  20  3156    2936    200
1363157983019   13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82          4   0   240 0   200
1363157984041   13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4    s19.cnzz.com    站点统计    24  9   6960    690 200
1363157973098   15013685858 5C-0E-8B-C7-F7-90:CMCC  120.197.40.4    rank.ie.sogou.com   搜索引擎    28  27  3659    3538    200
1363157986029   15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99  www.umeng.com   站点统计    3   3   1938    180 200
1363157992093   13560439658 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          15  9   918 4938    200
1363157986041   13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4            3   3   180 180 200
1363157984040   13602846565 5C-0E-8B-8B-B6-00:CMCC  120.197.40.4    2052.flash2-http.qq.com 综合门户    15  12  1938    2910    200
1363157995093   13922314466 00-FD-07-A2-EC-BA:CMCC  120.196.100.82  img.qfc.cn      12  12  3008    3720    200
1363157982040   13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99  y0.ifengimg.com 综合门户    57  102 7335    110349  200
1363157986072   18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99  input.shouji.sogou.com  搜索引擎    21  18  9531    2412    200
1363157990043   13925057413 00-1F-64-E1-E6-9A:CMCC  120.196.100.55  t3.baidu.com    搜索引擎    69  63  11058   48243   200
1363157988072   13760778710 00-FD-07-A4-7B-08:CMCC  120.196.100.82          2   2   120 120 200
1363157985066   13726238888 00-FD-07-A4-72-B8:CMCC  120.196.100.82  i02.c.aliimg.com        24  27  2481    24681   200
1363157993055   13560436666 C4-17-FE-BA-DE-D9:CMCC  120.196.100.99          18  15  1116    954 200

任务是:
  对每一个用户(手机号,第二列)统计他的上行流量(第7列),下行流量(第8列)和总的流量(前两个加起来)。
  
大体思路:
  map阶段用手机号做key,上行流量和下行流量作为value传出去供reduce汇总。在这过程有一个问题是原来统计每一个key的次数,直接使用封装好的 LongWrite 类,现在我们这个value必须包含上行流量和下行流量,所以我们必须学着像 LongWrite 类一样去构造我们需要的数据类型类。同时,这个类产生的数据对象还要符合hadoop中序列化传输的要求(什么是序列化;怎么写这个数据类型的类)。
  
什么是序列化:
  在计算机内存中,数据是分块存储的,但是在网络中进行传输是用 流 串行传输的(如下图)。接收端必须反序列化,读取对应的内容。

  注意:JDK自带的序列化机制,会将原来的继承信息一起传递过去,可以复原。但hadoop没有传递继承结构(因为hadoop主要用于大数据量的存储和计算,不需要对象之间的继承结构,所以不需要再序列化的传输过程中带上继承信息)。

如何写这个数据类型的类:
  第一次接触,肯定不知道相关概念,只有去参照hadoop已经写好的(如 LongWrite 是对数据类型Long的封装)(这是一种学习方式,没事可以多读读源码)。下面是LongWrite 的源码。

package org.apache.hadoop.io;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;@Public
@Stable
public class LongWritable implements WritableComparable<LongWritable> {private long value;public LongWritable() {}public LongWritable(long value) {this.set(value);}public void set(long value) {this.value = value;}public long get() {return this.value;}public void readFields(DataInput in) throws IOException {this.value = in.readLong();}public void write(DataOutput out) throws IOException {out.writeLong(this.value);}public boolean equals(Object o) {if (!(o instanceof LongWritable)) {return false;} else {LongWritable other = (LongWritable)o;return this.value == other.value;}}public int hashCode() {return (int)this.value;}public int compareTo(LongWritable o) {long thisValue = this.value;long thatValue = o.value;return thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1);}public String toString() {return Long.toString(this.value);}static {WritableComparator.define(LongWritable.class, new LongWritable.Comparator());}public static class DecreasingComparator extends LongWritable.Comparator {public DecreasingComparator() {}public int compare(WritableComparable a, WritableComparable b) {return super.compare(b, a);}public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {return super.compare(b2, s2, l2, b1, s1, l1);}}public static class Comparator extends WritableComparator {public Comparator() {super(LongWritable.class);}public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {long thisValue = readLong(b1, s1);long thatValue = readLong(b2, s2);return thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1);}}
}

  这里面最重要的是定义数据类型(你要传输什么样子的数据),即成员变量;两个构造函数(无参数是反序列化机制过程要用,有参数的是序列化时对成员变量赋值);序列化函数:write(DataOutput out) ,通过java的DataOutput 将成员变量送到网络中传给其他节点;反序列函数:readFields(DataOutput in) ,也是通过java的机制从流中读取数据给成员变量赋值。
下面是flowSum的map程序:

package hadoop_llb_flowmr;
import java.io.IOException;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/*** Created by llb on 2018/6/18.**/
/**wordcount中是: extends Mapper<LongWritable, Text, Text, LongWritable>*      表示的是:1.输入:偏移量(数字,key)和该行内容(Text)-> <key,value>;*                2.输出:统计name(字符串,Text)和在本行中的统计值(如次数,value) -> <key,value>;*                其中,输入是自动传入的,一行一行的;输出是通过context将<key,value> 发送出去的。*            在这过程中,key和value的类型可以指定,也可以自己写,但是自己写要注意符合序列化的机制。* 因为要统计每一个用户的上行流量,下行流量,以及总的流量和(每个用户多条记录),而其他一般的* 基础类型一次只传一个类目,如只传上行流量,现在我要一次传上行和下行,所以要封装一个自己的数据类型,* 将每条记录中的手机号,上行流量和下行流量封装到一个类的对象中,一起当成是value,即本例中的FlowBean。* FlowBean 是我们自定义的一种数据类型,要在hadoop的各个节点之间传输,应该遵循hadoop的序列化机制* 就必须实现hadoop相应的序列化接口*/
public class flowSumMap extends Mapper<LongWritable,Text,Text,FlowBean> {  //指定输入输出的数据类型//拿到日志中的一行数据,切分各个字段,抽取出我们需要的字段:手机号,上行流量,下行流量,然后封装成kv发送出去protected void map(LongWritable key,Text value,Context context)throws IOException,InterruptedException{//拿一行数据,在内部都是用的原始数据类型String line = value.toString();//切分成各个字段String[] fields = StringUtils.split(line,"\t");//拿到需要的字段String phoneNB = fields[1];long u_flow = Long.parseLong(fields[7]);long d_flow = Long.parseLong(fields[8]);//将 string 参数解析为有符号十进制 long,字符串中的字符必须都是十进制数字。//封装成key-value ,通过context传输出去context.write(new Text(phoneNB),new FlowBean(phoneNB,u_flow,d_flow));}}

下面是针对 FlowBean 的封装:

package hadoop_llb_flowmr;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
/*** Created by llb on 2018/6/18.*/
// Writable 直接这个就可以了,默认按照key排序,这里WritableComparable
// 指定按照流量进行排序的实现。最后面成员函数就要加compare方法。
public class FlowBean implements WritableComparable<FlowBean>{private String phoneNB;private long up_flow;//上行流量private long d_flow;//下行流量private long s_flow;//总流量,为什么写成私有的呢?防止在reduce中被串改?//在反序列化时,反射机制需要调用空参构造函数,所以显示定义了一个空参构造函数public FlowBean(){}
//    写一个有参构造函数,为了初始化方便public FlowBean(String phoneNB,long up_flow,long d_flow){this.phoneNB = phoneNB;this.up_flow = up_flow;this.d_flow = d_flow;this.s_flow = up_flow+d_flow;}//现在写write函数,将对象数据序列化到流中public void write(DataOutput out) throws IOException{out.writeUTF(phoneNB); //将电话号码编码为UTF的字符串格式,传输到流中out.writeLong(up_flow);out.writeLong(d_flow);out.writeLong(s_flow);}//从数据流中取出反序列化对象的数据,注意要和刚刚序列化的顺序一致public void readFields(DataInput in) throws IOException {phoneNB = in.readUTF();up_flow = in.readLong();d_flow = in.readLong();s_flow = in.readLong();}
//    因为成员变量是私有变量,而后面的reduce中要用,所以定义两个方法,获取成员变量的值public long getUp_flow(){return up_flow;}public long getD_flow(){return d_flow;}
//    自定的数据类型,必须实现序列化和反序列化的方法/**我们是自定的一个数据类型,想要在最后的结果中显示为:手机号(key) 上行流量 下行流量 总流量(value)但是我们的contex是不知道怎么排版这个的,所以要单独重写toString方法,指定输出内容的显示格式。*/public String toString(){return " "+up_flow+"\t"+d_flow+"\t"+s_flow;}//因为要按照总流量进行排序,所以要对比较器进行重写public int compareTo(FlowBean o){return s_flow > o.s_flow ? -1:1; //倒排序}
}

下面是reduce程序:

package hadoop_llb_flowmr;
import java.io.IOException;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/*** Created by llb on 2018/6/18.*/
public class flowSumReduce extends Reducer<Text,FlowBean,Text,FlowBean>{//框架每传递一组数据<1387788654(phoneNB),{flowbean,flowbean,flowbean,flowbean.....}>//    调用一次我们的reduce方法//reduce中的业务逻辑就是遍历values,然后进行累加求和再输出protected void reduce(Text key,Iterable<FlowBean> values,Context context)throws IOException, InterruptedException {long up_flow_counter = 0;long d_flow_counter = 0;//遍历对流量求和for (FlowBean bean : values){up_flow_counter += bean.getUp_flow();d_flow_counter += bean.getD_flow();}//最后将结果保存下来,这里面用了bean中的toString方法,指定保存格式context.write(key,new FlowBean(key.toString(),up_flow_counter,d_flow_counter));}
}

下面是主类程序(Runner):

package hadoop_llb_flowmr;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/*** Created by llb on 2018/6/18.*/
//下面是job提交的规范流程:通过hadoop的tool工具
public class flowSumRunner extends Configured implements Tool{//将wordcount中的main的内容写到run中,并返回成功与否的标志public int run(String[] args) throws Exception{//读取配置文件,并创建一个工作IDConfiguration conf = new Configuration();Job job = Job.getInstance();//将map和reduce类封装类jar包job.setJarByClass(flowSumMap.class);job.setJarByClass(flowSumReduce.class);job.setJarByClass(flowSumRunner.class);//设定输入输出格式,见上一篇wordcount说明job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(FlowBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(FlowBean.class);//输入和输出地址通过参数进行指定,可以多次复用FileInputFormat.setInputPaths(job,new Path(args[0]));FileOutputFormat.setOutputPath(job,new Path(args[1]));//提交给yarn资源调度器,并返回状态值return job.waitForCompletion(true)?0:1;}public static void main(String [] args) throws Exception {// ToolRunner 是框架里面的,这是调用的标准写法。int res = ToolRunner.run(new Configuration(),new flowSumRunner(),args);System.exit(res); // 正常就正常退出,否则就异常退出。}
}

然后是将这个工程打包上传hadoop,交给集群运行。Intellij生成jar包: https://jingyan.baidu.com/album/c275f6ba0bbb65e33d7567cb.html?picindex=2 。

还有自定义按照什么排序,后面有时间再添加。
下面是shuffle:
首先补充下map task 并发:

  切片是逻辑概念,一个split触发一个map task进程。因为split是文件的逻辑切片,所以当一个文件很大时,对应的block也很大,会一个block触发一个map task;若block较小,则多个block触发一个map task。这也从侧面说明hadoop对于大文件的高并发。
shuffle:
  我感觉就是不同map和reduce之间的执行过程。放个总图上来:

  输入一个split,启动一个map task,执行map程序,得到 key - value 结果,然后存放到缓存区(本机),当缓存不够时存到磁盘。在这过程中要对数据进行合并和排序,最后合并到一个分区文件。其他的结合图中的内容和下面的博客来看。
https://blog.csdn.net/clerk0324/article/details/52461135

  Map之所以慢就慢在 很多时候缓存放不了,需要不断的放到磁盘上。现在的spark框架不用放在磁盘上,所以能实时计算,但是内存始终有限,没有hadoop处理的数据量大。
  Shuffle整个过程都是由 MRAppMaster 进行监控调度的,因为yarn不懂mapreduce程序。
  
  分区信息主要是告诉 每个part对应结果文件 的偏移量和具体的内容,好拼接起来。
  
  输入数据可能来自于各个客服端,所以为了统一格式,有了InputFormat(内部有从文件系统读,也有从数据库读)。为了写到HDFS文件上,有写了一个OutputFormat 组件。读入文件后,切分为split,然后通过RR形成key-value传给map。
任务:
  查找每个单词在哪个文件出现,并统计查询出现出现几次。
   
  思路:
   
  后面有时间再补充这个程序。

Hadoop学习_mapreduce提交方式+实现简单流量统计程序(有注释)+shuffle相关推荐

  1. Hadoop学习(source方式安装篇)

    Hadoop学习(source方式安装篇) 1.为什么要进行源码编译 网上说Hadoop官网没有提供32位的编译好的Hadoop,这其实是假的,自从2.5版本以后,官方已经提供了64位的编译好的Had ...

  2. Mapreduce的序列化和流量统计程序开发

    一.Hadoop数据序列化的数据类型 Java数据类型 => Hadoop数据类型 int IntWritable float FloatWritable long LongWritable d ...

  3. python表单提交的两种方式_Flask框架学习笔记之表单基础介绍与表单提交方式

    本文实例讲述了Flask框架学习笔记之表单基础介绍与表单提交方式.分享给大家供大家参考,具体如下: 表单介绍 表单是HTML页面中负责数据采集功能的部件.由表单标签,表单域和表单按钮组成.通过表单,将 ...

  4. centos7.6查看什么进程跑的流量_Spark的提交方式有哪些?有什么区别?

    spark的提交方式总体来说有两种,分别是standalone模式和yarn模式. 这两种模式又分别有两种提交方式,分别是: standalone下的client提交方式.(客户端提交) standa ...

  5. Hadoop学习笔记—4.初识MapReduce

    一.神马是高大上的MapReduce MapReduce是Google的一项重要技术,它首先是一个 编程模型 ,用以进行大数据量的计算.对于大 数据量的计算,通常采用的处理手法就是并行计算.但对许多开 ...

  6. Hadoop学习(二)——MapReduce\Yarn架构

    其他更多java基础文章: java基础学习(目录) 学习资料 理解Hadoop YARN架构 本文先讲MapReduce 1.x的框架.再讲MapReduce 1.x升级改进后MapReduce 2 ...

  7. Hadoop学习之MapReduce

    Hadoop学习之MapReduce 目录 Hadoop学习之MapReduce 1 MapReduce简介 1.1 什么是MapReduce 1.2 MapReduce的作用 1.3 MapRedu ...

  8. Hadoop学习教程(MapReduce)(四)

    MapReduce 1.MapReduce概述 1.1.MapReduce定义 1.2.MapReduce优缺点 1.2.1.MapReduce优点 1.2.2.MapReduce缺点 1.3.Map ...

  9. Hadoop学习笔记(1) ——菜鸟入门

     Hadoop学习笔记(1) --菜鸟入门 Hadoop是什么?先问一下百度吧: [百度百科]一个分布式系统基础架构,由Apache基金会所开发.用户能够在不了解分布式底层细节的情况下.开发分布式 ...

最新文章

  1. ssh服务常见问题及其解决办法
  2. 单片机c语言程序设计实训报告,(整理)单片机C语言程序设计实训100例.doc
  3. 向预训练进一步:掩码式非自回归模型训练
  4. 文献记录(part68)--K- 近邻分类器鲁棒性验证:从约束放松法到随机平滑法
  5. Android之AsyncTask异步任务详解总结
  6. C/C++:各类型变量占用字节
  7. qt如何把父窗口的变量传给子窗口_父窗口和iframe子窗口之间相互传递参数和调用函数或方法...
  8. Spring MVC 无XML配置入门示例
  9. 最全的Python进度条展示程序方案
  10. RSA的JavaScript程序
  11. 致远OA_0day批量植Cknife马一步到位
  12. 深度有趣 | 18 二次元头像生成
  13. ln火线零线_插座怎么接LN线
  14. word中安装Zotero插件
  15. java面试(1)如何防止恶意攻击短信验证码接口
  16. extern关键字作用
  17. 黑苹果安装各种问题解决办法
  18. python 百度人脸 sdk_深更半夜实现python百度api人脸识别
  19. 正则表达式在线测试网站推荐
  20. “Python小屋”免费资源汇总(截至2018年11月28日)

热门文章

  1. 一.mysql数据库保存微信用户名报错
  2. H2数据库修改用户名密码
  3. threeJS-Helper13-SkeletonHelper(骨骼显示助手)
  4. vivado高层次综合(high-level synthesis,HLS)学习日记
  5. python常用处理字符串函数的详细分析(全)
  6. 实现PC视频播放最强画质教程( Potplayer播放器+MADVR插件)【转】
  7. Win10系统无法查看CalcExpress帮助文件(.hlp)的解决办法
  8. Android 5.0 Termux 实现对米家设备的控制
  9. 【LOJ】【树形DP】2485 「CEOI2017」Chase
  10. 数字科技企业研发实力榜 TOP 50 ,华为千亿研发排第一