对于不同文件里的数据,有时候有相应关系,须要进行连接(join),获得一个新的文件以便进行分析。比方有两个输入文件a.txt,b.txt,当中的数据格式分别例如以下

1 a
2 b
3 c
4 d
1 good
2 bad
3 ok
4 hello

须要将其连接成一个新的例如以下的文件:

a good
b bad
c ok
d hello

处理步骤能够分成两步:

1.map阶段,将两个输入文件里的数据进行打散,例如以下:

1 a
1 good
2 b
2 bad
3 c
3 ok
4 d
4 hello

2.reduce阶段,进行数据的连接操作,此处数据较简单,仅仅要推断map结果的value的长度是不是1就决定是新的键还是值。

package cn.zhf.hadoop;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class SingleJoin extends Configured implements Tool{public static void main(String[] args) throws Exception {Tool tool = new SingleJoin();ToolRunner.run(tool, args);print(tool);}@Overridepublic int run(String[] arg0) throws Exception {Configuration conf = getConf();Job job = new Job();job.setJarByClass(getClass());FileSystem fs = FileSystem.get(conf);fs.delete(new Path("out"),true);FileInputFormat.addInputPath(job, new Path("a.txt"));FileInputFormat.addInputPath(job, new Path("b.txt"));FileOutputFormat.setOutputPath(job,new Path("out"));job.setMapperClass(JoinMapper.class);job.setReducerClass(JoinReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.waitForCompletion(true);return 0;}public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{String[] str = value.toString().split(" ");context.write(new Text(str[0]), new Text(str[1]));}}public static class JoinReducer extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{Iterator<Text> iterator = values.iterator();Text keyy = new Text();Text valuee = new Text();while(iterator.hasNext()){Text temp = iterator.next();if(temp.toString().length() == 1){keyy.set(temp);valuee.set(iterator.next());}else{valuee.set(temp);keyy.set(iterator.next());}}context.write(keyy, valuee);}}public static void print(Tool tool) throws IOException{FileSystem fs = FileSystem.get(tool.getConf());Path path = new Path("out/part-r-00000");FSDataInputStream fsin = fs.open(path);int length = 0;byte[] buff = new byte[128];while((length = fsin.read(buff,0,128)) != -1)System.out.println(new String(buff,0,length));}
}

reference:《MapReduce2.0源代码分析及编程实践》

MapReduce数据连接相关推荐

  1. MapReduce算法–了解数据连接第二部分

    自从我上一次发布以来已经有一段时间了,就像我上一次大休息一样,我正在Coursera上一些课程. 这次是Scala中的函数式编程 原理和反应式编程原理 . 我发现它们都是不错的课程,如果有时间的话,建 ...

  2. 华为系统移动数据连接到服务器,怎样设置手机数据连接到服务器配置

    怎样设置手机数据连接到服务器配置 内容精选 换一换 GaussDB(for MySQL)全兼容MySQL协议,因此,连接GaussDB(for MySQL) 实例有普通连接和SSL连接.其中,SSL连 ...

  3. Hive数据连接与函数(2)

    1 数据连接 内连接:保留左表和右表连接成功的数据信息,连接未成功则不保留该数据 select * from hive_day03.orders as o join users u on o.user ...

  4. 数据连接linux网络编程之TCP/IP基础(四):TCP连接的建立和断开、滑动窗口

    在写这篇文章之前,xxx已经写过了几篇关于改数据连接主题的文章,想要了解的朋友可以去翻一下之前的文章 一.TCP段格式: TCP的段格式如下图所示 源端口号与目标端口号 源端口号和目标端口号,加上IP ...

  5. R语言data.table进行滚动数据连接,滚动连接通常用于分析涉及时间的数据(例如商业销售活动和对应的广告投放的安排之之间的关系)实战:实战和动画说明滚动数据连接的形式及方法

    R语言data.table进行滚动数据连接,滚动连接通常用于分析涉及时间的数据(例如商业销售活动和对应的广告投放的安排之之间的关系)实战:实战和动画说明滚动数据连接的形式及方法 目录

  6. R语言data.table进行滚动数据连接,滚动联接通常用于分析涉及时间的数据实战(动画说明滚动数据连接的形式):rolling joins data.table in R

    R语言data.table进行滚动数据连接,滚动联接通常用于分析涉及时间的数据实战(动画说明滚动数据连接的形式):rolling joins data.table in R 目录

  7. pyspark dataframe数据连接(join)、转化为pandas dataframe、基于多个字段删除冗余数据

    pyspark dataframe数据连接(join).转化为pandas dataframe.基于多个字段删除冗余数据 目录 pyspark dataframe数据连接(join).转化为panda ...

  8. Python使用pandas读取两个或者多个excel文件(xlsx)并进行数据连接(join)合并两个或者多个excel的信息

    Python使用pandas读取两个或者多个excel文件(xlsx)并进行数据连接(join)合并两个或者多个excel的信息 目录

  9. 10秒一部电影,全球首个5G数据连接完成

    作者:李赓 概要:2017年10月17日.也就是上周二,高通在香港高调宣布--其面向移动终端的5G调制解调器芯片组,骁龙X50 5G调制解调器芯片组完成了全球首个5G连接,同时实现了千兆级速率并在28 ...

最新文章

  1. 驰骋工作流引擎设计系列07 线性流程节点运动(发送)设计
  2. ppt转换成pdf转换器免费版
  3. sdcard不可执行.
  4. 取得select框的text
  5. 洛谷 P2324 [SCOI2005]骑士精神 解题报告
  6. P1463-[POI2002][HAOI2007]反素数【约数,数论】
  7. java 控制台五子棋_java控制台五子棋
  8. mysql_数据备份和迁移(Windows)
  9. linux怎么执行frida脚本,Frida使用和Hook代码整理
  10. 一个很好的电磁学科普视频里融入自我的意识流“翻译”
  11. psp模拟器完美字库_安卓PSP模拟器评测:讨鬼传
  12. rapidxml使用
  13. ie7/8卸载工具 降级到IE6
  14. 计算机专业色弱限制,体检标准变成建议 色盲色弱能报高校计算机专业
  15. 查询每门课程被选修的学生数
  16. 任何情况下请通过正规渠道变更信用卡额度
  17. 离开腾讯首创业,贾佳亚谈人工智能 2.0 革命,技术究竟该如何变革?
  18. centos7 xfs分区重调整
  19. 网络文件共享服务(三):NFS
  20. 如何在同一个局域网内实现简单的两台Linux主机互相连接

热门文章

  1. hdu1526 二分匹配+ floyd
  2. 【Android Gradle 插件】ProductFlavor 配置 ( ProductFlavor#manifestPlaceholders 清单文件占位符配置 )
  3. 【数字信号处理】相关系数 ( 相关系数概念解析 | 信号能量常数 | 共轭序列 | 序列在相同时刻的相关性 )
  4. 【Android 逆向】Android 系统文件分析 ( Android 系统 root 环境准备 | 查看 Android 根目录信息 )
  5. 【错误记录】Windows 系统 bat 脚本报错 ( 使用 pause 拦截窗口自动关闭 | 方便查看错误 )
  6. 【OkHttp】OkHttp 上传图片 ( 获取 SD 卡动态权限 | 跳转到相册界面选择图片 | 使用 OkHttp 上传图片文件 )
  7. 【Android 插件化】“ 插桩式 “ 插件化框架 ( 注入上下文的使用 )
  8. 【数据挖掘】神经网络 后向传播算法( 向后传播误差 | 输出层误差公式 | 隐藏层误差公式 | 单元连接权值更新公式 | 单元偏置更新公式 | 反向传播 | 损失函数 | 误差平方和 | 交叉熵 )
  9. linux -- ./configure --prefix 命令
  10. git push 操作代码回退