1.解析Partition

Map的结果,会通过partition分发到Reducer上,Reducer做完Reduce操作后,通过OutputFormat,进行输出,下面我们就来分析参与这个过程的类。

Mapper的结果,可能送到Combiner做合并,Combiner在系统中并没有自己的基类,而是用Reducer作为Combiner的基类,他们对外的功能是一样的,只是使用的位置和使用时的上下文不太一样而已。Mapper最终处理的键值对<key, value>,是需要送到Reducer去合并的,合并的时候,有相同key的键/值对会送到同一个Reducer那。哪个key到哪个Reducer的分配过程,是由Partitioner规定的。它只有一个方法,

[java] view plaincopyprint?
  1. getPartition(Text key, Text value, int numPartitions)

输入是Map的结果对<key, value>和Reducer的数目,输出则是分配的Reducer(整数编号)。就是指定Mappr输出的键值对到哪一个reducer上去。系统缺省的Partitioner是HashPartitioner,它以key的Hash值对Reducer的数目取模,得到对应的Reducer。这样保证如果有相同的key值,肯定被分配到同一个reducre上。如果有N个reducer,编号就为0,1,2,3……(N-1)

Reducer是所有用户定制Reducer类的基类,和Mapper类似,它也有setup,reduce,cleanup和run方法,其中setup和cleanup含义和Mapper相同,reduce是真正合并Mapper结果的地方,它的输入是key和这个key对应的所有value的一个迭代器,同时还包括Reducer的上下文。系统中定义了两个非常简单的Reducer,IntSumReducer和LongSumReducer,分别用于对整形/长整型的value求和。

Reduce的结果,通过Reducer.Context的方法collect输出到文件中,和输入类似,Hadoop引入了OutputFormat。OutputFormat依赖两个辅助接口:RecordWriter和OutputCommitter,来处理输出。RecordWriter提供了write方法,用于输出<key, value>和close方法,用于关闭对应的输出。OutputCommitter提供了一系列方法,用户通过实现这些方法,可以定制OutputFormat生存期某些阶段需要的特殊操作。我们在TaskInputOutputContext中讨论过这些方法(明显,TaskInputOutputContext是OutputFormat和Reducer间的桥梁)。OutputFormat和RecordWriter分别对应着InputFormat和RecordReader,系统提供了空输出NullOutputFormat(什么结果都不输出,NullOutputFormat.RecordWriter只是示例,系统中没有定义),LazyOutputFormat(没在类图中出现,不分析),FilterOutputFormat(不分析)和基于文件FileOutputFormat的SequenceFileOutputFormat和TextOutputFormat输出。

基于文件的输出FileOutputFormat利用了一些配置项配合工作,包括:
mapred.output.compress:是否压缩;
mapred.output.compression.codec:压缩方法;
mapred.output.dir:输出路径;
mapred.work.output.dir:输出工作路径。
FileOutputFormat还依赖于FileOutputCommitter,通过FileOutputCommitter提供一些和Job,Task相关的临时文件管理功能。如FileOutputCommitter的setupJob,会在输出路径下创建一个名为_temporary的临时目录,cleanupJob则会删除这个目录。
SequenceFileOutputFormat输出和TextOutputFormat输出分别对应输入的SequenceFileInputFormat和TextInputFormat。

2.代码实例

[java] view plaincopyprint?
  1. package org.apache.hadoop.examples;
  2. import java.io.IOException;
  3. import java.util.*;
  4. import org.apache.hadoop.fs.Path;
  5. import org.apache.hadoop.conf.*;
  6. import org.apache.hadoop.io.*;
  7. import org.apache.hadoop.mapred.*;
  8. import org.apache.hadoop.util.*;
  9. /**
  10. * 输入文本,以tab间隔
  11. * kaka    1       28
  12. * hua     0       26
  13. * chao    1
  14. * tao     1       22
  15. * mao     0       29      22
  16. * */
  17. //Partitioner函数的使用
  18. public class MyPartitioner {
  19. // Map函数
  20. public static class MyMap extends MapReduceBase implements
  21. Mapper<LongWritable, Text, Text, Text> {
  22. public void map(LongWritable key, Text value,
  23. OutputCollector<Text, Text> output, Reporter reporter)
  24. throws IOException {
  25. String[] arr_value = value.toString().split("\t");
  26. //测试输出
  27. //          for(int i=0;i<arr_value.length;i++)
  28. //          {
  29. //              System.out.print(arr_value[i]+"\t");
  30. //          }
  31. //          System.out.print(arr_value.length);
  32. //          System.out.println();
  33. Text word1 = new Text();
  34. Text word2 = new Text();
  35. if (arr_value.length > 3) {
  36. word1.set("long");
  37. word2.set(value);
  38. } else if (arr_value.length < 3) {
  39. word1.set("short");
  40. word2.set(value);
  41. } else {
  42. word1.set("right");
  43. word2.set(value);
  44. }
  45. output.collect(word1, word2);
  46. }
  47. }
  48. public static class MyReduce extends MapReduceBase implements
  49. Reducer<Text, Text, Text, Text> {
  50. public void reduce(Text key, Iterator<Text> values,
  51. OutputCollector<Text, Text> output, Reporter reporter)
  52. throws IOException {
  53. int sum = 0;
  54. System.out.println(key);
  55. while (values.hasNext()) {
  56. output.collect(key, new Text(values.next().getBytes()));
  57. }
  58. }
  59. }
  60. // 接口Partitioner继承JobConfigurable,所以这里有两个override方法
  61. public static class MyPartitionerPar implements Partitioner<Text, Text> {
  62. /**
  63. * getPartition()方法的
  64. * 输入参数:键/值对<key,value>与reducer数量numPartitions
  65. * 输出参数:分配的Reducer编号,这里是result
  66. * */
  67. @Override
  68. public int getPartition(Text key, Text value, int numPartitions) {
  69. // TODO Auto-generated method stub
  70. int result = 0;
  71. System.out.println("numPartitions--" + numPartitions);
  72. if (key.toString().equals("long")) {
  73. result = 0 % numPartitions;
  74. } else if (key.toString().equals("short")) {
  75. result = 1 % numPartitions;
  76. } else if (key.toString().equals("right")) {
  77. result = 2 % numPartitions;
  78. }
  79. System.out.println("result--" + result);
  80. return result;
  81. }
  82. @Override
  83. public void configure(JobConf arg0)
  84. {
  85. // TODO Auto-generated method stub
  86. }
  87. }
  88. //输入参数:/home/hadoop/input/PartitionerExample /home/hadoop/output/Partitioner
  89. public static void main(String[] args) throws Exception {
  90. JobConf conf = new JobConf(MyPartitioner.class);
  91. conf.setJobName("MyPartitioner");
  92. //控制reducer数量,因为要分3个区,所以这里设定了3个reducer
  93. conf.setNumReduceTasks(3);
  94. conf.setMapOutputKeyClass(Text.class);
  95. conf.setMapOutputValueClass(Text.class);
  96. //设定分区类
  97. conf.setPartitionerClass(MyPartitionerPar.class);
  98. conf.setOutputKeyClass(Text.class);
  99. conf.setOutputValueClass(Text.class);
  100. //设定mapper和reducer类
  101. conf.setMapperClass(MyMap.class);
  102. conf.setReducerClass(MyReduce.class);
  103. conf.setInputFormat(TextInputFormat.class);
  104. conf.setOutputFormat(TextOutputFormat.class);
  105. FileInputFormat.setInputPaths(conf, new Path(args[0]));
  106. FileOutputFormat.setOutputPath(conf, new Path(args[1]));
  107. JobClient.runJob(conf);
  108. }
  109. }
本文转自xwdreamer博客园博客,原文链接:http://www.cnblogs.com/xwdreamer/archive/2011/10/27/2296943.html,如需转载请自行联系原作者

Hadoop中Partition解析相关推荐

  1. 7、大数据中常见的文件存储格式以及hadoop中支持的压缩算法

    Hadoop系列文章目录 1.hadoop3.1.4简单介绍及部署.简单验证 2.HDFS操作 - shell客户端 3.HDFS的使用(读写.上传.下载.遍历.查找文件.整个目录拷贝.只拷贝文件.列 ...

  2. Hadoop常见错误解析

    1:Shuffle Error: Exceeded MAX_FAILED_UNIQUE_FETCHES; bailing-out  Answer: 程序里面需要打开多个文件,进行分析,系统一般默认数量 ...

  3. 考究Hadoop中split的计算方法

    Hadoop中block块大小和split切片大小会影响到MapReduce程序在运行过程中的效率.map的个数.在本文中,以经典入门案例WordCount为例,通过debug的方式跟踪源代码,来分析 ...

  4. Hadoop中mapreduce作业日志是如何生成的

    摘要:本篇博客介绍了hadoop中mapreduce类型的作业日志是如何生成的.主要介绍日志生成的几个关键过程,不涉及过多细节性的内容. 本文分享自华为云社区<hadoop中mapreduce作 ...

  5. 浅析 Hadoop 中的数据倾斜

    最近几次被问到关于数据倾斜的问题,这里找了些资料也结合一些自己的理解. 在并行计算中我们总希望分配的每一个task 都能以差不多的粒度来切分并且完成时间相差不大,但是集群中可能硬件不同,应用的类型不同 ...

  6. kafka partition java,kafka中partition数量与消费者对应关系以及Java实践

    kafka中partition数量与消费者对应关系以及Java实践 kafka中partition数量与消费者对应关系以及Java实践 kafka是由Apache软件基金会开发的一个开源流处理平台.k ...

  7. Zookeeper 在Hadoop中的应用

    Zookeeper 简单介绍 Zookeeper 分布式服务框架是 Apache Hadoop 的一个子项目.它主要是用来解决分布式应用中常常遇到的一些数据管理问题,如:统一命名服务.状态同步服务.集 ...

  8. 数据蒋堂 | Hadoop中理论与工程的错位

    作者:蒋步星 来源:数据蒋堂 校对:林亦霖 本文共1400字,建议阅读6分钟. 本文分析了在Hadoop的设计和实现中的理论问题和工程问题. Hadoop是当前重要的大数据计算平台,它试图摒弃传统数据 ...

  9. hadoop中的序列化与Writable类

    本文地址:http://www.cnblogs.com/archimedes/p/hadoop-writable-class.html,转载请注明源地址. hadoop中自带的org.apache.h ...

最新文章

  1. 崛起于Springboot2.X之Mybatis-全注解方式操作Mysql(4)
  2. pandas比较两个dataframe特定数据列的数值是否相同并给出差值:使用np.where函数
  3. 英国最新报告:40% AI公司其实没用任何AI技术
  4. 欢迎参加“城市大脑与应急管理”专家研讨会
  5. php_yaf 安装
  6. tomcat 部署脚本
  7. php数组遍历相同的元素覆盖_php获取数组中重复数据的两种方法
  8. Docker及K8S使用碎碎记
  9. 计算机主机内有哪些部件常用的,智慧职教: 计算机系统由什么组成?计算机主机内有哪些部件?常用的计算机外设有哪些...
  10. 人脸识别资源推荐:20款人脸检测/识别的API、库和软件
  11. Linux下辅助DNS的搭建以及远程和加密更新
  12. 作为一个生鲜电商自媒体
  13. python时间模块 dir(time)_python sys,os,time模块的使用(包括时间格式的各种转换)...
  14. 创建maven工程时总是带有后缀名Maven Webapp解决办法
  15. DDA数值微分法详解
  16. Java线程状态与方法关系
  17. IPhone4S自定义铃声
  18. Android audio 三 AudioRecord 分析下
  19. 苹果手机支持鸿蒙,除了苹果,这四款华为也能三年不卡,还能升级鸿蒙
  20. Vue+DataV+Echarts组件创建炫酷科技大屏~(注释多多)

热门文章

  1. ElasticSearch 未授权访问记录(端口:9200)
  2. module ‘urllib‘ has no attribute ‘unquote‘(url解码)
  3. 虚拟机Ubuntu蓝屏闪屏解决方法
  4. C/C++:uint64_t 转为char*
  5. linux开机启动详细流程图
  6. 20155207实验2 Windows口令破解
  7. ASP.Net WebForm温故知新学习笔记:二、ViewState与UpdatePanel探秘
  8. 洛谷 P3183 [HAOI2016]食物链
  9. [转载]oracle常用经典SQL查询
  10. 重命名数据库解决“无法用排他锁锁定该数据库”