MapReduce数据连接
对于不同文件里的数据,有时候有相应关系,须要进行连接(join),获得一个新的文件以便进行分析。比方有两个输入文件a.txt,b.txt,当中的数据格式分别例如以下
1 a
2 b
3 c
4 d
1 good
2 bad
3 ok
4 hello
须要将其连接成一个新的例如以下的文件:
a good
b bad
c ok
d hello
处理步骤能够分成两步:
1.map阶段,将两个输入文件里的数据进行打散,例如以下:
1 a
1 good
2 b
2 bad
3 c
3 ok
4 d
4 hello
2.reduce阶段,进行数据的连接操作,此处数据较简单,仅仅要推断map结果的value的长度是不是1就决定是新的键还是值。
package cn.zhf.hadoop;import java.io.IOException;
import java.util.Iterator;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;public class SingleJoin extends Configured implements Tool{public static void main(String[] args) throws Exception {Tool tool = new SingleJoin();ToolRunner.run(tool, args);print(tool);}@Overridepublic int run(String[] arg0) throws Exception {Configuration conf = getConf();Job job = new Job();job.setJarByClass(getClass());FileSystem fs = FileSystem.get(conf);fs.delete(new Path("out"),true);FileInputFormat.addInputPath(job, new Path("a.txt"));FileInputFormat.addInputPath(job, new Path("b.txt"));FileOutputFormat.setOutputPath(job,new Path("out"));job.setMapperClass(JoinMapper.class);job.setReducerClass(JoinReducer.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);job.waitForCompletion(true);return 0;}public static class JoinMapper extends Mapper<LongWritable,Text,Text,Text>{public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException{String[] str = value.toString().split(" ");context.write(new Text(str[0]), new Text(str[1]));}}public static class JoinReducer extends Reducer<Text,Text,Text,Text>{public void reduce(Text key,Iterable<Text> values,Context context) throws IOException, InterruptedException{Iterator<Text> iterator = values.iterator();Text keyy = new Text();Text valuee = new Text();while(iterator.hasNext()){Text temp = iterator.next();if(temp.toString().length() == 1){keyy.set(temp);valuee.set(iterator.next());}else{valuee.set(temp);keyy.set(iterator.next());}}context.write(keyy, valuee);}}public static void print(Tool tool) throws IOException{FileSystem fs = FileSystem.get(tool.getConf());Path path = new Path("out/part-r-00000");FSDataInputStream fsin = fs.open(path);int length = 0;byte[] buff = new byte[128];while((length = fsin.read(buff,0,128)) != -1)System.out.println(new String(buff,0,length));}
}
reference:《MapReduce2.0源代码分析及编程实践》
MapReduce数据连接相关推荐
- MapReduce算法–了解数据连接第二部分
自从我上一次发布以来已经有一段时间了,就像我上一次大休息一样,我正在Coursera上一些课程. 这次是Scala中的函数式编程 原理和反应式编程原理 . 我发现它们都是不错的课程,如果有时间的话,建 ...
- 华为系统移动数据连接到服务器,怎样设置手机数据连接到服务器配置
怎样设置手机数据连接到服务器配置 内容精选 换一换 GaussDB(for MySQL)全兼容MySQL协议,因此,连接GaussDB(for MySQL) 实例有普通连接和SSL连接.其中,SSL连 ...
- Hive数据连接与函数(2)
1 数据连接 内连接:保留左表和右表连接成功的数据信息,连接未成功则不保留该数据 select * from hive_day03.orders as o join users u on o.user ...
- 数据连接linux网络编程之TCP/IP基础(四):TCP连接的建立和断开、滑动窗口
在写这篇文章之前,xxx已经写过了几篇关于改数据连接主题的文章,想要了解的朋友可以去翻一下之前的文章 一.TCP段格式: TCP的段格式如下图所示 源端口号与目标端口号 源端口号和目标端口号,加上IP ...
- R语言data.table进行滚动数据连接,滚动连接通常用于分析涉及时间的数据(例如商业销售活动和对应的广告投放的安排之之间的关系)实战:实战和动画说明滚动数据连接的形式及方法
R语言data.table进行滚动数据连接,滚动连接通常用于分析涉及时间的数据(例如商业销售活动和对应的广告投放的安排之之间的关系)实战:实战和动画说明滚动数据连接的形式及方法 目录
- R语言data.table进行滚动数据连接,滚动联接通常用于分析涉及时间的数据实战(动画说明滚动数据连接的形式):rolling joins data.table in R
R语言data.table进行滚动数据连接,滚动联接通常用于分析涉及时间的数据实战(动画说明滚动数据连接的形式):rolling joins data.table in R 目录
- pyspark dataframe数据连接(join)、转化为pandas dataframe、基于多个字段删除冗余数据
pyspark dataframe数据连接(join).转化为pandas dataframe.基于多个字段删除冗余数据 目录 pyspark dataframe数据连接(join).转化为panda ...
- Python使用pandas读取两个或者多个excel文件(xlsx)并进行数据连接(join)合并两个或者多个excel的信息
Python使用pandas读取两个或者多个excel文件(xlsx)并进行数据连接(join)合并两个或者多个excel的信息 目录
- 10秒一部电影,全球首个5G数据连接完成
作者:李赓 概要:2017年10月17日.也就是上周二,高通在香港高调宣布--其面向移动终端的5G调制解调器芯片组,骁龙X50 5G调制解调器芯片组完成了全球首个5G连接,同时实现了千兆级速率并在28 ...
最新文章
- 驰骋工作流引擎设计系列07 线性流程节点运动(发送)设计
- ppt转换成pdf转换器免费版
- sdcard不可执行.
- 取得select框的text
- 洛谷 P2324 [SCOI2005]骑士精神 解题报告
- P1463-[POI2002][HAOI2007]反素数【约数,数论】
- java 控制台五子棋_java控制台五子棋
- mysql_数据备份和迁移(Windows)
- linux怎么执行frida脚本,Frida使用和Hook代码整理
- 一个很好的电磁学科普视频里融入自我的意识流“翻译”
- psp模拟器完美字库_安卓PSP模拟器评测:讨鬼传
- rapidxml使用
- ie7/8卸载工具 降级到IE6
- 计算机专业色弱限制,体检标准变成建议 色盲色弱能报高校计算机专业
- 查询每门课程被选修的学生数
- 任何情况下请通过正规渠道变更信用卡额度
- 离开腾讯首创业,贾佳亚谈人工智能 2.0 革命,技术究竟该如何变革?
- centos7 xfs分区重调整
- 网络文件共享服务(三):NFS
- 如何在同一个局域网内实现简单的两台Linux主机互相连接
热门文章
- hdu1526 二分匹配+ floyd
- 【Android Gradle 插件】ProductFlavor 配置 ( ProductFlavor#manifestPlaceholders 清单文件占位符配置 )
- 【数字信号处理】相关系数 ( 相关系数概念解析 | 信号能量常数 | 共轭序列 | 序列在相同时刻的相关性 )
- 【Android 逆向】Android 系统文件分析 ( Android 系统 root 环境准备 | 查看 Android 根目录信息 )
- 【错误记录】Windows 系统 bat 脚本报错 ( 使用 pause 拦截窗口自动关闭 | 方便查看错误 )
- 【OkHttp】OkHttp 上传图片 ( 获取 SD 卡动态权限 | 跳转到相册界面选择图片 | 使用 OkHttp 上传图片文件 )
- 【Android 插件化】“ 插桩式 “ 插件化框架 ( 注入上下文的使用 )
- 【数据挖掘】神经网络 后向传播算法( 向后传播误差 | 输出层误差公式 | 隐藏层误差公式 | 单元连接权值更新公式 | 单元偏置更新公式 | 反向传播 | 损失函数 | 误差平方和 | 交叉熵 )
- linux -- ./configure --prefix 命令
- git push 操作代码回退