MapReduce实现join操作
数据准备
- create table if not exists m_ys_lab_jointest_a (
- id bigint,
- name string
- )
- row format delimited
- fields terminated by '9'
- lines terminated by '10'
- stored as textfile;
id name 1 北京 2 天津 3 河北 4 山西 5 内蒙古 6 辽宁 7 吉林 8 黑龙江 |
- create table if not exists m_ys_lab_jointest_b (
- id bigint,
- statyear bigint,
- num bigint
- )
- row format delimited
- fields terminated by '9'
- lines terminated by '10'
- stored as textfile;
数据:
id statyear num 1 2010 1962 1 2011 2019 2 2010 1299 2 2011 1355 4 2010 3574 4 2011 3593 9 2010 2303 9 2011 2347 |
我们的目的是,以id为key做join操作,得到以下表:
id name statyear num 1 北京 2011 2019 1 北京 2010 1962 2 天津 2011 1355 2 天津 2010 1299 4 山西 2011 3593 4 山西 2010 3574 |
计算模型
代码
- import java.io.IOException;
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Vector;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.RecordWriter;
- import org.apache.hadoop.mapred.Reducer;
- import org.apache.hadoop.mapred.Reporter;
- /**
- * MapReduce实现Join操作
- */
- public class MapRedJoin {
- public static final String DELIMITER = "\u0009"; // 字段分隔符
- // map过程
- public static class MapClass extends MapReduceBase implements
- Mapper<LongWritable, Text, Text, Text> {
- public void configure(JobConf job) {
- super.configure(job);
- }
- public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
- Reporter reporter) throws IOException, ClassCastException {
- // 获取输入文件的全路径和名称
- String filePath = ((FileSplit)reporter.getInputSplit()).getPath().toString();
- // 获取记录字符串
- String line = value.toString();
- // 抛弃空记录
- if (line == null || line.equals("")) return;
- // 处理来自表A的记录
- if (filePath.contains("m_ys_lab_jointest_a")) {
- String[] values = line.split(DELIMITER); // 按分隔符分割出字段
- if (values.length < 2) return;
- String id = values[0]; // id
- String name = values[1]; // name
- output.collect(new Text(id), new Text("a#"+name));
- }
- // 处理来自表B的记录
- else if (filePath.contains("m_ys_lab_jointest_b")) {
- String[] values = line.split(DELIMITER); // 按分隔符分割出字段
- if (values.length < 3) return;
- String id = values[0]; // id
- String statyear = values[1]; // statyear
- String num = values[2]; //num
- output.collect(new Text(id), new Text("b#"+statyear+DELIMITER+num));
- }
- }
- }
- // reduce过程
- public static class Reduce extends MapReduceBase
- implements Reducer<Text, Text, Text, Text> {
- public void reduce(Text key, Iterator<Text> values,
- OutputCollector<Text, Text> output, Reporter reporter)
- throws IOException {
- Vector<String> vecA = new Vector<String>(); // 存放来自表A的值
- Vector<String> vecB = new Vector<String>(); // 存放来自表B的值
- while (values.hasNext()) {
- String value = values.next().toString();
- if (value.startsWith("a#")) {
- vecA.add(value.substring(2));
- } else if (value.startsWith("b#")) {
- vecB.add(value.substring(2));
- }
- }
- int sizeA = vecA.size();
- int sizeB = vecB.size();
- // 遍历两个向量
- int i, j;
- for (i = 0; i < sizeA; i ++) {
- for (j = 0; j < sizeB; j ++) {
- output.collect(key, new Text(vecA.get(i) + DELIMITER +vecB.get(j)));
- }
- }
- }
- }
- protected void configJob(JobConf conf) {
- conf.setMapOutputKeyClass(Text.class);
- conf.setMapOutputValueClass(Text.class);
- conf.setOutputKeyClass(Text.class);
- conf.setOutputValueClass(Text.class);
- conf.setOutputFormat(ReportOutFormat.class);
- }
- }
技术细节
MapReduce实现join操作相关推荐
- 使用MapReduce实现join操作
2019独角兽企业重金招聘Python工程师标准>>> 在关系型数据库中,要实现join操作是非常方便的,通过sql定义的join原语就可以实现.在hdfs存储的海量数据中,要实现j ...
- MapReduce之join操作
一 前言 在很多时候,我们可能需要处理的不是一个单独的文件,而是几个有关联的文件,比如账户信息和订单信息=> 账户信息:customerIdname address telephone 订单信 ...
- [MapReduce_add_4] MapReduce 的 join 操作
0. 说明 Map 端 join && Reduce 端 join 1. Map 端 join Map 端 join:大表+小表 => 将小表加入到内存,迭代大表每一行,与之进行 ...
- MapReduce之Map join操作
MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...
- 5、HIVE DML操作、load数据、update、Delete、Merge、where语句、基于分区的查询、HAVING子句、LIMIT子句、Group By语法、Hive 的Join操作等
目录: 4.2.1 Load文件数据到表中 4.2.2查询的数据插入到表中 4.2.3将Hive查询的结果存到本地Linux的文件系统目录中 4.2.4通过SQL语句的方式插入数据 4.2.5 UPD ...
- Hive是如何让MapReduce实现SQL操作的?
learn from 从0开始学大数据(极客时间) 1. MapReduce 实现 SQL 的原理 SELECT pageid, age, count(1) FROM pv_users GROUP B ...
- Flink学习笔记:Operators之CoGroup及Join操作
本文为<Flink大数据项目实战>学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz ...
- shell中join链接多个域_shell 如何实现两个表的join操作
shell 如何实现两个表的join操作 今天研究的一个问题是:在Shell 脚本中如何实现两个表的 join 操作,这里说的两个表示的其实是 两个文件,但是文件是列表的形式,有固定的分割符号,即就相 ...
- 离线轻量级大数据平台Spark之JavaRDD关联join操作
对两个RDD进行关联操作,如: 1)文件post_data.txt包含:post_id\title\content 2)文件train.txt包含:dev_id\post_id\praise\time ...
最新文章
- 高效管理 GitHub Star,用这几个插件就能做到!
- 在Eclipse中运行hadoop程序
- linux redis安装报错,Linux安装Redis实现过程及报错解决方案
- 验证gpu版pytorch是否可用
- C#中实现空间的登录加密密码脚本里的方法
- Linux下Mail命令收集
- django-xadmin出现Models aren't loaded yet错误
- 设计模式是什么鬼(原型)
- LINUX如何让内存FREE变大,(转)Linux中显示空闲内存空间的free命令的基本用法
- 2011年度IT博客大赛 “博”乐大行动(已结束)
- qq拼音输入法下载|qq拼音输入法纯净版下载
- MTK平台Camera驱动流程分析
- 平面坐标转大地坐标(经纬度)
- python 3des加密_python3使用3des加密
- 九章算术 五:《商功》
- 【Python】python中[-1]、[:-1]、[::-1]、[n::-1]使用方法;random.choice()
- (原创)巧用通道作颜色网
- 数组的下标为什么从0开始而不是从1开始
- 【前端应该掌握的一些知识点】
- 用Cocos Creator 模拟书本翻页效果
热门文章
- box-sizing详解
- java.lang.ClassNotFoundException: org.springframework.web.servlet.DispatcherServlet
- 学计算机对显卡要求大吗,【5人回答】学AE的电脑配置要求高吗?需要什么样的电脑配置-3D溜溜网...
- 一元流量显示服务器繁忙,流量充不进去老退款
- mysql b-a全局索引_MySQL中B+树索引的使用
- andriod 多个Activity之间共享数据
- 深度学习和目标检测系列教程 16-300:通过全球小麦数据集训练第一个yolov5模型
- 7.04 -2018-长沙机场笔试总结
- 黑马Go语言与区块链学习笔记
- 正视长尾挑战!颜水成、冯佳时团队发布首篇《深度长尾学习》综述