MR实现reduce join和map join及hive的执行计划
一、涵盖
MapReduce
InputFormat
RecordReader
切片:block=input split 1.1
File…
Text…
NLine…
DB…
Mapper
setup
map 业务逻辑
cleanup
Combiner
本地的Reducer
注意适用场景
Partitioner
将key按照某种规则进行分发
Hash:
Custom
Reducer
setup
reduce
cleanup
OutputFormat
RecordWriter
DB…
Text…
File…
排序
全排
分区排
二、MapReduce Hive实现join
join
select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;
Reduce/Shuffle/Common Join
join操作是在reduce端完成的
mr:两张表 emp dept使用两个MapTask来读取我们的emp和dept数据MR是要进行shuffle,Mapper的key是我们on的条件 deptno相同的deptno会被分发到同一个reduce中去有一个标识符来区分某一条数据到底来自于哪个表select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;最终的输出字段:empno ename deptno dname flag需要自定义序列化类:Info脏数据 不干净的数据文件:MapTask表:TableScan1)读完数据以后==>Shuffle==>Reduce来join2)某一个join的key对应的数据量大==>skew1Y条数据9000W都是deptno=10 ReduceTask
reducer join
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cc.pxj.wfy</groupId><artifactId>phoneWcRuoZe</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><hadoop.version>2.6.0-cdh5.16.2</hadoop.version></properties><repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><!-- 添加Hadoop依赖 --><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.17</version></dependency></dependencies><build><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin><!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --><plugin><artifactId>maven-site-plugin</artifactId><version>3.7.1</version></plugin><plugin><artifactId>maven-project-info-reports-plugin</artifactId><version>3.0.0</version></plugin></plugins></pluginManagement></build>
</project>
package com.ccj.pxj.join;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/****/
public class Info implements Writable {private int empno;private String ename;private int deptno;private String dname;private int flag; //区分数据从哪里来的标志位public Info(){}public Info(int empno, String ename, int deptno, String dname){this.empno = empno;this.ename = ename;this.deptno = deptno;this.dname = dname;}public Info(int empno, String ename, int deptno, String dname, int flag){this(empno, ename,deptno,dname);this.flag = flag;}@Overridepublic String toString() {returnempno + "\t" + ename + "\t" + deptno +"\t" + dname ;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(empno);out.writeUTF(ename);out.writeInt(deptno);out.writeUTF(dname);out.writeInt(flag);}@Overridepublic void readFields(DataInput in) throws IOException {this.empno = in.readInt();this.ename = in.readUTF();this.deptno = in.readInt();this.dname = in.readUTF();this.flag = in.readInt();}public int getEmpno() {return empno;}public void setEmpno(int empno) {this.empno = empno;}public String getEname() {return ename;}public void setEname(String ename) {this.ename = ename;}public int getDeptno() {return deptno;}public void setDeptno(int deptno) {this.deptno = deptno;}public String getDname() {return dname;}public void setDname(String dname) {this.dname = dname;}public int getFlag() {return flag;}public void setFlag(int flag) {this.flag = flag;}
}
package com.ccj.pxj.join;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/****/
public class ReduceJoinDriver {public static void main(String[] args) throws Exception{String input = "data/join";String output = "outreducejoin";Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Info.class);job.setOutputKeyClass(Info.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}/*** 在Mapper中如何区分哪个数据是哪个表过来的....*/public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Info> {String name; // 标识数据从哪来的@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit split = (FileSplit) context.getInputSplit();name = split.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] splits = value.toString().split("\t");if (name.contains("emp")) { // 处理emp的数据if(splits.length == 8) {// 1int empno = Integer.parseInt(splits[0].trim());String ename = splits[1];int deptno = Integer.parseInt(splits[7].trim());Info info = new Info(empno, ename, deptno, "", 1); // 1:empcontext.write(new IntWritable(deptno), info);}} else { // deptint deptno = Integer.parseInt(splits[0].trim());Info info = new Info(0, "", deptno, splits[1], 2); // 2:deptcontext.write(new IntWritable(deptno), info);}}}// 真正的join操作是在此处完成的public static class MyReducer extends Reducer<IntWritable, Info, Info, NullWritable> {@Overrideprotected void reduce(IntWritable key, Iterable<Info> values, Context context) throws IOException, InterruptedException {List<Info> infos = new ArrayList<>();String dname = "";for (Info info : values) {if (info.getFlag() == 1) { // empInfo tmp = new Info();tmp.setEmpno(info.getEmpno());tmp.setEname(info.getEname());tmp.setDeptno(info.getDeptno());infos.add(tmp);} else if (info.getFlag() == 2) { // deptdname = info.getDname();}// System.out.println(infos.size()+".......");}for (Info bean : infos) {bean.setDname(dname);context.write(bean, NullWritable.get());}}}
}
数据:
dept:
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
emp
7369 SMITH CLERK 7902 1980-12-17 800.00 20
7499 ALLEN SALESMAN 7698 1981-2-20 1600.00 300.00 30
7521 WARD SALESMAN 7698 1981-2-22 1250.00 500.00 30
7566 JONES MANAGER 7839 1981-4-2 2975.00 20
7654 MARTIN SALESMAN 7698 1981-9-28 1250.00 1400.00 30
7698 BLAKE MANAGER 7839 1981-5-1 2850.00 30
7782 CLARK MANAGER 7839 1981-6-9 2450.00 10
7788 SCOTT ANALYST 7566 1987-4-19 3000.00 20
7839 KING PRESIDENT 1981-11-17 5000.00 10
7844 TURNER SALESMAN 7698 1981-9-8 1500.00 0.00 30
7876 ADAMS CLERK 7788 1987-5-23 1100.00 20
7900 JAMES CLERK 7698 1981-12-3 950.00 30
7902 FORD ANALYST 7566 1981-12-3 3000.00 20
7934 MILLER CLERK 7782 1982-1-23 1300.00 10
8888 HIVE PROGRAM 7839 1988-1-23 10300.00
map join
package com.ccj.pxj.join;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/****/
public class Info implements Writable {private int empno;private String ename;private int deptno;private String dname;private int flag; //区分数据从哪里来的标志位public Info(){}public Info(int empno, String ename, int deptno, String dname){this.empno = empno;this.ename = ename;this.deptno = deptno;this.dname = dname;}public Info(int empno, String ename, int deptno, String dname, int flag){this(empno, ename,deptno,dname);this.flag = flag;}@Overridepublic String toString() {returnempno + "\t" + ename + "\t" + deptno +"\t" + dname ;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(empno);out.writeUTF(ename);out.writeInt(deptno);out.writeUTF(dname);out.writeInt(flag);}@Overridepublic void readFields(DataInput in) throws IOException {this.empno = in.readInt();this.ename = in.readUTF();this.deptno = in.readInt();this.dname = in.readUTF();this.flag = in.readInt();}public int getEmpno() {return empno;}public void setEmpno(int empno) {this.empno = empno;}public String getEname() {return ename;}public void setEname(String ename) {this.ename = ename;}public int getDeptno() {return deptno;}public void setDeptno(int deptno) {this.deptno = deptno;}public String getDname() {return dname;}public void setDname(String dname) {this.dname = dname;}public int getFlag() {return flag;}public void setFlag(int flag) {this.flag = flag;}
}
package com.ccj.pxj.join;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/****/
public class MapJoinDriver {public static void main(String[] args) throws Exception {String input = "data/join/emp.txt";String output = "out";Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MyMapper.class);job.setNumReduceTasks(0); // 不需要reducejob.addCacheFile(new URI("data/join/dept.txt"));job.setOutputKeyClass(Info.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}/*** 在Mapper中如何区分哪个数据是哪个表过来的....*/public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Info> {Map<String, String> cache = new HashMap<>();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {String path = context.getCacheFiles()[0].getPath().toString();BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path)));String line;while (StringUtils.isNotEmpty(line = reader.readLine())) {String[] splits = line.split("\t");cache.put(splits[0], splits[1]); // 部门编号和部门名称}IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] splits = value.toString().split("\t");if (splits.length == 8) {int empno = Integer.parseInt(splits[0].trim());String ename = splits[1];int deptno = Integer.parseInt(splits[7].trim());Info info = new Info(empno, ename, deptno, cache.get(deptno+""));context.write(new IntWritable(deptno), info);}}}
}
造数据脚本
package com.ccj.wfy.makedata;
import java.io.*;
import java.util.Random;
public class Mock {public static void main(String[] args) throws Exception {BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File("data/wc.txt"))));String[] a={"pxj","wfy","ccj","zmj","xwc","lzh","zcl","wlp","wxc","pk","jpeson"};for (int i=0;i<100;i++){Random random = new Random();writer.write(a[random.nextInt(a.length)]);writer.write(",");writer.write(a[random.nextInt(a.length)]);writer.write(",");writer.write(a[random.nextInt(a.length)]);writer.newLine();}writer.flush();writer.close();}
}
三、执行计划
建表
CREATE TABLE `dept`(`deptno` int, `dname` string, `loc` string)
row format delimited
fields terminated by '\t'
CREATE TABLE `emp`(`empno` int, `ename` string, `job` string, `mgr` int, `hiredate` string, `sal` double, `comm` double, `deptno` int)
row format delimited
fields terminated by '\t'
hive (default)> load data local inpath '//home/pxj/datas/dept.txt' into table dept;
hive (default)> load data local inpath '/home/pxj/datas/emp.txt' into table emp;
先关闭
set hive.auto.convert.join
0: jdbc:hive2://pxj:10000> set hive.auto.convert.join=false
. . . . . . . . . . . . .> ;
0: jdbc:hive2://pxj:10000> explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;
INFO : Compiling command(queryId=pxj_20200119010707_26c4bbd5-479b-49f1-8968-b08a1abc5056): explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno
INFO : Semantic Analysis Completed
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:Explain, type:string, comment:null)], properties:null)
INFO : Completed compiling command(queryId=pxj_20200119010707_26c4bbd5-479b-49f1-8968-b08a1abc5056); Time taken: 0.243 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Executing command(queryId=pxj_20200119010707_26c4bbd5-479b-49f1-8968-b08a1abc5056): explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno
INFO : Starting task [Stage-3:EXPLAIN] in serial mode
INFO : Completed executing command(queryId=pxj_20200119010707_26c4bbd5-479b-49f1-8968-b08a1abc5056); Time taken: 0.042 seconds
INFO : OK
+----------------------------------------------------+--+
| Explain |
+----------------------------------------------------+--+
| STAGE DEPENDENCIES: |
| Stage-1 is a root stage |
| Stage-0 depends on stages: Stage-1 |
| |
| STAGE PLANS: |
| Stage: Stage-1 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| alias: e |
| Statistics: Num rows: 6 Data size: 700 Basic stats: COMPLETE Column stats: NONE |
| Filter Operator |
| predicate: deptno is not null (type: boolean) |
| Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE |
| Reduce Output Operator |
| key expressions: deptno (type: int) |
| sort order: + |
| Map-reduce partition columns: deptno (type: int) |
| Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE |
| value expressions: empno (type: int), ename (type: string) |
| TableScan |
| alias: d |
| Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
| Filter Operator |
| predicate: deptno is not null (type: boolean) |
| Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
| Reduce Output Operator |
| key expressions: deptno (type: int) |
| sort order: + |
| Map-reduce partition columns: deptno (type: int) |
| Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
| value expressions: dname (type: string) |
| Reduce Operator Tree: |
| Join Operator |
| condition map: |
| Inner Join 0 to 1 |
| keys: |
| 0 deptno (type: int) |
| 1 deptno (type: int) |
| outputColumnNames: _col0, _col1, _col7, _col12 |
| Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
| Select Operator |
| expressions: _col0 (type: int), _col1 (type: string), _col7 (type: int), _col12 (type: string) |
| outputColumnNames: _col0, _col1, _col2, _col3 |
| Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
| File Output Operator |
| compressed: false |
| Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
| table: |
| input format: org.apache.hadoop.mapred.TextInputFormat |
| output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
| serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
| |
| Stage: Stage-0 |
| Fetch Operator |
| limit: -1 |
| Processor Tree: |
| ListSink |
| |
+----------------------------------------------------+--+
59 rows selected (0.379 seconds)
说明:
执行计划
explain sql
一个sql一次性能完成吗?不太一定
一个hivesql可能会被拆分成多个mr作业
ABSTRACT SYNTAX TREE:AST
STAGE DEPENDENCIES
STAGE PLANS
number of mappers: 2; number of reducers: 1
数据倾斜:Shuffle
mapjoin执行计划
0: jdbc:hive2://pxj:10000> set hive.auto.convert.join=true;
No rows affected (0.004 seconds)
0: jdbc:hive2://pxj:10000> explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;
INFO : Compiling command(queryId=pxj_20200119011010_f0ca7412-1b4e-41bf-9c1c-34c6d801acff): explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno
INFO : Semantic Analysis Completed
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:Explain, type:string, comment:null)], properties:null)
INFO : Completed compiling command(queryId=pxj_20200119011010_f0ca7412-1b4e-41bf-9c1c-34c6d801acff); Time taken: 0.197 seconds
INFO : Concurrency mode is disabled, not creating a lock manager
INFO : Executing command(queryId=pxj_20200119011010_f0ca7412-1b4e-41bf-9c1c-34c6d801acff): explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno
INFO : Starting task [Stage-5:EXPLAIN] in serial mode
INFO : Completed executing command(queryId=pxj_20200119011010_f0ca7412-1b4e-41bf-9c1c-34c6d801acff); Time taken: 0.015 seconds
INFO : OK
+----------------------------------------------------+--+
| Explain |
+----------------------------------------------------+--+
| STAGE DEPENDENCIES: |
| Stage-4 is a root stage |
| Stage-3 depends on stages: Stage-4 |
| Stage-0 depends on stages: Stage-3 |
| |
| STAGE PLANS: |
| Stage: Stage-4 |
| Map Reduce Local Work |
| Alias -> Map Local Tables: |
| d |
| Fetch Operator |
| limit: -1 |
| Alias -> Map Local Operator Tree: |
| d |
| TableScan |
| alias: d |
| Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
| Filter Operator |
| predicate: deptno is not null (type: boolean) |
| Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
| HashTable Sink Operator |
| keys: |
| 0 deptno (type: int) |
| 1 deptno (type: int) |
| |
| Stage: Stage-3 |
| Map Reduce |
| Map Operator Tree: |
| TableScan |
| alias: e |
| Statistics: Num rows: 6 Data size: 700 Basic stats: COMPLETE Column stats: NONE |
| Filter Operator |
| predicate: deptno is not null (type: boolean) |
| Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE |
| Map Join Operator |
| condition map: |
| Inner Join 0 to 1 |
| keys: |
| 0 deptno (type: int) |
| 1 deptno (type: int) |
| outputColumnNames: _col0, _col1, _col7, _col12 |
| Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
| Select Operator |
| expressions: _col0 (type: int), _col1 (type: string), _col7 (type: int), _col12 (type: string) |
| outputColumnNames: _col0, _col1, _col2, _col3 |
| Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
| File Output Operator |
| compressed: false |
| Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
| table: |
| input format: org.apache.hadoop.mapred.TextInputFormat |
| output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
| serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
| Local Work: |
| Map Reduce Local Work |
| |
| Stage: Stage-0 |
| Fetch Operator |
| limit: -1 |
| Processor Tree: |
| ListSink |
| |
+----------------------------------------------------+--+
62 rows selected (0.303 seconds)
说明:
Map Join
join操作是在map端完成的
有Shuffle吗?有没有Reduce?
思路:小表加载到内存中,读取大表时,读一条和内存中的数据匹配一下,匹配上就表示join上,没匹配88
缺点:没办法处理大数据量的表 适合大表join小表
mr不太适合交互次数比较多的场景
DAG RDD Tez
Hive: MR Tez Spark
以WC为例分析数据倾斜
同key分发到同一个reduce去执行==>skew
数据倾斜解决方案:打散再处理
pxj_8 100
pxj_1 200
pxj_2 300
pxj_4 200
==> 原来都在一个reducetask中去处理的数据,被我们打散的
9000w/100 这种处理方式 压力是不是很多了?
pxj 800
chain:
第一个mr:key加随机数打散
pxj_8 100
pxj_1 200
pxj_2 300
pxj_4 200
第二个mr:第一个mr加的随机数去掉
pxj 100
pxj 200
pxj300
pxj 200
在第二个mr中的reduce来做最终的聚合操作
pxj 800
木桶原理:水桶能装多少水,取决于你的水桶最短的那块板
MR:100reducetask,你整个作业的运行时长必然是以最后一个task执行完算的
一个task就把整个mr作业的执行性能拉低了
Hadoop不怕数据量大,就怕数据倾斜
数据热度/数据热点
作者:pxj(潘陈)
MR实现reduce join和map join及hive的执行计划相关推荐
- join丢失数据_15、Hive数据倾斜与解决方案
数据倾斜 1.什么是数据倾斜 由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点 2.数据倾斜的现象 在执行任务的时候,任务进度长时间维持在99%左右,查看任务监控页面,发现只有少量(1个或几 ...
- 【SQL开发实战技巧】系列(五):从执行计划看IN、EXISTS 和 INNER JOIN效率,我们要分场景不要死记网上结论
系列文章目录 [SQL开发实战技巧]系列(一):关于SQL不得不说的那些事 [SQL开发实战技巧]系列(二):简单单表查询 [SQL开发实战技巧]系列(三):SQL排序的那些事 [SQL开发实战技巧] ...
- Map Join介绍及案例
Map Join介绍及案例 Map Join介绍 1. 使用场景 2. 优点 3. 实现方法 Map Join案例 1. 需求 (1)需求说明 (2)文件 2.案例分析 (1)需求分析 (2)输入数据 ...
- 关于hive中Map join 时大表left join小表的问题
在hive中,(启用Map join时) 大表left join小表,加载从右向左,所以小表会加载进内存,存储成map键值对,通过大表驱动小表,来进行join,即大表中的join字段作为key 来获取 ...
- Hive的Map Join与Common Join
笼统的说,Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join). 一.Hive Common Join 如果不指定MapJoin ...
- MapReduce之Map join操作
MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...
- Hive中的map join、left semi join和sort merge bucket join
map join map join是将join双方比较小的表直接分发到各个 map进程的内存中,在map进程中进行join操作,这样就不用进行reduce步骤,从而提高了速度. 如果不指定mapjoi ...
- hive之Map Join使用方法
目录 介绍 mapjoin的使用方法 结语 介绍 MAPJION会把小表全部加载到内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map端是进行了join操作,省去了reduce ...
- python中map,join,int结合在一起的用法
python学习之路 关于map函数的用法 今天我在使用codewars进行代码练习的时候发现有个题目很有意思: 给定一个由1和0组成的数组,将等效的二进制值转换为整数. 例如:[0,0,0,1]被视 ...
最新文章
- 驱动07.USB驱动程序
- 《Adobe Fireworks CS6中文版经典教程》——1.2工具面板
- 基于JavaWeb实现学校网站开发
- android 字符串,textview
- java 鼠标停留时,[Java教程]鼠标悬浮停留三秒 显示大图_星空网
- Spring 5 新特性:函数式Web框架
- 给图片下方加水印_别再看不起美图秀秀啦,想要做长图,批量加水印,用它超级方便...
- 目标检测——知识蒸馏的学习笔记
- [vb]SendMessageA函数
- linux系统硬盘数据恢复软件下载,R-Linux|R-Linux(linux数据恢复软件)下载 v5.1中文免费版 - 121下载站...
- ajax提交不能获取数据,django无法收到ajax的请求数据
- 《Go程序设计语言》- 第12章:反射
- p6spy mysql8_druid数据源集成p6spy踩坑
- 【语音处理】开始学习语音,从基本概念和应用讲起
- 蓝桥杯2016年第七届真题-碱基
- mysql count sending data_mysql查询sending data占用大量时间的问题处理
- php解压有密码的压缩包,linux下解压有密码的rar压缩包
- vscode插件扩展 js代码高亮显示问题,美化问题,颜色问题
- C++用递归方法求x^n,x和n由键盘输入完整可运行源代码及过程
- 领域驱动设计的重要性