目录

  • Map Join
    • 使用场景
    • 优点
    • 具体办法:采用 DistributedCache
  • Map Join 案例
    • 需求
    • 需求分析
  • 源码

Map Join

使用场景

Map Join 适用于一张表十分小、一张表很大的场景。

优点

思考:在 Reduce 端处理过多的表,非常容易产生数据倾斜。怎么办?
在 Map 端缓存多张表,提前处理业务逻辑,这样增加 Map 端业务,减少 Reduce 端数
据的压力,尽可能的减少数据倾斜。

具体办法:采用 DistributedCache

  1. 在 Mapper 的 setup 阶段,将文件读取到缓存集合中;
  2. 在 Driver 驱动类中加载缓存。

缓存普通文件到 Task 运行节点:

job.addCacheFile(new URI("file:///e:/cache/pd.txt"));

如果是集群运行,需要设置 HDFS 路径:

job.addCacheFile(new URI("hdfs://hadoop102:8020/cache/pd.txt"));

Map Join 案例

需求

:同Reduce Join案例;

需求分析

MapJoin 适用于关联表中有小表的情形。

Map端表合并案例分析(Distributedcache)

源码

MapJoinMapper类

package com.xiaobai.mapreduce.mapjoin;import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;public class MapJoinMapper extends Mapper<LongWritable, Text,Text, NullWritable> {private HashMap<String, String> pdMap = new HashMap<>();private Text outK = new Text();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//获取缓存的文件,并把文件内容封装到集合 pd.txtURI[] cacheFiles = context.getCacheFiles();FileSystem fs = FileSystem.get(context.getConfiguration());FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));//从流中读取数据BufferedReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));String line;while(StringUtils.isNotEmpty(line = reader.readLine())){//切割String[] fields = line.split("\t");//赋值pdMap.put(fields[0],fields[1]);}//关流IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//处理order.txtString line = value.toString();String[] fields = line.split("\t");//获取pidString pname = pdMap.get(fields[1]);//获取订单id和订单数量//封装outK.set(fields[0] + "\t" + pname + "\t" + fields[2]);context.write(outK,NullWritable.get());}
}

MapJoinDriver类

package com.xiaobai.mapreduce.mapjoin;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;public class MapJoinDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {//1. 获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);//2. 设置加载jar包路径job.setJarByClass(MapJoinDriver.class);//3. 关联mapperjob.setMapperClass(MapJoinMapper.class);//4.设置Map输出kv类型job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(NullWritable.class);//5.设置最终输出kv类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//加载缓存数据job.addCacheFile(new URI("/Users/jane/Desktop/test/JoinTest/pd.txt"));//Map端join的逻辑不需要reduce阶段,设置reduceTask数量为0job.setNumReduceTasks(0);//6. 设置输入输出路径FileInputFormat.setInputPaths(job,new Path("/Users/jane/Desktop/test/JoinTest"));FileOutputFormat.setOutputPath(job,new Path("/Users/jane/Desktop/hadoop/MapJoinTestOutput"));//7. 提交boolean b = job.waitForCompletion(true);System.exit(b ? 0 : 1);}
}

hadoop--Map Join相关推荐

  1. Map Join介绍及案例

    Map Join介绍及案例 Map Join介绍 1. 使用场景 2. 优点 3. 实现方法 Map Join案例 1. 需求 (1)需求说明 (2)文件 2.案例分析 (1)需求分析 (2)输入数据 ...

  2. Hadoop之Join、计数器、数据清洗概述

    Hadoop之Join.计数器.数据清洗概述 目录 Reduce join Map join 计数器应用 数据清洗(ETL) 1. Reduce join 原理 Map端的主要工作:为来自不同表(文件 ...

  3. MapReduce之Map join操作

    MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...

  4. MR实现reduce join和map join及hive的执行计划

    一.涵盖 MapReduce InputFormat RecordReader 切片:block=input split 1.1 File- Text- NLine- DB- Mapper setup ...

  5. 关于hive中Map join 时大表left join小表的问题

    在hive中,(启用Map join时) 大表left join小表,加载从右向左,所以小表会加载进内存,存储成map键值对,通过大表驱动小表,来进行join,即大表中的join字段作为key 来获取 ...

  6. Hadoop Map/Reduce教程

    Hadoop Map/Reduce教程 目的     先决条件     概述     输入与输出     例子:WordCount v1.0         源代码         用法        ...

  7. 一步一步跟我学习hadoop(5)----hadoop Map/Reduce教程(2)

    Map/Reduce用户界面 本节为用户採用框架要面对的各个环节提供了具体的描写叙述,旨在与帮助用户对实现.配置和调优进行具体的设置.然而,开发时候还是要相应着API进行相关操作. 首先我们须要了解M ...

  8. Hive的Map Join与Common Join

    笼统的说,Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join). 一.Hive Common Join 如果不指定MapJoin ...

  9. eclipse的plugins导入hadoop-eclipse-plugin-2.6.0.jar后Preference下没有hadoop Map/Reduce的解决方法

    参考文章:eclipse下的plugins导入hadoop-eclipse-plugin-2.7.1.jar,Preference下没有hadoop Map/Reduce的解决方法 这种现象一般是由于 ...

  10. Hive中的map join、left semi join和sort merge bucket join

    map join map join是将join双方比较小的表直接分发到各个 map进程的内存中,在map进程中进行join操作,这样就不用进行reduce步骤,从而提高了速度. 如果不指定mapjoi ...

最新文章

  1. Python编写Hive UDF
  2. linux sw状态,linux 下查看性能状态命令
  3. Makefile选项CFLAGS LDFLAGS LIBS
  4. finalize方法作用
  5. python面试题之用列表解析式选出1-100中的奇数
  6. java中fackeditor_ckeditor高级定制之发文模板
  7. windows服务器远程端口,查看和修改Windows服务器远程桌面的默认端口
  8. [rtsp]海康IPC监控摄像头远程外网监控配置(DDNS)
  9. 国土档案管理信息系统【辅助说明】
  10. VC安装产生eula.1028.txt等文件的问题
  11. INI文件解析、遍历
  12. windows和android双系统平板,平板电脑双系统和安卓单系统,哪个好用?
  13. 能ping通百度,但是上不了网的解决方法
  14. 火狐浏览器This address is restricted.端口问题
  15. 好用的fuzz字典以及fuzz字典生成工具
  16. python中的坐标表示方法_Python使用Matplotlib模块时坐标轴标题中文及各种特殊符号显示方法...
  17. oracle中业务组添加,oracle 11g增加业务profile
  18. 汽车java歌曲_车载音乐推荐 50首适合开车听的歌曲 2019车载歌曲 开车必备100首...
  19. 目标检测算法——YOLOV7——网络结构
  20. 天津推出社保金融IC卡

热门文章

  1. (数据库系统概论|王珊)第十章数据库恢复技术-第四、五、六、七节:数据库恢复技术和数据库镜像
  2. C++设计模式-Flyweight享元模式
  3. Python paho-mqtt消息队列
  4. 转载-----Java Longest Palindromic Substring(最长回文字符串)
  5. R6饮料AK赛(NOIP模拟赛)/省选专练HDU 5713 K个联通块
  6. Scrum 冲刺博客集合
  7. SQL server插入数据后,获取自增长字段的值
  8. PHP调用WebService接口
  9. Nhibernate3.3.3 GA使用初探
  10. FreeRTOS列表