一、mapjoin

1.Mapper类

package com.css.mapjoin;import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;// 思路:商品表加载到内存中  然后数据在map端输出前  进行替换
public class CacheMapper extends Mapper<LongWritable, Text, Text, NullWritable>{HashMap<String, String> pdMap = new HashMap<>();// 1.商品表加载到内存@Overrideprotected void setup(Context context)throws IOException {// 加载缓存文件BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream("pd.txt"), "UTF-8"));String line;while (StringUtils.isNotEmpty(line = br.readLine())) {// 切分String[] fields = line.split("\t");// 缓存pdMap.put(fields[0], fields[1]);}br.close();}// 2.map传输@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {// 获取数据String line = value.toString();// 切割String[] fields = line.split("\t");// 获取订单中商品idString pid = fields[1];// 根据订单商品id获取商品名String pName = pdMap.get(pid);// 拼接数据line = line + "\t" + pName;// 输出context.write(new Text(line), NullWritable.get());}
}

2.Driver类

package com.css.mapjoin;import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class CacheDriver {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {// 1.获取job信息Configuration conf = new Configuration();Job job = Job.getInstance(conf);// 2.获取jar包job.setJarByClass(CacheDriver.class);// 3.获取自定义的mapper与reducer类job.setMapperClass(CacheMapper.class);// 4.设置reduce输出的数据类型(最终的数据类型)job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);// 5.设置输入存在的路径与处理后的结果路径FileInputFormat.setInputPaths(job, new Path("c:/table1029/in"));FileOutputFormat.setOutputPath(job, new Path("c:/table1029/out"));// 6.加载缓存商品数据job.addCacheFile(new URI("file:///c:/inputcache/pd.txt"));// 7.设置一下reducetask的数量job.setNumReduceTasks(0);// 8.提交任务boolean rs = job.waitForCompletion(true);System.out.println(rs ? 0 : 1);}
}

3.输入文件

(1)order.txt
201801    01    1
201802    02    2
201803    03    3
201804    01    4
201805    02    5
201806    03    6(2)pd.txt
01    苹果
02    华为
03    小米

4.输出文件part-m-00000

201801    01    1    苹果
201802    02    2    华为
201803    03    3    小米
201804    01    4    苹果
201805    02    5    华为
201806    03    6    小米

二、reducejoin

1.Mapper类

package com.css.reducejoin;import java.io.IOException;import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;public class TableMapper extends Mapper<LongWritable, Text, Text, TableBean>{@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {TableBean v = new TableBean();Text k = new Text();// 区分两张表FileSplit inputSplit = (FileSplit) context.getInputSplit();String name = inputSplit.getPath().getName();// 获取数据String line = value.toString();// 区分  此时是订单表if (name.contains("order.txt")) {// 切分字段String[] fields = line.split("\t");// 封装对象v.setOrder_id(fields[0]);v.setPid(fields[1]);v.setAmount(Integer.parseInt(fields[2]));v.setpName("");v.setFlag("0");// 设置k 商品id作为kk.set(fields[1]);}else { // 此时为商品表// 切分字段String[] fields = line.split("\t");// 封装对象v.setOrder_id("");v.setPid(fields[0]);v.setAmount(0);v.setpName(fields[1]);v.setFlag("1");// 设置k 商品id作为kk.set(fields[0]);}context.write(k, v);}
}

2.Reducer类

package com.css.reducejoin;import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;public class TableReducer extends Reducer<Text, TableBean, TableBean, NullWritable>{@Overrideprotected void reduce(Text key, Iterable<TableBean> values,Context context) throws IOException, InterruptedException {// 创建集合  存放订单数据ArrayList<TableBean> orderBean = new ArrayList<TableBean>();// 商品存储TableBean pdBean = new TableBean(); // 把pd商品中商品名  拷贝到orderBeanfor (TableBean v : values) {if ("0".equals(v.getFlag())) { // 订单表// 1.创建一个临时变量  拷贝数据TableBean tableBean = new TableBean();// 2.拷贝try {BeanUtils.copyProperties(tableBean, v);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}orderBean.add(tableBean);}else {try {BeanUtils.copyProperties(pdBean, v);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}}}// 拼接表for (TableBean tableBean : orderBean) {// 加入商品名
            tableBean.setpName(pdBean.getpName());context.write(tableBean, NullWritable.get());}}
}

3.封装类

package com.css.reducejoin;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;public class TableBean implements Writable{// 封装对应字段private String order_id; //订单idprivate String pid; // 产品idprivate int amount; // 产品数量private String pName; // 产品名称private String flag; // 判断是订单表还是商品表public TableBean() {super();}public String getOrder_id() {return order_id;}public void setOrder_id(String order_id) {this.order_id = order_id;}public String getPid() {return pid;}public void setPid(String pid) {this.pid = pid;}public int getAmount() {return amount;}public void setAmount(int amount) {this.amount = amount;}public String getpName() {return pName;}public void setpName(String pName) {this.pName = pName;}public String getFlag() {return flag;}public void setFlag(String flag) {this.flag = flag;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(order_id);out.writeUTF(pid);out.writeInt(amount);out.writeUTF(pName);out.writeUTF(flag);}@Overridepublic void readFields(DataInput in) throws IOException {order_id = in.readUTF();pid = in.readUTF();amount = in.readInt();pName = in.readUTF();flag = in.readUTF();}@Overridepublic String toString() {return order_id + "\t" + pName + "\t" + amount;}
}

4.Driver类

package com.css.reducejoin;import java.io.IOException;
import java.net.URISyntaxException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class TableDriver {public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Job.getInstance(conf);job.setJarByClass(TableDriver.class);job.setMapperClass(TableMapper.class);job.setReducerClass(TableReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(TableBean.class);job.setOutputKeyClass(TableBean.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path("c:/reduce1029/in"));FileOutputFormat.setOutputPath(job, new Path("c:/reduce1029/out"));boolean rs = job.waitForCompletion(true);System.out.println(rs ? 0 : 1);}
}

5.输入文件

(1)order.txt
201801    01    1
201802    02    2
201803    03    3
201804    01    4
201805    02    5
201806    03    6(2)pd.txt
01    苹果
02    华为
03    小米

6.输出文件part-r-00000

201804    苹果    4
201801    苹果    1
201805    华为    5
201802    华为    2
201806    小米    6
201803    小米    3

转载于:https://www.cnblogs.com/areyouready/p/9904984.html

mapjoin与reducejoin相关推荐

  1. 经典大数据面试题及解析

    经典大数据面试题及解析 1.下列哪个属性是hdfs-site.xml中的配置? A.dfs.replication B.fs.defaultFS C.mapreduce.framework.name ...

  2. 使用MapReduce实现join操作

    2019独角兽企业重金招聘Python工程师标准>>> 在关系型数据库中,要实现join操作是非常方便的,通过sql定义的join原语就可以实现.在hdfs存储的海量数据中,要实现j ...

  3. map端join和reduce端join的区别

    MapReduce Join MapJoin和ReduceJoin区别及优化 maptask处理后写到本地,如果再到reduce,又涉及到网络的拷贝. map端join最大优势,可以提前过滤不需要的数 ...

  4. Hadoop实例学习(十三) Join应用

    目录 Reduce Join 解决的问题 实例 编写TableBean 类 编写TableMapper类 编写TableDriver类 Map Join 案例 编写MapJoinMapper类 编写M ...

  5. 面试题--精选Hadoop选择题

    精选Hadoop选择题 1.下列哪个属性是hdfs-site.xml中的配置? A.dfs.replication  B.fs.defaultFS C.mapreduce.framework.name ...

  6. Spark 学习【二】

    Spark Core [05-07] 两个demo 很多复杂的业务拆分开都是变种的wc: 分组 ==> 变种WC ==> 数据补齐 将不同组的数据按规则合并在一起 split(" ...

  7. Hive3入门至精通(基础、部署、理论、SQL、函数、运算以及性能优化)15-28章

    Hive3入门至精通(基础.部署.理论.SQL.函数.运算以及性能优化)15-28章 [Hive3入门至精通(基础.部署.理论.SQL.函数.运算以及性能优化)1-14章](https://blog. ...

  8. MR案例:Reduce-Join

    问题描述:两种类型输入文件:address(地址)和company(公司)进行一对多的关联查询,得到地址名(例如:Beijing)与公司名(例如:Beijing JD.Beijing Red Star ...

  9. Hive旺旺讨论(关于mapjoin)

    2019独角兽企业重金招聘Python工程师标准>>> shaomn (2013-08-01 13:39:06): select/*+ mapjoin(a)*/ a.* from s ...

最新文章

  1. mysql client dev_ubuntu下mysql安装(server、client、dev),开启、停止和重启,及常见错误...
  2. ecshop 搜索热词推荐_拼多多搜索推广实操——如何选择正确的关键词实现高投产!...
  3. Windows服务器上怎样开放指定端口
  4. IDEA第一个mybatis程序 mybatis增删查改操作 mybatis的map模糊查询
  5. 实例变量和静态变量(或类变量static)
  6. 揭秘阿里云 RTS SDK 是如何实现直播降低延迟和卡顿
  7. SpringBoot集成Kafka集群并实现接收_发送消息操作_以及常见错误_亲测---Kafka工作笔记005
  8. 微信公众号关闭iOS端虚拟支付业务;苹果「Apple 登录」存安全漏洞;谷歌推迟发布Android 11 Beta| 极客头条...
  9. java二叉树的遍历,递归与非递归方法
  10. 科学研究设计一:什么是科学
  11. 淘宝店铺运营,店铺访客增加但是浏览量减少这是为什么,应该怎样解决?
  12. [深入理解Android卷二 全文-第四章]深入理解PackageManagerService
  13. icns文件怎么打开_Mac快速生成icns图标文件 | kTWO-个人博客
  14. 做了7年新媒体人,现在才懂的精细化运营增粉变现秘诀!黎想
  15. 二叉树的中序遍历 递归与非递归
  16. 安装和删除Ubuntu双系统
  17. 在Visio中实现任意两点之间的连线
  18. GridBagLayout和GridBagConstraints
  19. 2019校招内推拼多多面试总结
  20. 绘制圆形,方法一(Canvas,drawOval)

热门文章

  1. Django学习:用RGF创建一个简单的网页框架!(实用性感觉很强!!)
  2. 软件测试——黑盒测试基本方法
  3. Artstudio Pro Mac版 绘图与图片编辑软件
  4. misc 压缩包的那些操作(1)
  5. 离散数学之第30题赵钱孙李周派谁去学习Python
  6. 2021雅虎百度指数批量查询软件【多线程】
  7. python学得好牢饭_英雄联盟LPL比赛数据可视化详细教程,可视化的魅力,你值得拥有!!!...
  8. wx超强随机短视频程序源码视频打赏自带视频+支付接口+源码自适应/安装教程
  9. 游戏鼠标的dpi测试软件,普通鼠标怎么调dpi_怎么测鼠标dpi-win7之家
  10. linux里/是主分区,Linux主分区和扩展分区及逻辑分区的最大区别