order_detail.txt

item_id    item_type
sp001    type001
sp002    type002
sp003    type002

iteminfo.txt

item_id    item_type
sp001    type001
sp002    type002
sp003    type002

代码部分:

package squencefile;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;
import java.util.ArrayList;
import java.util.List;//多表关联
public class ReducerJoin {public static class MyMapper extends Mapper<LongWritable,Text,Text,Text>{//map处理逻辑//1、判断是哪个表//2、针对不同的表输出不同的数据@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//判断是哪个表文件String fileName=((FileSplit)context.getInputSplit()).getPath().getName();//切分每行数据String line = value.toString();String[] lineArr = line.split("\t");if("order_detail.txt".equals(fileName)){//订单明细<item_id,"1:order_id:amout>"context.write(new Text(lineArr[1]),new Text("1:"+lineArr[0]+":"+lineArr[2]));}else if("iteminfo.txt".equals(fileName)){//商品表<item_id,"2:item_type">context.write(new Text(lineArr[0]),new Text("2:"+lineArr[1]));}}}public static class MyReducer extends Reducer<Text,Text,Text,Text>{//1、将相同商品id的订单信息明细和商品信息进行拆分,拆分后存到响应的订单明细表和商品明细表中//2、将订单明细列表和商品列表进行嵌套遍历@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//0、定义订单明细列表和商品信息列表List<String> orderDetailList=new ArrayList<>();List<String> itemInfoList=new ArrayList<>();for (Text tempVal:values){String tempValStr = tempVal.toString();String[] tempValArr=tempValStr.split(":");System.out.print(tempValStr);if ("1".equals((tempValArr[0]))){orderDetailList.add(tempValStr.substring(2));}else{itemInfoList.add(tempValArr[1]);}}for(String itemInfo:itemInfoList){for(String orderDetail:orderDetailList){context.write(key,new Text(itemInfo+":"+orderDetail));}}}}public static void main (String[] args ) throws IOException, ClassNotFoundException, InterruptedException {//创建一个job,也就是一个运行环境Configuration conf=new Configuration();//集群运行
//        conf.set("fs.defaultFS","hdfs://hadoop:8088");//本地运行Job job=Job.getInstance(conf,"reduce-join");//程序入口(打jar包)job.setJarByClass(ReducerJoin.class);//需要输入俩个文件:输入文件FileInputFormat.addInputPath(job,new Path("F:\\filnk_package\\hadoop-2.10.1\\data\\test5\\order_detail.txt"));FileInputFormat.addInputPath(job,new Path("F:\\filnk_package\\hadoop-2.10.1\\data\\test5\\iteminfo.txt"));//编写mapper处理逻辑job.setMapperClass(ReducerJoin.MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//shuffle流程//编写reduce处理逻辑job.setReducerClass(ReducerJoin.MyReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//输出文件FileOutputFormat.setOutputPath(job,new Path("F:\\filnk_package\\hadoop-2.10.1\\data\\test5\\out"));//运行job,需要放到Yarn上运行boolean result =job.waitForCompletion(true);System.out.print(result?1:0);}
}

数据倾斜如何处理:

注意:reduceJoin会产生数据倾斜,比如俩个task1和task2,task1处理的任务比task2处理的比较多,这样会导致性能很低,如何使得俩个task处理任务比较均衡。
方案:map输出的key添加随机数后缀,将生成的新的key分发到不同的reduce task上
sp001_8888
sp002_999
商品表中map输出需要扩容10000条,输出到各个reduce task上
reduce:将俩个表输出数据进行合并,将后缀删除

hadoop之MapReduce的案例(多表关联)相关推荐

  1. hadoop之mapreduce教程+案例学习(二)

    第3章 MapReduce框架原理 目录 第3章 MapReduce框架原理 3.1 InputFormat数据输入 3.1.1 切片与MapTask并行度决定机制 3.1.2 Job提交流程源码和切 ...

  2. hadoop之mapreduce教程+案例学习(一)

    第1章 MapReduce概述 目录 第1章 MapReduce概述 1.1 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发"基于Hadoop的数据分析 ...

  3. MapReduce编程(五) 单表关联

    一.问题描述 下面给出一个child-parent的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的表格. 输入文件内容如下: child parent Steven Lucy Steven Jack ...

  4. hadoop之MapReduce的案例(排序、最大值)

    <?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven ...

  5. Hadoop入门(十七)Mapreduce的多表关联程序

    多表关联和单表关联类似,它也是通过对原始数据进行一定的处理,从其中挖掘出关心的信息 1 实例描述 输入是两个文件,一个代表工厂表,包含工厂名列和地址编号列:另一个代表地址表,包含地址名列和地址编号列. ...

  6. Hadoop入门(十六)Mapreduce的单表关联程序

    "单表关联"要求从给出的数据中寻找所关心的数据,它是对原始数据所包含信息的挖掘 1 实例描述 给出child-parent(孩子--父母)表,要求输出grandchild-gran ...

  7. Hadoop集群 MapReduce初级案例

    1.数据去重  "数据去重"主要是为了掌握和利用并行化思想来对数据进行有意义的筛选.统计大数据集上的数据种类个数.从网站日志中计算访问地等这些看似庞杂的任务都会涉及数据去重.下面就 ...

  8. MapReduce实例----单表关联

    1.源数据: Tom Lucy Tom Jack Jone Lucy Jone Jack Lucy Mary Lucy Ben Jack Alice Jack Jesse Terry Alice Te ...

  9. Hadoop编写MapReduce之入门案例WordCount(自带+编写java)

    目录标题 Hadoop自带案例WordCount运行 MapReduce可以很好地应用于各种计算问题 网页查看 集群上jar包的位置 程序的介绍 自己编写WordCount的project(MapRe ...

最新文章

  1. torch-toolbox
  2. csdn无人驾驶汽车_无人驾驶汽车100年历史
  3. 7PYX 网站代码下载
  4. 字符串转换到double数组
  5. 山西计算机应用基本技能,计算机应用基本技能 山西省对口高考题库
  6. 花一天时间踩了node npm的一个坑
  7. C++设计模式-Mediator中介者模式
  8. 《网站建设与网页设计从入门到精通Dreamweaver+Flash+Photoshop+HTML+CSS+JavaScript》——3.3 添加文本元素...
  9. Ubuntu 系统安装 MATLAB 2016b
  10. 个税服务器系统繁忙,2021个人所得税系统异常进不去怎么办?系统繁忙请稍后再试怎么回事...
  11. JAVA——电子商城三级分类目录查询-递归树形数据结构
  12. 循环时尚是消费者与电商平台的一场“双向奔赴”?
  13. idea 关于自动导包的设置
  14. No fallbackFactory instance of type class com.chongyou.system.api.factory.RemoteUserFallbackFactory
  15. ES High Level Rest Client 超时问题排查及解决
  16. 男人至少的道德底线(男女都该看)
  17. 版本管理工具Subversion Edge的备份恢复与数据迁移方案
  18. 根据采样频率计算音频时长
  19. php注册验证用户名已存在,php ajax注册验证用户名是否存在代码_PHP教程
  20. mount ntfs分区和配置xmms手记(转)

热门文章

  1. Python3基础3——List列表的增删改和内建函数的用法
  2. Go语言大神亲述:历七劫方可成为程序员!
  3. phpmemcache
  4. nodejs对文件进行分页
  5. C/C++程序设计注意事项 (二)
  6. 什么样的外链才是高质量的外链|网站优化
  7. 利用Session实现一次性验证码(多学一招)
  8. python在编程序网站_Python的用户登录接口编制以及实现流程图
  9. c++读取excel_Python读取并提取xlsx数据+去趋势和高通滤波与低通滤波
  10. python的内存机制_python中的内存机制