一、涵盖

MapReduce
InputFormat
RecordReader
切片:block=input split 1.1
File…
Text…
NLine…
DB…
Mapper
setup
map 业务逻辑
cleanup
Combiner
本地的Reducer
注意适用场景
Partitioner
将key按照某种规则进行分发
Hash:
Custom
Reducer
setup
reduce
cleanup
OutputFormat
RecordWriter
DB…
Text…
File…
排序
全排
分区排

二、MapReduce Hive实现join

join
select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;

Reduce/Shuffle/Common Join
join操作是在reduce端完成的

mr:两张表 emp dept使用两个MapTask来读取我们的emp和dept数据MR是要进行shuffle,Mapper的key是我们on的条件 deptno相同的deptno会被分发到同一个reduce中去有一个标识符来区分某一条数据到底来自于哪个表select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;最终的输出字段:empno ename deptno dname flag需要自定义序列化类:Info脏数据 不干净的数据文件:MapTask表:TableScan1)读完数据以后==>Shuffle==>Reduce来join2)某一个join的key对应的数据量大==>skew1Y条数据9000W都是deptno=10 ReduceTask

reducer join

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cc.pxj.wfy</groupId><artifactId>phoneWcRuoZe</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><hadoop.version>2.6.0-cdh5.16.2</hadoop.version></properties><repositories><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><!-- 添加Hadoop依赖 --><!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><!-- https://mvnrepository.com/artifact/junit/junit --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.17</version></dependency></dependencies><build><pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) --><plugins><!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle --><plugin><artifactId>maven-clean-plugin</artifactId><version>3.1.0</version></plugin><!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging --><plugin><artifactId>maven-resources-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-compiler-plugin</artifactId><version>3.8.0</version></plugin><plugin><artifactId>maven-surefire-plugin</artifactId><version>2.22.1</version></plugin><plugin><artifactId>maven-jar-plugin</artifactId><version>3.0.2</version></plugin><plugin><artifactId>maven-install-plugin</artifactId><version>2.5.2</version></plugin><plugin><artifactId>maven-deploy-plugin</artifactId><version>2.8.2</version></plugin><!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle --><plugin><artifactId>maven-site-plugin</artifactId><version>3.7.1</version></plugin><plugin><artifactId>maven-project-info-reports-plugin</artifactId><version>3.0.0</version></plugin></plugins></pluginManagement></build>
</project>
package com.ccj.pxj.join;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/****/
public class Info implements Writable {private int empno;private String ename;private int deptno;private String dname;private int flag; //区分数据从哪里来的标志位public Info(){}public Info(int empno, String ename, int deptno, String dname){this.empno = empno;this.ename = ename;this.deptno = deptno;this.dname = dname;}public Info(int empno, String ename, int deptno, String dname, int flag){this(empno, ename,deptno,dname);this.flag = flag;}@Overridepublic String toString() {returnempno + "\t" + ename + "\t" + deptno +"\t" + dname ;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(empno);out.writeUTF(ename);out.writeInt(deptno);out.writeUTF(dname);out.writeInt(flag);}@Overridepublic void readFields(DataInput in) throws IOException {this.empno = in.readInt();this.ename = in.readUTF();this.deptno = in.readInt();this.dname = in.readUTF();this.flag = in.readInt();}public int getEmpno() {return empno;}public void setEmpno(int empno) {this.empno = empno;}public String getEname() {return ename;}public void setEname(String ename) {this.ename = ename;}public int getDeptno() {return deptno;}public void setDeptno(int deptno) {this.deptno = deptno;}public String getDname() {return dname;}public void setDname(String dname) {this.dname = dname;}public int getFlag() {return flag;}public void setFlag(int flag) {this.flag = flag;}
}
package com.ccj.pxj.join;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/****/
public class ReduceJoinDriver {public static void main(String[] args) throws Exception{String input = "data/join";String output = "outreducejoin";Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);job.setJarByClass(ReduceJoinDriver.class);job.setMapperClass(MyMapper.class);job.setReducerClass(MyReducer.class);job.setMapOutputKeyClass(IntWritable.class);job.setMapOutputValueClass(Info.class);job.setOutputKeyClass(Info.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}/*** 在Mapper中如何区分哪个数据是哪个表过来的....*/public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Info> {String name;  // 标识数据从哪来的@Overrideprotected void setup(Context context) throws IOException, InterruptedException {FileSplit split = (FileSplit) context.getInputSplit();name = split.getPath().getName();}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] splits = value.toString().split("\t");if (name.contains("emp")) { // 处理emp的数据if(splits.length == 8) {// 1int empno = Integer.parseInt(splits[0].trim());String ename = splits[1];int deptno = Integer.parseInt(splits[7].trim());Info info = new Info(empno, ename, deptno, "", 1); // 1:empcontext.write(new IntWritable(deptno), info);}} else { // deptint deptno = Integer.parseInt(splits[0].trim());Info info = new Info(0, "", deptno, splits[1], 2); // 2:deptcontext.write(new IntWritable(deptno), info);}}}// 真正的join操作是在此处完成的public static class MyReducer extends Reducer<IntWritable, Info, Info, NullWritable> {@Overrideprotected void reduce(IntWritable key, Iterable<Info> values, Context context) throws IOException, InterruptedException {List<Info> infos = new ArrayList<>();String dname = "";for (Info info : values) {if (info.getFlag() == 1) { // empInfo tmp = new Info();tmp.setEmpno(info.getEmpno());tmp.setEname(info.getEname());tmp.setDeptno(info.getDeptno());infos.add(tmp);} else if (info.getFlag() == 2) {  // deptdname = info.getDname();}//   System.out.println(infos.size()+".......");}for (Info bean : infos) {bean.setDname(dname);context.write(bean, NullWritable.get());}}}
}
数据:
dept:
10  ACCOUNTING  NEW YORK
20  RESEARCH    DALLAS
30  SALES   CHICAGO
40  OPERATIONS  BOSTON
emp
7369    SMITH   CLERK   7902    1980-12-17  800.00      20
7499    ALLEN   SALESMAN    7698    1981-2-20   1600.00 300.00  30
7521    WARD    SALESMAN    7698    1981-2-22   1250.00 500.00  30
7566    JONES   MANAGER 7839    1981-4-2    2975.00     20
7654    MARTIN  SALESMAN    7698    1981-9-28   1250.00 1400.00 30
7698    BLAKE   MANAGER 7839    1981-5-1    2850.00     30
7782    CLARK   MANAGER 7839    1981-6-9    2450.00     10
7788    SCOTT   ANALYST 7566    1987-4-19   3000.00     20
7839    KING    PRESIDENT       1981-11-17  5000.00     10
7844    TURNER  SALESMAN    7698    1981-9-8    1500.00 0.00    30
7876    ADAMS   CLERK   7788    1987-5-23   1100.00     20
7900    JAMES   CLERK   7698    1981-12-3   950.00      30
7902    FORD    ANALYST 7566    1981-12-3   3000.00     20
7934    MILLER  CLERK   7782    1982-1-23   1300.00     10
8888    HIVE    PROGRAM 7839    1988-1-23   10300.00

map join

package com.ccj.pxj.join;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/****/
public class Info implements Writable {private int empno;private String ename;private int deptno;private String dname;private int flag; //区分数据从哪里来的标志位public Info(){}public Info(int empno, String ename, int deptno, String dname){this.empno = empno;this.ename = ename;this.deptno = deptno;this.dname = dname;}public Info(int empno, String ename, int deptno, String dname, int flag){this(empno, ename,deptno,dname);this.flag = flag;}@Overridepublic String toString() {returnempno + "\t" + ename + "\t" + deptno +"\t" + dname ;}@Overridepublic void write(DataOutput out) throws IOException {out.writeInt(empno);out.writeUTF(ename);out.writeInt(deptno);out.writeUTF(dname);out.writeInt(flag);}@Overridepublic void readFields(DataInput in) throws IOException {this.empno = in.readInt();this.ename = in.readUTF();this.deptno = in.readInt();this.dname = in.readUTF();this.flag = in.readInt();}public int getEmpno() {return empno;}public void setEmpno(int empno) {this.empno = empno;}public String getEname() {return ename;}public void setEname(String ename) {this.ename = ename;}public int getDeptno() {return deptno;}public void setDeptno(int deptno) {this.deptno = deptno;}public String getDname() {return dname;}public void setDname(String dname) {this.dname = dname;}public int getFlag() {return flag;}public void setFlag(int flag) {this.flag = flag;}
}
package com.ccj.pxj.join;
import com.ccj.pxj.phone.utils.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/****/
public class MapJoinDriver {public static void main(String[] args) throws Exception {String input = "data/join/emp.txt";String output = "out";Configuration configuration = new Configuration();Job job = Job.getInstance(configuration);FileUtils.deleteOutput(configuration, output);job.setJarByClass(MapJoinDriver.class);job.setMapperClass(MyMapper.class);job.setNumReduceTasks(0); // 不需要reducejob.addCacheFile(new URI("data/join/dept.txt"));job.setOutputKeyClass(Info.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(input));FileOutputFormat.setOutputPath(job, new Path(output));boolean result = job.waitForCompletion(true);System.exit(result ? 0 : 1);}/*** 在Mapper中如何区分哪个数据是哪个表过来的....*/public static class MyMapper extends Mapper<LongWritable, Text, IntWritable, Info> {Map<String, String> cache = new HashMap<>();@Overrideprotected void setup(Context context) throws IOException, InterruptedException {String path = context.getCacheFiles()[0].getPath().toString();BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(path)));String line;while (StringUtils.isNotEmpty(line = reader.readLine())) {String[] splits = line.split("\t");cache.put(splits[0], splits[1]);  // 部门编号和部门名称}IOUtils.closeStream(reader);}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String[] splits = value.toString().split("\t");if (splits.length == 8) {int empno = Integer.parseInt(splits[0].trim());String ename = splits[1];int deptno = Integer.parseInt(splits[7].trim());Info info = new Info(empno, ename, deptno, cache.get(deptno+""));context.write(new IntWritable(deptno), info);}}}
}

造数据脚本

package com.ccj.wfy.makedata;
import java.io.*;
import java.util.Random;
public class Mock {public static void main(String[] args) throws Exception {BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(new File("data/wc.txt"))));String[] a={"pxj","wfy","ccj","zmj","xwc","lzh","zcl","wlp","wxc","pk","jpeson"};for (int i=0;i<100;i++){Random random = new Random();writer.write(a[random.nextInt(a.length)]);writer.write(",");writer.write(a[random.nextInt(a.length)]);writer.write(",");writer.write(a[random.nextInt(a.length)]);writer.newLine();}writer.flush();writer.close();}
}

三、执行计划

建表

CREATE TABLE `dept`(`deptno` int, `dname` string, `loc` string)
row format delimited
fields terminated by '\t'
CREATE TABLE `emp`(`empno` int, `ename` string, `job` string, `mgr` int, `hiredate` string, `sal` double, `comm` double, `deptno` int)
row format delimited
fields terminated by '\t'
hive (default)> load data local inpath '//home/pxj/datas/dept.txt' into table dept;
hive (default)> load data local inpath '/home/pxj/datas/emp.txt' into table emp;

先关闭

set hive.auto.convert.join
0: jdbc:hive2://pxj:10000> set hive.auto.convert.join=false
. . . . . . . . . . . . .> ;
0: jdbc:hive2://pxj:10000> explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;
INFO  : Compiling command(queryId=pxj_20200119010707_26c4bbd5-479b-49f1-8968-b08a1abc5056): explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:Explain, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=pxj_20200119010707_26c4bbd5-479b-49f1-8968-b08a1abc5056); Time taken: 0.243 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=pxj_20200119010707_26c4bbd5-479b-49f1-8968-b08a1abc5056): explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno
INFO  : Starting task [Stage-3:EXPLAIN] in serial mode
INFO  : Completed executing command(queryId=pxj_20200119010707_26c4bbd5-479b-49f1-8968-b08a1abc5056); Time taken: 0.042 seconds
INFO  : OK
+----------------------------------------------------+--+
|                      Explain                       |
+----------------------------------------------------+--+
| STAGE DEPENDENCIES:                                |
|   Stage-1 is a root stage                          |
|   Stage-0 depends on stages: Stage-1               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-1                                   |
|     Map Reduce                                     |
|       Map Operator Tree:                           |
|           TableScan                                |
|             alias: e                               |
|             Statistics: Num rows: 6 Data size: 700 Basic stats: COMPLETE Column stats: NONE |
|             Filter Operator                        |
|               predicate: deptno is not null (type: boolean) |
|               Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE |
|               Reduce Output Operator               |
|                 key expressions: deptno (type: int) |
|                 sort order: +                      |
|                 Map-reduce partition columns: deptno (type: int) |
|                 Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE |
|                 value expressions: empno (type: int), ename (type: string) |
|           TableScan                                |
|             alias: d                               |
|             Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
|             Filter Operator                        |
|               predicate: deptno is not null (type: boolean) |
|               Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
|               Reduce Output Operator               |
|                 key expressions: deptno (type: int) |
|                 sort order: +                      |
|                 Map-reduce partition columns: deptno (type: int) |
|                 Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
|                 value expressions: dname (type: string) |
|       Reduce Operator Tree:                        |
|         Join Operator                              |
|           condition map:                           |
|                Inner Join 0 to 1                   |
|           keys:                                    |
|             0 deptno (type: int)                   |
|             1 deptno (type: int)                   |
|           outputColumnNames: _col0, _col1, _col7, _col12 |
|           Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
|           Select Operator                          |
|             expressions: _col0 (type: int), _col1 (type: string), _col7 (type: int), _col12 (type: string) |
|             outputColumnNames: _col0, _col1, _col2, _col3 |
|             Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
|             File Output Operator                   |
|               compressed: false                    |
|               Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
|               table:                               |
|                   input format: org.apache.hadoop.mapred.TextInputFormat |
|                   output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                   serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|                                                    |
|   Stage: Stage-0                                   |
|     Fetch Operator                                 |
|       limit: -1                                    |
|       Processor Tree:                              |
|         ListSink                                   |
|                                                    |
+----------------------------------------------------+--+
59 rows selected (0.379 seconds)
说明:

执行计划
explain sql
一个sql一次性能完成吗?不太一定
一个hivesql可能会被拆分成多个mr作业
ABSTRACT SYNTAX TREE:AST
STAGE DEPENDENCIES
STAGE PLANS
number of mappers: 2; number of reducers: 1

     数据倾斜:Shuffle

mapjoin执行计划

0: jdbc:hive2://pxj:10000> set hive.auto.convert.join=true;
No rows affected (0.004 seconds)
0: jdbc:hive2://pxj:10000> explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno;
INFO  : Compiling command(queryId=pxj_20200119011010_f0ca7412-1b4e-41bf-9c1c-34c6d801acff): explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno
INFO  : Semantic Analysis Completed
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:Explain, type:string, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=pxj_20200119011010_f0ca7412-1b4e-41bf-9c1c-34c6d801acff); Time taken: 0.197 seconds
INFO  : Concurrency mode is disabled, not creating a lock manager
INFO  : Executing command(queryId=pxj_20200119011010_f0ca7412-1b4e-41bf-9c1c-34c6d801acff): explain select e.empno,e.ename,e.deptno,d.dname from emp e join dept d on e.deptno=d.deptno
INFO  : Starting task [Stage-5:EXPLAIN] in serial mode
INFO  : Completed executing command(queryId=pxj_20200119011010_f0ca7412-1b4e-41bf-9c1c-34c6d801acff); Time taken: 0.015 seconds
INFO  : OK
+----------------------------------------------------+--+
|                      Explain                       |
+----------------------------------------------------+--+
| STAGE DEPENDENCIES:                                |
|   Stage-4 is a root stage                          |
|   Stage-3 depends on stages: Stage-4               |
|   Stage-0 depends on stages: Stage-3               |
|                                                    |
| STAGE PLANS:                                       |
|   Stage: Stage-4                                   |
|     Map Reduce Local Work                          |
|       Alias -> Map Local Tables:                   |
|         d                                          |
|           Fetch Operator                           |
|             limit: -1                              |
|       Alias -> Map Local Operator Tree:            |
|         d                                          |
|           TableScan                                |
|             alias: d                               |
|             Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
|             Filter Operator                        |
|               predicate: deptno is not null (type: boolean) |
|               Statistics: Num rows: 1 Data size: 79 Basic stats: COMPLETE Column stats: NONE |
|               HashTable Sink Operator              |
|                 keys:                              |
|                   0 deptno (type: int)             |
|                   1 deptno (type: int)             |
|                                                    |
|   Stage: Stage-3                                   |
|     Map Reduce                                     |
|       Map Operator Tree:                           |
|           TableScan                                |
|             alias: e                               |
|             Statistics: Num rows: 6 Data size: 700 Basic stats: COMPLETE Column stats: NONE |
|             Filter Operator                        |
|               predicate: deptno is not null (type: boolean) |
|               Statistics: Num rows: 3 Data size: 350 Basic stats: COMPLETE Column stats: NONE |
|               Map Join Operator                    |
|                 condition map:                     |
|                      Inner Join 0 to 1             |
|                 keys:                              |
|                   0 deptno (type: int)             |
|                   1 deptno (type: int)             |
|                 outputColumnNames: _col0, _col1, _col7, _col12 |
|                 Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
|                 Select Operator                    |
|                   expressions: _col0 (type: int), _col1 (type: string), _col7 (type: int), _col12 (type: string) |
|                   outputColumnNames: _col0, _col1, _col2, _col3 |
|                   Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
|                   File Output Operator             |
|                     compressed: false              |
|                     Statistics: Num rows: 3 Data size: 385 Basic stats: COMPLETE Column stats: NONE |
|                     table:                         |
|                         input format: org.apache.hadoop.mapred.TextInputFormat |
|                         output format: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat |
|                         serde: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe |
|       Local Work:                                  |
|         Map Reduce Local Work                      |
|                                                    |
|   Stage: Stage-0                                   |
|     Fetch Operator                                 |
|       limit: -1                                    |
|       Processor Tree:                              |
|         ListSink                                   |
|                                                    |
+----------------------------------------------------+--+
62 rows selected (0.303 seconds)
说明:

Map Join
join操作是在map端完成的
有Shuffle吗?有没有Reduce?
思路:小表加载到内存中,读取大表时,读一条和内存中的数据匹配一下,匹配上就表示join上,没匹配88
缺点:没办法处理大数据量的表 适合大表join小表

mr不太适合交互次数比较多的场景

DAG RDD Tez
Hive: MR Tez Spark

以WC为例分析数据倾斜
同key分发到同一个reduce去执行==>skew
数据倾斜解决方案:打散再处理
pxj_8 100
pxj_1 200
pxj_2 300
pxj_4 200
==> 原来都在一个reducetask中去处理的数据,被我们打散的
9000w/100 这种处理方式 压力是不是很多了?

    pxj 800

chain:
第一个mr:key加随机数打散
pxj_8 100
pxj_1 200
pxj_2 300
pxj_4 200
第二个mr:第一个mr加的随机数去掉
pxj 100
pxj 200
pxj300
pxj 200
在第二个mr中的reduce来做最终的聚合操作
pxj 800

木桶原理:水桶能装多少水,取决于你的水桶最短的那块板
MR:100reducetask,你整个作业的运行时长必然是以最后一个task执行完算的
一个task就把整个mr作业的执行性能拉低了

Hadoop不怕数据量大,就怕数据倾斜

数据热度/数据热点
作者:pxj(潘陈)

MR实现reduce join和map join及hive的执行计划相关推荐

  1. join丢失数据_15、Hive数据倾斜与解决方案

    数据倾斜 1.什么是数据倾斜 由于数据分布不均匀,造成数据大量的集中到一点,造成数据热点 2.数据倾斜的现象 在执行任务的时候,任务进度长时间维持在99%左右,查看任务监控页面,发现只有少量(1个或几 ...

  2. 【SQL开发实战技巧】系列(五):从执行计划看IN、EXISTS 和 INNER JOIN效率,我们要分场景不要死记网上结论

    系列文章目录 [SQL开发实战技巧]系列(一):关于SQL不得不说的那些事 [SQL开发实战技巧]系列(二):简单单表查询 [SQL开发实战技巧]系列(三):SQL排序的那些事 [SQL开发实战技巧] ...

  3. Map Join介绍及案例

    Map Join介绍及案例 Map Join介绍 1. 使用场景 2. 优点 3. 实现方法 Map Join案例 1. 需求 (1)需求说明 (2)文件 2.案例分析 (1)需求分析 (2)输入数据 ...

  4. 关于hive中Map join 时大表left join小表的问题

    在hive中,(启用Map join时) 大表left join小表,加载从右向左,所以小表会加载进内存,存储成map键值对,通过大表驱动小表,来进行join,即大表中的join字段作为key 来获取 ...

  5. Hive的Map Join与Common Join

    笼统的说,Hive中的Join可分为Common Join(Reduce阶段完成join)和Map Join(Map阶段完成join). 一.Hive Common Join 如果不指定MapJoin ...

  6. MapReduce之Map join操作

    MapReduce之Map join操作(分布式缓存) 文章目录 MapReduce之Map join操作(分布式缓存) 案例结合 利用MapReduce中的setup方法与DistributedCa ...

  7. Hive中的map join、left semi join和sort merge bucket join

    map join map join是将join双方比较小的表直接分发到各个 map进程的内存中,在map进程中进行join操作,这样就不用进行reduce步骤,从而提高了速度. 如果不指定mapjoi ...

  8. hive之Map Join使用方法

    目录 介绍 mapjoin的使用方法 结语 介绍 MAPJION会把小表全部加载到内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map端是进行了join操作,省去了reduce ...

  9. python中map,join,int结合在一起的用法

    python学习之路 关于map函数的用法 今天我在使用codewars进行代码练习的时候发现有个题目很有意思: 给定一个由1和0组成的数组,将等效的二进制值转换为整数. 例如:[0,0,0,1]被视 ...

最新文章

  1. 驱动07.USB驱动程序
  2. 《Adobe Fireworks CS6中文版经典教程》——1.2工具面板
  3. 基于JavaWeb实现学校网站开发
  4. android 字符串,textview
  5. java 鼠标停留时,[Java教程]鼠标悬浮停留三秒 显示大图_星空网
  6. Spring 5 新特性:函数式Web框架
  7. 给图片下方加水印_别再看不起美图秀秀啦,想要做长图,批量加水印,用它超级方便...
  8. 目标检测——知识蒸馏的学习笔记
  9. [vb]SendMessageA函数
  10. linux系统硬盘数据恢复软件下载,R-Linux|R-Linux(linux数据恢复软件)下载 v5.1中文免费版 - 121下载站...
  11. ajax提交不能获取数据,django无法收到ajax的请求数据
  12. 《Go程序设计语言》- 第12章:反射
  13. p6spy mysql8_druid数据源集成p6spy踩坑
  14. 【语音处理】开始学习语音,从基本概念和应用讲起
  15. 蓝桥杯2016年第七届真题-碱基
  16. mysql count sending data_mysql查询sending data占用大量时间的问题处理
  17. php解压有密码的压缩包,linux下解压有密码的rar压缩包
  18. vscode插件扩展 js代码高亮显示问题,美化问题,颜色问题
  19. C++用递归方法求x^n,x和n由键盘输入完整可运行源代码及过程
  20. 领域驱动设计的重要性

热门文章

  1. 合并单元格如何快速填充序列
  2. 作为一个程序员的年终总结。
  3. aiwi游戏里的忍者神龟
  4. 数据恢复顾问(DRA)
  5. AJAX 和 JSON学习笔记
  6. 利用HTML实现一个个人信息表的网页(代码实例)
  7. 为什么生产MES系统对工厂管理如此重要?
  8. 零基础入门--中文实体关系抽取(BiLSTM+attention,含代码)
  9. 海康威视录像机如何添加大华摄像头
  10. centos6.5(Linux)下搭建SVN服务器