Hadoop详解(三)——MapReduce原理和执行过程,远程Debug,Writable序列化接口,MapReduce程序编写
MapReduce概述
MapReduce的老大是JobTracker 小弟叫TaskTracker相当于小组长 执行具体任务的是Map任务和reduce任务
在Hadoop 0.23版本之后 JobTracker—>ResourceManager(RM) TaskTracker—>NodeManager(NM)
RM和NM只存在于Hadoop 2.0之后的版本中 JobTracker和TaskTracker只存在Hadoop1.0以下 除了0.23版本
有了Yarn之后 完全转为RM和NM YARN不仅仅可以运行MapReduce
MapReduce的大致流程
(2).JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
(3).client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
(4).开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
(5).JobTracker进行初始化任务
(6).读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
(7).TaskTracker通过心跳机制领取任务(任务的描述信息)
(8).下载所需的jar,配置文件等
(9).TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
(10).将结果写入到HDFS当中
MapReduce原理
同时它还进行任务的监控,如果检测到某执行任务的机器宕机了 JobTracker会将该任务进行转移
InputSplit 输入切片 一个InputSplit对应一个Mapper
Mapper的任务执行完成后 Mapper的输出会作为Reducer的输入进行运算,Reducer任务完成后会将结果输出到HDFS中。
通过记录文件偏移量将任务逻辑切分成多个任务切片(split)
每个TaskSplit对应一个Mapper
Mapper和Reducer和输入输出都是以<key,value>的形式存在的
Shuffle (****)非常重要 主要完成Mapper输出的排序和分组 进行合并
框架已经完成了分区排序和分组,如果想实现MapReduce模型,只需重写Map方法和Reduce方法实现具体的业务逻辑即可。
MapReduce相关的类简介:
相对于大量的小文件来说,hadoop更合适处理少量的大文件。
CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。
◆ KeyValueTextInputFormat
当输入数据的每一行是两列,并用tab分离的形式的时候,KeyValueTextInputformat处理这种格式的文件非常适合。
◆ NLineInputformat NLineInputformat可以控制在每个split中数据的行数。
◆ SequenceFileInputformat
当输入文件格式是sequencefile的时候,要使用SequenceFileInputformat作为输入。
(六) 自定义输入格式
2)重写里面的getSplits(JobContext context)方法。
3)重写createRecordReader(InputSplit split,TaskAttemptContext context)方法。
(七) Hadoop的输出
默认的输出格式,key和value中间值用tab隔开的。
◆ SequenceFileOutputformat
将key和value以sequencefile格式输出。
◆ SequenceFileAsOutputFormat
将key和value以原始二进制的格式输出。
◆ MapFileOutputFormat
将key和value写入MapFile中。由于MapFile中的key是有序的,所以写入的时候必须保证记录是按key值顺序写入的。
◆ MultipleOutputFormat
默认情况下一个reducer会产生一个输出,但是有些时候我们想一个reducer产生多个输出,MultipleOutputFormat和MultipleOutputs可以实现这个功能。
Hadoop序列化
序列化概念
Hadoop序列化特点
② 存储空间大。递归地输出类的超类描述直到不再有超类。序列化图对象,反序列化时为每个对象新建一个实例。相反。Writable对象可以重用。
③ 扩展性差。而Writable方便用户自定义
Hadoop有两套序列化机制:一是自己的序列化机制,二是Google提供的Protobuf(protocol buffer)
Hadoop 框架中标已经使用自己的序列化机制对一些类型进行了包装
String —>Text Long—>LongWritable
Hadoop序列化的作用
Writable接口
package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {void write(DataOutput out) throws IOException;void readFields(DataInput in) throws IOException;
}
③ MR任意的key必须实现WritableComparable接口
package org.apache.hadoop.io;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface WritableComparable<T> extends Writable, Comparable<T> {
}
WordCount测试和编写
测试Hadoop自带的wordCount程序
测试步骤1:首先启动HDFS 再启动Yarn

测试步骤2:新建一个words文档,内容如下:

测试步骤3:将words上传到HDFS

测试步骤4:进入到/cloud/hadoop-2.7.4/share/hadoop/mapreduce 使用示例jar进行测试
[root@hadoop1 mapreduce]# hadoop jar hadoop-mapreduce-examples-2.7.4.jar wordcount /word.txt /wcount 其中/word.txt 是输入内容 /wcount是输出内容

测试步骤5:查看输出内容
分析Wordcount执行过程
根据执行过程编写WordCount
package liuxun.test.hadoop.mr;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;public class WCMapper extends Mapper<LongWritable, Text, Text, LongWritable> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 接收数据String line = value.toString();// 切分数据String[] words = line.split(" ");// 循环for (String w : words) {// 出现一次,记一个1context.write(new Text(w), new LongWritable(1));}}}
② 编写Reducer类WCReducer
package liuxun.test.hadoop.mr;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable> {@Overrideprotected void reduce(Text k2, Iterable<LongWritable> v2s, Context context)throws IOException, InterruptedException {// 接收数据Text k3 = k2; // 这里仍将接收的key作为输出的key// 定义一个计数器long counter = 0;// 循环迭代v2sfor (LongWritable i : v2s) {counter+= i.get();}// 输出context.write(k3, new LongWritable(counter));}
}
③ 编写主程序组装Mapper和Reducer
package liuxun.test.hadoop.mr;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/*** * @author liuxun* 1.分析具体的业务逻辑,确定输入和输出数据的样式* 2.自定义一个类继承自org.apache.hadoop.mapreduce.Mapper类,重写map方法,实现具体业务逻辑,将新的key-Value输出* 3.自定义一个类继承自org.apache.hadoop.mapreduce.Reducer类,重写reduce方法,实现具体业务逻辑,将新的key-Value输出* 4.将自定义的mapper和reducer通过job对象组装起来*/
public class WordCount {public static void main(String[] args) throws Exception {// 构建一个Job对象Job job = Job.getInstance(new Configuration());// 注意:一定要将main方法所在的类设置进去job.setJarByClass(WordCount.class);// 设置Mapper相关属性job.setMapperClass(WCMapper.class);job.setMapOutputKeyClass(Text.class); //设置Map的输出参数key的类型job.setMapOutputValueClass(LongWritable.class); //设置Map输出参数Value的类型FileInputFormat.setInputPaths(job, new Path(args[0])); //设置输入的路径// 设置Reducer相关属性job.setReducerClass(WCReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(LongWritable.class);FileOutputFormat.setOutputPath(job, new Path(args[1]));// 提交任务// 参数 表示在执行任务的过程中是否打印进程信息job.waitForCompletion(true);}
}
程序打包并指定主方法,然后将jar包上传至Linux主机,使用命令进行测试
hello 5
jetty 1
kitty 1
tom 2
world 1
注意事项:
org.apache.hadoop.mapreduce.lib.input 是最新的接口
如果导出的是JAR file 运行时hadoop jar <jar_name>
在涉及到Hadoop中Mapper和Reducer自己的逻辑代码进行打包时 选择JAR file
Maven开发自定义Bean实现MR
1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200
1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200
1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200
1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200
1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200
1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200
1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200
1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200
1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200
1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200
1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200
1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200
1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200
1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200
1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200
1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200
1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200
1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200
1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200
1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
日志描述如下:
<dependencies> <dependency><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>2.4.3</version></dependency>
</dependencies>
② HDFS程序需要引入如下依赖
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.4.1</version></dependency>
</dependencies>
③mr程序需要引入依赖:
<dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.8.2</version><scope>test</scope></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.4.1</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.4.1</version></dependency>
</dependencies>
开始程序的编写
package liuxun.hadoop.mr.dc;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class DataBean implements Writable {private String tel;private long upPayLoad;private long downPayLoad;private long totalPayLoad;public DataBean() {}public DataBean(String tel, long upPayLoad, long downPayLoad) {this.tel = tel;this.upPayLoad = upPayLoad;this.downPayLoad = downPayLoad;this.totalPayLoad = upPayLoad + downPayLoad;}@Overridepublic String toString() {return this.upPayLoad + "\t" + this.downPayLoad + "\t" + this.totalPayLoad;}public void readFields(DataInput in) throws IOException {this.tel = in.readUTF();this.upPayLoad = in.readLong();this.downPayLoad = in.readLong();this.totalPayLoad = in.readLong();}// 注意两点:写入的顺序和写入的类型public void write(DataOutput out) throws IOException {out.writeUTF(tel);out.writeLong(upPayLoad);out.writeLong(downPayLoad);out.writeLong(totalPayLoad);}public String getTel() {return tel;}public void setTel(String tel) {this.tel = tel;}public long getUpPayLoad() {return upPayLoad;}public void setUpPayLoad(long upPayLoad) {this.upPayLoad = upPayLoad;}public long getDownPayLoad() {return downPayLoad;}public void setDownPayLoad(long downPayLoad) {this.downPayLoad = downPayLoad;}public long getTotalPayLoad() {return totalPayLoad;}public void setTotalPayLoad(long totalPayLoad) {this.totalPayLoad = totalPayLoad;}}
二、编写MapReduce程序 使用job进行组装
package liuxun.hadoop.mr.dc;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class DataCount {public static class DCMapper extends Mapper<LongWritable, Text, Text, DataBean>{@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {//accept String line = value.toString();//splitString[] fields = line.split("\t");String tel = fields[1];long up = Long.parseLong(fields[8]);long down = Long.parseLong(fields[9]); DataBean bean = new DataBean(tel, up, down);//sendcontext.write(new Text(tel), bean);}}public static class DCReducer extends Reducer<Text, DataBean, Text, DataBean>{@Overrideprotected void reduce(Text key, Iterable<DataBean> values, Context context)throws IOException, InterruptedException {long up_sum = 0;long down_sum = 0;for(DataBean bean : values){up_sum += bean.getUpPayLoad();down_sum += bean.getDownPayLoad();}DataBean bean = new DataBean("", up_sum, down_sum);context.write(key, bean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(DataCount.class);job.setMapperClass(DCMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(DataBean.class);FileInputFormat.setInputPaths(job, new Path(args[0]));job.setReducerClass(DCReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(DataBean.class);FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}}
打包上传测试运行 最后查看运行结果
Hadoop远程Debug
注意:
如果没有打包在Linux上运行,而是在Eclipse上调试运行是按本地模式启动的 只会启动一个Mapper和一个Reducer 看不到真正的集群效果
JPDA 简介
Sun Microsystem 的 Java Platform Debugger Architecture (JPDA) 技术是一个多层架构,使您能够在各种环境中轻松调试 Java 应用程序。JPDA 由两个接口(分别是 JVM Tool Interface 和 JDI)、一个协议(Java Debug Wire Protocol)和两个用于合并它们的软件组件(后端和前端)组成。它的设计目的是让调试人员在任何环境中都可以进行调试。
更详细的介绍,可以参考使用 Eclipse 远程调试 Java 应用程序
JDWP 设置
JVM本身就支持远程调试,Eclipse也支持JDWP,只需要在各模块的JVM启动时加载以下参数:
dt_socket表示使用套接字传输。
address=8000
JVM在8000端口上监听请求,这个设定为一个不冲突的端口即可。
server=y
y表示启动的JVM是被调试者。如果为n,则表示启动的JVM是调试器。
suspend=y
y表示启动的JVM会暂停等待,直到调试器连接上才继续执行。suspend=n,则JVM不会暂停等待。
需要在$HADOOP_HOME/etc/hadoop/hadoop-env.sh文件的最后添加需要debug的进程
#远程调试namenode
export HADOOP_NAMENODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y"
#远程调试datanode
export HADOOP_DATANODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=9888,server=y,suspend=y"
#远程调试RM
export YARN_RESOURCEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"
#远程调试NM
export YARN_NODEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"
hadoop-daemon.sh start namenode
hadoop-daemon.sh start datanode
测试案例:测试NameNode和DataNode
编辑$HADOOP_HOME/etc/hadoop/hadoop-env.sh 添加如下配置
#远程调试namenode
export HADOOP_NAMENODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y"
#远程调试datanode
export HADOOP_DATANODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=9888,server=y,suspend=y"
单个启动:
然后关联源码:再使用command+o 查找Main方法

注意:Debug时如果不打断点 直接就运行了 是不会停住的
接下来断点调试DataNode
在main方法中打断点

以下步骤上同

然后apply , debug 就OK了

远程调试一般在分布式上用的最多。
例如在Eclipse调试时 普通调试只能调试客户端的代码,当通过RPC协议与云主机上的Server进行通讯时,即使导入全部的源码 也无法走到服务器端的程序,因为当与服务器交互时走的是服务器上的程序。只有通过远程调试才可进入Server端程序进行查看。
远程调试技巧:
客户端与服务端进行交互一般都是通过某种协议进行通讯,例如Hadoop是通过RPC通讯的。共同点是服务端和实现了协议接口,客户端通过协议接口代理服务端的程序,调用协议中的方法,进行通讯。
首先找到通讯的协议接口 查找协议接口的实现类(协议接口的实现是服务器端实现的)
在需要调试的部分查找客户端调用的是协议接口中的哪个方法,选中右键查看其方法的实现
然后打上断点调试即可。
Hadoop详解(三)——MapReduce原理和执行过程,远程Debug,Writable序列化接口,MapReduce程序编写相关推荐
- (29.1)【CSRF详解】CSRF原理、利用过程、分类、举例、工具……
目录 CSRF 一.简介: 二.CSRF与XSS的区别: XSS CSRF 三.原理: 四.基本流程: 五.危害: 六.分类: 站外攻击: 站内攻击: 七.举例: Get提交方法: Post提交方法: ...
- MapReduce 原理及执行过程
(1)MapReduce 是一种分布式计算框架,由 Google 提出,主要用于搜索领域,以解决海量数据的计算问题.(分布式存储和分布式计算发轫于 Google 这样的公司是不足为奇的) (2)Map ...
- Hadoop详解以及历史版本介绍
Hadoop详解 Hadoop的介绍以及发展历史 Hadoop之父Doug Cutting Hadoop最早起源于lucene下的Nutch.Nutch的设计目标是构建一个大型的全网搜索引擎,包括网页 ...
- 初级游戏外挂编程详解 windows运行原理+游戏辅助编程 游戏外挂编程
@TOC初级游戏外挂编程详解 windows运行原理+游戏辅助编程 游戏外挂编程 [1]什么是windows API Windows API 中文翻译过来就是windows应用程序接口(Applica ...
- P2P技术详解(三):P2P技术之STUN、TURN、ICE详解
本文是<P2P理论详解>系列文章中的第2篇,总目录如下: <P2P技术详解(一):NAT详解--详细原理.P2P简介> <P2P技术详解(二):P2P中的NAT穿越(打洞 ...
- ArrayList 扩容详解,扩容原理
ArrayList 扩容详解,扩容原理 ArrayList是基于数组实现的,是一个动态数组,其容量能自动增长. ArrayList不是线程安全的,只能用在单线程环境下. 实现了Serializable ...
- Hadoop详解(十二):Yarn资源调度策略
在Yarn中有三种调度器可以选择:FIFO Scheduler ,Capacity Scheduler,Fair Scheduler. FIFO Scheduler FIFO Scheduler把应用 ...
- Android init.rc文件解析过程详解(三)
Android init.rc文件解析过程详解(三) 三.相关结构体 1.listnode listnode结构体用于建立双向链表,这种结构广泛用于kernel代码中, android源代码中定义了l ...
- linux 进程间通信 dbus-glib【实例】详解三 数据类型和dteeth(类型签名type域)(层级结构:服务Service --> Node(对象、object) 等 )(附代码)
linux 进程间通信 dbus-glib[实例]详解一(附代码)(d-feet工具使用) linux 进程间通信 dbus-glib[实例]详解二(上) 消息和消息总线(附代码) linux 进程间 ...
最新文章
- 寒假每日一题(入门组)【week5 完结】
- python封装方法有几种_Python打包exe文件方法汇总【4种】
- python学习笔记(字典)
- aws 弹性三剑客_AWS和弹性:超越用户需求
- mongodb索引使用
- 学习python3(一)
- 快速傅里叶变换(蝶形算法c++源代码)
- Tiles的使用,遗漏和总结
- 【Grafana】通过阿里云日志服务监控Nginx访问日志显示统计信息
- 涉密计算机病毒库升级管理,涉密计算机管理制度.doc
- 听java技术讲座心得体会_听讲座心得体会范文3篇
- 时态的重建--适合理工直男的钟平老师逻辑英语学习笔记
- 2教务管理系统 / 选课管理web
- 常用python编程软件-现在编程软件有哪些?常用是哪一种?
- 在单片机C语言中const是什么意思
- C/C++超市收银系统
- 51单片机存储器原理
- 15.7数据库(7):MySQL创建校园数据库
- python之pygal学习
- 业精于勤毁于嬉,行成于思毁于随