Postgresql源码(83)执行器的结果接收系统——DestReceiver
相关
《Postgresql源码(76)执行器专用元组格式TupleTableSlot》
《Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果》
《Postgresql源码(83)执行器的结果接收系统——DestReceiver》
0 总结
- 执行器进入前会配置_DestReceiver(一套接口)
- 执行器在内部跑一遍计划,产生一条tts(参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》),执行器调用接口函数receiveSlot,按当前接口定义输出到指定位置。
- 例如
- 【SELECT xxx】receiveSlot为printtup:把tts按列解析后拿到数据,按客户端服务端协议拼装包内容,调用libpq返回给客户端。
- 【COPY TO 文件】receiveSlot为copy_dest_receive:把tts按列解析后拿到数据,按copy语法提供的分隔符组装,fwrite到文件中。
- 【SPI】中receiveSlot为spi_printtup:把tts转换为HeapTuple格式,保存到SPI结果全局变量中。
1 概要
执行器的工作包括:work、get result,之前work的内容已经介绍过了,这里分析下执行器如何拿到执行结果。
- 执行器会在多种场景下工作,例如:
- SPI调用。
- 常规客户端服务端的调用。
- standalone backend调用(没有postmaster)。
- 系统内部调用。
- 对于上述场景,执行器的调用者有较大的差异,结果集无法使用一套函数返回。所以执行器设计了一套拿结果的函数钩子(接口),调用者需要将结果集的获取函数配置到接口上,执行器在执行中会把结果通过接口函数调入相应模块中,完成调用者所需的结果集构造,例如:
- SPI的结果需要存放到执行的全局变量结构中。
- 常规客户端服务端调用需要将结果用Libpq返回客户端。
- standalone backend调用需要将结果打印到stdout。
- 系统内部调用不需要返回结果。
PG的结果接收器提供了四个接口:
- receiveSlot:输入执行器产生的tts,按指定格式输出
- rStartup:初始化结果接收器
- rShutdown:停止结果接收器
- rDestroy:清理动态申请中间变量
struct _DestReceiver
{/* Called for each tuple to be output: */bool (*receiveSlot) (TupleTableSlot *slot,DestReceiver *self);/* Per-executor-run initialization and shutdown: */void (*rStartup) (DestReceiver *self,int operation,TupleDesc typeinfo);void (*rShutdown) (DestReceiver *self);/* Destroy the receiver object itself (if dynamically allocated) */void (*rDestroy) (DestReceiver *self);/* CommandDest code for this receiver */CommandDest mydest;/* Private fields might appear beyond this point... */
};
2 场景
第一组:正常客户端连接场景【DestRemote】
这一组函数接口由printtup_create_DR配置:
DestReceiver *
printtup_create_DR(CommandDest dest)
{DR_printtup *self = (DR_printtup *) palloc0(sizeof(DR_printtup));self->pub.receiveSlot = printtup; /* might get changed later */self->pub.rStartup = printtup_startup;self->pub.rShutdown = printtup_shutdown;self->pub.rDestroy = printtup_destroy;self->pub.mydest = dest;/** Send T message automatically if DestRemote, but not if* DestRemoteExecute*/self->sendDescrip = (dest == DestRemote);self->attrinfo = NULL;self->nattrs = 0;self->myinfo = NULL;self->buf.data = NULL;self->tmpcontext = NULL;return (DestReceiver *) self;
}
注意:这里的数据结构DR_printtup把DestReceiver有包装了 一层:
typedef struct
{DestReceiver pub; /* publicly-known function pointers */Portal portal; /* the Portal we are printing from */bool sendDescrip; /* send RowDescription at startup? */TupleDesc attrinfo; /* The attr info we are set up for */int nattrs;PrinttupAttrInfo *myinfo; /* Cached info about each attr */StringInfoData buf; /* output buffer (*not* in tmpcontext) */MemoryContext tmpcontext; /* Memory context for per-row workspace */
} DR_printtup;
来看下这几个函数的工作位置和流程,例如:
select s::int, left(random()::text,4) l from generate_series(1,2) s;-- 输出:两行、两列s | l
---+------1 | 0.552 | 0.28
1 rStartup = printtup_startup:申请上下文,发送行描述符
位置
#0 printtup_startup (self=0x106dd50, operation=1, typeinfo=0x100d190) at printtup.c:113
#1 0x0000000000733ddf in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:350
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s\n;") at postgres.c:1213
#6 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
static void
printtup_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{DR_printtup *myState = (DR_printtup *) self;Portal portal = myState->portal;/** Create I/O buffer to be used for all messages. This cannot be inside* tmpcontext, since we want to re-use it across rows.*/initStringInfo(&myState->buf);
申请"printtup"上下文,存放所有输出数据,
myState->tmpcontext = AllocSetContextCreate(CurrentMemoryContext,"printtup",ALLOCSET_DEFAULT_SIZES);
启动时就需要发送行描述符:
if (myState->sendDescrip)SendRowDescriptionMessage(&myState->buf,typeinfo,FetchPortalTargetList(portal),portal->formats);
}
SendRowDescriptionMessage发送行描述符,入参:
List *targetlistTargetEntry: {xpr = {type = T_TargetEntry}, expr = 0x1086570, resno = 1, resname = 0xf5af88 "s", ressortgroupref = 0, resorigtbl = 0, resorigcol = 0, resjunk = false}TargetEntry: {xpr = {type = T_TargetEntry}, expr = 0x1086868, resno = 2, resname = 0xf5b5a0 "l", ressortgroupref = 0, resorigtbl = 0, resorigcol = 0, resjunk = false}typeinfo{natts = 2, tdtypeid = 2249, tdtypmod = -1, tdrefcount = -1, constr = 0x0, attrs = 0x100d1a8}
流程:拼接输出串
void
SendRowDescriptionMessage(StringInfo buf, TupleDesc typeinfo,List *targetlist, int16 *formats)
{int natts = typeinfo->natts;int i;ListCell *tlist_item = list_head(targetlist);/* tuple descriptor message type */pq_beginmessage_reuse(buf, 'T');/* # of attrs in tuples */pq_sendint16(buf, natts);/** Preallocate memory for the entire message to be sent. That allows to* use the significantly faster inline pqformat.h functions and to avoid* reallocations.** Have to overestimate the size of the column-names, to account for* character set overhead.*/enlargeStringInfo(buf, (NAMEDATALEN * MAX_CONVERSION_GROWTH /* attname */+ sizeof(Oid) /* resorigtbl */+ sizeof(AttrNumber) /* resorigcol */+ sizeof(Oid) /* atttypid */+ sizeof(int16) /* attlen */+ sizeof(int32) /* attypmod */+ sizeof(int16) /* format */) * natts);for (i = 0; i < natts; ++i){Form_pg_attribute att = TupleDescAttr(typeinfo, i);Oid atttypid = att->atttypid;int32 atttypmod = att->atttypmod;Oid resorigtbl;AttrNumber resorigcol;int16 format;/** If column is a domain, send the base type and typmod instead.* Lookup before sending any ints, for efficiency.*/atttypid = getBaseTypeAndTypmod(atttypid, &atttypmod);/* Do we have a non-resjunk tlist item? */while (tlist_item &&((TargetEntry *) lfirst(tlist_item))->resjunk)tlist_item = lnext(targetlist, tlist_item);if (tlist_item){TargetEntry *tle = (TargetEntry *) lfirst(tlist_item);resorigtbl = tle->resorigtbl;resorigcol = tle->resorigcol;tlist_item = lnext(targetlist, tlist_item);}else{/* No info available, so send zeroes */resorigtbl = 0;resorigcol = 0;}if (formats)format = formats[i];elseformat = 0;pq_writestring(buf, NameStr(att->attname));pq_writeint32(buf, resorigtbl);pq_writeint16(buf, resorigcol);pq_writeint32(buf, atttypid);pq_writeint16(buf, att->attlen);pq_writeint32(buf, atttypmod);pq_writeint16(buf, format);}pq_endmessage_reuse(buf);
}
pq_endmessage_reuse
socket_putmessage(char msgtype, const char *s, size_t len)(gdb) p s
$18 = 0x10724e0 ""
(gdb) p len
$19 = 42
(gdb) x/32bx 0x10724e0
0x10724e0: 0x00 0x02 0x73(s) 0x00 0x00 0x00 0x00 0x00
0x10724e8: 0x00 0x00 0x00 0x00 0x00 0x17 0x00 0x04
0x10724f0: 0xff 0xff 0xff 0xff 0x00 0x00 0x6c(l) 0x00
0x10724f8: 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00
0x1072500: 0x00 0x19 0xff 0xff 0xff 0xff 0xff 0xff
0x1072508: 0x00 0x00
2 receiveSlot = printtup
位置
#0 printtup (slot=0x100d2a8, self=0x106dd50) at printtup.c:303
#1 0x0000000000736102 in ExecutePlan (estate=0x100b630, planstate=0x100b868, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0,direction=ForwardScanDirection, dest=0x106dd50, execute_once=true) at execMain.c:1582
#2 0x0000000000733e7d in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#5 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50,qc=0x7ffd6b492890) at pquery.c:765
#6 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#7 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#8 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#9 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#10 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#11 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#12 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
printtup输入为tts包装的元组,和上面初始化后的DestReceiver。
static bool
printtup(TupleTableSlot *slot, DestReceiver *self)printtup_prepare_info // 拼接DR_printtup中的信息,准备发送MemoryContextSwitchTo // 切换到"printtup"pq_beginmessage_reuse // 调用libpq开始发数据pq_sendint16......pq_endmessage_reuse
3 rShutdown = printtup_shutdown
位置
#0 printtup_shutdown (self=0x106dd50) at printtup.c:388
#1 0x0000000000733e98 in standard_ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0xf7d910, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x000000000097ccd5 in PortalRunSelect (portal=0xff5050, forward=true, count=0, dest=0x106dd50) at pquery.c:921
#4 0x000000000097c994 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0x106dd50, altdest=0x106dd50, qc=0x7ffd6b492890) at pquery.c:765
#5 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1213
#6 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#7 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#8 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#9 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#10 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#11 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程:清理工作
static void
printtup_shutdown(DestReceiver *self)
{DR_printtup *myState = (DR_printtup *) self;if (myState->myinfo)pfree(myState->myinfo);myState->myinfo = NULL;myState->attrinfo = NULL;if (myState->buf.data)pfree(myState->buf.data);myState->buf.data = NULL;if (myState->tmpcontext)MemoryContextDelete(myState->tmpcontext);myState->tmpcontext = NULL;
}
4 rDestroy = printtup_destroy
位置
#0 printtup_destroy (self=0x106dd50) at printtup.c:412
#1 0x0000000000976650 in exec_simple_query (query_string=0xf5a4f0 "select s::int, left(random()::text,4) l from generate_series(1,2) s ;") at postgres.c:1221
#2 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#3 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#4 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#5 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#6 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#7 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程:清理动态申请的:外层数据结构DR_printtup
static void
printtup_destroy(DestReceiver *self)
{pfree(self);
}
第二组:COPY数据场景【DestCopyOut】
这一组函数接口由CreateCopyDestReceiver配置:
DestReceiver *
CreateCopyDestReceiver(void)
{DR_copy *self = (DR_copy *) palloc(sizeof(DR_copy));self->pub.receiveSlot = copy_dest_receive;self->pub.rStartup = copy_dest_startup;self->pub.rShutdown = copy_dest_shutdown;self->pub.rDestroy = copy_dest_destroy;self->pub.mydest = DestCopyOut;self->cstate = NULL; /* will be set later */self->processed = 0;return (DestReceiver *) self;
}
注意copy也给DestReceiver包了一层:DR_copy
typedef struct
{DestReceiver pub; /* publicly-known function pointers */CopyToState cstate; /* CopyToStateData for the command */uint64 processed; /* # of tuples processed */
} DR_copy;
来看下这几个函数的工作位置和流程,例如:
copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';-- 输出:两行两列
1 0.74
2 0.09
1 rStartup = copy_dest_startup
空
2 receiveSlot = copy_dest_receive
位置
#0 copy_dest_receive (slot=0x1048538, self=0x10861c0) at copyto.c:1259
#1 0x0000000000736102 in ExecutePlan (estate=0x10468c0, planstate=0x1046af8, use_parallel_mode=false, operation=CMD_SELECT, sendTuples=true, numberTuples=0, direction=ForwardScanDirection, dest=0x10861c0, execute_once=true) at execMain.c:1582
#2 0x0000000000733e7d in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:361
#3 0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#4 0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#5 0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#6 0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739
#7 0x000000000097e69b in ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:527
#8 0x000000000097d297 in PortalRunUtility (portal=0xff5050, pstmt=0xf5bef8, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, qc=0x7ffd6b492890)at pquery.c:1155
#9 0x000000000097d4fb in PortalRunMulti (portal=0xff5050, isTopLevel=true, setHoldSnapshot=false, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:1312
#10 0x000000000097ca27 in PortalRun (portal=0xff5050, count=9223372036854775807, isTopLevel=true, run_once=true, dest=0xf5bfe8, altdest=0xf5bfe8, qc=0x7ffd6b492890) at pquery.c:788
#11 0x000000000097663b in exec_simple_query (query_string=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';")at postgres.c:1213
#12 0x000000000097ab59 in PostgresMain (argc=1, argv=0x7ffd6b492b20, dbname=0xf83cc0 "postgres", username=0xf83c98 "mingjiegao") at postgres.c:4494
#13 0x00000000008b6d4e in BackendRun (port=0xf7ba90) at postmaster.c:4530
#14 0x00000000008b66cd in BackendStartup (port=0xf7ba90) at postmaster.c:4252
#15 0x00000000008b2b45 in ServerLoop () at postmaster.c:1745
#16 0x00000000008b2417 in PostmasterMain (argc=1, argv=0xf540e0) at postmaster.c:1417
#17 0x00000000007b4c93 in main (argc=1, argv=0xf540e0) at main.c:209
流程
- 同样也是拿到tts和DestReceiver
- 由CopyOneRowTo调用CopySendEndOfRow调用fwrite写入文件
static bool
copy_dest_receive(TupleTableSlot *slot, DestReceiver *self)
{DR_copy *myState = (DR_copy *) self;CopyToState cstate = myState->cstate;/* Send the data */CopyOneRowTo(cstate, slot);/* Increment the number of processed tuples, and report the progress */pgstat_progress_update_param(PROGRESS_COPY_TUPLES_PROCESSED,++myState->processed);return true;
}
3 rShutdown = copy_dest_shutdown
位置
#0 copy_dest_shutdown (self=0x10861c0) at copyto.c:1279
#1 0x0000000000733e98 in standard_ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:376
#2 0x0000000000733ca8 in ExecutorRun (queryDesc=0x1086218, direction=ForwardScanDirection, count=0, execute_once=true) at execMain.c:305
#3 0x0000000000685e87 in DoCopyTo (cstate=0xf7da60) at copyto.c:905
#4 0x000000000067bdfa in DoCopy (pstate=0xf7d910, stmt=0xf5bb88, stmt_location=0, stmt_len=86, processed=0x7ffd6b4924e8) at copy.c:309
#5 0x000000000097ec16 in standard_ProcessUtility (pstmt=0xf5bef8, queryString=0xf5a4f0 "copy (select s::int, left(random()::text,4) l from generate_series(1,2) s) to '/tmp/a';", readOnlyTree=false, context=PROCESS_UTILITY_TOPLEVEL, params=0x0, queryEnv=0x0, dest=0xf5bfe8, qc=0x7ffd6b492890) at utility.c:739
无操作
4 rDestroy = copy_dest_destroy
不执行,因为DestReceiver外面包的数据结构DR_copy没有什么需要释放的。
第三组:SPI获取数据场景【DestSPI】
这一组函数接口由CreateDestReceiver分发函数直接配置,注意前面两种都是走CreateDestReceiver入口进入自己的配置函数,但是SPI不同,直接在CreateDestReceiver里面配置:
DestReceiver *
CreateDestReceiver(CommandDest dest)
{/** It's ok to cast the constness away as any modification of the none* receiver would be a bug (which gets easier to catch this way).*/switch (dest){case DestRemote:case DestRemoteExecute:return printtup_create_DR(dest);case DestRemoteSimple:return unconstify(DestReceiver *, &printsimpleDR);case DestNone:return unconstify(DestReceiver *, &donothingDR);case DestDebug:return unconstify(DestReceiver *, &debugtupDR);// 这里配置 <-<-<-----------------------------case DestSPI:return unconstify(DestReceiver *, &spi_printtupDR);case DestTuplestore:return CreateTuplestoreDestReceiver();case DestIntoRel:return CreateIntoRelDestReceiver(NULL);case DestCopyOut:return CreateCopyDestReceiver();case DestSQLFunction:return CreateSQLFunctionDestReceiver();case DestTransientRel:return CreateTransientRelDestReceiver(InvalidOid);case DestTupleQueue:return CreateTupleQueueDestReceiver(NULL);}/* should never get here */pg_unreachable();
}
spi_printtupDR带四个函数:
static const DestReceiver spi_printtupDR = {spi_printtup, spi_dest_startup, donothingCleanup, donothingCleanup,DestSPI
};
SPI的结果不是直接返回给客户端的!SPI有自己的三个全局变量来指向结果集,SPI的接口函数会从全局变量中取值,组织后返回给客户端。(使用全局变量当接口的设计很差!)
uint64 SPI_processed = 0; // 行数
SPITupleTable *SPI_tuptable = NULL; // 数据
int SPI_result = 0; // 执行结果
例子
直接执行:```c
cat << EOF > spitest.c
#include "postgres.h"
#include "executor/spi.h"
#include "utils/builtins.h"
#include "fmgr.h"PG_MODULE_MAGIC;
PG_FUNCTION_INFO_V1(sptest1);Datum
sptest1(PG_FUNCTION_ARGS)
{char *sql10 = "select s::int, left(random()::text,4) l from generate_series(1,10) s";int ret;int proc;SPI_connect();ret = SPI_exec(sql10, 0);proc = SPI_processed;if (ret > 0 && SPI_tuptable != NULL){SPITupleTable *tuptable = SPI_tuptable;TupleDesc tupdesc = tuptable->tupdesc;char buf[8192];uint64 j;for (j = 0; j < tuptable->numvals; j++){HeapTuple tuple = tuptable->vals[j];int i;for (i = 1, buf[0] = 0; i <= tupdesc->natts; i++)snprintf(buf + strlen(buf), sizeof(buf) - strlen(buf), " %4s%4s",SPI_getvalue(tuple, tupdesc, i),(i == tupdesc->natts) ? " " : " |");elog(INFO, "%s", buf);}}SPI_finish();return (proc);
}
EOFgcc -O0 -Wall -I /home/mingjiegao/dev/src/postgres/src/include -g -shared -fpic -o spitest.so spitest.c
psql执行:
postgres=# select sptest1();
INFO: 1 | 0.10
INFO: 2 | 0.18
INFO: 3 | 0.01
INFO: 4 | 0.78
INFO: 5 | 0.60
INFO: 6 | 0.76
INFO: 7 | 0.18
INFO: 8 | 0.86
INFO: 9 | 0.19
INFO: 10 | 0.99 sptest1
---------10
(1 row)
1 rStartup = spi_dest_startup
位置
sptest1SPI_execSPI_execute_SPI_execute_plan_SPI_pqueryExecutorRunstandard_ExecutorRunspi_dest_startup
流程
void
spi_dest_startup(DestReceiver *self, int operation, TupleDesc typeinfo)
{SPITupleTable *tuptable;MemoryContext oldcxt;MemoryContext tuptabcxt;if (_SPI_current == NULL)elog(ERROR, "spi_dest_startup called while not connected to SPI");if (_SPI_current->tuptable != NULL)elog(ERROR, "improper call to spi_dest_startup");
- 从"ExecutorState"切换到"SPI Proc"
- 创建"SPI TupTable",切换到"SPI TupTable"
oldcxt = _SPI_procmem(); /* switch to procedure memory context */tuptabcxt = AllocSetContextCreate(CurrentMemoryContext,"SPI TupTable",ALLOCSET_DEFAULT_SIZES);MemoryContextSwitchTo(tuptabcxt);
在"SPI TupTable"中申请SPITupleTable
结构,由_SPI_current->tuptable记录:
SPITupleTable结构中有:
TupleDesc tupdesc;
HeapTuple *vals;
uint64 numvals;
记录结果集数据。
_SPI_current->tuptable = tuptable = (SPITupleTable *)palloc0(sizeof(SPITupleTable));tuptable->tuptabcxt = tuptabcxt;tuptable->subid = GetCurrentSubTransactionId();/** The tuptable is now valid enough to be freed by AtEOSubXact_SPI, so put* it onto the SPI context's tuptables list. This will ensure it's not* leaked even in the unlikely event the following few lines fail.*/
- _SPI_connection中保存了
slist_head tuptables;
所有活跃的tuptable链表。 - 申请128个HeapTupleData指针位置保存结果数据。
slist_push_head(&_SPI_current->tuptables, &tuptable->next);/* set up initial allocations */tuptable->alloced = 128;tuptable->vals = (HeapTuple *) palloc(tuptable->alloced * sizeof(HeapTuple));tuptable->numvals = 0;tuptable->tupdesc = CreateTupleDescCopy(typeinfo);MemoryContextSwitchTo(oldcxt);
}
2 receiveSlot = spi_printtup
位置
sptest1SPI_execSPI_execute_SPI_execute_plan_SPI_pqueryExecutorRunstandard_ExecutorRunExecutePlanspi_printtup
流程
bool
spi_printtup(TupleTableSlot *slot, DestReceiver *self)
{SPITupleTable *tuptable;MemoryContext oldcxt;if (_SPI_current == NULL)elog(ERROR, "spi_printtup called while not connected to SPI");
tuptable还没赋值的状态:
{tupdesc = 0x107ea90, vals = 0x107e678, numvals = 0, alloced = 128, tuptabcxt = 0x107e500, next = {next = 0x0}, subid = 1}
tuptable = _SPI_current->tuptable;if (tuptable == NULL)elog(ERROR, "improper call to spi_printtup");
- 切到"SPI TupTable"
- 分配的128个位置不够用了?不够在申请256个。
oldcxt = MemoryContextSwitchTo(tuptable->tuptabcxt);if (tuptable->numvals >= tuptable->alloced){/* Double the size of the pointer array */uint64 newalloced = tuptable->alloced * 2;tuptable->vals = (HeapTuple *) repalloc_huge(tuptable->vals,newalloced * sizeof(HeapTuple));tuptable->alloced = newalloced;}
- 调用tts接口函数ExecCopySlotHeapTuple做元组拷贝,这里实际使用的是tts_virtual_copy_heap_tuple,参考《Postgresql源码(76)执行器专用元组格式TupleTableSlot》。
- ExecCopySlotHeapTuple输入tts输出标准存储格式HeapTuple。
tuptable->vals[tuptable->numvals] = ExecCopySlotHeapTuple(slot);(tuptable->numvals)++;MemoryContextSwitchTo(oldcxt);return true;
}
3 rShutdown = donothingCleanup
无
4 rDestroy = donothingCleanup
无
Postgresql源码(83)执行器的结果接收系统——DestReceiver相关推荐
- Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果
相关 <Postgresql源码(76)执行器专用元组格式TupleTableSlot> <Postgresql源码(82)SPI模块拆解分析一:执行简单SQL获取结果> &l ...
- PostgreSQL源码分析
PostgreSQL源码结构 PostgreSQL的使用形态 PostgreSQL采用C/S(客户机/服务器)模式结构.应用层通过INET或者Unix Socket利用既定的协议与数据库服务器进行通信 ...
- java ipc pgsql_[转]PostgreSQL源码结构
PostgreSQL采用C/S(客户机/服务器)模式结构.应用层通过INET或者Unix Socket利用既定的协议与数据库服务器进行通信. 另外,还有一种'Standalone Backend'使用 ...
- PostgreSQL源码结构
PostgreSQL的使用形态 PostgreSQL采用C/S(客户机/服务器)模式结构.应用层通过INET或者Unix Socket利用既定的协议与数据库服务器进行通信. 另外,还有一种'Stand ...
- PostgreSQL源码学习(1)--PG13代码结构
PostgreSQL源码学习(1)–PG13代码结构 PostgreSQL代码结构 Bootstrap:用于支持Bootstrap运行模式,该模式主要用来创建初始的模板数据库. Main:主程序模块, ...
- Postgresql源码(85)查询执行——表达式解析器分析(select 1+1如何执行)
相关 <Postgresql源码(61)查询执行--最外层Portal模块> <Postgresql源码(62)查询执行--子模块ProcessUtility> <Pos ...
- Postgresql源码(106)Generic Plan与Custom Plan的区别(以分区表为例)
相关: <Postgresql源码(105)分区表剪枝代码分析> <Postgresql源码(106)Generic Plan与Custom Plan的区别(以分区表为例)> ...
- postgreSQL源码分析——索引的建立与使用——Hash索引(3)
2021SC@SDUSC 上一篇博客讲了关于Hash索引创建与插入的相关函数,这一篇博客讲述关于溢出页的操作函数以及Hash表的扩展相关的函数. 目录 溢出页的分配和回收 _hash_addovflp ...
- postgreSQL源码分析——索引的建立与使用——各种索引类型的管理和操作(2)
2021SC@SDUSC 目录 上层操作函数 index_open index_beginscan() index_create() indexcmd.c 下层接口函数 IndexScanDescDa ...
最新文章
- jquery.min.map 404 (Not Found)出错的原因及解决办法
- python画椭圆-python opencv圆、椭圆与任意多边形的绘制实例详解
- Redis在Windows上编译
- 大朗机器人餐厅在哪里_东莞餐厅惊现机器人服务员 平均每个10万元
- Java接口long类型精度丢失,解决前后端交互Long类型精度丢失问题
- html上传文件与后台处理,HTML加一般处理程序实现文件上传
- OpenShift 4 - 利用 File Integrity Operator 实现对集群节点进行入侵检测
- win10文件同步到服务器失败,win10系统同步时间同步失败的解决方法
- lisp 设计盘形齿轮铣刀_齿轮是怎么来的——图解6种齿轮加工工艺
- 《计算传播学导论》读书笔记:第五章 网络传播与传播网络
- 《嵌入式C语言自我修养》书评
- Re-ID Driven Localization Refinement for Person Search
- 动态内存的申请和非动态内存的申请_公安交管新举措咋解读?非营运七座车6年免检,70岁可申请驾照...
- 机器学习“调音师”:如何及何时重新调校ML
- 速卖通关键词挖掘工具_利用SEO工具挖掘同行竞争对手关键词数据快速布局网站词库...
- 国潮来袭 农产品变身国潮三大方式
- CANoe简易教程1
- 安装完QQ必须要删除掉的几个恐怖文件
- 测试之Testin云测——标准兼容测试百度apk并生成相关测试报告
- linux调试器——gdb
热门文章
- 项目管理:如何制作项目进度计划表?
- 腾讯游戏王者荣耀扫码登录源码
- 交叉编译移植 FFMPEG X264 XVID 到 hi3531
- 【元胞自动机】元胞自动机生命游戏【含Matlab源码 655期】
- MyBatisPlus查询时报错,Unknow column ‘id‘ in ‘field list‘,怎么解决?
- PDF转OFD ~java实现
- IOS学习之UISwitch控件两种使用方法和监听
- 会考flash中文字变形为三角形_高中会考flash
- CloudCompare点云配准
- Js报错:Maximum call stack size exceeded