Executor--执行可优化语句】

可优化语句 被查询编译器处理后都会生成査询计划树,这一类语句由执行器(Executor)处理。该模块对外提供了三个接口: ExecutorStart、ExecutorRun 和 ExecutorEnd,其输入是包含査询计划树的数据结构QueryDesc,输出则是相关执行信息或结果数据。如果希望执行某个计划树,仅需构造包含此计划树的QueryDesc,并依次调用ExecutorStart、ExecutorRun、ExecutorEnd 3个过程即能完成相应的处理过程。

/* ----------------------------------------------------------------*        ExecutorStart**        This routine must be called at the beginning of any execution of any*        query plan** Takes a QueryDesc previously created by CreateQueryDesc (which is separate* only because some places use QueryDescs for utility commands).  The tupDesc* field of the QueryDesc is filled in to describe the tuples that will be* returned, and the internal fields (estate and planstate) are set up.** eflags contains flag bits as described in executor.h.** NB: the CurrentMemoryContext when this is called will become the parent* of the per-query context used for this Executor invocation.** We provide a function hook variable that lets loadable plugins* get control when ExecutorStart is called.  Such a plugin would* normally call standard_ExecutorStart().** ----------------------------------------------------------------*/
void
ExecutorStart(QueryDesc *queryDesc, int eflags)
{if (ExecutorStart_hook)(*ExecutorStart_hook) (queryDesc, eflags);elsestandard_ExecutorStart(queryDesc, eflags);
}

/* ----------------------------------------------------------------*        ExecutorRun**        This is the main routine of the executor module. It accepts*        the query descriptor from the traffic cop and executes the*        query plan.**        ExecutorStart must have been called already.**        If direction is NoMovementScanDirection then nothing is done*        except to start up/shut down the destination.  Otherwise,*        we retrieve up to 'count' tuples in the specified direction.**        Note: count = 0 is interpreted as no portal limit, i.e., run to*        completion.  Also note that the count limit is only applied to*        retrieved tuples, not for instance to those inserted/updated/deleted*        by a ModifyTable plan node.**        There is no return value, but output tuples (if any) are sent to*        the destination receiver specified in the QueryDesc; and the number*        of tuples processed at the top level can be found in*        estate->es_processed.**        We provide a function hook variable that lets loadable plugins*        get control when ExecutorRun is called.  Such a plugin would*        normally call standard_ExecutorRun().** ----------------------------------------------------------------*/
void
ExecutorRun(QueryDesc *queryDesc,ScanDirection direction, uint64 count,bool execute_once)
{if (ExecutorRun_hook)(*ExecutorRun_hook) (queryDesc, direction, count, execute_once);elsestandard_ExecutorRun(queryDesc, direction, count, execute_once);
}
/* ----------------------------------------------------------------*        ExecutorEnd**        This routine must be called at the end of execution of any*        query plan**        We provide a function hook variable that lets loadable plugins*        get control when ExecutorEnd is called.  Such a plugin would*        normally call standard_ExecutorEnd().** ----------------------------------------------------------------*/
void
ExecutorEnd(QueryDesc *queryDesc)
{if (ExecutorEnd_hook)(*ExecutorEnd_hook) (queryDesc);elsestandard_ExecutorEnd(queryDesc);
}

执行计划的节点关系示例:

gather

^

|

limit
          ^
          |
         sort
          ^
          |
         join
          ^^
         /  \
     join   scan
     ^^
    /  \
scan    scan

上层函数通过ExecInitNode、ExecProcNode、ExecEndNode二个接口函数来统一对节点进行初始化、执行和清理,这三个函数会根据所处理节点的实际类型调用相应的初始化、执行、清理函数。

执行器初始化时,ExecutorStart会根据査询计划树构造执行器全局状态(estate)以及计划节点执行状态(planstate)。在査询计划树的执行过程中,执行器将使用planstate来记录计划节点执行状态和数据,并使用全局状态记录中的es_tupleTable字段在节点间传递结果元组。执行器的清理函数ExecutorEnd将回收执行器全局状态和计划节点执行状态。

PostgreSQL为每种计划节点定义了一种状态节点。所有的状态节点均继承于PlanState节点,其中包含辅助计划节点指针(Plan)、执行器全局状态结构指针(state)、投影运算相关信息(targetlist)、选择运算相关条件(qual),以及左右子状态节点指针(lefttree、righttree)。

提供了三个接口函数用于调用执行器,分别为ExecutorStart、ExecutorRun和ExecutorEnd。当需要使用执行器来处理査询计划时,仅需依次调用三个函数即可完成执行器的整个执行过程。

ExecutorStart =》standard_ExecutorStart =》InitPlan =》ExecInitNode

【ExecInitNode】

/* ------------------------------------------------------------------------*        ExecInitNode**        Recursively initializes all the nodes in the plan tree rooted*        at 'node'.**        Inputs:*          'node' is the current node of the plan produced by the query planner*          'estate' is the shared execution state for the plan tree*          'eflags' is a bitwise OR of flag bits described in executor.h**        Returns a PlanState node corresponding to the given Plan node.* ------------------------------------------------------------------------*/
PlanState *
ExecInitNode(Plan *node, EState *estate, int eflags)
{
......#ifdef PGXCcase T_RemoteQuery:result = (PlanState *) ExecInitRemoteQuery((RemoteQuery *) node,estate, eflags);break;
#endif
#ifdef XCPcase T_RemoteSubplan:result = (PlanState *) ExecInitRemoteSubplan((RemoteSubplan *) node,estate, eflags);
......
}

主要是通过ExecInitRemoteQuery 和  ExecInitRemoteSubplan 两个函数来完成分布式节点的 query和subplan .

\src\backend\executor\execProcNode.c

/*****************************************************************************
 *
 * Simplified versions of ExecInitRemoteQuery, ExecRemoteQuery and
 * ExecEndRemoteQuery: in XCP they are only used to execute simple queries.
 *
 *****************************************************************************/
RemoteQueryState *
ExecInitRemoteQuery(RemoteQuery *node, EState *estate, int eflags)
{
    RemoteQueryState   *remotestate;
    ResponseCombiner   *combiner;

remotestate = makeNode(RemoteQueryState);
    remotestate->eflags = eflags;
    combiner = (ResponseCombiner *) remotestate;
    InitResponseCombiner(combiner, 0, node->combine_type);
    combiner->ss.ps.plan = (Plan *) node;
    combiner->ss.ps.state = estate;
    combiner->ss.ps.ExecProcNode = ExecRemoteQuery;

combiner->ss.ps.qual = NULL;

combiner->request_type = REQUEST_TYPE_QUERY;

......

}

/** Execute step of PGXC plan.* The step specifies a command to be executed on specified nodes.* On first invocation connections to the data nodes are initialized and* command is executed. Further, as well as within subsequent invocations,* responses are received until step is completed or there is a tuple to emit.* If there is a tuple it is returned, otherwise returned NULL. The NULL result* from the function indicates completed step.* The function returns at most one tuple per invocation.*/
TupleTableSlot *
ExecRemoteQuery(PlanState *pstate)

 调用 pgxc_node_receive  读取分布式节点数据,启用linux 内核 poll 网络通讯模型,进行网络数据接收和发送,

/*
 * Wait while at least one of specified connections has data available and read
 * the data into the buffer

 *
 * Returning state code
 *         DNStatus_OK      = 0,
 *        DNStatus_ERR     = 1,
 *        DNStatus_EXPIRED = 2,
 *        DNStatus_BUTTY
 */
#ifdef __TBASE__
int
pgxc_node_receive(const int conn_count,
                  PGXCNodeHandle ** connections, struct timeval * timeout)

#else
bool
pgxc_node_receive(const int conn_count,
                  PGXCNodeHandle ** connections, struct timeval * timeout)
#endif

{// #lizard forgives
#ifndef __TBASE__
#define ERROR_OCCURED        true
#define NO_ERROR_OCCURED    false
#endif

 int        i,
       sockets_to_poll,
        poll_val;
    bool    is_msg_buffered;
    long     timeout_ms;
    struct    pollfd pool_fd[conn_count];

    /* sockets to be polled index */
    sockets_to_poll = 0;

    is_msg_buffered = false;
    for (i = 0; i < conn_count; i++)
    {
        /* If connection has a buffered message */
        if (HAS_MESSAGE_BUFFERED(connections[i]))
        {
            is_msg_buffered = true;
            break;
        }
    }

......

    /* read data */
    for (i = 0; i < conn_count; i++)
    {
       
PGXCNodeHandle *conn = connections[i];

if( pool_fd[i].fd == -1 )
            continue;

if ( pool_fd[i].fd == conn->sock )
        {
            if( pool_fd[i].revents & POLLIN )
            {
                int    read_status = pgxc_node_read_data(conn, true);
                if ( read_status == EOF || read_status < 0 )
                {
               
    /* Can not read - no more actions, just discard connection */
                    PGXCNodeSetConnectionState(conn,
                            DN_CONNECTION_STATE_ERROR_FATAL);
                    add_error_message(conn, "unexpected EOF on datanode connection.");
                    elog(LOG, "unexpected EOF on node:%s pid:%d, read_status:%d, EOF:%d", conn->nodename, conn->backend_pid, read_status, EOF);

#ifdef __TBASE__
                    return DNStatus_ERR;            
#else
                    return ERROR_OCCURED;
#endif
                }

}
            else if (
                    (pool_fd[i].revents & POLLERR) ||
                    (pool_fd[i].revents & POLLHUP) ||
                    (pool_fd[i].revents & POLLNVAL)
                    )
            {

  PGXCNodeSetConnectionState(connections[i],
                        DN_CONNECTION_STATE_ERROR_FATAL);
                add_error_message(conn, "unexpected network error on datanode connection");
                elog(LOG, "unexpected EOF on datanode:%s pid:%d with event %d", conn->nodename, conn->backend_pid, pool_fd[i].revents);
                /* Should we check/read from the other connections before returning? */
#ifdef __TBASE__
                return DNStatus_ERR;            
#else
                return ERROR_OCCURED;
#endif
            }
        }
    }
#ifdef __TBASE__
    return DNStatus_OK;            
#else
    return NO_ERROR_OCCURED;
#endif
}

pgxc_node_read_data 解析接收的网络数据,

/*
 * Read up incoming messages from the PGXC node connection
 */
int
pgxc_node_read_data(PGXCNodeHandle *conn, bool close_if_error)

RemoteSubplanState *
ExecInitRemoteSubplan(RemoteSubplan *node, EState *estate, int eflags)
{// #lizard forgives
    RemoteStmt            rstmt;
    RemoteSubplanState *remotestate;
    ResponseCombiner   *combiner;
    CombineType            combineType;
    struct rusage        start_r;
    struct timeval        start_t;
#ifdef _MIGRATE_
    Oid                 groupid = InvalidOid;
    Oid                    reloid  = InvalidOid;
    ListCell *table;
#endif

if (log_remotesubplan_stats)
        ResetUsageCommon(&start_r, &start_t);

#ifdef __AUDIT__
    memset((void *)(&rstmt), 0, sizeof(RemoteStmt));
#endif

remotestate = makeNode(RemoteSubplanState);
    combiner = (ResponseCombiner *) remotestate;
    /*
     * We do not need to combine row counts if we will receive intermediate
     * results or if we won't return row count.
     */

    if (IS_PGXC_DATANODE || estate->es_plannedstmt->commandType == CMD_SELECT)
    {
        combineType = COMBINE_TYPE_NONE;
        remotestate->execOnAll = node->execOnAll;
    }
    else
    {
        if (node->execOnAll)
            combineType = COMBINE_TYPE_SUM;
        else
            combineType = COMBINE_TYPE_SAME;
        /*
         * If we are updating replicated table we should run plan on all nodes.
         * We are choosing single node only to read
         */

        remotestate->execOnAll = true;
    }
    remotestate->execNodes = list_copy(node->nodeList);
    InitResponseCombiner(combiner, 0, combineType);
    combiner->ss.ps.plan = (Plan *) node;
    combiner->ss.ps.state = estate;
    combiner->ss.ps.ExecProcNode = ExecRemoteSubplan;

......

}

pgxc_node_send_execute 发送执行消息,推送Datanode 数据节点,

/*
 * Send EXECUTE message down to the Datanode
 */
int
pgxc_node_send_execute(PGXCNodeHandle * handle, const char *portal, int fetch)

{
    /* portal name size (allow NULL) */
    int            pnameLen = portal ? strlen(portal) + 1 : 1;

/* size + pnameLen + fetchLen */
    int            msgLen = 4 + pnameLen + 4;

/* msgType + msgLen */
    if (ensure_out_buffer_capacity(handle->outEnd + 1 + msgLen, handle) != 0)
    {
        add_error_message(handle, "out of memory");
        return EOF;
    }

handle->outBuffer[handle->outEnd++] = 'E';
    /* size */
    msgLen = htonl(msgLen);
    memcpy(handle->outBuffer + handle->outEnd, &msgLen, 4);
    handle->outEnd += 4;
    /* portal name */
    if (portal)
    {
        memcpy(handle->outBuffer + handle->outEnd, portal, pnameLen);
        handle->outEnd += pnameLen;
    }
    else
        handle->outBuffer[handle->outEnd++] = '\0';

/* fetch */
    fetch = htonl(fetch);
    memcpy(handle->outBuffer + handle->outEnd, &fetch, 4);
    handle->outEnd += 4;

PGXCNodeSetConnectionState(handle, DN_CONNECTION_STATE_QUERY);

handle->in_extended_query = true;
    return 0;
}

TupleTableSlot *
ExecRemoteSubplan(PlanState *pstate)
{// #lizard forgives
    RemoteSubplanState *node = castNode(RemoteSubplanState, pstate);
    ResponseCombiner *combiner = (ResponseCombiner *) node;
    RemoteSubplan  *plan = (RemoteSubplan *) combiner->ss.ps.plan;
    EState           *estate = combiner->ss.ps.state;
    TupleTableSlot *resultslot = combiner->ss.ps.ps_ResultTupleSlot;
    struct rusage    start_r;
    struct timeval        start_t;
#ifdef __TBASE__
    int count = 0;
#endif
#ifdef __TBASE__
    if ((node->eflags & EXEC_FLAG_EXPLAIN_ONLY) != 0)
        return NULL;
    
    if (!node->local_exec && (!node->finish_init) && (!(node->eflags & EXEC_FLAG_SUBPLAN)))
    {
        if(node->execNodes)
        {
            ExecFinishInitRemoteSubplan(node);
        }
        else
        {
            return NULL;
        }
    }
#endif
    /* 
     * We allow combiner->conn_count == 0 after node initialization
     * if we figured out that current node won't receive any result
     * because of distributionRestrict is set by planner.
     * But we should distinguish this case from others, when conn_count is 0.
     * That is possible if local execution is chosen or data are buffered 
     * at the coordinator or data are exhausted and node was reset.
     * in last two cases connections are saved to cursor_connections and we
     * can check their presence.  
     */

    if (!node->local_exec && combiner->conn_count == 0 && 
            combiner->cursor_count == 0)
        return NULL;

if (log_remotesubplan_stats)
        ResetUsageCommon(&start_r, &start_t);

primary_mode_phase_two:

if (!node->bound)
    {
        int fetch = 0;
        int paramlen = 0;
        int epqctxlen = 0;
        char *paramdata = NULL;
        char *epqctxdata = NULL;
        
        /*
         * Conditions when we want to execute query on the primary node first:
         * Coordinator running replicated ModifyTable on multiple nodes
         */

        bool primary_mode = combiner->probing_primary ||
                (IS_PGXC_COORDINATOR &&
                 combiner->combine_type == COMBINE_TYPE_SAME &&
                 OidIsValid(primary_data_node) &&
                 combiner->conn_count > 1 && !g_UseDataPump);
        char cursor[NAMEDATALEN];

if (plan->cursor)
        {
            fetch = PGXLRemoteFetchSize;
            if (plan->unique)
                snprintf(cursor, NAMEDATALEN, "%s_"INT64_FORMAT, plan->cursor, plan->unique);
            else
                strncpy(cursor, plan->cursor, NAMEDATALEN);
        }
        else
            cursor[0] = '\0';

......

  /*
         * The subplan being rescanned, need to restore connections and
         * re-bind the portal
         */

        if (combiner->cursor)
        {
            int i;

  /*
             * On second phase of primary mode connections are properly set,
             * so do not copy.
             */

            if (!combiner->probing_primary)
            {
                combiner->conn_count = combiner->cursor_count;
                memcpy(combiner->connections, combiner->cursor_connections,
                            combiner->cursor_count * sizeof(PGXCNodeHandle *));
            }

for (i = 0; i < combiner->conn_count; i++)
            {
                PGXCNodeHandle *conn = combiner->connections[i];

CHECK_OWNERSHIP(conn, combiner);

/* close previous cursor only on phase 1 */
                if (!primary_mode || !combiner->probing_primary)
                    pgxc_node_send_close(conn, false, combiner->cursor);

/*
                 * If we now should probe primary, skip execution on non-primary
                 * nodes
                 */

                if (primary_mode && !combiner->probing_primary &&
                        conn->nodeoid != primary_data_node)
                    continue;

/* rebind */
                pgxc_node_send_bind(conn, combiner->cursor, combiner->cursor,
                                    paramlen, paramdata, epqctxlen, epqctxdata);

                if (enable_statistic)
                {
                    elog(LOG, "Bind Message:pid:%d,remote_pid:%d,remote_ip:%s,remote_port:%d,fd:%d,cursor:%s",
                              MyProcPid, conn->backend_pid, conn->nodehost, conn->nodeport, conn->sock, cursor);
                }
                /* execute */
                pgxc_node_send_execute(conn, combiner->cursor, fetch);
                /* submit */
                if (pgxc_node_send_flush(conn))

                {
                    combiner->conn_count = 0;
                    pfree(combiner->connections);
                    ereport(ERROR,
                            (errcode(ERRCODE_INTERNAL_ERROR),
                             errmsg("Failed to send command to data nodes")));
                }

/*
                 * There could be only one primary node, but can not leave the
                 * loop now, because we need to close cursors.
                 */
                if (primary_mode && !combiner->probing_primary)

                {
                    combiner->current_conn = i;
                }
            }
        }

......

    if (combiner->tuplesortstate)
    {
        if (tuplesort_gettupleslot((Tuplesortstate *) combiner->tuplesortstate,
                                   true, true, resultslot, NULL))
        {
            if (log_remotesubplan_stats)
                ShowUsageCommon("ExecRemoteSubplan", &start_r, &start_t);
            return resultslot;
        }
    }
    else
    {
        TupleTableSlot *slot = FetchTuple(combiner);
        if (!TupIsNull(slot))
        {
            if (log_remotesubplan_stats)
                ShowUsageCommon("ExecRemoteSubplan", &start_r, &start_t);
            return slot;
        }
        else if (combiner->probing_primary)
            /* phase1 is successfully completed, run on other nodes */
            goto primary_mode_phase_two;

    }
    if (combiner->errorMessage)
        pgxc_node_report_error(combiner);

......

}

/*
 * FetchTuple
 *
        Get next tuple from one of the datanode connections.
 * The connections should be in combiner->connections, if "local" dummy
 * connection presents it should be the last active connection in the array.
 *      If combiner is set up to perform merge sort function returns tuple from
 * connection defined by combiner->current_conn, or NULL slot if no more tuple
 * are available from the connection. Otherwise it returns tuple from any
 * connection or NULL slot if no more available connections.
 *         Function looks into combiner->rowBuffer before accessing connection
 * and return a tuple from there if found.
 *         Function may wait while more data arrive from the data nodes. If there
 * is a locally executed subplan function advance it and buffer resulting rows
 * instead of waiting.
 */
TupleTableSlot *
FetchTuple(ResponseCombiner *combiner)

  

ExecutorRun调用次序 =》standard_ExecutorRun =》ExecutePlan

【ExecutePlan】

/* ----------------------------------------------------------------
 *        ExecutePlan
 *
 *        Processes the query plan until we have retrieved 'numberTuples' tuples,
 *        moving in the specified direction.
 *
 *        Runs to completion if numberTuples is 0
 *
 * Note: the ctid attribute is a 'junk' attribute that is removed before the
 * user can see it
 * ----------------------------------------------------------------
 */
static void
ExecutePlan(EState *estate,
            PlanState *planstate,
            bool use_parallel_mode,
            CmdType operation,
            bool sendTuples,
            uint64 numberTuples,
            ScanDirection direction,
            DestReceiver *dest,
            bool execute_once)

【ExecProcNode】

/* ----------------------------------------------------------------*        ExecProcNode**        Execute the given node to return a(nother) tuple.* ----------------------------------------------------------------*/
#ifndef FRONTEND
static inline TupleTableSlot *
ExecProcNode(PlanState *node)
{if (node->chgParam != NULL) /* something changed? */ExecReScan(node);        /* let ReScan handle this */return node->ExecProcNode(node);
}
#endif

/*
 * ExecReScan
 *        Reset a plan node so that its output can be re-scanned.
 *
 * Note that if the plan node has parameters that have changed value,
 * the output might be different from last time.
 */
void
ExecReScan(PlanState *node)

{

......

/* And do node-type-specific processing */
    switch (nodeTag(node))
    {
        case T_ResultState:
            ExecReScanResult((ResultState *) node);
            break;

case T_ProjectSetState:
            ExecReScanProjectSet((ProjectSetState *) node);
            break;

case T_ModifyTableState:
            ExecReScanModifyTable((ModifyTableState *) node);
            break;

case T_AppendState:
            ExecReScanAppend((AppendState *) node);
            break;

case T_MergeAppendState:
            ExecReScanMergeAppend((MergeAppendState *) node);
            break;

case T_RecursiveUnionState:
            ExecReScanRecursiveUnion((RecursiveUnionState *) node);
            break;

case T_BitmapAndState:
            ExecReScanBitmapAnd((BitmapAndState *) node);
            break;

......

 #ifdef PGXC
        case T_RemoteSubplanState:
            ExecReScanRemoteSubplan((RemoteSubplanState *) node);

            break;
        case T_RemoteQueryState:
            ExecReScanRemoteQuery((RemoteQueryState *) node);

            break;
#endif

......

}

ExecReScanRemoteSubplan】

void
ExecReScanRemoteSubplan(RemoteSubplanState *node)
{
    ResponseCombiner *combiner = (ResponseCombiner *)node;

/*
     * If we haven't queried remote nodes yet, just return. If outerplan'
     * chgParam is not NULL then it will be re-scanned by ExecProcNode,
     * else - no reason to re-scan it at all.
     */
    if (!node->bound)
        return;

/*
     * If we execute locally rescan local copy of the plan
     */
    if (outerPlanState(node))
        ExecReScan(outerPlanState(node));

/*
     * Consume any possible pending input
     */
    pgxc_connections_cleanup(combiner);

/* misc cleanup */
    combiner->command_complete_count = 0;
    combiner->description_count = 0;

/*
     * Force query is re-bound with new parameters
     */
    node->bound = false;
#ifdef __TBASE__
    node->eflags &= ~(EXEC_FLAG_DISCONN);
#endif
}

ExecReScanRemoteQuery

/* ----------------------------------------------------------------
 *        ExecReScanRemoteQuery
 * ----------------------------------------------------------------
 */
void
ExecReScanRemoteQuery(RemoteQueryState *node)
{
    ResponseCombiner *combiner = (ResponseCombiner *)node;

/*
     * If we haven't queried remote nodes yet, just return. If outerplan'
     * chgParam is not NULL then it will be re-scanned by ExecProcNode,
     * else - no reason to re-scan it at all.
     */
    if (!node->query_Done)
        return;

/*
     * If we execute locally rescan local copy of the plan
     */
    if (outerPlanState(node))
        ExecReScan(outerPlanState(node));

/*
     * Consume any possible pending input
     */
    pgxc_connections_cleanup(combiner);

/* misc cleanup */
    combiner->command_complete_count = 0;
    combiner->description_count = 0;

/*
     * Force query is re-bound with new parameters
     */
    node->query_Done = false;

}

主要是通过下面两个函数来完成分布式节点的 ExecEndRemotequery和ExecEndRemotesubplan .

\src\backend\executor\execProcNode.c

ExecEndNode(PlanState *node)

/* ----------------------------------------------------------------*        ExecEndNode**        Recursively cleans up all the nodes in the plan rooted*        at 'node'.**        After this operation, the query plan will not be able to be*        processed any further.  This should be called only after*        the query plan has been fully executed.* ----------------------------------------------------------------*/
void
ExecEndNode(PlanState *node)
{
......#ifdef PGXCcase T_RemoteQueryState:ExecEndRemoteQuery((RemoteQueryState *) node);break;
#endif
#ifdef XCPcase T_RemoteSubplanState:ExecEndRemoteSubplan((RemoteSubplanState *) node);break;
......
}

ExecEndRemoteSubplan(RemoteSubplanState *node)】

void
ExecEndRemoteSubplan(RemoteSubplanState *node)

【ExecEndRemoteSubplan】

oid
ExecEndRemoteSubplan(RemoteSubplanState *node)
{// #lizard forgivesint32             count    = 0;ResponseCombiner *combiner = (ResponseCombiner *)node;RemoteSubplan    *plan = (RemoteSubplan *) combiner->ss.ps.plan;int i;struct rusage    start_r;struct timeval        start_t;if (log_remotesubplan_stats)ResetUsageCommon(&start_r, &start_t);if (outerPlanState(node))ExecEndNode(outerPlanState(node));if (node->locator)freeLocator(node->locator);/** Consume any possible pending input*/if (node->bound){pgxc_connections_cleanup(combiner);}    /** Update coordinator statistics*/if (IS_PGXC_COORDINATOR){EState *estate = combiner->ss.ps.state;
......}

Tbase 源码 (六)相关推荐

  1. 阅读react-redux源码(六) - selectorFactory处理store更新

    阅读react-redux源码 - 零 阅读react-redux源码 - 一 阅读react-redux源码(二) - createConnect.match函数的实现 阅读react-redux源 ...

  2. php wrappers,浅谈PHP源码六:关于stream_get_wrappers函数

    这篇文章主要介绍了关于浅谈PHP源码六:关于stream_get_wrappers函数,有着一定的参考价值,现在分享给大家,有需要的朋友可以参考一下 stream_get_wrappers (PHP ...

  3. JDK1.8源码(六)——java.util.LinkedList 类

    上一篇博客我们介绍了List集合的一种典型实现 ArrayList,我们知道 ArrayList 是由数组构成的,本篇博客我们介绍 List 集合的另一种典型实现 LinkedList,这是一个由链表 ...

  4. NumPy Beginner's Guide 2e 带注释源码 六、深入 NumPy 模块

    # 来源:NumPy Biginner's Guide 2e ch6 矩阵的逆 import numpy as npA = np.mat("0 1 2;1 0 3;4 -3 8") ...

  5. NumPy Essentials 带注释源码 六、NumPy 中的傅里叶分析

    # 来源:NumPy Essentials ch6 绘图函数 import matplotlib.pyplot as plt import numpy as np def show(ori_func, ...

  6. NumPy Cookbook 带注释源码 六、NumPy 特殊数组与通用函数

    # 来源:NumPy Cookbook 2e ch6 创建通用函数 from __future__ import print_function import numpy as np# 我们需要定义对单 ...

  7. Tbase 源码 (七)

    WalSender 进程负责发送Primary DataNode节点的Wal日志到Standby DataNode节点. [WalSender]Walsender 进程的入口函数 bool  exec ...

  8. 你管这叫操作系统源码(六)

    文章目录 进程调度初始化shed_init 缓冲区初始化buffer_init 硬盘初始化 进程调度初始化shed_init void main(void) {...mem_init(main_mem ...

  9. Tbase 源码 (四)

    [Executor-- 执行器策略 ] 上层应用调用执行器的入口是 exec_simple_query函数 , \src\backend\tcop\Postgres.c /*  * exec_simp ...

  10. 阅读react-redux源码(七) - 实现一个react-redux

    阅读react-redux源码 - 零 阅读react-redux源码 - 一 阅读react-redux源码(二) - createConnect.match函数的实现 阅读react-redux源 ...

最新文章

  1. 什么是视觉Visual SLAM
  2. C# Winform程序中DataGridView中使用ContextMenuStrip实现右键菜单
  3. JAVA中的那些名词解释
  4. php字符串原地反转,php反转字符串方法
  5. Java多线程详解(深究Thread类)
  6. ubuntu设置始终亮屏_教你在Ubuntu系统下保存屏幕亮度设置
  7. ASP.NET MVC 重点教程一周年版 第七回 UrlHelper
  8. c语言实验报告模板电子版,C语言实验报告模板.doc
  9. MongoDB 下载地址列表
  10. iNFTnews | 元宇宙浪潮下,企业正通过AR和VR技术改善客户体验
  11. dmac学习之基于LLI的multi block tranfer验证
  12. Markdowm常用公式及相关符号笔记
  13. 什么是java线程_Java多线程是什么意思?
  14. 多种JS刷新页面代码!
  15. 〖Python 数据库开发实战 - Python与Redis交互篇⑫〗- 综合案例 - 新闻管理系统 - 删除新闻(含redis缓存)
  16. 《学术研究你的成功之道》读书笔记之论文篇
  17. 从传统COM简析WinRT的Async(使用WRL)
  18. CEAC 之《计算机应用助理工程师》1
  19. java魔塔源代码_魔塔Java开源(素材+源码)
  20. python基于django的高校奖学金管理系统

热门文章

  1. [AutoCAD.Net][事件] AUTOCAD 选择对象后触发事件
  2. Threejs导入OBJ模型出错的一些经验之谈
  3. C++ gflags
  4. java异或运算_java中异或怎么运算?
  5. unity导入导出excel的功能
  6. USB TYPE A B C 引脚定义
  7. 第18章 基于物理的渲染
  8. C语言每日一练——第118天:百钱百鸡问题
  9. php讲一个数组分割成字符串,PHP 分割字符串函数把字符串分割成数组示例
  10. Delta并联机构重力补偿分析