相关
《Postgresql源码(76)执行器专用元组格式TupleTableSlot》
《Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果》
《Postgresql源码(83)执行器的结果接收系统——DestReceiver》

0 总结

  • 执行器进入前会配置_DestReceiver(一套接口)
  • 执行器在内部跑一遍计划,产生一条tts(参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》),执行器调用接口函数receiveSlot,按当前接口定义输出到指定位置。
  • 例如
    • 【SELECT xxx】receiveSlot为printtup:把tts按列解析后拿到数据,按客户端服务端协议拼装包内容,调用libpq返回给客户端。
    • 【COPY TO 文件】receiveSlot为copy_dest_receive:把tts按列解析后拿到数据,按copy语法提供的分隔符组装,fwrite到文件中。
    • 【SPI】中receiveSlot为spi_printtup:把tts转换为HeapTuple格式,保存到SPI结果全局变量中。

1 概要

执行器的工作包括:work、get result,之前work的内容已经介绍过了,这里分析下执行器如何拿到执行结果。

  • 执行器会在多种场景下工作,例如:

    • SPI调用。
    • 常规客户端服务端的调用。
    • standalone backend调用(没有postmaster)。
    • 系统内部调用。
  • 对于上述场景,执行器的调用者有较大的差异,结果集无法使用一套函数返回。所以执行器设计了一套拿结果的函数钩子(接口),调用者需要将结果集的获取函数配置到接口上,执行器在执行中会把结果通过接口函数调入相应模块中,完成调用者所需的结果集构造,例如:
    • SPI的结果需要存放到执行的全局变量结构中。
    • 常规客户端服务端调用需要将结果用Libpq返回客户端。
    • standalone backend调用需要将结果打印到stdout。
    • 系统内部调用不需要返回结果。

PG的结果接收器提供了四个接口:

  • receiveSlot:输入执行器产生的tts,按指定格式输出
  • rStartup:初始化结果接收器
  • rShutdown:停止结果接收器
  • rDestroy:清理动态申请中间变量
struct _DestReceiver
{/* Called for each tuple to be output: */bool      (*receiveSlot) (TupleTableSlot *slot,DestReceiver *self);/* Per-executor-run initialization and shutdown: */void        (*rStartup) (DestReceiver *self,int operation,TupleDesc typeinfo);void      (*rShutdown) (DestReceiver *self);/* Destroy the receiver object itself (if dynamically allocated) */void       (*rDestroy) (DestReceiver *self);/* CommandDest code for this receiver */CommandDest mydest;/* Private fields might appear beyond this point... */
};

2 场景

第一组:正常客户端连接场景【DestRemote】

这一组函数接口由printtup_create_DR配置:

DestReceiver *
printtup_create_DR(CommandDest dest)
{DR_printtup *self = (DR_printtup *) palloc0(sizeof(DR_printtup));self->pub.receiveSlot = printtup;    /* might get changed later */self->pub.rStartup = printtup_startup;self->pub.rShutdown = printtup_shutdown;self->pub.rDestroy = printtup_destroy;self->pub.mydest = dest;/** Send T message automatically if DestRemote, but not if* DestRemoteExecute*/self->sendDescrip = (dest == DestRemote);self->attrinfo = NULL;self->nattrs = 0;self->myinfo = NULL;self->buf.data = NULL;self->tmpcontext = NULL;return (DestReceiver *) self;
}

注意:这里的数据结构DR_printtup把DestReceiver有包装了 一层:

typedef struct
{DestReceiver pub;          /* publicly-known function pointers */Portal        portal;         /* the Portal we are printing from */bool       sendDescrip;    /* send RowDescription at startup? */TupleDesc  attrinfo;       /* The attr info we are set up for */int            nattrs;PrinttupAttrInfo *myinfo;    /* Cached info about each attr */StringInfoData buf;            /* output buffer (*not* in tmpcontext) */MemoryContext tmpcontext;  /* Memory context for per-row workspace */
} DR_printtup;

来看下这几个函数的工作位置和流程,例如:

select s::int, left(random()::text,4) l from generate_series(1,2) s;-- 输出:两行、两列s |  l
---+------1 | 0.552 | 0.28

1 rStartup = printtup_startup:申请上下文,发送行描述符

位置

#0  printtup_startup (self=0x106dd50, operation=1, typeinfo=0x100d190) at printtup.c:113
#1  0x0000000000733ddf in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:350
#2  0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3  0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4  0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5  0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s\n;") at postgres.c:1213
#6  0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7  0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8  0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9  0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程

static void
printtup_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{DR_printtup *myState = (DR_printtup *) self;Portal        portal = myState->portal;/** Create I/O buffer to be used for all messages.  This cannot be inside* tmpcontext, since we want to re-use it across rows.*/initStringInfo(&myState->buf);

申请"printtup"上下文,存放所有输出数据,

 myState->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,"printtup",ALLOCSET_DEFAULT_SIZES);

启动时就需要发送行描述符:

 if (myState->sendDescrip)SendRowDescriptionMessage(&myState->buf,typeinfo,FetchPortalTargetList(portal),portal->formats);
}

SendRowDescriptionMessage发送行描述符,入参:

List *targetlistTargetEntry: {xpr = {type = T_TargetEntry}, expr = 0x1086570, resno = 1, resname = 0xf5af88 "s", ressortgroupref = 0, resorigtbl = 0, resorigcol = 0, resjunk = false}TargetEntry: {xpr = {type = T_TargetEntry}, expr = 0x1086868, resno = 2, resname = 0xf5b5a0 "l", ressortgroupref = 0, resorigtbl = 0, resorigcol = 0, resjunk = false}typeinfo{natts = 2, tdtypeid = 2249, tdtypmod = -1, tdrefcount = -1, constr = 0x0, attrs = 0x100d1a8}

流程:拼接输出串

void
SendRowDescriptionMessage(StringInfo buf, TupleDesc typeinfo,List *targetlist, int16 *formats)
{int            natts = typeinfo->natts;int         i;ListCell   *tlist_item = list_head(targetlist);/* tuple descriptor message type */pq_beginmessage_reuse(buf, 'T');/* # of attrs in tuples */pq_sendint16(buf, natts);/** Preallocate memory for the entire message to be sent. That allows to* use the significantly faster inline pqformat.h functions and to avoid* reallocations.** Have to overestimate the size of the column-names, to account for* character set overhead.*/enlargeStringInfo(buf, (NAMEDATALEN * MAX_CONVERSION_GROWTH /* attname */+ sizeof(Oid) /* resorigtbl */+ sizeof(AttrNumber)   /* resorigcol */+ sizeof(Oid)  /* atttypid */+ sizeof(int16) /* attlen */+ sizeof(int32) /* attypmod */+ sizeof(int16) /* format */) * natts);for (i = 0; i < natts; ++i){Form_pg_attribute att = TupleDescAttr(typeinfo, i);Oid         atttypid = att->atttypid;int32      atttypmod = att->atttypmod;Oid          resorigtbl;AttrNumber   resorigcol;int16        format;/** If column is a domain, send the base type and typmod instead.* Lookup before sending any ints, for efficiency.*/atttypid = getBaseTypeAndTypmod(atttypid, &atttypmod);/* Do we have a non-resjunk tlist item? */while (tlist_item &&((TargetEntry *) lfirst(tlist_item))->resjunk)tlist_item = lnext(targetlist, tlist_item);if (tlist_item){TargetEntry *tle = (TargetEntry *) lfirst(tlist_item);resorigtbl = tle->resorigtbl;resorigcol = tle->resorigcol;tlist_item = lnext(targetlist, tlist_item);}else{/* No info available, so send zeroes */resorigtbl = 0;resorigcol = 0;}if (formats)format = formats[i];elseformat = 0;pq_writestring(buf, NameStr(att->attname));pq_writeint32(buf, resorigtbl);pq_writeint16(buf, resorigcol);pq_writeint32(buf, atttypid);pq_writeint16(buf, att->attlen);pq_writeint32(buf, atttypmod);pq_writeint16(buf, format);}pq_endmessage_reuse(buf);
}

pq_endmessage_reuse

socket_putmessage(char msgtype, const char *s, size_t len)(gdb) p s
$18 = 0x10724e0 ""
(gdb) p len
$19 = 42
(gdb) x/32bx 0x10724e0
0x10724e0:      0x00    0x02    0x73(s) 0x00    0x00    0x00    0x00    0x00
0x10724e8:      0x00    0x00    0x00    0x00    0x00    0x17    0x00    0x04
0x10724f0:      0xff    0xff    0xff    0xff    0x00    0x00    0x6c(l) 0x00
0x10724f8:      0x00    0x00    0x00    0x00    0x00    0x00    0x00    0x00
0x1072500:      0x00    0x19    0xff    0xff    0xff    0xff    0xff    0xff
0x1072508:      0x00    0x00

2 receiveSlot = printtup

位置

#0  printtup (slot=0x100d2a8, self=0x106dd50) at printtup.c:303
#1  0x0000000000736102 in ExecutePlan (estate=0x100b630, planstate=0x100b868, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0,direction=ForwardScanDirection, dest=0x106dd50, execute_once=true) at execMain.c:1582
#2  0x0000000000733e7d in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3  0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4  0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#5  0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50,qc=0x7ffd6b492890) at pquery.c:765
#6  0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#7  0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#8  0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#9  0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#10 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#11 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#12 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程

printtup输入为tts包装的元组,和上面初始化后的DestReceiver。

static bool
printtup(TupleTableSlot *slot, DestReceiver *self)printtup_prepare_info    // 拼接DR_printtup中的信息,准备发送MemoryContextSwitchTo    // 切换到"printtup"pq_beginmessage_reuse    // 调用libpq开始发数据pq_sendint16......pq_endmessage_reuse

3 rShutdown = printtup_shutdown

位置

#0  printtup_shutdown (self=0x106dd50) at printtup.c:388
#1  0x0000000000733e98 in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2  0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3  0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4  0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5  0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#6  0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7  0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8  0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9  0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程:清理工作

static void
printtup_shutdown(DestReceiver *self)
{DR_printtup *myState = (DR_printtup *) self;if (myState->myinfo)pfree(myState->myinfo);myState->myinfo = NULL;myState->attrinfo = NULL;if (myState->buf.data)pfree(myState->buf.data);myState->buf.data = NULL;if (myState->tmpcontext)MemoryContextDelete(myState->tmpcontext);myState->tmpcontext = NULL;
}

4 rDestroy = printtup_destroy

位置

#0  printtup_destroy (self=0x106dd50) at printtup.c:412
#1  0x0000000000976650 in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1221
#2  0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#3  0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#4  0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#5  0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#6  0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#7  0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程:清理动态申请的:外层数据结构DR_printtup

static void
printtup_destroy(DestReceiver *self)
{pfree(self);
}

第二组:COPY数据场景【DestCopyOut】

这一组函数接口由CreateCopyDestReceiver配置:

DestReceiver *
CreateCopyDestReceiver(void)
{DR_copy    *self = (DR_copy *) palloc(sizeof(DR_copy));self->pub.receiveSlot = copy_dest_receive;self->pub.rStartup = copy_dest_startup;self->pub.rShutdown = copy_dest_shutdown;self->pub.rDestroy = copy_dest_destroy;self->pub.mydest = DestCopyOut;self->cstate = NULL;       /* will be set later */self->processed = 0;return (DestReceiver *) self;
}

注意copy也给DestReceiver包了一层:DR_copy

typedef struct
{DestReceiver pub;          /* publicly-known function pointers */CopyToState cstate;           /* CopyToStateData for the command */uint64     processed;      /* # of tuples processed */
} DR_copy;

来看下这几个函数的工作位置和流程,例如:

copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';-- 输出:两行两列
1       0.74
2       0.09

1 rStartup = copy_dest_startup

2 receiveSlot = copy_dest_receive

位置

#0  copy_dest_receive (slot=0x1048538, self=0x10861c0) at copyto.c:1259
#1  0x0000000000736102 in ExecutePlan (estate=0x10468c0, planstate=0x1046af8, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0,  direction=ForwardScanDirection, dest=0x10861c0, execute_once=true) at execMain.c:1582
#2  0x0000000000733e7d in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3  0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4  0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#5  0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#6  0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8,  queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739
#7  0x000000000097e69b in ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:527
#8  0x000000000097d297 in PortalRunUtility (portal=0xff5050, pstmt=0xf5bef8, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, qc=0x7ffd6b492890)at pquery.c:1155
#9  0x000000000097d4fb in PortalRunMulti (portal=0xff5050, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:1312
#10 0x000000000097ca27 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:788
#11 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';")at postgres.c:1213
#12 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#13 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#14 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#15 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#16 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#17 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209

流程

  • 同样也是拿到tts和DestReceiver
  • 由CopyOneRowTo调用CopySendEndOfRow调用fwrite写入文件
static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{DR_copy    *myState = (DR_copy *) self;CopyToState cstate = myState->cstate;/* Send the data */CopyOneRowTo(cstate, slot);/* Increment the number of processed tuples, and report the progress */pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,++myState->processed);return true;
}

3 rShutdown = copy_dest_shutdown

位置

#0  copy_dest_shutdown (self=0x10861c0) at copyto.c:1279
#1  0x0000000000733e98 in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2  0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3  0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#4  0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#5  0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8,  queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false,  context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739

无操作

4 rDestroy = copy_dest_destroy

不执行,因为DestReceiver外面包的数据结构DR_copy没有什么需要释放的。

第三组:SPI获取数据场景【DestSPI】

这一组函数接口由CreateDestReceiver分发函数直接配置,注意前面两种都是走CreateDestReceiver入口进入自己的配置函数,但是SPI不同,直接在CreateDestReceiver里面配置:

DestReceiver *
CreateDestReceiver(CommandDest dest)
{/** It's ok to cast the constness away as any modification of the none* receiver would be a bug (which gets easier to catch this way).*/switch (dest){case DestRemote:case DestRemoteExecute:return printtup_create_DR(dest);case DestRemoteSimple:return unconstify(DestReceiver *, &printsimpleDR);case DestNone:return unconstify(DestReceiver *, &donothingDR);case DestDebug:return unconstify(DestReceiver *, &debugtupDR);// 这里配置 <-<-<-----------------------------case DestSPI:return unconstify(DestReceiver *, &spi_printtupDR);case DestTuplestore:return CreateTuplestoreDestReceiver();case DestIntoRel:return CreateIntoRelDestReceiver(NULL);case DestCopyOut:return CreateCopyDestReceiver();case DestSQLFunction:return CreateSQLFunctionDestReceiver();case DestTransientRel:return CreateTransientRelDestReceiver(InvalidOid);case DestTupleQueue:return CreateTupleQueueDestReceiver(NULL);}/* should never get here */pg_unreachable();
}

spi_printtupDR带四个函数:

static const DestReceiver spi_printtupDR = {spi_printtup, spi_dest_startup, donothingCleanup, donothingCleanup,DestSPI
};

SPI的结果不是直接返回给客户端的!SPI有自己的三个全局变量来指向结果集,SPI的接口函数会从全局变量中取值,组织后返回给客户端。(使用全局变量当接口的设计很差!)

uint64           SPI_processed = 0;     // 行数
SPITupleTable    *SPI_tuptable = NULL;  // 数据
int              SPI_result = 0;        // 执行结果

例子

直接执行:```c
cat << EOF > spitest.c
#include "postgres.h"
#include "executor/spi.h"
#include "utils/builtins.h"
#include "fmgr.h"PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(sptest1);Datum
sptest1(PG_FUNCTION_ARGS)
{char *sql10 = "select s::int, left(random()::text,4) l from generate_series(1,10) s";int ret;int proc;SPI_connect();ret = SPI_exec(sql10, 0);proc = SPI_processed;if (ret > 0 && SPI_tuptable != NULL){SPITupleTable *tuptable = SPI_tuptable;TupleDesc tupdesc = tuptable->tupdesc;char buf[8192];uint64 j;for (j = 0; j < tuptable->numvals; j++){HeapTuple tuple = tuptable->vals[j];int i;for (i = 1, buf[0] = 0; i <= tupdesc->natts; i++)snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), " %4s%4s",SPI_getvalue(tuple, tupdesc, i),(i == tupdesc->natts) ? " " : " |");elog(INFO, "%s", buf);}}SPI_finish();return (proc);
}
EOFgcc -O0 -Wall -I /home/mingjiegao/dev/src/postgres/src/include -g -shared -fpic -o spitest.so spitest.c

psql执行:

postgres=# select sptest1();
INFO:      1   | 0.10
INFO:      2   | 0.18
INFO:      3   | 0.01
INFO:      4   | 0.78
INFO:      5   | 0.60
INFO:      6   | 0.76
INFO:      7   | 0.18
INFO:      8   | 0.86
INFO:      9   | 0.19
INFO:     10   | 0.99    sptest1
---------10
(1 row)

1 rStartup = spi_dest_startup

位置

sptest1SPI_execSPI_execute_SPI_execute_plan_SPI_pqueryExecutorRunstandard_ExecutorRunspi_dest_startup

流程

void
spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{SPITupleTable *tuptable;MemoryContext oldcxt;MemoryContext tuptabcxt;if (_SPI_current == NULL)elog(ERROR, "spi_dest_startup called while not connected to SPI");if (_SPI_current->tuptable != NULL)elog(ERROR, "improper call to spi_dest_startup");
  • 从"ExecutorState"切换到"SPI Proc"
  • 创建"SPI TupTable",切换到"SPI TupTable"
 oldcxt = _SPI_procmem();   /* switch to procedure memory context */tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,"SPI TupTable",ALLOCSET_DEFAULT_SIZES);MemoryContextSwitchTo(tuptabcxt);

在"SPI TupTable"中申请SPITupleTable结构,由_SPI_current->tuptable记录:

SPITupleTable结构中有:

  • TupleDesc tupdesc;
  • HeapTuple *vals;
  • uint64 numvals;

记录结果集数据。

 _SPI_current->tuptable = tuptable = (SPITupleTable *)palloc0(sizeof(SPITupleTable));tuptable->tuptabcxt = tuptabcxt;tuptable->subid = GetCurrentSubTransactionId();/** The tuptable is now valid enough to be freed by AtEOSubXact_SPI, so put* it onto the SPI context's tuptables list.  This will ensure it's not* leaked even in the unlikely event the following few lines fail.*/
  • _SPI_connection中保存了slist_head tuptables;所有活跃的tuptable链表。
  • 申请128个HeapTupleData指针位置保存结果数据。
 slist_push_head(&_SPI_current->tuptables, &tuptable->next);/* set up initial allocations */tuptable->alloced = 128;tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));tuptable->numvals = 0;tuptable->tupdesc = CreateTupleDescCopy(typeinfo);MemoryContextSwitchTo(oldcxt);
}

2 receiveSlot = spi_printtup

位置

sptest1SPI_execSPI_execute_SPI_execute_plan_SPI_pqueryExecutorRunstandard_ExecutorRunExecutePlanspi_printtup

流程

bool
spi_printtup(TupleTableSlot *slot, DestReceiver *self)
{SPITupleTable *tuptable;MemoryContext oldcxt;if (_SPI_current == NULL)elog(ERROR, "spi_printtup called while not connected to SPI");

tuptable还没赋值的状态:
{tupdesc = 0x107ea90, vals = 0x107e678, numvals = 0, alloced = 128, tuptabcxt = 0x107e500, next = {next = 0x0}, subid = 1}

 tuptable = _SPI_current->tuptable;if (tuptable == NULL)elog(ERROR, "improper call to spi_printtup");
  • 切到"SPI TupTable"
  • 分配的128个位置不够用了?不够在申请256个。
 oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt);if (tuptable->numvals >= tuptable->alloced){/* Double the size of the pointer array */uint64      newalloced = tuptable->alloced * 2;tuptable->vals = (HeapTuple *) repalloc_huge(tuptable->vals,newalloced * sizeof(HeapTuple));tuptable->alloced = newalloced;}
  • 调用tts接口函数ExecCopySlotHeapTuple做元组拷贝,这里实际使用的是tts_virtual_copy_heap_tuple,参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》。
  • ExecCopySlotHeapTuple输入tts输出标准存储格式HeapTuple。
 tuptable->vals[tuptable->numvals] = ExecCopySlotHeapTuple(slot);(tuptable->numvals)++;MemoryContextSwitchTo(oldcxt);return true;
}

3 rShutdown = donothingCleanup

4 rDestroy = donothingCleanup

Postgresql源码(83)执行器的结果接收系统——DestReceiver相关推荐

  1. Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果

    相关 <Postgresql源码(76)执行器专用元组格式TupleTableSlot> <Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果> &l ...

  2. PostgreSQL源码分析

    PostgreSQL源码结构 PostgreSQL的使用形态 PostgreSQL采用C/S(客户机/服务器)模式结构.应用层通过INET或者Unix Socket利用既定的协议与数据库服务器进行通信 ...

  3. java ipc pgsql_[转]PostgreSQL源码结构

    PostgreSQL采用C/S(客户机/服务器)模式结构.应用层通过INET或者Unix Socket利用既定的协议与数据库服务器进行通信. 另外,还有一种'Standalone Backend'使用 ...

  4. PostgreSQL源码结构

    PostgreSQL的使用形态 PostgreSQL采用C/S(客户机/服务器)模式结构.应用层通过INET或者Unix Socket利用既定的协议与数据库服务器进行通信. 另外,还有一种'Stand ...

  5. PostgreSQL源码学习(1)--PG13代码结构

    PostgreSQL源码学习(1)–PG13代码结构 PostgreSQL代码结构 Bootstrap:用于支持Bootstrap运行模式,该模式主要用来创建初始的模板数据库. Main:主程序模块, ...

  6. Postgresql源码(85)查询执行——表达式解析器分析(select 1+1如何执行)

    相关 <Postgresql源码(61)查询执行--最外层Portal模块> <Postgresql源码(62)查询执行--子模块ProcessUtility> <Pos ...

  7. Postgresql源码(106)Generic Plan与Custom Plan的区别(以分区表为例)

    相关: <Postgresql源码(105)分区表剪枝代码分析> <Postgresql源码(106)Generic Plan与Custom Plan的区别(以分区表为例)> ...

  8. postgreSQL源码分析——索引的建立与使用——Hash索引(3)

    2021SC@SDUSC 上一篇博客讲了关于Hash索引创建与插入的相关函数,这一篇博客讲述关于溢出页的操作函数以及Hash表的扩展相关的函数. 目录 溢出页的分配和回收 _hash_addovflp ...

  9. postgreSQL源码分析——索引的建立与使用——各种索引类型的管理和操作(2)

    2021SC@SDUSC 目录 上层操作函数 index_open index_beginscan() index_create() indexcmd.c 下层接口函数 IndexScanDescDa ...

最新文章

  1. jquery.min.map 404 (Not Found)出错的原因及解决办法
  2. python画椭圆-python opencv圆、椭圆与任意多边形的绘制实例详解
  3. Redis在Windows上编译
  4. 大朗机器人餐厅在哪里_东莞餐厅惊现机器人服务员 平均每个10万元
  5. Java接口long类型精度丢失,解决前后端交互Long类型精度丢失问题
  6. html上传文件与后台处理,HTML加一般处理程序实现文件上传
  7. OpenShift 4 - 利用 File Integrity Operator 实现对集群节点进行入侵检测
  8. win10文件同步到服务器失败,win10系统同步时间同步失败的解决方法
  9. lisp 设计盘形齿轮铣刀_齿轮是怎么来的——图解6种齿轮加工工艺
  10. 《计算传播学导论》读书笔记:第五章 网络传播与传播网络
  11. 《嵌入式C语言自我修养》书评
  12. Re-ID Driven Localization Refinement for Person Search
  13. 动态内存的申请和非动态内存的申请_公安交管新举措咋解读?非营运七座车6年免检,70岁可申请驾照...
  14. 机器学习“调音师”:如何及何时重新调校ML
  15. 速卖通关键词挖掘工具_利用SEO工具挖掘同行竞争对手关键词数据快速布局网站词库...
  16. 国潮来袭 农产品变身国潮三大方式
  17. CANoe简易教程1
  18. 安装完QQ必须要删除掉的几个恐怖文件
  19. 测试之Testin云测——标准兼容测试百度apk并生成相关测试报告
  20. linux调试器——gdb

热门文章

  1. 项目管理:如何制作项目进度计划表?
  2. 腾讯游戏王者荣耀扫码登录源码
  3. 交叉编译移植 FFMPEG X264 XVID 到 hi3531
  4. 【元胞自动机】元胞自动机生命游戏【含Matlab源码 655期】
  5. MyBatisPlus查询时报错,Unknow column ‘id‘ in ‘field list‘,怎么解决?
  6. PDF转OFD ~java实现
  7. IOS学习之UISwitch控件两种使用方法和监听
  8. 会考flash中文字变形为三角形_高中会考flash
  9. CloudCompare点云配准
  10. Js报错:Maximum call stack size exceeded