一  前言

在很多时候,我们可能需要处理的不是一个单独的文件,而是几个有关联的文件,比如账户信息和订单信息=>

账户信息:customerIdname address telephone

订单信息:orderIdcustomerId price productName

我们很可能就需要用到这2个文件,并且他们的关系通过用户id进行关联或者join.

两个文件的关联点作为key,后面的字段作为value。

二 Map-Reduce join的三种方式

2.1 Reduce端Join: Join的操作在Reduce执行

2.2 Map端Join:

两个待连接的表,其中一个非常大,一个非常小,可以将小表直接存放于内存,DistributedCache实现。

2.3 半连接 SemiJoin: map端join和reduce端的join结合

三 Reduce端 Join

3.1场景:对于那种需要连接的文件大小差不多,且不需要过滤无效数据;这种情况效率比较低,特别是文件都很大的时候,还要经历shuffle过程。

3.2实现步骤:

#一般情况下,我们需要自定义key的类型,提供区别两种数据的一个标记

public class ReduceSideJoin extends Configured implements Tool {

public static class JoinWritable implements Writable {

private String tag;

private String data;

public JoinWritable() {

}

public JoinWritable(String tag, String data) {

this.tag = tag;

this.data = data;

}

public void write(DataOutput out) throws IOException {

out.writeUTF(getTag());

out.writeUTF(getData());

}

public void readFields(DataInput in) throws IOException {

this.setTag(in.readUTF());

this.setData(in.readUTF());

}

public void set(String tag, String data) {

this.setTag(tag);

this.setData(data);

}

@Override

public int hashCode() {

final int prime = 31;

int result = 1;

result = prime * result + ((data == null) ? 0 : data.hashCode());

result = prime * result + ((tag == null) ? 0 : tag.hashCode());

return result;

}

@Override

public boolean equals(Object obj) {

if (this == obj)

return true;

if (obj == null)

return false;

if (getClass() != obj.getClass())

return false;

JoinWritable other = (JoinWritable) obj;

if (data == null) {

if (other.data != null)

return false;

} else if (!data.equals(other.data))

return false;

if (tag == null) {

if (other.tag != null)

return false;

} else if (!tag.equals(other.tag))

return false;

return true;

}

@Override

public String toString() {

return tag+","+data;

}

public String getTag() {

return tag;

}

public void setTag(String tag) {

this.tag = tag;

}

public String getData() {

return data;

}

public void setData(String data) {

this.data = data;

}

}

public static class JoinMapper extends Mapper<LongWritable, Text, LongWritable,JoinWritable> {

private LongWritable outputKey = new LongWritable();

private JoinWritable outputValue = new JoinWritable();

@Override

protected void map(LongWritable key, Text value,

Mapper<LongWritable, Text,LongWritable, JoinWritable>.Context context)

throws IOException, InterruptedException {

String[] fields = value.toString().split(",");

if (ArrayUtils.isEmpty(fields)) {

return;

}

if (fields.length != 3 && fields.length != 4) {

return;

}

long cid = Long.valueOf(fields[0]);

outputKey.set(cid);

String name = fields[1];

if (fields.length == 3) {

String phone = fields[2];

outputValue.set("customer", name+","+phone);

}

if (fields.length == 4) {

String price = fields[2];

String date = fields[3];

outputValue.set("order", name+","+price+","+date);

}

context.write(outputKey, outputValue);

}

}

public static class JoinReduce extends Reducer<LongWritable, JoinWritable,NullWritable, Text>{

private Text outputValue = new Text();

@Override

protected void reduce(LongWritable key, Iterable<JoinWritable> values,

Reducer<LongWritable, JoinWritable,NullWritable, Text>.Context context)

throws IOException, InterruptedException {

String customerInfo = null;

List<String> orderList = new ArrayList<String>();

for (JoinWritable value : values) {

if ("customer".equals(value.getTag())) {

customerInfo = value.getData();

} else if ("order".equals(value.getTag())){

orderList.add(value.getData());

}

}

for (String order : orderList) {

outputValue.set(key.get()+","+customerInfo+","+order);

context.write(NullWritable.get(), outputValue);

}

}

}

public int run(String[] args) throws Exception {

if (ArrayUtils.isEmpty(args) || args.length < 2) {

System.exit(2);

}

Configuration conf = this.getConf();

Job job = Job.getInstance(conf, this.getClass().getSimpleName());

job.setJarByClass(ReduceSideJoin.class);

Path in = new Path(args[0]);

FileInputFormat.addInputPath(job, in);

Path out = new Path(args[1]);

FileOutputFormat.setOutputPath(job, out);

job.setMapperClass(JoinMapper.class);

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(JoinWritable.class);

job.setReducerClass(JoinReduce.class);

job.setOutputKeyClass(NullWritable.class);

job.setOutputValueClass(Text.class);

return job.waitForCompletion(Boolean.TRUE) ? 0 : 1;

}

public static void main(String[] args) throws Exception {

int num = new Random().nextInt(1000);

if (args == null || args.length == 0) {

args = new String[]{

"hdfs://hdfs-cluster/user/hadoop/input/join",

"hdfs://hdfs-cluster/user/hadoop/output/join"+num

};

}

Configuration conf = new Configuration();

int code = ToolRunner.run(conf, new ReduceSideJoin(), args);

System.exit(code);

}

}

四 Map端Join

4.1场景

如果需要连接的文件,一个比较大,一个比较小。这时候我们是比较适合在Map 端做join操作的。因为我们知道在Reduce端做join操作效率是比较低下的。

在Map阶段join会在数据达到map函数之前取出来,放在内存里的,以便task运行的时候可以去取,所以必须保证数据量不是很大,否则内存会撑不住。

我们要取文件需要通过DistributedCache来实现的,它会把我们上传的文件分发到各个节点一份,从而保证每个Mapper在工作的时候都能从本地缓存目录去取出这个文件。

4.2实现

#setup 阶段去读取文件,然后放入内存

#map阶段进行数据合并

#在提交job的时候,需要将这个文件路径放入分布式缓存

public class MapSideJoin extends Configured implements Tool{

public static class CustomerInfo {

private long cid;

private String name;

private String phone;

public CustomerInfo() {

}

public CustomerInfo(long cid, String name, String phone) {

this.cid = cid;

this.name = name;

this.phone = phone;

}

public long getCid() {

return cid;

}

public String getName() {

return name;

}

public String getPhone() {

return phone;

}

}

public static class MapSideJoinMapper extendsMapper<LongWritable, Text, LongWritable, Text> {

private Map<Long, CustomerInfo> customerInfos = new HashMap<Long, CustomerInfo>();

private LongWritable outputKey = new LongWritable();

private Text outputValue = new Text();

@Override

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable,Text>.Context context)

throws IOException, InterruptedException {

if (value == null) {

return;

}

String[] fields = value.toString().split(",");

if (ArrayUtils.isEmpty(fields) || fields.length < 4) {

return;

}

if (customerInfos.size() == 0) {

return;

}

try {

long cid = Long.valueOf(fields[0]);

CustomerInfo cInfo = customerInfos.get(cid);

if (cInfo == null) {

return;

}

StringBuilder builder = new StringBuilder();

builder.append(cid).append("=>").append(cInfo.getName()).append("\t").append(cInfo.getPhone())

.append("\t").append(fields[1]).append("\t").append(fields[2]).append("\t").append(fields[3]);

outputKey.set(cid);

outputValue.set(builder.toString());

context.write(outputKey, outputValue);

} catch (NumberFormatException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

@Override

protected void setup(Mapper<LongWritable, Text, LongWritable,Text>.Context context)

throws IOException, InterruptedException {

Configuration conf = context.getConfiguration();

// 获取小表中的缓存文件的URI

URI[] localCacheFiles = context.getCacheFiles();

if (ArrayUtils.isEmpty(localCacheFiles)) {

return;

}

// 通过URI构造HDFS 路径

Path path = new Path(localCacheFiles[0]);

// 获取文件系统

FileSystem fs = FileSystem.get(conf);

// 打开文件

FSDataInputStream in = fs.open(path);

if (in == null) {

return;

}

InputStreamReader isr = new InputStreamReader(in);

BufferedReader br = new BufferedReader(isr);

String line = null;

String[] fields = null;

CustomerInfo info = null;

while ((line = br.readLine()) != null) {

fields = line.split(",");

if (ArrayUtils.isEmpty(fields) || fields.length != 3) {

continue;

}

info = new CustomerInfo(Long.valueOf(fields[0]), fields[1], fields[2]);

customerInfos.put(Long.valueOf(fields[0]), info);

}

if (br != null) {

br.close();

}

if (isr != null) {

isr.close();

}

}

}

public static class MapSideJoinReducer extendsReducer<LongWritable, Text, NullWritable, Text> {

@Override

protected void reduce(LongWritable key, Iterable<Text> values,

Reducer<LongWritable, Text,NullWritable, Text>.Context context) throws IOException, InterruptedException {

for (Text value : values) {

context.write(NullWritable.get(), value);

}

}

}

public int run(String[] args) throws Exception {

if (ArrayUtils.isEmpty(args) || args.length < 3) {

System.exit(3);

}

Configuration conf = this.getConf();

Job job = Job.getInstance(conf, this.getClass().getSimpleName());

job.setJarByClass(MapSideJoin.class);

Path in = new Path(args[0]);

FileInputFormat.addInputPath(job, in);

Path out = new Path(args[1]);

FileOutputFormat.setOutputPath(job, out);

Path cacheURL = new Path(args[2]);

job.addCacheFile(cacheURL.toUri());

job.setMapperClass(MapSideJoinMapper.class);

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(Text.class);

job.setReducerClass(MapSideJoinReducer.class);

job.setOutputKeyClass(NullWritable.class);

job.setOutputValueClass(Text.class);

return job.waitForCompletion(Boolean.TRUE) ? 0 : 1;

}

public static void main(String[] args) throws Exception {

int num = new Random().nextInt(1000);

if (args == null || args.length == 0) {

args = new String[]{

"hdfs://hdfs-cluster/user/hadoop/input/join",

"hdfs://hdfs-cluster/user/hadoop/output/join"+num,

"hdfs://hdfs-cluster/user/hadoop/cache/customers.csv"

};

}

Configuration conf = new Configuration();

int code = ToolRunner.run(conf, new MapSideJoin(), args);

System.exit(code);

}

}

五 Semi-Join(半连接)

如果我们需要过滤很多无效数据,但是在Reduce阶段才来过滤,显然并不是一件很好的方法,因为我们让这些数据经历了shuffle过程,这会涉及到很多磁盘I/O 和网络I/O操作。

如果我们在Map阶段就把这些数据过滤了,那么这部分数据就不会经历shuffle阶段,从而有助于性能提升。

5.1实现

利用DistributedCache将小表分发到各个节点上,在Map过程的setup()函数里,读取缓存里的文件,只将小表的连接键存储在hashSet中。

在map()函数执行时,对每一条数据进行判断,如果这条数据的连接键为空或者在hashSet里不存在,那么则认为这条数据无效,使条数据也不参与reduce的过程。

需要注意的是小表的key是不是很大,否则内存不够用。

public class SemiJoin extends Configured implements Tool {

public static class JoinWritable implements Writable {

private String tag;

private String data;

public JoinWritable() {

}

public JoinWritable(String tag, String data) {

this.tag = tag;

this.data = data;

}

public void write(DataOutput out) throws IOException {

out.writeUTF(getTag());

out.writeUTF(getData());

}

public void readFields(DataInput in) throws IOException {

this.setTag(in.readUTF());

this.setData(in.readUTF());

}

public void set(String tag, String data) {

this.setTag(tag);

this.setData(data);

}

@Override

public int hashCode() {

final int prime = 31;

int result = 1;

result = prime * result + ((data == null) ? 0 : data.hashCode());

result = prime * result + ((tag == null) ? 0 : tag.hashCode());

return result;

}

@Override

public boolean equals(Object obj) {

if (this == obj)

return true;

if (obj == null)

return false;

if (getClass() != obj.getClass())

return false;

JoinWritable other = (JoinWritable) obj;

if (data == null) {

if (other.data != null)

return false;

} else if (!data.equals(other.data))

return false;

if (tag == null) {

if (other.tag != null)

return false;

} else if (!tag.equals(other.tag))

return false;

return true;

}

@Override

public String toString() {

return tag + "," + data;

}

public String getTag() {

return tag;

}

public void setTag(String tag) {

this.tag = tag;

}

public String getData() {

return data;

}

public void setData(String data) {

this.data = data;

}

}

public static class SemiJoinMapper extendsMapper<LongWritable, Text, LongWritable, JoinWritable> {

private Set<Long> keySet = new HashSet<Long>();

private LongWritable outputKey = new LongWritable();

private JoinWritable outputValue = new JoinWritable();

@Override

protected void map(LongWritable key, Text value,

Mapper<LongWritable, Text,LongWritable, JoinWritable>.Context context)

throws IOException, InterruptedException {

if (value == null) {

return;

}

String[] fields = value.toString().split(",");

if (ArrayUtils.isEmpty(fields) || fields.length < 3) {

return;

}

try {

long cid = Long.valueOf(fields[0]);

if (!keySet.contains(cid)) {

return;

}

outputKey.set(cid);

String name = fields[1];

if (fields.length == 3) {

String phone = fields[2];

outputValue.set("customer", name + "\t" + phone);

}

if (fields.length == 4) {

String price = fields[2];

String date = fields[3];

outputValue.set("order", name + "\t" + price + "\t" + date);

}

context.write(outputKey, outputValue);

} catch (NumberFormatException e) {

// TODO Auto-generated catch block

e.printStackTrace();

}

}

@Override

protected void setup(Mapper<LongWritable, Text, LongWritable,JoinWritable>.Context context)

throws IOException, InterruptedException {

Configuration conf = context.getConfiguration();

// 获取小表中的缓存文件的URI

URI[] localCacheFiles = context.getCacheFiles();

if (ArrayUtils.isEmpty(localCacheFiles)) {

return;

}

// 通过URI构造HDFS 路径

Path path = new Path(localCacheFiles[0]);

// 获取文件系统

FileSystem fs = FileSystem.get(conf);

// 打开文件

FSDataInputStream in = fs.open(path);

if (in == null) {

return;

}

InputStreamReaderisr = new InputStreamReader(in);

BufferedReader br = new BufferedReader(isr);

String line = null;

String[] fields = null;

while ((line = br.readLine()) != null) {

fields = line.split(",");

if (ArrayUtils.isEmpty(fields) || fields.length != 3) {

continue;

}

keySet.add(Long.valueOf(fields[0]));

}

if (br != null) {

br.close();

}

if (isr != null) {

isr.close();

}

}

}

public static class SemiJoinReducer extendsReducer<LongWritable, JoinWritable, NullWritable, Text> {

private Text outputValue = new Text();

@Override

protected void reduce(LongWritable key, Iterable<JoinWritable> values,

Reducer<LongWritable,JoinWritable, NullWritable, Text>.Context context)

throws IOException, InterruptedException {

List<String> orderList = new ArrayList<String>();

String customerInfo = null;

for (JoinWritable value : values) {

if ("customer".equals(value.getTag())) {

customerInfo = value.getData();

} else if ("order".equals(value.getTag())) {

orderList.add(value.getData());

}

}

for (String order : orderList) {

outputValue.set(key.get() + "\t" + customerInfo + "\t" + order);

context.write(NullWritable.get(), outputValue);

}

}

}

public int run(String[] args) throws Exception {

if (ArrayUtils.isEmpty(args) || args.length < 3) {

System.exit(3);

}

Configuration conf = this.getConf();

Job job = Job.getInstance(conf, this.getClass().getSimpleName());

job.setJarByClass(SemiJoin.class);

Path in = new Path(args[0]);

FileInputFormat.addInputPath(job, in);

Path out = new Path(args[1]);

FileOutputFormat.setOutputPath(job, out);

Path cacheURL = new Path(args[2]);

job.addCacheFile(cacheURL.toUri());

job.setMapperClass(SemiJoinMapper.class);

job.setMapOutputKeyClass(LongWritable.class);

job.setMapOutputValueClass(JoinWritable.class);

job.setReducerClass(SemiJoinReducer.class);

job.setOutputKeyClass(NullWritable.class);

job.setOutputValueClass(Text.class);

return job.waitForCompletion(Boolean.TRUE) ? 0 : 1;

}

public static void main(String[] args) throws Exception {

int num = new Random().nextInt(1000);

if (args == null || args.length == 0) {

args = new String[] { "hdfs://hdfs-cluster/user/hadoop/input/join",

"hdfs://hdfs-cluster/user/hadoop/output/join" + num,

"hdfs://hdfs-cluster/user/hadoop/cache/customers.csv" };

}

Configuration conf = new Configuration();

int code = ToolRunner.run(conf, new SemiJoin(), args);

System.exit(code);

}

}

MapReduce之join操作相关推荐

  1. MapReduce实现join操作

    前阵子把MapReduce实现join操作的算法设想清楚了,但一直没有在代码层面落地.今天终于费了些功夫把整个流程走了一遭,期间经历了诸多麻烦并最终得以将其一一搞定,再次深切体会到,什么叫从计算模型到 ...

  2. 使用MapReduce实现join操作

    2019独角兽企业重金招聘Python工程师标准>>> 在关系型数据库中,要实现join操作是非常方便的,通过sql定义的join原语就可以实现.在hdfs存储的海量数据中,要实现j ...

  3. [MapReduce_add_4] MapReduce 的 join 操作

    0. 说明 Map 端 join && Reduce 端 join 1. Map 端 join Map 端 join:大表+小表 => 将小表加入到内存,迭代大表每一行,与之进行 ...

  4. MapReduce之Map join操作

    MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...

  5. 5、HIVE DML操作、load数据、update、Delete、Merge、where语句、基于分区的查询、HAVING子句、LIMIT子句、Group By语法、Hive 的Join操作等

    目录: 4.2.1 Load文件数据到表中 4.2.2查询的数据插入到表中 4.2.3将Hive查询的结果存到本地Linux的文件系统目录中 4.2.4通过SQL语句的方式插入数据 4.2.5 UPD ...

  6. Hive是如何让MapReduce实现SQL操作的?

    learn from 从0开始学大数据(极客时间) 1. MapReduce 实现 SQL 的原理 SELECT pageid, age, count(1) FROM pv_users GROUP B ...

  7. Flink学习笔记:Operators之CoGroup及Join操作

    本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...

  8. shell中join链接多个域_shell 如何实现两个表的join操作

    shell 如何实现两个表的join操作 今天研究的一个问题是:在Shell 脚本中如何实现两个表的 join 操作,这里说的两个表示的其实是 两个文件,但是文件是列表的形式,有固定的分割符号,即就相 ...

  9. 离线轻量级大数据平台Spark之JavaRDD关联join操作

    对两个RDD进行关联操作,如: 1)文件post_data.txt包含:post_id\title\content 2)文件train.txt包含:dev_id\post_id\praise\time ...

最新文章

  1. 《每天学点博弈论全集》-读书总结
  2. 【前端3】jquary:页面加载,选择器,隔行换色,Dom,全选,动画,遍历,广告/抽奖,表单校验插件
  3. E - Escape from the Island(最短路+dp)
  4. apso matlab,APSO算法指导
  5. Myeclipse学习总结(12)——Eclipse/MyEclipse实用技巧再回顾
  6. XSS注入,js脚本注入后台
  7. 在浏览器上运行Qt应用 emscripten-qt
  8. 台式电脑备用电源_台式电源哪家稳?华硕 TUF GAMING六年质保,坚如磐石_电脑电源...
  9. Trucksim车辆动力学模型
  10. 九宫格游戏(java实现)
  11. 竞品分析文档撰写总结
  12. 亿网文交孟建州艺术品该怎么鉴别,代码分析
  13. 26个英语字母表及字母音标
  14. 关于单链表结构体定义结点时 LNode *LinkList的理解
  15. Atlas 200 DK登录密码 制卡安装DDK和lib
  16. unity更优秀的跳跃手感(简单物理小知识)
  17. ffmpeg 有声视频合成背景音乐(合成多声音/合成多音轨)
  18. 软件测试 | 测试开发 | Git实战(四)| Git分支管理实操,搞定在线合并和本地合并
  19. 题2:找出落单的那个数
  20. matlab傅里叶变换 绘图

热门文章

  1. textview 背景变形_今日推荐:温州洞头-背景墙框石塑线条
  2. 电脑显示器闪屏_时尚超薄可升降:华硕新品家用护眼显示器MZ27AQL
  3. Mysql orangepi_SSH远程登录香橙派Orange Pi Zero2开发板的操作方法
  4. 同步服务器客户端位置,服务器和客户端信息同步方式
  5. 3-31Pytorch与auto-variabletensor
  6. mysql 驱动说明_mysql_jdbc连接说明
  7. c#值get、 set用法(包含自动转换的说明)
  8. java获取教务系统成绩,Java httpClient 正方教务管理系统模拟登陆,爬取学生成绩和培养计划...
  9. 时序模型预测结果:DM检验含义与python实现
  10. matplotlib绘制三维折线图