Impala 源码分析-FE
Impala 源代码目录结构
SQL 解析
Impala 的 SQL 解析与执行计划生成部分是由 impala-frontend(Java)实现的,监听端口是 21000。用户通过
Beeswax 接口 BeeswaxService.query() 提交一个请求,在 impalad 端的处理逻辑是由
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) 这个函数(
ImpalaServer.h)完成的。
1
|
void ImpalaServer::query(QueryHandle& query_handle, const Query& query) {
|
2
|
VLOG_QUERY << "query(): query=" << query.query;
|
3
|
ScopedSessionState session_handle( this );
|
4
|
shared_ptr<SessionState> session;
|
5
|
RAISE_IF_ERROR( // 为当前连接返回唯一标识,标记会话为使用中并保存
|
6
|
session_handle.WithSession(ThriftServer::GetThreadConnectionId(), &session),
|
7
|
SQLSTATE_GENERAL_ERROR);
|
8
|
TQueryCtx query_ctx;
|
9
|
// 将 Query 转化为 TQueryCtx
|
10
|
// raise general error for request conversion error;
|
11
|
RAISE_IF_ERROR(QueryToTQueryContext(query, &query_ctx), SQLSTATE_GENERAL_ERROR);
|
12
|
13
|
// raise Syntax error or access violation; it's likely to be syntax/analysis error
|
14
|
// TODO: that may not be true; fix this
|
15
|
shared_ptr<QueryExecState> exec_state;
|
16
|
// 开始异步执行查询,内部调用 ImpalaServer::Execute() 函数
|
17
|
// 将 TQueryCtx 转换为 QueryExecState,注册并调用 Coordinator::Execute()
|
18
|
RAISE_IF_ERROR(Execute(&query_ctx, session, &exec_state),
|
19
|
SQLSTATE_SYNTAX_ERROR_OR_ACCESS_VIOLATION);
|
20
|
21
|
exec_state->UpdateQueryState(QueryState::RUNNING);
|
22
|
// start thread to wait for results to become available, which will allow
|
23
|
// us to advance query state to FINISHED or EXCEPTION
|
24
|
exec_state->WaitAsync();
|
25
|
// Once the query is running do a final check for session closure and add it to the
|
26
|
// set of in-flight queries.
|
27
|
Status status = SetQueryInflight(session, exec_state);
|
28
|
if (!status.ok()) {
|
29
|
UnregisterQuery(exec_state->query_id(), false , &status);
|
30
|
RaiseBeeswaxException(status.GetDetail(), SQLSTATE_GENERAL_ERROR);
|
31
|
}
|
32
|
TUniqueIdToQueryHandle(exec_state->query_id(), &query_handle);
|
33
|
}
|
其中 QueryToTQueryContext(query, &query_ctx) 将 Query 装换为 TQueryCtx。具体代码实现如下:
(ImpalaServer.h)
1
|
Status ImpalaServer::QueryToTQueryContext( const Query& query,
|
2
|
TQueryCtx* query_ctx) {
|
3
|
query_ctx->request.stmt = query.query;
|
4
|
VLOG_QUERY << "query: " << ThriftDebugString(query);
|
5
|
{
|
6
|
shared_ptr<SessionState> session;
|
7
|
const TUniqueId& session_id = ThriftServer::GetThreadConnectionId();
|
8
|
RETURN_IF_ERROR(GetSessionState(session_id, &session));
|
9
|
DCHECK(session != NULL);
|
10
|
{
|
11
|
// The session is created when the client connects. Depending on the underlying
|
12
|
// transport, the username may be known at that time. If the username hasn't been
|
13
|
// set yet, set it now.
|
14
|
lock_guard<mutex> l(session->lock);
|
15
|
if (session->connected_user.empty()) session->connected_user = query.hadoop_user;
|
16
|
query_ctx->request.query_options = session->default_query_options;
|
17
|
}
|
18
|
// 构建该 SessionState 的 Thrift 表示用于序列化到 frontend
|
19
|
session->ToThrift(session_id, &query_ctx->session);
|
20
|
}
|
21
|
22
|
// Override default query options with Query.Configuration
|
23
|
if (query.__isset.configuration) {
|
24
|
BOOST_FOREACH( const string& option, query.configuration) {
|
25
|
RETURN_IF_ERROR(ParseQueryOptions(option, &query_ctx->request.query_options));
|
26
|
}
|
27
|
VLOG_QUERY << "TClientRequest.queryOptions: "
|
28
|
<< ThriftDebugString(query_ctx->request.query_options);
|
29
|
}
|
30
|
31
|
return Status::OK();
|
32
|
}
|
内部调用 ImpalaServer::Execute()
(ImpalaServer.h)
函数将 TQueryCtx 转换为 TExecRequest,具体逻辑通过调用 ImpalaServer::ExecuteInternal() 实现。代码如下:
1
|
Status ImpalaServer::Execute(TQueryCtx* query_ctx,
|
2
|
shared_ptr<SessionState> session_state,
|
3
|
shared_ptr<QueryExecState>* exec_state) {
|
4
|
PrepareQueryContext(query_ctx);
|
5
|
bool registered_exec_state;
|
6
|
ImpaladMetrics::IMPALA_SERVER_NUM_QUERIES->Increment(1L);
|
7
|
8
|
// Redact the SQL stmt and update the query context
|
9
|
string stmt = replace_all_copy(query_ctx->request.stmt, "\n" , " " );
|
10
|
Redact(&stmt);
|
11
|
query_ctx->request.__set_redacted_stmt(( const string) stmt);
|
12
|
// 实现 Execute() 逻辑,出错时不取消注册查询
|
13
|
Status status = ExecuteInternal(*query_ctx, session_state, ®istered_exec_state,
|
14
|
exec_state);
|
15
|
if (!status.ok() && registered_exec_state) {
|
16
|
UnregisterQuery((*exec_state)->query_id(), false , &status);
|
17
|
}
|
18
|
return status;
|
19
|
}
|
上面的函数调用 ImpalaServer::ExecuteInternal()
(ImpalaServer.h)
在这个函数里通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest,具体代码如下:
1
|
Status ImpalaServer::ExecuteInternal(
|
2
|
const TQueryCtx& query_ctx,
|
3
|
shared_ptr<SessionState> session_state,
|
4
|
bool* registered_exec_state,
|
5
|
shared_ptr<QueryExecState>* exec_state) {
|
6
|
DCHECK(session_state != NULL);
|
7
|
*registered_exec_state = false ;
|
8
|
if (IsOffline()) {
|
9
|
return Status( "This Impala server is offline. Please retry your query later." );
|
10
|
}
|
11
|
exec_state->reset( new QueryExecState(query_ctx, exec_env_, exec_env_->frontend(),
|
12
|
this , session_state));
|
13
|
14
|
(*exec_state)->query_events()->MarkEvent( "Start execution" );
|
15
|
16
|
TExecRequest result;
|
17
|
{
|
18
|
// Keep a lock on exec_state so that registration and setting
|
19
|
// result_metadata are atomic.
|
20
|
//
|
21
|
// Note: this acquires the exec_state lock *before* the
|
22
|
// query_exec_state_map_ lock. This is the opposite of
|
23
|
// GetQueryExecState(..., true), and therefore looks like a
|
24
|
// candidate for deadlock. The reason this works here is that
|
25
|
// GetQueryExecState cannot find exec_state (under the exec state
|
26
|
// map lock) and take it's lock until RegisterQuery has
|
27
|
// finished. By that point, the exec state map lock will have been
|
28
|
// given up, so the classic deadlock interleaving is not possible.
|
29
|
lock_guard<mutex> l(*(*exec_state)->lock());
|
30
|
31
|
// register exec state as early as possible so that queries that
|
32
|
// take a long time to plan show up, and to handle incoming status
|
33
|
// reports before execution starts.
|
34
|
RETURN_IF_ERROR(RegisterQuery(session_state, *exec_state));
|
35
|
*registered_exec_state = true ;
|
36
|
37
|
RETURN_IF_ERROR((*exec_state)->UpdateQueryStatus(
|
38
|
// 通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest
|
39
|
exec_env_->frontend()->GetExecRequest(query_ctx, &result)));
|
40
|
(*exec_state)->query_events()->MarkEvent( "Planning finished" );
|
41
|
(*exec_state)->summary_profile()->AddEventSequence(
|
42
|
result.timeline.name, result.timeline);
|
43
|
if (result.__isset.result_set_metadata) {
|
44
|
(*exec_state)->set_result_metadata(result.result_set_metadata);
|
45
|
}
|
46
|
}
|
47
|
VLOG( 2 ) << "Execution request: " << ThriftDebugString(result);
|
48
|
49
|
// start execution of query; also starts fragment status reports
|
50
|
RETURN_IF_ERROR((*exec_state)->Exec(&result));
|
51
|
if (result.stmt_type == TStmtType::DDL) {
|
52
|
Status status = UpdateCatalogMetrics();
|
53
|
if (!status.ok()) {
|
54
|
VLOG_QUERY << "Couldn't update catalog metrics: " << status.GetDetail();
|
55
|
}
|
56
|
}
|
57
|
58
|
if ((*exec_state)->coord() != NULL) {
|
59
|
const unordered_set<TNetworkAddress>& unique_hosts =
|
60
|
(*exec_state)->schedule()->unique_hosts();
|
61
|
if (!unique_hosts.empty()) {
|
62
|
lock_guard<mutex> l(query_locations_lock_);
|
63
|
BOOST_FOREACH( const TNetworkAddress& port, unique_hosts) {
|
64
|
query_locations_[port].insert((*exec_state)->query_id());
|
65
|
}
|
66
|
}
|
67
|
}
|
68
|
return Status::OK();
|
69
|
}
|
Frontend::GetExecRequest()
(Frontend.h)
通过 JNI 接口调用 frontend.createExecRequest() 生成 TExecRequest。具体实现代码如下:
1
|
Status Frontend::GetExecRequest(
|
2
|
const TQueryCtx& query_ctx, TExecRequest* result) {
|
3
|
return JniUtil::CallJniMethod(fe_, create_exec_request_id_, query_ctx, result);
|
4
|
}
|
JniUtil::CallJniMethod()
(jni-util.h)
的具体实现代码如下:
1
|
/// Utility methods to avoid repeating lots of the JNI call boilerplate. It seems these
|
2
|
/// must be defined in the header to compile properly.
|
3
|
template <typename T>
|
4
|
static Status CallJniMethod( const jobject& obj, const jmethodID& method, const T& arg) {
|
5
|
JNIEnv* jni_env = getJNIEnv();
|
6
|
jbyteArray request_bytes;
|
7
|
JniLocalFrame jni_frame;
|
8
|
RETURN_IF_ERROR(jni_frame.push(jni_env));
|
9
|
RETURN_IF_ERROR(SerializeThriftMsg(jni_env, &arg, &request_bytes));
|
10
|
jni_env->CallObjectMethod(obj, method, request_bytes);
|
11
|
RETURN_ERROR_IF_EXC(jni_env);
|
12
|
return Status::OK();
|
13
|
}
|
至此,将通过 Thrift 转到 Java Frontend 生成执行计划树。
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
(Frontend.java)
是最重要的方法,它根据提供的 TQueryCtx 创建 TExecRequest。具体代码(分析部分)如下:
1
|
/**
|
2
|
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
|
3
|
*/
|
4
|
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
|
5
|
throws ImpalaException {
|
6
|
// Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
|
7
|
AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
|
8
|
EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
|
9
|
timeline.markEvent( "Analysis finished" );
|
10
|
.
|
11
|
.
|
12
|
.
|
13
|
.
|
14
|
}
|
首先通过调用 analyzeStmt()
(Frontend.java)
方法分析提交的 SQL 语句。analyzeStmt() 的具体实现代码如下:
1
|
/**
|
2
|
* Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
|
3
|
*/
|
4
|
private AnalysisContext.AnalysisResult analyzeStmt(TQueryCtx queryCtx)
|
5
|
throws AnalysisException, InternalException, AuthorizationException {
|
6
|
AnalysisContext analysisCtx = new AnalysisContext(dsqldCatalog_, queryCtx,
|
7
|
authzConfig_);
|
8
|
LOG.debug( "analyze query " + queryCtx.request.stmt);
|
9
|
10
|
// 循环分析直到出现以下某种情形:
|
11
|
// 1) 分析成功完成
|
12
|
// 2) 由于缺失表分析失败并抛出 AnalysisException 异常
|
13
|
// 3) 分析失败并抛出 AuthorizationException 异常
|
14
|
try {
|
15
|
while ( true ) {
|
16
|
try {
|
17
|
// 通过调用 AnalyzeContex.analyze() 实现具体的分析逻辑
|
18
|
analysisCtx.analyze(queryCtx.request.stmt);
|
19
|
Preconditions.checkState(analysisCtx.getAnalyzer().getMissingTbls().isEmpty());
|
20
|
return analysisCtx.getAnalysisResult();
|
21
|
} catch (AnalysisException e) {
|
22
|
Set<TableName> missingTbls = analysisCtx.getAnalyzer().getMissingTbls();
|
23
|
// Only re-throw the AnalysisException if there were no missing tables.
|
24
|
if (missingTbls.isEmpty()) throw e;
|
25
|
26
|
// Some tables/views were missing, request and wait for them to load.
|
27
|
if (!requestTblLoadAndWait(missingTbls, MISSING_TBL_LOAD_WAIT_TIMEOUT_MS)) {
|
28
|
LOG.info(String.format( "Missing tables were not received in %dms. Load " +
|
29
|
"request will be retried." , MISSING_TBL_LOAD_WAIT_TIMEOUT_MS));
|
30
|
}
|
31
|
}
|
32
|
}
|
33
|
} finally {
|
34
|
// Authorize all accesses.
|
35
|
// AuthorizationExceptions must take precedence over any AnalysisException
|
36
|
// that has been thrown, so perform the authorization first.
|
37
|
analysisCtx.getAnalyzer().authorize(getAuthzChecker());
|
38
|
}
|
39
|
}
|
AnalyzerContext.AnalyzeResult.Analyzer 对象是个存放这个 SQL 所涉及到的所有信息
(包含Table, conjunct, slot,slotRefMap, eqJoinConjuncts等)的知识库,所有跟这个
SQL 有关的东西都会存到 Analyzer对象里面。该类的定义可以查看
Analyzer.java
AnalyzerContex.analyze()
(AnalyzeContext.java)
的具体实现代码如下:
1
|
/**
|
2
|
* Parse and analyze 'stmt'. If 'stmt' is a nested query (i.e. query that
|
3
|
* contains subqueries), it is also rewritten by performing subquery unnesting.
|
4
|
* The transformed stmt is then re-analyzed in a new analysis context.
|
5
|
*/
|
6
|
public void analyze(String stmt) throws AnalysisException {
|
7
|
Analyzer analyzer = new Analyzer(catalog_, queryCtx_, authzConfig_);
|
8
|
analyze(stmt, analyzer);
|
9
|
}
|
上面的 analyze() 函数通过调用同名的重载函数 analyze(String stmt, Analyzer analyzer)
(AnalyzeContext.java)
实现具体的分析,代码如下:
1
|
/**
|
2
|
* Parse and analyze 'stmt' using a specified Analyzer.
|
3
|
*/
|
4
|
public void analyze(String stmt, Analyzer analyzer) throws AnalysisException {
|
5
|
SqlScanner input = new SqlScanner( new StringReader(stmt));
|
6
|
SqlParser parser = new SqlParser(input);
|
7
|
try {
|
8
|
analysisResult_ = new AnalysisResult();
|
9
|
analysisResult_.analyzer_ = analyzer;
|
10
|
if (analysisResult_.analyzer_ == null ) {
|
11
|
analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_);
|
12
|
}
|
13
|
analysisResult_.stmt_ = (StatementBase) parser.parse().value;
|
14
|
if (analysisResult_.stmt_ == null )
|
15
|
return ;
|
16
|
17
|
// For CTAS(Create Table As Select), we copy the create statement
|
18
|
// in case we have to create a new CTAS statement after a query rewrite.
|
19
|
if (analysisResult_.stmt_ instanceof CreateTableAsSelectStmt) {
|
20
|
analysisResult_.tmpCreateTableStmt_ =
|
21
|
((CreateTableAsSelectStmt) analysisResult_.stmt_).getCreateStmt().clone();
|
22
|
}
|
23
|
24
|
analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
|
25
|
boolean isExplain = analysisResult_.isExplainStmt();
|
26
|
27
|
// Check if we need to rewrite the statement.
|
28
|
if (analysisResult_.requiresRewrite()) {
|
29
|
StatementBase rewrittenStmt = StmtRewriter.rewrite(analysisResult_);
|
30
|
// Re-analyze the rewritten statement.
|
31
|
Preconditions.checkNotNull(rewrittenStmt);
|
32
|
analysisResult_ = new AnalysisResult();
|
33
|
analysisResult_.analyzer_ = new Analyzer(catalog_, queryCtx_, authzConfig_);
|
34
|
analysisResult_.stmt_ = rewrittenStmt;
|
35
|
analysisResult_.stmt_.analyze(analysisResult_.analyzer_);
|
36
|
LOG.trace( "rewrittenStmt: " + rewrittenStmt.toSql());
|
37
|
if (isExplain)
|
38
|
analysisResult_.stmt_.setIsExplain();
|
39
|
}
|
40
|
} catch (AnalysisException e) {
|
41
|
// Don't wrap AnalysisExceptions in another AnalysisException
|
42
|
throw e;
|
43
|
} catch (Exception e) {
|
44
|
throw new AnalysisException(parser.getErrorMsg(stmt), e);
|
45
|
}
|
46
|
}
|
上面的函数通过调用 SqlScanner 和 SqlParser 类实现具体的分析。可以查看
sql-scanner.flex
和
sql-parser.y
分析 SQL 语句的大概流程如下:
- 处理这个 SQL 所涉及到的 Table(即TableRefs),这些 Table 是在 from 从句中提取出来的(包含关键字
from, join, on/using)。注意 JOIN 操作以及 on/using 条件是存储在参与 JOIN 操作的右边的表的 TableRef
中并分析的。依次 analyze() 每个 TableRef,向 Analyzer 注册 registerBaseTableRef(填充TupleDescriptor)。
如果对应的 TableRef 涉及到 JOIN 操作,还要 analyzeJoin()。在 analyzeJoin() 时会向 Analyzer registerConjunct()
填充 Analyzer 的一些成员变量:conjuncts,tuplePredicates(TupleId 与 conjunct 的映射),slotPredicates(SlotId
与 conjunct 的映射),eqJoinConjuncts。 - 处理 select 从句(包含关键字 select, MAX(), AVG()等聚集函数):分析这个 SQL 都 select 了哪几项,每一项都是个
Expr 类型的子类对象,把这几项填入 resultExprs 数组和 colLabels。然后把 resultExprs 里面的 Expr 都递归 analyze
一下,要分析到树的最底层,向 Analyzer 注册 SlotRef 等。 - 分析 where 从句(关键字 where),首先递归 Analyze 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()
填充 Analyzer 的一些成员变量(同1,此外还要填充 whereClauseConjuncts) 。 - 处理 sort 相关信息(关键字 order by)。先是解析 aliases 和 ordinals,然后从 order by 后面的从句中提取 Expr 填入
orderingExprs,接着递归 Analyze 从句中 Expr 组成的树,最后创建 SortInfo 对象。 - 处理 aggregation 相关信息(关键字 group by, having, avg, max 等)。首先递归分析 group by 从句里的 Expr,然后如果有
having 从句就像 where 从句一样,先是 analyze having 从句中 Expr 组成的树,然后向 Analyzer registerConjunct()等。 - 处理 InlineView。
至此,词法分析和语法分析都完成了,回到 frontend.createExecRequest()
(Frontend.java)
函数,开始填充 TExecRequest 内的成员变量。代码如下(部分):
1
|
/**
|
2
|
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
|
3
|
*/
|
4
|
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
|
5
|
throws ImpalaException {
|
6
|
// Analyzes the SQL statement included in queryCtx and returns the AnalysisResult.
|
7
|
AnalysisContext.AnalysisResult analysisResult = analyzeStmt(queryCtx);
|
8
|
EventSequence timeline = analysisResult.getAnalyzer().getTimeline();
|
9
|
timeline.markEvent( "Analysis finished" );
|
10
|
|
11
|
// 开始填充 TExecRequest
|
12
|
Preconditions.checkNotNull(analysisResult.getStmt());
|
13
|
TExecRequest result = new TExecRequest();
|
14
|
result.setQuery_options(queryCtx.request.getQuery_options());
|
15
|
result.setAccess_events(analysisResult.getAccessEvents());
|
16
|
result.analysis_warnings = analysisResult.getAnalyzer().getWarnings();
|
17
|
18
|
if (analysisResult.isCatalogOp()) {
|
19
|
result.stmt_type = TStmtType.DDL;
|
20
|
createCatalogOpRequest(analysisResult, result);
|
21
|
String jsonLineageGraph = analysisResult.getJsonLineageGraph();
|
22
|
if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
|
23
|
result.catalog_op_request.setLineage_graph(jsonLineageGraph);
|
24
|
}
|
25
|
// All DDL operations except for CTAS are done with analysis at this point.
|
26
|
if (!analysisResult.isCreateTableAsSelectStmt()) return result;
|
27
|
} else if (analysisResult.isLoadDataStmt()) {
|
28
|
result.stmt_type = TStmtType.LOAD;
|
29
|
result.setResult_set_metadata( new TResultSetMetadata(Arrays.asList(
|
30
|
new TColumn( "summary" , Type.STRING.toThrift()))));
|
31
|
result.setLoad_data_request(analysisResult.getLoadDataStmt().toThrift());
|
32
|
return result;
|
33
|
} else if (analysisResult.isSetStmt()) {
|
34
|
result.stmt_type = TStmtType.SET;
|
35
|
result.setResult_set_metadata( new TResultSetMetadata(Arrays.asList(
|
36
|
new TColumn( "option" , Type.STRING.toThrift()),
|
37
|
new TColumn( "value" , Type.STRING.toThrift()))));
|
38
|
result.setSet_query_option_request(analysisResult.getSetStmt().toThrift());
|
39
|
return result;
|
40
|
}
|
41
|
.
|
42
|
.
|
43
|
.
|
44
|
.
|
45
|
|
46
|
}
|
如果是 DDL 命令(use, show tables, show databases, describe),那么调用 createCatalogOpRequest()。
如果是 Load Data 或者 Set 语句,就调用相应的 setmetadata 并转换为 Thrift。
执行计划生成
另外一种情况就是 Query 或者 DML 命令,那么就得创建和填充 TQueryExecRequest 了。该部分代码如下:
1
|
/**
|
2
|
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
|
3
|
*/
|
4
|
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
|
5
|
throws DsqlException {
|
6
|
.
|
7
|
.
|
8
|
.
|
9
|
.
|
10
|
.
|
11
|
// create TQueryExecRequest 如果是 Query、DML、或 CTAS 语句
|
12
|
Preconditions.checkState(analysisResult.isQueryStmt() || analysisResult.isDmlStmt()
|
13
|
|| analysisResult.isCreateTableAsSelectStmt());
|
14
|
15
|
TQueryExecRequest queryExecRequest = new TQueryExecRequest();
|
16
|
// create plan
|
17
|
LOG.debug( "create plan" );
|
18
|
Planner planner = new Planner(analysisResult, queryCtx);
|
19
|
// 根据 SQL 语法树生成执行计划(PlanNode 和 PlanFragment)
|
20
|
// 用 Planner 把 SQL 解析出的语法树转换成 Plan fragments,后者能在各个 backend 被执行。
|
21
|
ArrayList<PlanFragment> fragments = planner.createPlan();
|
22
|
23
|
List<ScanNode> scanNodes = Lists.newArrayList();
|
24
|
// 建立 queryExecRequest.fragments 中 fragment 到它索引的映射;
|
25
|
// queryExecRequest.dest_fragment_idx 需要这些映射
|
26
|
Map<PlanFragment, Integer> fragmentIdx = Maps.newHashMap();
|
27
|
28
|
for ( int fragmentId = 0 ; fragmentId < fragments.size(); ++fragmentId) {
|
29
|
PlanFragment fragment = fragments.get(fragmentId);
|
30
|
Preconditions.checkNotNull(fragment.getPlanRoot());
|
31
|
fragment.getPlanRoot().collect(Predicates.instanceOf(ScanNode. class ), scanNodes);
|
32
|
fragmentIdx.put(fragment, fragmentId);
|
33
|
}
|
34
|
.
|
35
|
.
|
36
|
.
|
37
|
.
|
38
|
.
|
39
|
}
|
上面的 createPlan() 函数是 frontend 最重要的函数:根据 SQL 解析的结果和 client 传入的 query options,
生成执行计划。执行计划是用 PlanFragment 的数组表示的,最后会序列化到 TQueryExecRequest.fragments
然后传给 backend 的 coordinator 去调度执行。现在让我们来看看 createPlan()
(Planner.java)
的具体实现:
1
|
/**
|
2
|
* Returns a list of plan fragments for executing an analyzed parse tree.
|
3
|
* May return a single-node or distributed executable plan.
|
4
|
*/
|
5
|
public ArrayList<PlanFragment> createPlan() throws ImpalaException {
|
6
|
SingleNodePlanner singleNodePlanner = new SingleNodePlanner(ctx_);
|
7
|
DistributedPlanner distributedPlanner = new DistributedPlanner(ctx_);
|
8
|
// 首先生成 SingleNodePlan,单节点执行计划树
|
9
|
PlanNode singleNodePlan = singleNodePlanner.createSingleNodePlan();
|
10
|
ctx_.getRootAnalyzer().getTimeline().markEvent( "Single node plan created" );
|
11
|
ArrayList<PlanFragment> fragments = null ;
|
12
|
13
|
// Determine the maximum number of rows processed by any node in the plan tree
|
14
|
MaxRowsProcessedVisitor visitor = new MaxRowsProcessedVisitor();
|
15
|
singleNodePlan.accept(visitor);
|
16
|
long maxRowsProcessed = visitor.get() == - 1 ? Long.MAX_VALUE : visitor.get();
|
17
|
boolean isSmallQuery =
|
18
|
maxRowsProcessed < ctx_.getQueryOptions().exec_single_node_rows_threshold;
|
19
|
if (isSmallQuery) {
|
20
|
// Execute on a single node and disable codegen for small results
|
21
|
ctx_.getQueryOptions().setNum_nodes( 1 );
|
22
|
ctx_.getQueryOptions().setDisable_codegen( true );
|
23
|
if (maxRowsProcessed < ctx_.getQueryOptions().batch_size ||
|
24
|
maxRowsProcessed < 1024 && ctx_.getQueryOptions().batch_size == 0 ) {
|
25
|
// Only one scanner thread for small queries
|
26
|
ctx_.getQueryOptions().setNum_scanner_threads( 1 );
|
27
|
}
|
28
|
}
|
29
|
30
|
if (ctx_.isSingleNodeExec()) { // 如果是单节点执行计划树
|
31
|
// 创建保护整个单点计划树的片段
|
32
|
fragments = Lists.newArrayList( new PlanFragment(
|
33
|
ctx_.getNextFragmentId(), singleNodePlan, DataPartition.UNPARTITIONED));
|
34
|
} else { // 分布式执行计划树
|
35
|
// create distributed plan
|
36
|
fragments = distributedPlanner.createPlanFragments(singleNodePlan);
|
37
|
}
|
38
|
// 最后一个 Fragment 是根 fragment
|
39
|
PlanFragment rootFragment = fragments.get(fragments.size() - 1 );
|
40
|
if (ctx_.isInsertOrCtas()) {
|
41
|
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
|
42
|
if (!ctx_.isSingleNodeExec()) {
|
43
|
// repartition on partition keys
|
44
|
rootFragment = distributedPlanner.createInsertFragment(
|
45
|
rootFragment, insertStmt, ctx_.getRootAnalyzer(), fragments);
|
46
|
}
|
47
|
// set up table sink for root fragment
|
48
|
rootFragment.setSink(insertStmt.createDataSink());
|
49
|
}
|
50
|
51
|
ColumnLineageGraph graph = ctx_.getRootAnalyzer().getColumnLineageGraph();
|
52
|
List<Expr> resultExprs = null ;
|
53
|
Table targetTable = null ;
|
54
|
if (ctx_.isInsertOrCtas()) {
|
55
|
InsertStmt insertStmt = ctx_.getAnalysisResult().getInsertStmt();
|
56
|
resultExprs = insertStmt.getResultExprs();
|
57
|
targetTable = insertStmt.getTargetTable();
|
58
|
graph.addTargetColumnLabels(targetTable);
|
59
|
} else {
|
60
|
resultExprs = ctx_.getQueryStmt().getResultExprs();
|
61
|
graph.addTargetColumnLabels(ctx_.getQueryStmt().getColLabels());
|
62
|
}
|
63
|
resultExprs = Expr.substituteList(resultExprs,
|
64
|
rootFragment.getPlanRoot().getOutputSmap(), ctx_.getRootAnalyzer(), true );
|
65
|
rootFragment.setOutputExprs(resultExprs);
|
66
|
LOG.debug( "desctbl: " + ctx_.getRootAnalyzer().getDescTbl().debugString());
|
67
|
LOG.debug( "resultexprs: " + Expr.debugString(rootFragment.getOutputExprs()));
|
68
|
LOG.debug( "finalize plan fragments" );
|
69
|
for (PlanFragment fragment: fragments) {
|
70
|
fragment.finalize(ctx_.getRootAnalyzer());
|
71
|
}
|
72
|
73
|
Collections.reverse(fragments);
|
74
|
ctx_.getRootAnalyzer().getTimeline().markEvent( "Distributed plan created" );
|
75
|
76
|
if (RuntimeEnv.INSTANCE.computeLineage() || RuntimeEnv.INSTANCE.isTestEnv()) {
|
77
|
// Compute the column lineage graph
|
78
|
if (ctx_.isInsertOrCtas()) {
|
79
|
Preconditions.checkNotNull(targetTable);
|
80
|
List<Expr> exprs = Lists.newArrayList();
|
81
|
if (targetTable instanceof HBaseTable) {
|
82
|
exprs.addAll(resultExprs);
|
83
|
} else {
|
84
|
exprs.addAll(ctx_.getAnalysisResult().getInsertStmt().getPartitionKeyExprs());
|
85
|
exprs.addAll(resultExprs.subList( 0 ,
|
86
|
targetTable.getNonClusteringColumns().size()));
|
87
|
}
|
88
|
graph.computeLineageGraph(exprs, ctx_.getRootAnalyzer());
|
89
|
} else {
|
90
|
graph.computeLineageGraph(resultExprs, ctx_.getRootAnalyzer());
|
91
|
}
|
92
|
LOG.trace( "lineage: " + graph.debugString());
|
93
|
ctx_.getRootAnalyzer().getTimeline().markEvent( "Lineage info computed" );
|
94
|
}
|
95
|
96
|
return fragments;
|
97
|
}
|
createPlan 包括createSingleNodePlan 和 createPlanFragments
两个主要部分。其中第一个是单节点计划树,所有片段只能在一个节点 corrd 上执行,第二个是分布式执行计划树,片段可以分配到不同的节点中运行。我们先来看看 SingleNodePlanner.createSingleNodePlan()
(SingleNodePlanner.java)
该方法根据 Planner Context 中分析的语法树创建单节点执行计划树并返回根节点。计划递归处理语法树并执行以下操作,自上而下处理查询语句:
- materialize the slots required for evaluating expressions of that statement
- migrate conjuncts from parent blocks into inline views and union operands In the bottom-up phase generate the plan tree for every query statement:
- perform join-order optimization when generating the plan of the FROM clause of a select statement; requires that all materialized slots are known for an accurate estimate of row sizes needed for cost-based join ordering
- assign conjuncts that can be evaluated at that node and compute the stats of that node (cardinality, etc.)
- apply combined expression substitution map of child plan nodes; if a plan node re-maps its input, set a substitution map to be applied by parents
具体代码如下:
1
|
/**
|
2
|
* Generates and returns the root of the single-node plan for the analyzed parse tree
|
3
|
* in the planner context.
|
4
|
*/
|
5
|
public PlanNode createSingleNodePlan() throws ImpalaException {
|
6
|
QueryStmt queryStmt = ctx_.getQueryStmt();
|
7
|
// Use the stmt's analyzer which is not necessarily the root analyzer
|
8
|
// to detect empty result sets.
|
9
|
Analyzer analyzer = queryStmt.getAnalyzer();
|
10
|
analyzer.computeEquivClasses();
|
11
|
analyzer.getTimeline().markEvent( "Equivalence classes computed" );
|
12
|
13
|
// Mark slots referenced by output exprs as materialized, prior to generating the
|
14
|
// plan tree.
|
15
|
// We need to mark the result exprs of the topmost select block as materialized, so
|
16
|
// that PlanNode.init() can compute the final mem layout of materialized tuples
|
17
|
// (the byte size of tuples is needed for cost computations).
|
18
|
// TODO: instead of materializing everything produced by the plan root, derive
|
19
|
// referenced slots from destination fragment and add a materialization node
|
20
|
// if not all output is needed by destination fragment
|
21
|
// TODO 2: should the materialization decision be cost-based?
|
22
|
if (queryStmt.getBaseTblResultExprs() != null ) {
|
23
|
analyzer.materializeSlots(queryStmt.getBaseTblResultExprs());
|
24
|
}
|
25
|
26
|
LOG.trace( "desctbl: " + analyzer.getDescTbl().debugString());
|
27
|
PlanNode singleNodePlan = createQueryPlan(queryStmt, analyzer,
|
28
|
ctx_.getQueryOptions().isDisable_outermost_topn());
|
29
|
Preconditions.checkNotNull(singleNodePlan);
|
30
|
return singleNodePlan;
|
31
|
}
|
上面的函数通过调用私有的 createQueryPlan()
(SingleNodePlanner.java)
函数实现。该函数为单节点执行创建计划树。为查询语句中的
Select/Project/Join/Union [All]/Group by/Having/Order by
生成 PlanNode。具体实现代码如下:
1
|
/**
|
2
|
* Create plan tree for single-node execution. Generates PlanNodes for the
|
3
|
* Select/Project/Join/Union [All]/Group by/Having/Order by clauses of the query stmt.
|
4
|
*/
|
5
|
private PlanNode createQueryPlan(QueryStmt stmt, Analyzer analyzer, boolean disableTopN)
|
6
|
throws ImpalaException {
|
7
|
// Analyzer 检测结果集是否为空,如果是的话直接返回空节点
|
8
|
if (analyzer.hasEmptyResultSet()) return createEmptyNode(stmt, analyzer);
|
9
|
10
|
PlanNode root;
|
11
|
if (stmt instanceof SelectStmt) { // 如果是 select 语句
|
12
|
SelectStmt selectStmt = (SelectStmt) stmt;
|
13
|
// 创建 SelectPlan
|
14
|
root = createSelectPlan(selectStmt, analyzer);
|
15
|
16
|
// insert possible AnalyticEvalNode before SortNode
|
17
|
if (((SelectStmt) stmt).getAnalyticInfo() != null ) {
|
18
|
AnalyticInfo analyticInfo = selectStmt.getAnalyticInfo();
|
19
|
ArrayList<TupleId> stmtTupleIds = Lists.newArrayList();
|
20
|
stmt.getMaterializedTupleIds(stmtTupleIds);
|
21
|
AnalyticPlanner analyticPlanner =
|
22
|
new AnalyticPlanner(stmtTupleIds, analyticInfo, analyzer, ctx_);
|
23
|
List<Expr> inputPartitionExprs = Lists.newArrayList();
|
24
|
AggregateInfo aggInfo = selectStmt.getAggInfo();
|
25
|
root = analyticPlanner.createSingleNodePlan(root,
|
26
|
aggInfo != null ? aggInfo.getGroupingExprs() : null , inputPartitionExprs);
|
27
|
if (aggInfo != null && !inputPartitionExprs.isEmpty()) {
|
28
|
// analytic computation will benefit from a partition on inputPartitionExprs
|
29
|
aggInfo.setPartitionExprs(inputPartitionExprs);
|
30
|
}
|
31
|
}
|
32
|
} else { // 否则,创建 UnionPlan
|
33
|
Preconditions.checkState(stmt instanceof UnionStmt);
|
34
|
root = createUnionPlan((UnionStmt) stmt, analyzer);
|
35
|
}
|
36
|
37
|
// 如果 sort 元组有没有物化的槽,避免添加 sort node,
|
38
|
boolean sortHasMaterializedSlots = false ;
|
39
|
if (stmt.evaluateOrderBy()) {
|
40
|
for (SlotDescriptor sortSlotDesc:
|
41
|
stmt.getSortInfo().getSortTupleDescriptor().getSlots()) {
|
42
|
if (sortSlotDesc.isMaterialized()) {
|
43
|
sortHasMaterializedSlots = true ;
|
44
|
break ;
|
45
|
}
|
46
|
}
|
47
|
}
|
48
|
49
|
if (stmt.evaluateOrderBy() && sortHasMaterializedSlots) {
|
50
|
long limit = stmt.getLimit();
|
51
|
// TODO: External sort could be used for very large limits
|
52
|
// not just unlimited order-by
|
53
|
boolean useTopN = stmt.hasLimit() && !disableTopN;
|
54
|
// 创建 sort node
|
55
|
root = new SortNode(ctx_.getNextNodeId(), root, stmt.getSortInfo(),
|
56
|
useTopN, stmt.getOffset());
|
57
|
Preconditions.checkState(root.hasValidStats());
|
58
|
root.setLimit(limit);
|
59
|
root.init(analyzer);
|
60
|
} else {
|
61
|
root.setLimit(stmt.getLimit());
|
62
|
root.computeStats(analyzer);
|
63
|
}
|
64
|
65
|
return root;
|
66
|
}
|
SingleNodePlanner.createSelectPlan()
(SingleNodePlanner.java)
函数创建实现 select 查询语句块中
Select/Project/Join/Group by/Having 等从句的 PlanNode 树。具体实现代码如下:
1
|
/**
|
2
|
* Create tree of PlanNodes that implements the Select/Project/Join/Group by/Having
|
3
|
* of the selectStmt query block.
|
4
|
*/
|
5
|
private PlanNode createSelectPlan(SelectStmt selectStmt, Analyzer analyzer)
|
6
|
throws ImpalaException {
|
7
|
// no from clause -> materialize the select's exprs with a UnionNode
|
8
|
// 如果 select 语句没有引用任何 table,创建 ConstantSelectPlan
|
9
|
if (selectStmt.getTableRefs().isEmpty()) {
|
10
|
return createConstantSelectPlan(selectStmt, analyzer);
|
11
|
}
|
12
|
13
|
// Slot materialization:
|
14
|
// We need to mark all slots as materialized that are needed during the execution
|
15
|
// of selectStmt, and we need to do that prior to creating plans for the TableRefs
|
16
|
// (because createTableRefNode() might end up calling computeMemLayout() on one or
|
17
|
// more TupleDescriptors, at which point all referenced slots need to be marked).
|
18
|
//
|
19
|
// For non-join predicates, slots are marked as follows:
|
20
|
// - for base table scan predicates, this is done directly by ScanNode.init(), which
|
21
|
// can do a better job because it doesn't need to materialize slots that are only
|
22
|
// referenced for partition pruning, for instance
|
23
|
// - for inline views, non-join predicates are pushed down, at which point the
|
24
|
// process repeats itself.
|
25
|
selectStmt.materializeRequiredSlots(analyzer);
|
26
|
27
|
ArrayList<TupleId> rowTuples = Lists.newArrayList();
|
28
|
// collect output tuples of subtrees
|
29
|
for (TableRef tblRef: selectStmt.getTableRefs()) {
|
30
|
rowTuples.addAll(tblRef.getMaterializedTupleIds());
|
31
|
}
|
32
|
33
|
// 如果 select 语句中的 select、project、join 部分返回空结果集
|
34
|
// 用空集创建满足 select 语句的 AggregationPlan
|
35
|
// Make sure the slots of the aggregation exprs and the tuples that they reference
|
36
|
// are materialized (see IMPALA-1960).
|
37
|
if (analyzer.hasEmptySpjResultSet()) {
|
38
|
PlanNode emptySetNode = new EmptySetNode(ctx_.getNextNodeId(), rowTuples);
|
39
|
emptySetNode.init(analyzer);
|
40
|
emptySetNode.setOutputSmap(selectStmt.getBaseTblSmap());
|
41
|
return createAggregationPlan(selectStmt, analyzer, emptySetNode);
|
42
|
}
|
43
|
44
|
// 为 table refs 创建 Plan;这里使用 list 而不是 map 是为了保证生成 join plan
|
45
|
// 时遍历 TableRefs 有一个确定的顺序
|
46
|
List<Pair<TableRef, PlanNode>> refPlans = Lists.newArrayList();
|
47
|
for (TableRef ref: selectStmt.getTableRefs()) {
|
48
|
PlanNode plan = createTableRefNode(analyzer, ref);
|
49
|
Preconditions.checkState(plan != null );
|
50
|
refPlans.add( new Pair(ref, plan));
|
51
|
}
|
52
|
// save state of conjunct assignment; needed for join plan generation
|
53
|
for (Pair<TableRef, PlanNode> entry: refPlans) {
|
54
|
entry.second.setAssignedConjuncts(analyzer.getAssignedConjuncts());
|
55
|
}
|
56
|
57
|
PlanNode root = null ;
|
58
|
// 如果有足够的统计数据,例如 join 操作各个 table 的大小,创建开销最小的 JoinPlan
|
59
|
if (!selectStmt.getSelectList().isStraightJoin()) {
|
60
|
Set<ExprId> assignedConjuncts = analyzer.getAssignedConjuncts();
|
61
|
root = createCheapestJoinPlan(analyzer, refPlans);
|
62
|
if (root == null ) analyzer.setAssignedConjuncts(assignedConjuncts);
|
63
|
}
|
64
|
// 否则,根据 from 从句中 table 顺序创建 JoinPlan
|
65
|
if (selectStmt.getSelectList().isStraightJoin() || root == null ) {
|
66
|
// we didn't have enough stats to do a cost-based join plan, or the STRAIGHT_JOIN
|
67
|
// keyword was in the select list: use the FROM clause order instead
|
68
|
root = createFromClauseJoinPlan(analyzer, refPlans);
|
69
|
Preconditions.checkNotNull(root);
|
70
|
}
|
71
|
72
|
// 如果有聚集操作,创建 AggregationPlan
|
73
|
if (selectStmt.getAggInfo() != null ) {
|
74
|
root = createAggregationPlan(selectStmt, analyzer, root);
|
75
|
}
|
76
|
77
|
// All the conjuncts_ should be assigned at this point.
|
78
|
// TODO: Re-enable this check here and/or elswehere.
|
79
|
//Preconditions.checkState(!analyzer.hasUnassignedConjuncts());
|
80
|
return root;
|
81
|
}
|
上面函数中调用的主要私有方法有:
createTableRefNode()、createCheapestJoinPlan()、 createFromClauseJoinPlan()、 createAggregationPlan(),各个函数的具体实现如下:
createTableRefNode()
1
|
/**
|
2
|
* Create a tree of PlanNodes for the given tblRef, which can be a BaseTableRef,
|
3
|
* CollectionTableRef or an InlineViewRef.
|
4
|
*/
|
5
|
private PlanNode createTableRefNode(Analyzer analyzer, TableRef tblRef)
|
6
|
throws ImpalaException {
|
7
|
if (tblRef instanceof BaseTableRef || tblRef instanceof CollectionTableRef) {
|
8
|
// 创建 ScanNode
|
9
|
return createScanNode(analyzer, tblRef);
|
10
|
} else if (tblRef instanceof InlineViewRef) {
|
11
|
// 创建 InlineViewPlan
|
12
|
return createInlineViewPlan(analyzer, (InlineViewRef) tblRef);
|
13
|
}
|
14
|
throw new InternalException(
|
15
|
"Unknown TableRef node: " + tblRef.getClass().getSimpleName());
|
16
|
}
|
createCheapestJoinPlan()
1
|
/**
|
2
|
* 返回物化 join refPlans 中所有 TblRefs 开销最小的 plan
|
3
|
* 假设 refPlans 中的顺序和查询中的原始顺序相同
|
4
|
* For this plan:
|
5
|
* - the plan is executable, ie, all non-cross joins have equi-join predicates
|
6
|
* - the leftmost scan is over the largest of the inputs for which we can still
|
7
|
* construct an executable plan(左边的是最大表)
|
8
|
* - all rhs's(right hand side?) are in decreasing order of selectiveness (percentage of rows they
|
9
|
* eliminate)
|
10
|
* - outer/cross/semi joins: rhs serialized size is < lhs serialized size;(右边的表比左边的小)
|
11
|
* enforced via join inversion, if necessary(否则通过 join 反转实现)
|
12
|
* Returns null if we can't create an executable plan.
|
13
|
*/
|
14
|
private PlanNode createCheapestJoinPlan(
|
15
|
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
|
16
|
throws ImpalaException {
|
17
|
LOG.trace( "createCheapestJoinPlan" );
|
18
|
if (refPlans.size() == 1 ) return refPlans.get( 0 ).second;
|
19
|
20
|
// collect eligible candidates for the leftmost input; list contains
|
21
|
// (plan, materialized size)
|
22
|
ArrayList<Pair<TableRef, Long>> candidates = Lists.newArrayList();
|
23
|
for (Pair<TableRef, PlanNode> entry: refPlans) {
|
24
|
TableRef ref = entry.first;
|
25
|
JoinOperator joinOp = ref.getJoinOp();
|
26
|
27
|
// The rhs table of an outer/semi join can appear as the left-most input if we
|
28
|
// invert the lhs/rhs and the join op. However, we may only consider this inversion
|
29
|
// for the very first join in refPlans, otherwise we could reorder tables/joins
|
30
|
// across outer/semi joins which is generally incorrect. The null-aware
|
31
|
// left anti-join operator is never considered for inversion because we can't
|
32
|
// execute the null-aware right anti-join efficiently.
|
33
|
// TODO: Allow the rhs of any cross join as the leftmost table. This needs careful
|
34
|
// consideration of the joinOps that result from such a re-ordering (IMPALA-1281).
|
35
|
if (((joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) &&
|
36
|
ref != refPlans.get( 1 ).first) || joinOp.isNullAwareLeftAntiJoin()) {
|
37
|
// ref cannot appear as the leftmost input
|
38
|
continue ;
|
39
|
}
|
40
|
41
|
PlanNode plan = entry.second;
|
42
|
if (plan.getCardinality() == - 1 ) {
|
43
|
// use 0 for the size to avoid it becoming the leftmost input
|
44
|
// TODO: Consider raw size of scanned partitions in the absence of stats.
|
45
|
candidates.add( new Pair(ref, new Long( 0 )));
|
46
|
LOG.trace( "candidate " + ref.getUniqueAlias() + ": 0" );
|
47
|
continue ;
|
48
|
}
|
49
|
Preconditions.checkNotNull(ref.getDesc());
|
50
|
long materializedSize =
|
51
|
( long ) Math.ceil(plan.getAvgRowSize() * ( double ) plan.getCardinality());
|
52
|
candidates.add( new Pair(ref, new Long(materializedSize)));
|
53
|
LOG.trace( "candidate " + ref.getUniqueAlias() + ": " + Long.toString(materializedSize));
|
54
|
}
|
55
|
if (candidates.isEmpty()) return null ;
|
56
|
57
|
// order candidates by descending materialized size; we want to minimize the memory
|
58
|
// consumption of the materialized hash tables required for the join sequence
|
59
|
Collections.sort(candidates,
|
60
|
new Comparator<Pair<TableRef, Long>>() {
|
61
|
public int compare(Pair<TableRef, Long> a, Pair<TableRef, Long> b) {
|
62
|
long diff = b.second - a.second;
|
63
|
return (diff < 0 ? - 1 : (diff > 0 ? 1 : 0 ));
|
64
|
}
|
65
|
});
|
66
|
67
|
// 根据已经按照大小排序的 table 创建 JoinPlan
|
68
|
for (Pair<TableRef, Long> candidate: candidates) {
|
69
|
PlanNode result = createJoinPlan(analyzer, candidate.first, refPlans);
|
70
|
if (result != null ) return result;
|
71
|
}
|
72
|
return null ;
|
73
|
}
|
createFromClauseJoinPlan()
1
|
/**
|
2
|
* 返回按照 from 语句顺序的 JoinPlan
|
3
|
*/
|
4
|
private PlanNode createFromClauseJoinPlan(
|
5
|
Analyzer analyzer, List<Pair<TableRef, PlanNode>> refPlans)
|
6
|
throws ImpalaException {
|
7
|
// create left-deep sequence of binary hash joins; assign node ids as we go along
|
8
|
Preconditions.checkState(!refPlans.isEmpty());
|
9
|
PlanNode root = refPlans.get( 0 ).second;
|
10
|
for ( int i = 1 ; i < refPlans.size(); ++i) {
|
11
|
TableRef innerRef = refPlans.get(i).first;
|
12
|
PlanNode innerPlan = refPlans.get(i).second;
|
13
|
root = createJoinNode(analyzer, root, innerPlan, null , innerRef);
|
14
|
root.setId(ctx_.getNextNodeId());
|
15
|
}
|
16
|
return root;
|
17
|
}
|
createAggregationPlan()
1
|
/**
|
2
|
* Returns a new AggregationNode that materializes the aggregation of the given stmt.
|
3
|
* Assigns conjuncts from the Having clause to the returned node.
|
4
|
*/
|
5
|
private PlanNode createAggregationPlan(SelectStmt selectStmt, Analyzer analyzer,
|
6
|
PlanNode root) throws InternalException {
|
7
|
Preconditions.checkState(selectStmt.getAggInfo() != null );
|
8
|
// add aggregation, if required
|
9
|
AggregateInfo aggInfo = selectStmt.getAggInfo();
|
10
|
root = new AggregationNode(ctx_.getNextNodeId(), root, aggInfo);
|
11
|
root.init(analyzer);
|
12
|
Preconditions.checkState(root.hasValidStats());
|
13
|
// if we're computing DISTINCT agg fns, the analyzer already created the
|
14
|
// 2nd phase agginfo
|
15
|
if (aggInfo.isDistinctAgg()) {
|
16
|
((AggregationNode)root).unsetNeedsFinalize();
|
17
|
// The output of the 1st phase agg is the 1st phase intermediate.
|
18
|
((AggregationNode)root).setIntermediateTuple();
|
19
|
root = new AggregationNode(ctx_.getNextNodeId(), root,
|
20
|
aggInfo.getSecondPhaseDistinctAggInfo());
|
21
|
root.init(analyzer);
|
22
|
Preconditions.checkState(root.hasValidStats());
|
23
|
}
|
24
|
// add Having clause
|
25
|
root.assignConjuncts(analyzer);
|
26
|
return root;
|
27
|
}
|
上面的 createCheapestJoinPlan() 和 createFromClauseJoinPlan()
方法调用了 createJoinNode() 和 createJoinPlan() 两个方法。它们的具体实现如下:
createJoinNode()
1
|
/**
|
2
|
* 创建 join outer 和 inner 的 node。两者其中之一可能是一个根据 table ref 创建的 plan
|
3
|
* 但不能同时都是 plan。对应的 outer/inner tableRef 不能为空
|
4
|
*/
|
5
|
private PlanNode createJoinNode(
|
6
|
Analyzer analyzer, PlanNode outer, PlanNode inner, TableRef outerRef,
|
7
|
TableRef innerRef) throws ImpalaException {
|
8
|
Preconditions.checkState(innerRef != null ^ outerRef != null );
|
9
|
TableRef tblRef = (innerRef != null ) ? innerRef : outerRef;
|
10
|
11
|
List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
|
12
|
List<Expr> eqJoinPredicates = Lists.newArrayList();
|
13
|
// get eq join predicates for the TableRefs' ids (not the PlanNodes' ids, which
|
14
|
// are materialized)
|
15
|
if (innerRef != null ) {
|
16
|
getHashLookupJoinConjuncts(
|
17
|
analyzer, outer.getTblRefIds(), innerRef, eqJoinConjuncts, eqJoinPredicates);
|
18
|
// Outer joins should only use On-clause predicates as eqJoinConjuncts.
|
19
|
if (!innerRef.getJoinOp().isOuterJoin()) {
|
20
|
analyzer.createEquivConjuncts(outer.getTblRefIds(), innerRef.getId(),
|
21
|
eqJoinConjuncts);
|
22
|
}
|
23
|
} else {
|
24
|
getHashLookupJoinConjuncts(
|
25
|
analyzer, inner.getTblRefIds(), outerRef, eqJoinConjuncts, eqJoinPredicates);
|
26
|
// Outer joins should only use On-clause predicates as eqJoinConjuncts.
|
27
|
if (!outerRef.getJoinOp().isOuterJoin()) {
|
28
|
analyzer.createEquivConjuncts(inner.getTblRefIds(), outerRef.getId(),
|
29
|
eqJoinConjuncts);
|
30
|
}
|
31
|
// Reverse the lhs/rhs of the join conjuncts.
|
32
|
for (BinaryPredicate eqJoinConjunct: eqJoinConjuncts) {
|
33
|
Expr swapTmp = eqJoinConjunct.getChild( 0 );
|
34
|
eqJoinConjunct.setChild( 0 , eqJoinConjunct.getChild( 1 ));
|
35
|
eqJoinConjunct.setChild( 1 , swapTmp);
|
36
|
}
|
37
|
}
|
38
|
39
|
// 处理隐含交叉 join
|
40
|
if (eqJoinConjuncts.isEmpty()) {
|
41
|
// Since our only implementation of semi and outer joins is hash-based, and we do
|
42
|
// not re-order semi and outer joins, we must have eqJoinConjuncts here to execute
|
43
|
// this query.
|
44
|
// TODO: Revisit when we add more semi/join implementations. Pick up and pass in
|
45
|
// the otherJoinConjuncts.
|
46
|
if (tblRef.getJoinOp().isOuterJoin() ||
|
47
|
tblRef.getJoinOp().isSemiJoin()) {
|
48
|
throw new NotImplementedException(
|
49
|
String.format( "%s join with '%s' without equi-join " +
|
50
|
"conjuncts is not supported." ,
|
51
|
tblRef.getJoinOp().isOuterJoin() ? "Outer" : "Semi" ,
|
52
|
innerRef.getUniqueAlias()));
|
53
|
}
|
54
|
CrossJoinNode result =
|
55
|
new CrossJoinNode(outer, inner, tblRef, Collections.<Expr>emptyList());
|
56
|
result.init(analyzer);
|
57
|
return result;
|
58
|
}
|
59
|
60
|
// 处理显式交叉 join
|
61
|
if (tblRef.getJoinOp() == JoinOperator.CROSS_JOIN) {
|
62
|
tblRef.setJoinOp(JoinOperator.INNER_JOIN);
|
63
|
}
|
64
|
65
|
analyzer.markConjunctsAssigned(eqJoinPredicates);
|
66
|
67
|
List<Expr> otherJoinConjuncts = Lists.newArrayList();
|
68
|
if (tblRef.getJoinOp().isOuterJoin()) { // 外连接
|
69
|
// Also assign conjuncts from On clause. All remaining unassigned conjuncts
|
70
|
// that can be evaluated by this join are assigned in createSelectPlan().
|
71
|
otherJoinConjuncts = analyzer.getUnassignedOjConjuncts(tblRef);
|
72
|
} else if (tblRef.getJoinOp().isSemiJoin()) { // 半连接
|
73
|
// Unassigned conjuncts bound by the invisible tuple id of a semi join must have
|
74
|
// come from the join's On-clause, and therefore, must be added to the other join
|
75
|
// conjuncts to produce correct results.
|
76
|
otherJoinConjuncts =
|
77
|
analyzer.getUnassignedConjuncts(tblRef.getAllTupleIds(), false );
|
78
|
if (tblRef.getJoinOp().isNullAwareLeftAntiJoin()) { // 对空值敏感的反连接
|
79
|
boolean hasNullMatchingEqOperator = false ;
|
80
|
// Keep only the null-matching eq conjunct in the eqJoinConjuncts and move
|
81
|
// all the others in otherJoinConjuncts. The BE relies on this
|
82
|
// separation for correct execution of the null-aware left anti join.
|
83
|
Iterator<BinaryPredicate> it = eqJoinConjuncts.iterator();
|
84
|
while (it.hasNext()) {
|
85
|
BinaryPredicate conjunct = it.next();
|
86
|
if (!conjunct.isNullMatchingEq()) {
|
87
|
otherJoinConjuncts.add(conjunct);
|
88
|
it.remove();
|
89
|
} else {
|
90
|
// Only one null-matching eq conjunct is allowed
|
91
|
Preconditions.checkState(!hasNullMatchingEqOperator);
|
92
|
hasNullMatchingEqOperator = true ;
|
93
|
}
|
94
|
}
|
95
|
Preconditions.checkState(hasNullMatchingEqOperator);
|
96
|
}
|
97
|
}
|
98
|
analyzer.markConjunctsAssigned(otherJoinConjuncts);
|
99
|
100
|
HashJoinNode result =
|
101
|
new HashJoinNode(outer, inner, tblRef, eqJoinConjuncts, otherJoinConjuncts);
|
102
|
result.init(analyzer);
|
103
|
return result;
|
104
|
}
|
createJoinPlan()
1
|
/**
|
2
|
* Returns a plan with leftmostRef's plan as its leftmost input; the joins
|
3
|
* are in decreasing order of selectiveness (percentage of rows they eliminate).
|
4
|
* The leftmostRef's join will be inverted if it is an outer/semi/cross join.
|
5
|
*/
|
6
|
private PlanNode createJoinPlan(
|
7
|
Analyzer analyzer, TableRef leftmostRef, List<Pair<TableRef, PlanNode>> refPlans)
|
8
|
throws ImpalaException {
|
9
|
10
|
LOG.trace( "createJoinPlan: " + leftmostRef.getUniqueAlias());
|
11
|
// 等待 join 的 tableref
|
12
|
List<Pair<TableRef, PlanNode>> remainingRefs = Lists.newArrayList();
|
13
|
PlanNode root = null ; // root of accumulated join plan
|
14
|
for (Pair<TableRef, PlanNode> entry: refPlans) {
|
15
|
if (entry.first == leftmostRef) {
|
16
|
root = entry.second;
|
17
|
} else {
|
18
|
remainingRefs.add(entry);
|
19
|
}
|
20
|
}
|
21
|
Preconditions.checkNotNull(root);
|
22
|
// 已经 join 的 refs;joinedRefs 和 remainingRefs 中 refs 的 union 就是所有 table refs
|
23
|
Set<TableRef> joinedRefs = Sets.newHashSet();
|
24
|
joinedRefs.add(leftmostRef);
|
25
|
26
|
// 如果最左边的 TblRef 是 outer/semi/cross join,反转
|
27
|
boolean planHasInvertedJoin = false ;
|
28
|
if (leftmostRef.getJoinOp().isOuterJoin()
|
29
|
|| leftmostRef.getJoinOp().isSemiJoin()
|
30
|
|| leftmostRef.getJoinOp().isCrossJoin()) {
|
31
|
// TODO: Revisit the interaction of join inversion here and the analysis state
|
32
|
// that is changed in analyzer.invertOuterJoin(). Changing the analysis state
|
33
|
// should not be necessary because the semantics of an inverted outer join do
|
34
|
// not change.
|
35
|
leftmostRef.invertJoin(refPlans, analyzer);
|
36
|
planHasInvertedJoin = true ;
|
37
|
}
|
38
|
39
|
long numOps = 0 ;
|
40
|
int i = 0 ;
|
41
|
while (!remainingRefs.isEmpty()) {
|
42
|
// Join 链中的每一步都最小化结果数目,从而最小化 hash table 查找
|
43
|
PlanNode newRoot = null ;
|
44
|
Pair<TableRef, PlanNode> minEntry = null ;
|
45
|
for (Pair<TableRef, PlanNode> entry: remainingRefs) {
|
46
|
TableRef ref = entry.first;
|
47
|
LOG.trace(Integer.toString(i) + " considering ref " + ref.getUniqueAlias());
|
48
|
49
|
// Determine whether we can or must consider this join at this point in the plan.
|
50
|
// Place outer/semi joins at a fixed position in the plan tree (IMPALA-860),
|
51
|
// s.t. all the tables appearing to the left/right of an outer/semi join in
|
52
|
// the original query still remain to the left/right after join ordering. This
|
53
|
// prevents join re-ordering across outer/semi joins which is generally wrong.
|
54
|
// The checks below relies on remainingRefs being in the order as they originally
|
55
|
// appeared in the query.
|
56
|
JoinOperator joinOp = ref.getJoinOp();
|
57
|
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
|
58
|
List<TupleId> currentTids = Lists.newArrayList(root.getTblRefIds());
|
59
|
currentTids.add(ref.getId());
|
60
|
// Place outer/semi joins at a fixed position in the plan tree. We know that
|
61
|
// the join resulting from 'ref' must become the new root if the current
|
62
|
// root materializes exactly those tuple ids corresponding to TableRefs
|
63
|
// appearing to the left of 'ref' in the original query.
|
64
|
List<TupleId> tableRefTupleIds = ref.getAllTupleIds();
|
65
|
if (!currentTids.containsAll(tableRefTupleIds) ||
|
66
|
!tableRefTupleIds.containsAll(currentTids)) {
|
67
|
// Do not consider the remaining table refs to prevent incorrect re-ordering
|
68
|
// of tables across outer/semi/anti joins.
|
69
|
break ;
|
70
|
}
|
71
|
} else if (ref.getJoinOp().isCrossJoin()) {
|
72
|
if (!joinedRefs.contains(ref.getLeftTblRef())) continue ;
|
73
|
}
|
74
|
75
|
PlanNode rhsPlan = entry.second;
|
76
|
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
|
77
|
78
|
boolean invertJoin = false ;
|
79
|
if (joinOp.isOuterJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
|
80
|
// Invert the join if doing so reduces the size of build-side hash table
|
81
|
// (may also reduce network costs depending on the join strategy).
|
82
|
// Only consider this optimization if both the lhs/rhs cardinalities are known.
|
83
|
// The null-aware left anti-join operator is never considered for inversion
|
84
|
// because we can't execute the null-aware right anti-join efficiently.
|
85
|
long lhsCard = root.getCardinality();
|
86
|
long rhsCard = rhsPlan.getCardinality();
|
87
|
if (lhsCard != - 1 && rhsCard != - 1 &&
|
88
|
lhsCard * root.getAvgRowSize() < rhsCard * rhsPlan.getAvgRowSize() &&
|
89
|
!joinOp.isNullAwareLeftAntiJoin()) {
|
90
|
invertJoin = true ;
|
91
|
}
|
92
|
}
|
93
|
PlanNode candidate = null ;
|
94
|
if (invertJoin) {
|
95
|
ref.setJoinOp(ref.getJoinOp().invert());
|
96
|
candidate = createJoinNode(analyzer, rhsPlan, root, ref, null );
|
97
|
planHasInvertedJoin = true ;
|
98
|
} else {
|
99
|
candidate = createJoinNode(analyzer, root, rhsPlan, null , ref);
|
100
|
}
|
101
|
if (candidate == null ) continue ;
|
102
|
LOG.trace( "cardinality=" + Long.toString(candidate.getCardinality()));
|
103
|
104
|
// Use 'candidate' as the new root; don't consider any other table refs at this
|
105
|
// position in the plan.
|
106
|
if (joinOp.isOuterJoin() || joinOp.isSemiJoin()) {
|
107
|
newRoot = candidate;
|
108
|
minEntry = entry;
|
109
|
break ;
|
110
|
}
|
111
|
112
|
// 优先选择 Hash Join 而不是 Cross Join, due to limited costing infrastructure
|
113
|
if (newRoot == null
|
114
|
|| (candidate.getClass().equals(newRoot.getClass())
|
115
|
&& candidate.getCardinality() < newRoot.getCardinality())
|
116
|
|| (candidate instanceof HashJoinNode && newRoot instanceof CrossJoinNode)) {
|
117
|
newRoot = candidate;
|
118
|
minEntry = entry;
|
119
|
}
|
120
|
}
|
121
|
if (newRoot == null ) {
|
122
|
// Currently, it should not be possible to invert a join for a plan that turns
|
123
|
// out to be non-executable because (1) the joins we consider for inversion are
|
124
|
// barriers in the join order, and (2) the caller of this function only considers
|
125
|
// other leftmost table refs if a plan turns out to be non-executable.
|
126
|
// TODO: This preconditions check will need to be changed to undo the in-place
|
127
|
// modifications made to table refs for join inversion, if the caller decides to
|
128
|
// explore more leftmost table refs.
|
129
|
Preconditions.checkState(!planHasInvertedJoin);
|
130
|
return null ;
|
131
|
}
|
132
|
133
|
// we need to insert every rhs row into the hash table and then look up
|
134
|
// every lhs row
|
135
|
long lhsCardinality = root.getCardinality();
|
136
|
long rhsCardinality = minEntry.second.getCardinality();
|
137
|
numOps += lhsCardinality + rhsCardinality;
|
138
|
LOG.debug(Integer.toString(i) + " chose " + minEntry.first.getUniqueAlias()
|
139
|
+ " #lhs=" + Long.toString(lhsCardinality)
|
140
|
+ " #rhs=" + Long.toString(rhsCardinality)
|
141
|
+ " #ops=" + Long.toString(numOps));
|
142
|
remainingRefs.remove(minEntry);
|
143
|
joinedRefs.add(minEntry.first);
|
144
|
root = newRoot;
|
145
|
// assign id_ after running through the possible choices in order to end up
|
146
|
// with a dense sequence of node ids
|
147
|
root.setId(ctx_.getNextNodeId());
|
148
|
analyzer.setAssignedConjuncts(root.getAssignedConjuncts());
|
149
|
++i;
|
150
|
}
|
151
|
152
|
return root;
|
153
|
}
|
至此我们已经大概介绍了 createSingleNodePlan 的过程。
现在让我们回到 createPlan() 函数,来看看创建分布式执行计划树,即 createPlanFrangments 过程。
DistributedPlanner.createPlanFragments()
(Planner.java)
方法为单点计划树生成多个片段。具体代码如下:
1
|
/**
|
2
|
* 根据一些执行选项为单点计划树创建多个片段
|
3
|
* 片段通过 list 返回,list 中位置 i 的片段只能使用片段 j 的输出(j > i)。
|
4
|
*
|
5
|
* TODO: 考虑计划片段中的数据分片; 尤其是要比 createQueryPlan() 更加注重协调
|
6
|
* 聚集操作中 hash partitioning 以及分析计算中的 hash partitioning。
|
7
|
* (只有在相同 select 块中进行聚集和分析计算时才会发生协调)
|
8
|
*/
|
9
|
public ArrayList<PlanFragment> createPlanFragments(
|
10
|
PlanNode singleNodePlan) throws ImpalaException {
|
11
|
Preconditions.checkState(!ctx_.isSingleNodeExec());
|
12
|
AnalysisContext.AnalysisResult analysisResult = ctx_.getAnalysisResult();
|
13
|
QueryStmt queryStmt = ctx_.getQueryStmt();
|
14
|
ArrayList<PlanFragment> fragments = Lists.newArrayList();
|
15
|
// 对于 insert 或 CTAS,除非有 limit 限制才保持根片段 partitioned
|
16
|
// 否则,合并所有为一个单独的 coordinator fragment 以便传回到客户端
|
17
|
boolean isPartitioned = false ;
|
18
|
if ((analysisResult.isInsertStmt() || analysisResult.isCreateTableAsSelectStmt())
|
19
|
&& !singleNodePlan.hasLimit()) {
|
20
|
Preconditions.checkState(!queryStmt.hasOffset());
|
21
|
isPartitioned = true ;
|
22
|
}
|
23
|
LOG.debug( "create plan fragments" );
|
24
|
long perNodeMemLimit = ctx_.getQueryOptions().mem_limit;
|
25
|
LOG.debug( "memlimit=" + Long.toString(perNodeMemLimit));
|
26
|
// 调用私有方法
|
27
|
createPlanFragments(singleNodePlan, isPartitioned, perNodeMemLimit, fragments);
|
28
|
return fragments;
|
29
|
}
|
上面的方法调用私有成员方法 DistributedPlanner.createPlanFragments()
DistributedPlanner.java
该方法返回生成 root 结果的 fragments。具体代码如下:
1
|
/**
|
2
|
* 返回生成 'root' 结果的 fragments; 递归创建所有 input fragments 到返回的 fragment
|
3
|
* 如果创建了一个新的 fragment,会被追加到 ‘fragments’,这样 fragment 就会在所有需要
|
4
|
* 它们的输出的 fragments 前面。
|
5
|
* 如果 'isPartitioned' 为否,,那么返回的 fragment 就是 unpartitioned;
|
6
|
* 否则就可能是 partitioned, 取决于它的输入是否 partitioned;
|
7
|
* the partition function is derived from the inputs.
|
8
|
*/
|
9
|
private PlanFragment createPlanFragments(
|
10
|
PlanNode root, boolean isPartitioned,
|
11
|
long perNodeMemLimit, ArrayList<PlanFragment> fragments)
|
12
|
throws InternalException, NotImplementedException {
|
13
|
ArrayList<PlanFragment> childFragments = Lists.newArrayList();
|
14
|
for (PlanNode child: root.getChildren()) {
|
15
|
// 允许子 fragments 是 partition 的,除非它们保护 limit 从句。
|
16
|
// (因为包含 limit 限制的结果集需要集中计算);
|
17
|
// 如果需要的话在后面合并
|
18
|
boolean childIsPartitioned = !child.hasLimit();
|
19
|
// 递归调用 createPlanFragments,将 child 创建的 PlanFragments 添加到 childFragments
|
20
|
childFragments.add(
|
21
|
createPlanFragments(
|
22
|
child, childIsPartitioned, perNodeMemLimit, fragments));
|
23
|
}
|
24
|
// 根据 root 的不同 Node 类型创建不同的 Fragment
|
25
|
PlanFragment result = null ;
|
26
|
if (root instanceof ScanNode) {
|
27
|
result = createScanFragment(root);
|
28
|
fragments.add(result);
|
29
|
} else if (root instanceof HashJoinNode) {
|
30
|
Preconditions.checkState(childFragments.size() == 2 );
|
31
|
result = createHashJoinFragment(
|
32
|
(HashJoinNode) root, childFragments.get( 1 ), childFragments.get( 0 ),
|
33
|
perNodeMemLimit, fragments);
|
34
|
} else if (root instanceof CrossJoinNode) {
|
35
|
Preconditions.checkState(childFragments.size() == 2 );
|
36
|
result = createCrossJoinFragment(
|
37
|
(CrossJoinNode) root, childFragments.get( 1 ), childFragments.get( 0 ),
|
38
|
perNodeMemLimit, fragments);
|
39
|
} else if (root instanceof SelectNode) {
|
40
|
result = createSelectNodeFragment((SelectNode) root, childFragments);
|
41
|
} else if (root instanceof UnionNode) {
|
42
|
result = createUnionNodeFragment((UnionNode) root, childFragments, fragments);
|
43
|
} else if (root instanceof AggregationNode) {
|
44
|
result = createAggregationFragment(
|
45
|
(AggregationNode) root, childFragments.get( 0 ), fragments);
|
46
|
} else if (root instanceof SortNode) {
|
47
|
if (((SortNode) root).isAnalyticSort()) {
|
48
|
// don't parallelize this like a regular SortNode
|
49
|
result = createAnalyticFragment(
|
50
|
(SortNode) root, childFragments.get( 0 ), fragments);
|
51
|
} else {
|
52
|
result = createOrderByFragment(
|
53
|
(SortNode) root, childFragments.get( 0 ), fragments);
|
54
|
}
|
55
|
} else if (root instanceof AnalyticEvalNode) {
|
56
|
result = createAnalyticFragment(root, childFragments.get( 0 ), fragments);
|
57
|
} else if (root instanceof EmptySetNode) {
|
58
|
result = new PlanFragment(
|
59
|
ctx_.getNextFragmentId(), root, DataPartition.UNPARTITIONED);
|
60
|
} else {
|
61
|
throw new InternalException(
|
62
|
"Cannot create plan fragment for this node type: " + root.getExplainString());
|
63
|
}
|
64
|
// move 'result' to end, it depends on all of its children
|
65
|
fragments.remove(result);
|
66
|
fragments.add(result);
|
67
|
// 如果已经分区,还需要创建 MergeFragment
|
68
|
if (!isPartitioned && result.isPartitioned()) {
|
69
|
result = createMergeFragment(result);
|
70
|
fragments.add(result);
|
71
|
}
|
72
|
73
|
return result;
|
74
|
}
|
上面的方法调用了大量的 create*Fragment() 私有成员方法。这些成员方法的具体实现可以查看源文件:
DistributedPlanner.java
这些成员方法都返回了 PlanFragment 实例,关于该类的具体实现可以查看源代码:
PlanFragment.java
至此,我们大概介绍了 createPlanFragments 的过程。
由于 createSingleNodePlan 和 createPlanFragments 两个 createPlan 最重要的部分都已经介绍了,
createPlan 也就介绍到这里。现在让我们回到 frontend.createExecRequest()
继续来看剩下的内容。frontend.createExecRequest() 其余代码如下:
1
|
/**
|
2
|
* Create a populated TExecRequest corresponding to the supplied TQueryCtx.
|
3
|
*/
|
4
|
public TExecRequest createExecRequest(TQueryCtx queryCtx, StringBuilder explainString)
|
5
|
throws ImpalaException {
|
6
|
.
|
7
|
.
|
8
|
.
|
9
|
.
|
10
|
.
|
11
|
12
|
// 设置 fragment 的目的地
|
13
|
for ( int i = 1 ; i < fragments.size(); ++i) {
|
14
|
PlanFragment dest = fragments.get(i).getDestFragment();
|
15
|
Integer idx = fragmentIdx.get(dest);
|
16
|
Preconditions.checkState(idx != null );
|
17
|
queryExecRequest.addToDest_fragment_idx(idx.intValue());
|
18
|
}
|
19
|
20
|
// 为 Scan node 设置 scan 范围/位置
|
21
|
// Also assemble list of tables names missing stats for assembling a warning message.
|
22
|
LOG.debug( "get scan range locations" );
|
23
|
Set<TTableName> tablesMissingStats = Sets.newTreeSet();
|
24
|
for (ScanNode scanNode: scanNodes) {
|
25
|
queryExecRequest.putToPer_node_scan_ranges(
|
26
|
scanNode.getId().asInt(),
|
27
|
scanNode.getScanRangeLocations());
|
28
|
if (scanNode.isTableMissingStats()) {
|
29
|
tablesMissingStats.add(scanNode.getTupleDesc().getTableName().toThrift());
|
30
|
}
|
31
|
}
|
32
|
// 设置主机列表
|
33
|
queryExecRequest.setHost_list(analysisResult.getAnalyzer().getHostIndex().getList());
|
34
|
for (TTableName tableName: tablesMissingStats) {
|
35
|
queryCtx.addToTables_missing_stats(tableName);
|
36
|
}
|
37
|
38
|
// Optionally disable spilling in the backend. Allow spilling if there are plan hints
|
39
|
// or if all tables have stats.
|
40
|
if (queryCtx.request.query_options.isDisable_unsafe_spills()
|
41
|
&& !tablesMissingStats.isEmpty()
|
42
|
&& !analysisResult.getAnalyzer().hasPlanHints()) {
|
43
|
queryCtx.setDisable_spilling( true );
|
44
|
}
|
45
|
46
|
// 计算资源需求,因为 scan node 的开销估计取决于这些
|
47
|
try {
|
48
|
planner.computeResourceReqs(fragments, true , queryExecRequest);
|
49
|
} catch (Exception e) {
|
50
|
// 将异常转换为警告,以便查询能继续执行
|
51
|
LOG.error( "Failed to compute resource requirements for query\n" +
|
52
|
queryCtx.request.getStmt(), e);
|
53
|
}
|
54
|
55
|
// 到了这里 fragment 所有信息都设置好了,序列化到 Thrift
|
56
|
for (PlanFragment fragment: fragments) {
|
57
|
TPlanFragment thriftFragment = fragment.toThrift();
|
58
|
queryExecRequest.addToFragments(thriftFragment);
|
59
|
}
|
60
|
61
|
// Use VERBOSE by default for all non-explain statements.
|
62
|
TExplainLevel explainLevel = TExplainLevel.VERBOSE;
|
63
|
// Use the query option for explain stmts and tests (e.g., planner tests).
|
64
|
if (analysisResult.isExplainStmt() || RuntimeEnv.INSTANCE.isTestEnv()) {
|
65
|
explainLevel = queryCtx.request.query_options.getExplain_level();
|
66
|
}
|
67
|
68
|
// Global query parameters to be set in each TPlanExecRequest.
|
69
|
queryExecRequest.setQuery_ctx(queryCtx);
|
70
|
71
|
explainString.append(
|
72
|
planner.getExplainString(fragments, queryExecRequest, explainLevel));
|
73
|
queryExecRequest.setQuery_plan(explainString.toString());
|
74
|
queryExecRequest.setDesc_tbl(analysisResult.getAnalyzer().getDescTbl().toThrift());
|
75
|
76
|
String jsonLineageGraph = analysisResult.getJsonLineageGraph();
|
77
|
if (jsonLineageGraph != null && !jsonLineageGraph.isEmpty()) {
|
78
|
queryExecRequest.setLineage_graph(jsonLineageGraph);
|
79
|
}
|
80
|
81
|
if (analysisResult.isExplainStmt()) {
|
82
|
// Return the EXPLAIN request
|
83
|
createExplainRequest(explainString.toString(), result);
|
84
|
return result;
|
85
|
}
|
86
|
87
|
result.setQuery_exec_request(queryExecRequest);
|
88
|
89
|
if (analysisResult.isQueryStmt()) {
|
90
|
// 填充元数据
|
91
|
LOG.debug( "create result set metadata" );
|
92
|
result.stmt_type = TStmtType.QUERY;
|
93
|
result.query_exec_request.stmt_type = result.stmt_type;
|
94
|
TResultSetMetadata metadata = new TResultSetMetadata();
|
95
|
QueryStmt queryStmt = analysisResult.getQueryStmt();
|
96
|
int colCnt = queryStmt.getColLabels().size();
|
97
|
for ( int i = 0 ; i < colCnt; ++i) {
|
98
|
TColumn colDesc = new TColumn();
|
99
|
colDesc.columnName = queryStmt.getColLabels().get(i);
|
100
|
colDesc.columnType = queryStmt.getResultExprs().get(i).getType().toThrift();
|
101
|
metadata.addToColumns(colDesc);
|
102
|
}
|
103
|
result.setResult_set_metadata(metadata);
|
104
|
} else {
|
105
|
Preconditions.checkState(analysisResult.isInsertStmt() ||
|
106
|
analysisResult.isCreateTableAsSelectStmt());
|
107
|
108
|
// For CTAS the overall TExecRequest statement type is DDL, but the
|
109
|
// query_exec_request should be DML
|
110
|
result.stmt_type =
|
111
|
analysisResult.isCreateTableAsSelectStmt() ? TStmtType.DDL : TStmtType.DML;
|
112
|
result.query_exec_request.stmt_type = TStmtType.DML;
|
113
|
114
|
// create finalization params of insert stmt
|
115
|
InsertStmt insertStmt = analysisResult.getInsertStmt();
|
116
|
if (insertStmt.getTargetTable() instanceof HdfsTable) {
|
117
|
TFinalizeParams finalizeParams = new TFinalizeParams();
|
118
|
finalizeParams.setIs_overwrite(insertStmt.isOverwrite());
|
119
|
finalizeParams.setTable_name(insertStmt.getTargetTableName().getTbl());
|
120
|
finalizeParams.setTable_id(insertStmt.getTargetTable().getId().asInt());
|
121
|
String db = insertStmt.getTargetTableName().getDb();
|
122
|
finalizeParams.setTable_db(db == null ? queryCtx.session.database : db);
|
123
|
HdfsTable hdfsTable = (HdfsTable) insertStmt.getTargetTable();
|
124
|
finalizeParams.setHdfs_base_dir(hdfsTable.getHdfsBaseDir());
|
125
|
finalizeParams.setStaging_dir(
|
126
|
hdfsTable.getHdfsBaseDir() + "/_impala_insert_staging" );
|
127
|
queryExecRequest.setFinalize_params(finalizeParams);
|
128
|
}
|
129
|
}
|
130
|
131
|
validateTableIds(analysisResult.getAnalyzer(), result);
|
132
|
133
|
timeline.markEvent( "Planning finished" );
|
134
|
result.setTimeline(analysisResult.getAnalyzer().getTimeline().toThrift());
|
135
|
return result;
|
136
|
}
|
至此,FE 结束,返回 TExecRequest 型的对象给 backend 执行。
由于笔者刚开始接触 Impala,分析可能存在某些谬误,有任何疑问或建议都欢迎讨论。
转载于:https://www.cnblogs.com/qiumingcheng/p/6201264.html
Impala 源码分析-FE相关推荐
- Redis源码分析:基础概念介绍与启动概述
Redis源码分析 基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容.仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是<Redis设计与 ...
- NEO从源码分析看NEOVM
2019独角兽企业重金招聘Python工程师标准>>> 0x00 前言 这篇文章是为下一篇<NEO从源码分析看UTXO转账交易>打前站,为交易的构造及执行的一些技术基础做 ...
- 【作者面对面问答】包邮送《Redis 5设计与源码分析》5本
墨墨导读:本文节选自<Redis 5设计与源码分析>,主要为读者分析Redis高性能内幕,重点从源码层次讲解了Redis事件模型,网络IO事件重在使用IO复用模型,时间事件重在限制最大执行 ...
- redis源码分析 -- cs结构之服务器
服务器与客户端是如何交互的 redis客户端向服务器发送命令请求,服务器接收到客户端发送的命令请求之后,读取解析命令,并执行命令,同时将命令执行结果返回给客户端. 客户端与服务器交互的代码流程如下图所 ...
- 【Golang源码分析】Go Web常用程序包gorilla/mux的使用与源码简析
目录[阅读时间:约10分钟] 一.概述 二.对比: gorilla/mux与net/http DefaultServeMux 三.简单使用 四.源码简析 1.NewRouter函数 2.HandleF ...
- SpringBoot-web开发(四): SpringMVC的拓展、接管(源码分析)
[SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) SpringBoot-web开发(二): 页面和图标定制(源码分析) SpringBo ...
- SpringBoot-web开发(二): 页面和图标定制(源码分析)
[SpringBoot-web系列]前文: SpringBoot-web开发(一): 静态资源的导入(源码分析) 目录 一.首页 1. 源码分析 2. 访问首页测试 二.动态页面 1. 动态资源目录t ...
- SpringBoot-web开发(一): 静态资源的导入(源码分析)
目录 方式一:通过WebJars 1. 什么是webjars? 2. webjars的使用 3. webjars结构 4. 解析源码 5. 测试访问 方式二:放入静态资源目录 1. 源码分析 2. 测 ...
- Yolov3Yolov4网络结构与源码分析
Yolov3&Yolov4网络结构与源码分析 从2018年Yolov3年提出的两年后,在原作者声名放弃更新Yolo算法后,俄罗斯的Alexey大神扛起了Yolov4的大旗. 文章目录 论文汇总 ...
最新文章
- 【每日一算法】二叉树的最小深度
- 智源论坛报名 | 自然语言处理
- 基于Linux命令行KVM虚拟机的安装配置与基本使用
- python 使用 sorted 对 列表嵌套元组的数据进行排序
- POJ - 2778 DNA Sequence(AC自动机+矩阵快速幂)
- linux c进程和线程脑图,进程和线程
- jersey客户端_项目学生:带有Jersey的Web服务客户端
- 惯用并发:flatMap()与parallel()– RxJava常见问题解答
- Spring DI模式 小样例
- 华为王成录:鸿蒙 OS 不是安卓、iOS 的拷贝;拼多多回应「删除手机照片」事件 | 极客头条...
- C++ std::unordered_map怎么用
- 前端基础—HTML制作课程表
- oracle中date错误,ORA-01830: date format picture ends before converting entire input string
- 软件的安全性应从哪几个方面去测试?
- python初学入门操作
- 在校生学习云计算HCIE难吗?好就业吗?
- 电子DIY:用单片机设计一款USB游戏手柄
- JS中的CommonJS和AMD
- AWS之Glue使用方法
- (软件)商标名称可用查询 申请
热门文章
- java打开android_解决android studio 打开java文件 内容全变了的问题
- 3Y叔的clusterProfiler-book阅读Chapter 3 Universal enrichment analysis
- python设置环境变量_小白Python进行中
- 2022年考研计算机组成原理_7 输入输出系统
- python 正则表达式集合-抄的
- vue.js 事件的案例以及 v-model 的学习
- mysql关于时间的面试题_关于面试中的mysql试题1
- java hough_java – 如何实现Hough变换?
- kafka从头消费信息
- 一文讲清模拟信号、自然信号、数字信号、模拟输入输出