转载自:http://zengzhaozheng.blog.51cto.com/8219051/1392961

1、在Reudce端进行连接。
在Reudce端进行连接是MapReduce框架进行表之间join操作最为常见的模式,其具体的实现原理如下:
Map端的主要工作:为来自不同表(文件)的key/value对打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。
reduce端的主要工作:在reduce端以连接字段作为key的分组已经完成,我们只需要在每一个分组当中将那些来源于不同文件的记录(在map阶段已经打标志)分开,最后进行笛卡尔只就ok了。原理非常简单,下面来看一个实例:
(1)自定义一个value返回类型:

package com.mr.reduceSizeJoin;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
public class CombineValues implements WritableComparable{   //private static final Logger logger = LoggerFactory.getLogger(CombineValues.class);   private Text joinKey;//链接关键字   private Text flag;//文件来源标志   private Text secondPart;//除了链接键外的其他部分   public void setJoinKey(Text joinKey) {   this.joinKey = joinKey;   }   public void setFlag(Text flag) {   this.flag = flag;   }   public void setSecondPart(Text secondPart) {   this.secondPart = secondPart;   }   public Text getFlag() {   return flag;   }   public Text getSecondPart() {   return secondPart;   }   public Text getJoinKey() {   return joinKey;   }   public CombineValues() {   this.joinKey =  new Text();   this.flag = new Text();   this.secondPart = new Text();   }@Override public void write(DataOutput out) throws IOException {   this.joinKey.write(out);   this.flag.write(out);   this.secondPart.write(out);   }   @Override public void readFields(DataInput in) throws IOException {   this.joinKey.readFields(in);   this.flag.readFields(in);   this.secondPart.readFields(in);   }   @Override public int compareTo(CombineValues o) {   return this.joinKey.compareTo(o.getJoinKey());   }   @Override public String toString() {   // TODO Auto-generated method stub   return "[flag="+this.flag.toString()+",joinKey="+this.joinKey.toString()+",secondPart="+this.secondPart.toString()+"]";   }
}

View Code

(2)map、reduce主体代码

package com.mr.reduceSizeJoin;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zengzhaozheng
* 用途说明:
* reudce side join中的left outer join
* 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
* tb_dim_city.dat文件内容,分隔符为"|":
* id     name  orderid  city_code  is_show
* 0       其他        9999     9999         0
* 1       长春        1        901          1
* 2       吉林        2        902          1
* 3       四平        3        903          1
* 4       松原        4        904          1
* 5       通化        5        905          1
* 6       辽源        6        906          1
* 7       白城        7        907          1
* 8       白山        8        908          1
* 9       延吉        9        909          1
* -------------------------风骚的分割线-------------------------------
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
* tb_user_profiles.dat文件内容,分隔符为"|":
* userID   network     flow    cityID
* 1           2G       123      1
* 2           3G       333      2
* 3           3G       555      1
* 4           2G       777      3
* 5           3G       666      4
*
* -------------------------风骚的分割线-------------------------------
*  结果:
*  1   长春  1   901 1   1   2G  123
*  1   长春  1   901 1   3   3G  555
*  2   吉林  2   902 1   2   3G  333
*  3   四平  3   903 1   4   2G  777
*  4   松原  4   904 1   5   3G  666
*/
public class ReduceSideJoin_LeftOuterJoin extends Configured implements Tool{   private static final Logger logger = LoggerFactory.getLogger(ReduceSideJoin_LeftOuterJoin.class);   public static class LeftOutJoinMapper extends Mapper {   private CombineValues combineValues = new CombineValues();   private Text flag = new Text();   private Text joinKey = new Text();   private Text secondPart = new Text();   @Override protected void map(Object key, Text value, Context context)   throws IOException, InterruptedException {   //获得文件输入路径   String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   //数据来自tb_dim_city.dat文件,标志即为"0"   if(pathName.endsWith("tb_dim_city.dat")){   String[] valueItems = value.toString().split("\\|");   //过滤格式错误的记录   if(valueItems.length != 5){   return;   }   flag.set("0");   joinKey.set(valueItems[0]);   secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   combineValues.setFlag(flag);   combineValues.setJoinKey(joinKey);   combineValues.setSecondPart(secondPart);   context.write(combineValues.getJoinKey(), combineValues);}//数据来自于tb_user_profiles.dat,标志即为"1"   else if(pathName.endsWith("tb_user_profiles.dat")){   String[] valueItems = value.toString().split("\\|");   //过滤格式错误的记录   if(valueItems.length != 4){   return;   }   flag.set("1");   joinKey.set(valueItems[3]);   secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   combineValues.setFlag(flag);   combineValues.setJoinKey(joinKey);   combineValues.setSecondPart(secondPart);   context.write(combineValues.getJoinKey(), combineValues);   }   }   }   public static class LeftOutJoinReducer extends Reducer {   //存储一个分组中的左表信息   private ArrayList leftTable = new ArrayList();   //存储一个分组中的右表信息   private ArrayList rightTable = new ArrayList();   private Text secondPar = null;   private Text output = new Text();   /**   * 一个分组调用一次reduce函数   */ @Override protected void reduce(Text key, Iterable value, Context context)   throws IOException, InterruptedException {   leftTable.clear();   rightTable.clear();   /**   * 将分组中的元素按照文件分别进行存放   * 这种方法要注意的问题:   * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,   * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最   * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。   */ for(CombineValues cv : value){   secondPar = new Text(cv.getSecondPart().toString());   //左表tb_dim_city   if("0".equals(cv.getFlag().toString().trim())){   leftTable.add(secondPar);   }   //右表tb_user_profiles   else if("1".equals(cv.getFlag().toString().trim())){   rightTable.add(secondPar);   }   }   logger.info("tb_dim_city:"+leftTable.toString());   logger.info("tb_user_profiles:"+rightTable.toString());   for(Text leftPart : leftTable){   for(Text rightPart : rightTable){   output.set(leftPart+ "\t" + rightPart);   context.write(key, output);   }   }   }   }   @Override public int run(String[] args) throws Exception {   Configuration conf=getConf(); //获得配置文件对象   Job job=new Job(conf,"LeftOutJoinMR");   job.setJarByClass(ReduceSideJoin_LeftOuterJoin.class);FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径job.setMapperClass(LeftOutJoinMapper.class);   job.setReducerClass(LeftOutJoinReducer.class);job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格格式//设置map的输出key和value类型   job.setMapOutputKeyClass(Text.class);   job.setMapOutputValueClass(CombineValues.class);//设置reduce的输出key和value类型   job.setOutputKeyClass(Text.class);   job.setOutputValueClass(Text.class);   job.waitForCompletion(true);   return job.isSuccessful()?0:1;   }   public static void main(String[] args) throws IOException,   ClassNotFoundException, InterruptedException {   try {   int returnCode =  ToolRunner.run(new ReduceSideJoin_LeftOuterJoin(),args);   System.exit(returnCode);   } catch (Exception e) {   // TODO Auto-generated catch block
            logger.error(e.getMessage());   }   }
}

View Code

其中具体的分析以及数据的输出输入请看代码中的注释已经写得比较清楚了,这里主要分析一下reduce join的一些不足。之所以会存在reduce join这种方式,我们可以很明显的看出原:因为整体数据被分割了,每个map task只处理一部分数据而不能够获取到所有需要的join字段,因此我们需要在讲join key作为reduce端的分组将所有join key相同的记录集中起来进行处理,所以reduce join这种方式就出现了。这种方式的缺点很明显就是会造成map和reduce端也就是shuffle阶段出现大量的数据传输,效率很低.

2、在Map端进行连接。
使用场景:一张表十分小、一张表很大。
用法:在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表进行join key / value解释分割放到内存中(可以放大Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
直接上代码,比较简单:

package com.mr.mapSideJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zengzhaozheng
*
* 用途说明:
* Map side join中的left outer join
* 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show),
* 假设tb_dim_city文件记录数很少,tb_dim_city.dat文件内容,分隔符为"|":
* id     name  orderid  city_code  is_show
* 0       其他        9999     9999         0
* 1       长春        1        901          1
* 2       吉林        2        902          1
* 3       四平        3        903          1
* 4       松原        4        904          1
* 5       通化        5        905          1
* 6       辽源        6        906          1
* 7       白城        7        907          1
* 8       白山        8        908          1
* 9       延吉        9        909          1
* -------------------------风骚的分割线-------------------------------
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
* tb_user_profiles.dat文件内容,分隔符为"|":
* userID   network     flow    cityID
* 1           2G       123      1
* 2           3G       333      2
* 3           3G       555      1
* 4           2G       777      3
* 5           3G       666      4
* -------------------------风骚的分割线-------------------------------
*  结果:
*  1   长春  1   901 1   1   2G  123
*  1   长春  1   901 1   3   3G  555
*  2   吉林  2   902 1   2   3G  333
*  3   四平  3   903 1   4   2G  777
*  4   松原  4   904 1   5   3G  666
*/
public class MapSideJoinMain extends Configured implements Tool{   private static final Logger logger = LoggerFactory.getLogger(MapSideJoinMain.class);   public static class LeftOutJoinMapper extends Mapper {private HashMap city_info = new HashMap();   private Text outPutKey = new Text();   private Text outPutValue = new Text();   private String mapInputStr = null;   private String mapInputSpit[] = null;   private String city_secondPart = null;   /**   * 此方法在每个task开始之前执行,这里主要用作从DistributedCache   * 中取到tb_dim_city文件,并将里边记录取出放到内存中。   */ @Override protected void setup(Context context)   throws IOException, InterruptedException {   BufferedReader br = null;   //获得当前作业的DistributedCache相关文件   Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   String cityInfo = null;   for(Path p : distributePaths){   if(p.toString().endsWith("tb_dim_city.dat")){   //读缓存文件,并放到mem中   br = new BufferedReader(new FileReader(p.toString()));   while(null!=(cityInfo=br.readLine())){   String[] cityPart = cityInfo.split("\\|",5);   if(cityPart.length ==5){   city_info.put(cityPart[0], cityPart[1]+"\t"+cityPart[2]+"\t"+cityPart[3]+"\t"+cityPart[4]);   }   }   }   }   }/**   * Map端的实现相当简单,直接判断tb_user_profiles.dat中的   * cityID是否存在我的map中就ok了,这样就可以实现Map Join了   */ @Override protected void map(Object key, Text value, Context context)   throws IOException, InterruptedException {   //排掉空行   if(value == null || value.toString().equals("")){   return;   }   mapInputStr = value.toString();   mapInputSpit = mapInputStr.split("\\|",4);   //过滤非法记录   if(mapInputSpit.length != 4){   return;   }   //判断链接字段是否在map中存在   city_secondPart = city_info.get(mapInputSpit[3]);   if(city_secondPart != null){   this.outPutKey.set(mapInputSpit[3]);   this.outPutValue.set(city_secondPart+"\t"+mapInputSpit[0]+"\t"+mapInputSpit[1]+"\t"+mapInputSpit[2]);   context.write(outPutKey, outPutValue);   }   }   }   @Override public int run(String[] args) throws Exception {   Configuration conf=getConf(); //获得配置文件对象   DistributedCache.addCacheFile(new Path(args[1]).toUri(), conf);//为该job添加缓存文件   Job job=new Job(conf,"MapJoinMR");   job.setNumReduceTasks(0);FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   FileOutputFormat.setOutputPath(job, new Path(args[2])); //设置reduce输出文件路径
job.setJarByClass(MapSideJoinMain.class);   job.setMapperClass(LeftOutJoinMapper.class);job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式//设置map的输出key和value类型   job.setMapOutputKeyClass(Text.class);//设置reduce的输出key和value类型   job.setOutputKeyClass(Text.class);   job.setOutputValueClass(Text.class);   job.waitForCompletion(true);   return job.isSuccessful()?0:1;   }   public static void main(String[] args) throws IOException,   ClassNotFoundException, InterruptedException {   try {   int returnCode =  ToolRunner.run(new MapSideJoinMain(),args);   System.exit(returnCode);   } catch (Exception e) {   // TODO Auto-generated catch block
            logger.error(e.getMessage());   }   }
}

View Code

这里说说DistributedCache。DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性,接下来有用再整理研究一下写一篇blog,这里就不详细说了。
另外还有一种比较变态的Map Join方式,就是结合HBase来做Map Join操作。这种方式完全可以突破内存的控制,使你毫无忌惮的使用Map Join,而且效率也非常不错。
3、SemiJoin。
SemiJoin就是所谓的半连接,其实仔细一看就是reduce join的一个变种,就是在map端过滤掉一些数据,在网络中只传输参与连接的数据不参与连接的数据不必在网络中进行传输,从而减少了shuffle的网络传输量,使整体效率得到提高,其他思想和reduce join是一模一样的。说得更加接地气一点就是将小表中参与join的key单独抽出来通过DistributedCach分发到相关节点,然后将其取出放到内存中(可以放到HashSet中),在map阶段扫描连接表,将join key不在内存HashSet中的记录过滤掉,让那些参与join的记录通过shuffle传输到reduce端进行join操作,其他的和reduce join都是一样的。看代码:

package com.mr.SemiJoin;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author zengzhaozheng
*
* 用途说明:
* reudce side join中的left outer join
* 左连接,两个文件分别代表2个表,连接字段table1的id字段和table2的cityID字段
* table1(左表):tb_dim_city(id int,name string,orderid int,city_code,is_show)
* tb_dim_city.dat文件内容,分隔符为"|":
* id     name  orderid  city_code  is_show
* 0       其他        9999     9999         0
* 1       长春        1        901          1
* 2       吉林        2        902          1
* 3       四平        3        903          1
* 4       松原        4        904          1
* 5       通化        5        905          1
* 6       辽源        6        906          1
* 7       白城        7        907          1
* 8       白山        8        908          1
* 9       延吉        9        909          1
* -------------------------风骚的分割线-------------------------------
* table2(右表):tb_user_profiles(userID int,userName string,network string,double flow,cityID int)
* tb_user_profiles.dat文件内容,分隔符为"|":
* userID   network     flow    cityID
* 1           2G       123      1
* 2           3G       333      2
* 3           3G       555      1
* 4           2G       777      3
* 5           3G       666      4
* -------------------------风骚的分割线-------------------------------
* joinKey.dat内容:
* city_code
* 1
* 2
* 3
* 4
* -------------------------风骚的分割线-------------------------------
*  结果:
*  1   长春  1   901 1   1   2G  123
*  1   长春  1   901 1   3   3G  555
*  2   吉林  2   902 1   2   3G  333
*  3   四平  3   903 1   4   2G  777
*  4   松原  4   904 1   5   3G  666
*/
public class SemiJoin extends Configured implements Tool{   private static final Logger logger = LoggerFactory.getLogger(SemiJoin.class);   public static class SemiJoinMapper extends Mapper {   private CombineValues combineValues = new CombineValues();   private HashSet joinKeySet = new HashSet();   private Text flag = new Text();   private Text joinKey = new Text();   private Text secondPart = new Text();   /**   * 将参加join的key从DistributedCache取出放到内存中,以便在map端将要参加join的key过滤出来。b   */ @Override protected void setup(Context context)   throws IOException, InterruptedException {   BufferedReader br = null;   //获得当前作业的DistributedCache相关文件   Path[] distributePaths = DistributedCache.getLocalCacheFiles(context.getConfiguration());   String joinKeyStr = null;   for(Path p : distributePaths){   if(p.toString().endsWith("joinKey.dat")){   //读缓存文件,并放到mem中   br = new BufferedReader(new FileReader(p.toString()));   while(null!=(joinKeyStr=br.readLine())){   joinKeySet.add(joinKeyStr);   }   }   }   }   @Override protected void map(Object key, Text value, Context context)   throws IOException, InterruptedException {   //获得文件输入路径   String pathName = ((FileSplit) context.getInputSplit()).getPath().toString();   //数据来自tb_dim_city.dat文件,标志即为"0"   if(pathName.endsWith("tb_dim_city.dat")){   String[] valueItems = value.toString().split("\\|");   //过滤格式错误的记录   if(valueItems.length != 5){   return;   }   //过滤掉不需要参加join的记录   if(joinKeySet.contains(valueItems[0])){   flag.set("0");   joinKey.set(valueItems[0]);   secondPart.set(valueItems[1]+"\t"+valueItems[2]+"\t"+valueItems[3]+"\t"+valueItems[4]);   combineValues.setFlag(flag);   combineValues.setJoinKey(joinKey);   combineValues.setSecondPart(secondPart);   context.write(combineValues.getJoinKey(), combineValues);   }else{   return ;   }   }//数据来自于tb_user_profiles.dat,标志即为"1"   else if(pathName.endsWith("tb_user_profiles.dat")){   String[] valueItems = value.toString().split("\\|");   //过滤格式错误的记录   if(valueItems.length != 4){   return;   }   //过滤掉不需要参加join的记录   if(joinKeySet.contains(valueItems[3])){   flag.set("1");   joinKey.set(valueItems[3]);   secondPart.set(valueItems[0]+"\t"+valueItems[1]+"\t"+valueItems[2]);   combineValues.setFlag(flag);   combineValues.setJoinKey(joinKey);   combineValues.setSecondPart(secondPart);   context.write(combineValues.getJoinKey(), combineValues);   }else{   return ;   }   }   }   }   public static class SemiJoinReducer extends Reducer {   //存储一个分组中的左表信息   private ArrayList leftTable = new ArrayList();   //存储一个分组中的右表信息   private ArrayList rightTable = new ArrayList();   private Text secondPar = null;   private Text output = new Text();   /**   * 一个分组调用一次reduce函数   */ @Override protected void reduce(Text key, Iterable value, Context context)   throws IOException, InterruptedException {   leftTable.clear();   rightTable.clear();   /**   * 将分组中的元素按照文件分别进行存放   * 这种方法要注意的问题:   * 如果一个分组内的元素太多的话,可能会导致在reduce阶段出现OOM,   * 在处理分布式问题之前最好先了解数据的分布情况,根据不同的分布采取最   * 适当的处理方法,这样可以有效的防止导致OOM和数据过度倾斜问题。   */ for(CombineValues cv : value){   secondPar = new Text(cv.getSecondPart().toString());   //左表tb_dim_city   if("0".equals(cv.getFlag().toString().trim())){   leftTable.add(secondPar);   }   //右表tb_user_profiles   else if("1".equals(cv.getFlag().toString().trim())){   rightTable.add(secondPar);   }   }   logger.info("tb_dim_city:"+leftTable.toString());   logger.info("tb_user_profiles:"+rightTable.toString());   for(Text leftPart : leftTable){   for(Text rightPart : rightTable){   output.set(leftPart+ "\t" + rightPart);   context.write(key, output);   }   }   }   }   @Override public int run(String[] args) throws Exception {   Configuration conf=getConf(); //获得配置文件对象   DistributedCache.addCacheFile(new Path(args[2]).toUri(), conf);Job job=new Job(conf,"LeftOutJoinMR");   job.setJarByClass(SemiJoin.class);FileInputFormat.addInputPath(job, new Path(args[0])); //设置map输入文件路径   FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce输出文件路径
job.setMapperClass(SemiJoinMapper.class);   job.setReducerClass(SemiJoinReducer.class);job.setInputFormatClass(TextInputFormat.class); //设置文件输入格式   job.setOutputFormatClass(TextOutputFormat.class);//使用默认的output格式//设置map的输出key和value类型   job.setMapOutputKeyClass(Text.class);   job.setMapOutputValueClass(CombineValues.class);//设置reduce的输出key和value类型   job.setOutputKeyClass(Text.class);   job.setOutputValueClass(Text.class);   job.waitForCompletion(true);   return job.isSuccessful()?0:1;   }   public static void main(String[] args) throws IOException,   ClassNotFoundException, InterruptedException {   try {   int returnCode =  ToolRunner.run(new SemiJoin(),args);   System.exit(returnCode);   } catch (Exception e) {   logger.error(e.getMessage());   }   }
}

View Code

这里还说说SemiJoin也是有一定的适用范围的,其抽取出来进行join的key是要放到内存中的,所以不能够太大,容易在Map端造成OOM。

总结
blog介绍了三种join方式。这三种join方式适用于不同的场景,其处理效率上的相差还是蛮大的,其中主要导致因素是网络传输。Map join效率最高,其次是SemiJoin,最低的是reduce join。另外,写分布式大数据处理程序的时最好要对整体要处理的数据分布情况作一个了解,这可以提高我们代码的效率,使数据的倾斜度降到最低,使我们的代码倾向性更好。

转载于:https://www.cnblogs.com/cssdongl/p/6018806.html

hadoop中MapReduce多种join实现实例分析相关推荐

  1. Hadoop中mapreduce作业日志是如何生成的

    摘要:本篇博客介绍了hadoop中mapreduce类型的作业日志是如何生成的.主要介绍日志生成的几个关键过程,不涉及过多细节性的内容. 本文分享自华为云社区<hadoop中mapreduce作 ...

  2. hadoop中MapReduce中压缩的使用及4种压缩格式的特征的比较

    在比较四中压缩方法之前,先来点干的,说一下在MapReduce的job中怎么使用压缩. MapReduce的压缩分为map端输出内容的压缩和reduce端输出的压缩,配置很简单,只要在作业的conf中 ...

  3. java 中数组与list_Java中List与数组相互转换实例分析

    这篇文章主要介绍了Java中List与数组相互转换的方法,实例分析了Java中List与数组相互转换中容易出现的问题与相关的解决方法,具有一定参考借鉴价值,需要的朋友可以参考下 本文实例分析了Java ...

  4. jsp action java_jsp中Action使用session方法实例分析

    本文实例分析了jsp中action使用session方法.分享给大家供大家参考.具体如下: 在struts2里,如果需要在action中使用session,可以通过下面两种方式得到 1.通过actio ...

  5. java中的递归算法_java递归算法实例分析

    递归算法设计的基本思想是: 对于一个复杂的问题,把原问题分解为若干个相对简单类同的子问题,继续下去直到子问题简单到能够直接求解,也就是说到了递推的出口,这样原问题就有递推得解. 在做递归算法的时候,一 ...

  6. python读取文本中的英文歌_Python 实例分析 - 获取MP3歌曲的Tag信息

    下面利用一个python的实例程序,来学习python.这个程序的目的就是分析出所有MP3文件的Tag信息并输出. 其中遇到一个问题,让我深切感受到python中依靠缩进来控制流程的缺点,不多说,看程 ...

  7. hadoop中mapreduce参数优化

    MapReduce重要配置参数 1.1 资源相关参数 //以下参数是在用户自己的mr应用程序中配置就可以生效 (1) mapreduce.map.memory.mb: 一个Map Task可使用的资源 ...

  8. 用Hadoop中MapReduce进行TopN排序

    数据格式: 10,3333,10,100 11,9321,1000,293 12,3881,701,20 13,6974,910,30 14,8888,11,39 订单ID 用户ID 资费 业务ID ...

  9. mapreduce将key相同的value结合在一起_个人理解Hadoop中MapReduce

    MapReduce 是什么? MapReduce是一种分布式离线计算框架 主要分为MapTask 和ReduceTask两部分 主要用于大规模的数据集(大于1T)的并行运算 首先我先讲解下MapRed ...

  10. 聊聊ajax,聊聊Ajax()中data()基本知识以及实例分析

    最近忙得不可开交啊,一个星期没出来透透气了,总算有点时间,来来来总结点东西:Ajax()中data参数类型.聊到data()方法,玩过ajax的童鞋肯定不会陌生了,data() 方法向被选元素附加数据 ...

最新文章

  1. 【深度学习】数形结合的图像处理(文末介绍了一种新型网络)
  2. lua学习笔记之数据文件及序列化
  3. python只能对列表进行切片_Python3:类型错误:列表索引必须是整数或切片,而不是s...
  4. ORACLE RAC+DG 硬件配置
  5. 汇编语言 masm常见报错原因
  6. 【锁相环系列1】锁相环的基本原理
  7. 态度决定你的人生高度(一个人能否成功,就看他的态度)
  8. 关于TensorFlow的MNIST数据集下载脚本input_data.py的坑
  9. java控制台五子棋游戏
  10. 鸿蒙系统的平板电脑,亓纪的想法 篇三百六十三:骁龙870+鸿蒙2.0!首款鸿蒙系统平板曝光,支持第二代M-Pencil...
  11. main:处理命令行选项
  12. 直线加速器,可以使用半圆转向串联,作弓形结构
  13. 免费PDF转换器注册码
  14. freemarker ftl java_FreeMarker学习1(Ftl)
  15. 百度云apkg手机文件怎么打开_下载不限速,这款网盘软件,轻松秒杀百度云!...
  16. 计算机用户文件夹加密,电脑怎么设置加密文件夹_给电脑文件夹设置密码的方法...
  17. 随着员工转为远程办公,Diligent在所有董事会管理平台中提供无缝视频会议接入,确保安全的虚拟董事会议
  18. java zip解压 中文_java解压ZIP 解决中文乱码 (GBK和UTF-8)
  19. ava.lang.IllegalStateException: Failed to introspect Class [xxxxxxxxImpl] from ClassLoader-Autowired
  20. 小猴吃桃matlab,小班美术优质课教案及教学反思《小猴吃桃》

热门文章

  1. ZOC for Mac(终端仿真器)
  2. 如何使用Airdrop将视频从Mac发送到iPhone?
  3. 如何在macOS中得到“另存为”快捷方式
  4. Rhinoceros技巧:有关曲线和曲面的分析
  5. 如何使用ExpressBurn Plus mac版刻录数据CD
  6. SQLPro Studio mac如何链接MYSQL?
  7. 怎样使用SQL Pro Studio管理所有数据库?
  8. Python之路第二天
  9. 2019年上海春运志愿服务启动 3000余名志愿者守护归乡路
  10. 复杂UI的组织-创建者模式-uitableview思想