本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数.
调用栈如下:


(gdb) bt
#0  0x00007fb6e6390903 in __epoll_wait_nocancel () from /lib64/libc.so.6
#1  0x000000000088e668 in WaitEventSetWaitBlock (set=0x10ac808, cur_timeout=29999, occurred_events=0x7ffd634441b0, nevents=1) at latch.c:1048
#2  0x000000000088e543 in WaitEventSetWait (set=0x10ac808, timeout=29999, occurred_events=0x7ffd634441b0, nevents=1, wait_event_info=83886092) at latch.c:1000
#3  0x000000000088dcec in WaitLatchOrSocket (latch=0x7fb6dcbfc4d4, wakeEvents=27, sock=10, timeout=29999, wait_event_info=83886092) at latch.c:385
#4  0x000000000085405b in WalSndLoop (send_data=0x8547fe <XLogSendPhysical>) at walsender.c:2229
#5  0x0000000000851c93 in StartReplication (cmd=0x10ab750) at walsender.c:684
#6  0x00000000008532f0 in exec_replication_command (cmd_string=0x101dd78 "START_REPLICATION 0/5D000000 TIMELINE 16")at walsender.c:1539
#7  0x00000000008c0170 in PostgresMain (argc=1, argv=0x1049cb8, dbname=0x1049ba8 "", username=0x1049b80 "replicator")at postgres.c:4178
#8  0x000000000081e06c in BackendRun (port=0x103fb50) at postmaster.c:4361
#9  0x000000000081d7df in BackendStartup (port=0x103fb50) at postmaster.c:4033
#10 0x0000000000819bd9 in ServerLoop () at postmaster.c:1706
#11 0x000000000081948f in PostmasterMain (argc=1, argv=0x1018a50) at postmaster.c:1379
#12 0x0000000000742931 in main (argc=1, argv=0x1018a50) at main.c:228

一、数据结构

StringInfo
StringInfoData结构体保存关于扩展字符串的相关信息.


/*-------------------------* StringInfoData holds information about an extensible string.* StringInfoData结构体保存关于扩展字符串的相关信息.*      data    is the current buffer for the string (allocated with palloc).*      data    通过palloc分配的字符串缓存*      len     is the current string length.  There is guaranteed to be*              a terminating '\0' at data[len], although this is not very*              useful when the string holds binary data rather than text.*      len     是当前字符串的长度.保证以ASCII 0(\0)结束(data[len] = '\0').*              虽然如果存储的是二进制数据而不是文本时不太好使.*      maxlen  is the allocated size in bytes of 'data', i.e. the maximum*              string size (including the terminating '\0' char) that we can*              currently store in 'data' without having to reallocate*              more space.  We must always have maxlen > len.*      maxlen  以字节为单位已分配的'data'的大小,限定了最大的字符串大小(包括结尾的ASCII 0)*              小于此尺寸的数据可以直接存储而无需重新分配.*      cursor  is initialized to zero by makeStringInfo or initStringInfo,*              but is not otherwise touched by the stringinfo.c routines.*              Some routines use it to scan through a StringInfo.*      cursor  通过makeStringInfo或initStringInfo初始化为0,但不受stringinfo.c例程的影响.*              某些例程使用该字段扫描StringInfo*-------------------------*/
typedef struct StringInfoData
{char       *data;int         len;int         maxlen;int         cursor;
} StringInfoData;
typedef StringInfoData *StringInfo;

二、源码解读

exec_replication_command
exec_replication_command执行复制命令,如cmd_string被识别为WalSender命令,返回T,否则返回F.
其主要逻辑如下:
1.执行相关初始化和校验
2.切换内存上下文
3.初始化复制扫描器
4.执行事务相关的判断或校验
5.初始化输入输出消息
6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication


/** Execute an incoming replication command.* 执行复制命令.** Returns true if the cmd_string was recognized as WalSender command, false* if not.* 如cmd_string被识别为WalSender命令,返回T,否则返回F*/
bool
exec_replication_command(const char *cmd_string)
{int         parse_rc;Node       *cmd_node;MemoryContext cmd_context;MemoryContext old_context;/** If WAL sender has been told that shutdown is getting close, switch its* status accordingly to handle the next replication commands correctly.* 如果WAL sender已被通知关闭,切换状态以应对接下来的复制命令.*/if (got_STOPPING)WalSndSetState(WALSNDSTATE_STOPPING);/** Throw error if in stopping mode.  We need prevent commands that could* generate WAL while the shutdown checkpoint is being written.  To be* safe, we just prohibit all new commands.* 如在stopping模式,则抛出错误.* 我们需要在shutdown checkpoint写入期间禁止命令的产生.* 安全期间,禁止所有新的命令.*/if (MyWalSnd->state == WALSNDSTATE_STOPPING)ereport(ERROR,(errmsg("cannot execute new commands while WAL sender is in stopping mode")));/** CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next* command arrives. Clean up the old stuff if there's anything.* CREATE_REPLICATION_SLOT ... LOGICAL 导出快照直至下个命令到达.* 如存在,则清理旧的stuff.* */SnapBuildClearExportedSnapshot();//检查中断CHECK_FOR_INTERRUPTS();//命令上下文cmd_context = AllocSetContextCreate(CurrentMemoryContext,"Replication command context",ALLOCSET_DEFAULT_SIZES);old_context = MemoryContextSwitchTo(cmd_context);//初始化复制扫描器replication_scanner_init(cmd_string);parse_rc = replication_yyparse();if (parse_rc != 0)ereport(ERROR,(errcode(ERRCODE_SYNTAX_ERROR),(errmsg_internal("replication command parser returned %d",parse_rc))));cmd_node = replication_parse_result;/** Log replication command if log_replication_commands is enabled. Even* when it's disabled, log the command with DEBUG1 level for backward* compatibility. Note that SQL commands are not logged here, and will be* logged later if log_statement is enabled.* 如log_replication_commands启用,则记录复制命令在日志中.* 就算该选项被禁止,通过DEBUG1级别记录日志.* 注意SQL命令不在这里记录,在log_statement启用的情况下在后续进行记录.* */if (cmd_node->type != T_SQLCmd)ereport(log_replication_commands ? LOG : DEBUG1,(errmsg("received replication command: %s", cmd_string)));/** CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot. If it was* called outside of transaction the snapshot should be cleared here.* CREATE_REPLICATION_SLOT ... LOGICAL导出快照.* 该命令如果在事务的外层被调用,那么快照应在这里清除.*/if (!IsTransactionBlock())SnapBuildClearExportedSnapshot();/** For aborted transactions, don't allow anything except pure SQL, the* exec_simple_query() will handle it correctly.* 对于废弃的事务,除了纯SQL外不允许其他命令,exec_simple_query()函数可以正确处理这种情况.*/if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))ereport(ERROR,(errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),errmsg("current transaction is aborted, ""commands ignored until end of transaction block")));CHECK_FOR_INTERRUPTS();/** Allocate buffers that will be used for each outgoing and incoming* message.  We do this just once per command to reduce palloc overhead.* 为消息I/O分配缓存.* 每个命令执行一次以减少palloc的负载.*/initStringInfo(&output_message);initStringInfo(&reply_message);initStringInfo(&tmpbuf);/* Report to pgstat that this process is running *///向pgstat报告该进程正在运行.pgstat_report_activity(STATE_RUNNING, NULL);//根据命令类型执行相应的命令switch (cmd_node->type){case T_IdentifySystemCmd://识别系统IdentifySystem();break;case T_BaseBackupCmd://BASE_BACKUPPreventInTransactionBlock(true, "BASE_BACKUP");SendBaseBackup((BaseBackupCmd *) cmd_node);break;case T_CreateReplicationSlotCmd://创建复制slotCreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);break;case T_DropReplicationSlotCmd://删除复制slotDropReplicationSlot((DropReplicationSlotCmd *) cmd_node);break;case T_StartReplicationCmd://START_REPLICATION{StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;PreventInTransactionBlock(true, "START_REPLICATION");if (cmd->kind == REPLICATION_KIND_PHYSICAL)StartReplication(cmd);elseStartLogicalReplication(cmd);break;}case T_TimeLineHistoryCmd://构造时间线历史 TIMELINE_HISTORYPreventInTransactionBlock(true, "TIMELINE_HISTORY");SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);break;case T_VariableShowStmt://{DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);VariableShowStmt *n = (VariableShowStmt *) cmd_node;GetPGVariable(n->name, dest);}break;case T_SQLCmd://SQL命令if (MyDatabaseId == InvalidOid)ereport(ERROR,(errmsg("cannot execute SQL commands in WAL sender for physical replication")));/* Report to pgstat that this process is now idle */pgstat_report_activity(STATE_IDLE, NULL);/* Tell the caller that this wasn't a WalSender command. */return false;default://其他命令elog(ERROR, "unrecognized replication command node tag: %u",cmd_node->type);}/* done *///执行完毕,回到原来的内存上下文中MemoryContextSwitchTo(old_context);MemoryContextDelete(cmd_context);/* Send CommandComplete message *///命令结束EndCommand("SELECT", DestRemote);/* Report to pgstat that this process is now idle *///报告状态pgstat_report_activity(STATE_IDLE, NULL);return true;
}

StartReplication
StartReplication处理START_REPLICATION命令.
其主要逻辑如下:
1.执行相关初始化和校验
2.选择时间线
3.进入COPY模式
3.1设置状态
3.2发送CopyBothResponse消息,启动streaming
3.3初始化相关变量,如共享内存状态等
3.4进入主循环(WalSndLoop)


/** Handle START_REPLICATION command.* 处理START_REPLICATION命令** At the moment, this never returns, but an ereport(ERROR) will take us back* to the main loop.* 该函数不会返回,但ereport(ERROR)调用可以回到主循环*/
static void
StartReplication(StartReplicationCmd *cmd)
{StringInfoData buf;XLogRecPtr  FlushPtr;if (ThisTimeLineID == 0)//时间线校验ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));/** We assume here that we're logging enough information in the WAL for* log-shipping, since this is checked in PostmasterMain().* 在这里,由于在PostmasterMain()假定已为log-shipping记录了足够多的信息** NOTE: wal_level can only change at shutdown, so in most cases it is* difficult for there to be WAL data that we can still see that was* written at wal_level='minimal'.* 注意:wal_level只能在shutdown的情况下进行修改,*   因此在大多数情况下,很难看到在wal_level='minimal'的情况下的WAL数据.*/if (cmd->slotname){ReplicationSlotAcquire(cmd->slotname, true);//#define SlotIsLogical ( slot ) (slot->data.database != InvalidOid)if (SlotIsLogical(MyReplicationSlot))ereport(ERROR,(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),(errmsg("cannot use a logical replication slot for physical replication"))));}/** Select the timeline. If it was given explicitly by the client, use* that. Otherwise use the timeline of the last replayed record, which is* kept in ThisTimeLineID.* 选择时间线.* 如果通过客户端明确给出,则使用该值.* 否则的话,使用最后重放记录的时间线,在ThisTimeLineID中保存.*/if (am_cascading_walsender){/* this also updates ThisTimeLineID *///这也会更新ThisTimeLineID变量FlushPtr = GetStandbyFlushRecPtr();}elseFlushPtr = GetFlushRecPtr();if (cmd->timeline != 0){XLogRecPtr  switchpoint;sendTimeLine = cmd->timeline;if (sendTimeLine == ThisTimeLineID){sendTimeLineIsHistoric = false;sendTimeLineValidUpto = InvalidXLogRecPtr;}else{List       *timeLineHistory;sendTimeLineIsHistoric = true;/** Check that the timeline the client requested exists, and the* requested start location is on that timeline.* 检查客户端请求的时间线是否存在,请求的开始位置是否在该时间线上.*/timeLineHistory = readTimeLineHistory(ThisTimeLineID);switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,&sendTimeLineNextTLI);list_free_deep(timeLineHistory);/** Found the requested timeline in the history. Check that* requested startpoint is on that timeline in our history.* 通过历史文件找到请求的时间线.* 在历史中检查请求的开始点是否在时间线上.** This is quite loose on purpose. We only check that we didn't* fork off the requested timeline before the switchpoint. We* don't check that we switched *to* it before the requested* starting point. This is because the client can legitimately* request to start replication from the beginning of the WAL* segment that contains switchpoint, but on the new timeline, so* that it doesn't end up with a partial segment. If you ask for* too old a starting point, you'll get an error later when we* fail to find the requested WAL segment in pg_wal.* 这是有意为之.我们只检查在切换点之前没有fork off的请求的时间线.* 我们不会检查在请求的开始点之前的时间线.* 这是因为客户端可以合法地请求从包含交换点的WAL端的开始处进行复制,*   在新的时间线上如此执行,以避免出现由于部分segment的问题导致出错.* 如果客户端请求一个较旧的开始点,在pg_wal中无法找到请求的WAL段时会报错.** XXX: we could be more strict here and only allow a startpoint* that's older than the switchpoint, if it's still in the same* WAL segment.* XXX: 我们可以更严格,如果仍然在同一个WAL segment中,那么可以只允许比切换点旧的开始点*/if (!XLogRecPtrIsInvalid(switchpoint) &&switchpoint < cmd->startpoint){ereport(ERROR,(errmsg("requested starting point %X/%X on timeline %u is not in this server's history",(uint32) (cmd->startpoint >> 32),(uint32) (cmd->startpoint),cmd->timeline),errdetail("This server's history forked from timeline %u at %X/%X.",cmd->timeline,(uint32) (switchpoint >> 32),(uint32) (switchpoint))));}sendTimeLineValidUpto = switchpoint;}}else{sendTimeLine = ThisTimeLineID;sendTimeLineValidUpto = InvalidXLogRecPtr;sendTimeLineIsHistoric = false;}streamingDoneSending = streamingDoneReceiving = false;/* If there is nothing to stream, don't even enter COPY mode *///如果没有任何东西需要stream,不需要启动COPY命令if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto){/** When we first start replication the standby will be behind the* primary. For some applications, for example synchronous* replication, it is important to have a clear state for this initial* catchup mode, so we can trigger actions when we change streaming* state later. We may stay in this state for a long time, which is* exactly why we want to be able to monitor whether or not we are* still here.* 在首次启动复制时,standby节点会落后于master节点.* 对于某些应用,比如同步复制,对于这种初始的catchup模式有一个干净的状态是十分重要的,*   因此在改变streaming状态时我们可以触发相关的动作.* 我们可以处于这种状态很长时间,这正是我们希望有能力监控我们是否仍在这里的原因.*///设置状态WalSndSetState(WALSNDSTATE_CATCHUP);/* Send a CopyBothResponse message, and start streaming *///发送CopyBothResponse消息,启动streamingpq_beginmessage(&buf, 'W');//W->COPY命令?pq_sendbyte(&buf, 0);pq_sendint16(&buf, 0);pq_endmessage(&buf);pq_flush();/** Don't allow a request to stream from a future point in WAL that* hasn't been flushed to disk in this server yet.* 不允许请求该服务器上一个尚未刷入到磁盘上的WAL未来位置.*/if (FlushPtr < cmd->startpoint){ereport(ERROR,(errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",(uint32) (cmd->startpoint >> 32),(uint32) (cmd->startpoint),(uint32) (FlushPtr >> 32),(uint32) (FlushPtr))));}/* Start streaming from the requested point *///从请求点开始streamingsentPtr = cmd->startpoint;/* Initialize shared memory status, too *///初始化共享内存状态SpinLockAcquire(&MyWalSnd->mutex);MyWalSnd->sentPtr = sentPtr;SpinLockRelease(&MyWalSnd->mutex);SyncRepInitConfig();/* Main loop of walsender *///walsender主循环,开始复制,激活复制replication_active = true;//主循环WalSndLoop(XLogSendPhysical);//完结后设置为非活动状态replication_active = false;if (got_STOPPING)proc_exit(0);//退出//设置状态WalSndSetState(WALSNDSTATE_STARTUP);Assert(streamingDoneSending && streamingDoneReceiving);}if (cmd->slotname)ReplicationSlotRelease();/** Copy is finished now. Send a single-row result set indicating the next* timeline.* Copy命令已完结.发送单行结果集以提升下一个timeline*/if (sendTimeLineIsHistoric){char        startpos_str[8 + 1 + 8 + 1];DestReceiver *dest;TupOutputState *tstate;TupleDesc   tupdesc;Datum       values[2];bool        nulls[2];snprintf(startpos_str, sizeof(startpos_str), "%X/%X",(uint32) (sendTimeLineValidUpto >> 32),(uint32) sendTimeLineValidUpto);dest = CreateDestReceiver(DestRemoteSimple);MemSet(nulls, false, sizeof(nulls));/** Need a tuple descriptor representing two columns. int8 may seem* like a surprising data type for this, but in theory int4 would not* be wide enough for this, as TimeLineID is unsigned.*/tupdesc = CreateTemplateTupleDesc(2);TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",INT8OID, -1, 0);TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",TEXTOID, -1, 0);/* prepare for projection of tuple */tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);values[1] = CStringGetTextDatum(startpos_str);/* send it to dest */do_tup_output(tstate, values, nulls);end_tup_output(tstate);}/* Send CommandComplete message */pq_puttextmessage('C', "START_STREAMING");
}

三、跟踪分析

在主节点上用gdb跟踪postmaster,在PostgresMain上设置断点后启动standby节点,进入断点


[xdb@localhost ~]$ ps -ef|grep postgres
xdb       1339     1  2 14:45 pts/0    00:00:00 /appdb/xdb/pg11.2/bin/postgres
[xdb@localhost ~]$ gdb -p 1339
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
...
(gdb) set follow-fork-mode child
(gdb) b exec_replication_command
Breakpoint 1 at 0x852fd2: file walsender.c, line 1438.
(gdb) c
Continuing.
[New process 1356]
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7f5df9d2d8c0 (LWP 1356)]
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "IDENTIFY_SYSTEM") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

第一个命令是IDENTIFY_SYSTEM,第二个命令才是需要跟踪的对象START_REPLICATION


(gdb) c
Continuing.
Breakpoint 1, exec_replication_command (cmd_string=0x1d66d78 "START_REPLICATION 0/5D000000 TIMELINE 16") at walsender.c:1438
1438        if (got_STOPPING)
(gdb)

1.执行相关初始化和校验


(gdb) n
1446        if (MyWalSnd->state == WALSNDSTATE_STOPPING)
(gdb)
1454        SnapBuildClearExportedSnapshot();
(gdb) p *MyWalSnd
$1 = {pid = 1356, state = WALSNDSTATE_STARTUP, sentPtr = 0, needreload = false, write = 0, flush = 0, apply = 0, writeLag = -1, flushLag = -1, applyLag = -1, mutex = 0 '\000', latch = 0x7f5dee92c4d4, sync_standby_priority = 0}
(gdb) n
1456        CHECK_FOR_INTERRUPTS();
(gdb)

2.切换内存上下文


(gdb)
1458        cmd_context = AllocSetContextCreate(CurrentMemoryContext,
(gdb)
1461        old_context = MemoryContextSwitchTo(cmd_context);
(gdb)

3.初始化复制扫描器


(gdb)
1463        replication_scanner_init(cmd_string);
(gdb) n
1464        parse_rc = replication_yyparse();
(gdb)
1465        if (parse_rc != 0)
(gdb) p parse_rc
$3 = 0
(gdb)
(gdb) n
1471        cmd_node = replication_parse_result;
(gdb)
(gdb)
1479        if (cmd_node->type != T_SQLCmd)
(gdb) n
1480            ereport(log_replication_commands ? LOG : DEBUG1,
(gdb) p cmd_node
$4 = (Node *) 0x1df4710
(gdb) p *cmd_node
$5 = {type = T_StartReplicationCmd}
(gdb)

4.执行事务相关的判断或校验


(gdb) n
1487        if (!IsTransactionBlock())
(gdb)
1488            SnapBuildClearExportedSnapshot();
(gdb)
1494        if (IsAbortedTransactionBlockState() && !IsA(cmd_node, SQLCmd))
(gdb)
1500        CHECK_FOR_INTERRUPTS();
(gdb)

5.初始化输入输出消息


(gdb)
1506        initStringInfo(&output_message);
(gdb)
1507        initStringInfo(&reply_message);
(gdb)
1508        initStringInfo(&tmpbuf);
(gdb)
1511        pgstat_report_activity(STATE_RUNNING, NULL);

6.根据命令类型执行相应的命令
6.1命令类型为T_StartReplicationCmd,调用StartReplication


(gdb) n
1513        switch (cmd_node->type)
(gdb)
1534                    StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
(gdb)
1536                    PreventInTransactionBlock(true, "START_REPLICATION");
(gdb)
1538                    if (cmd->kind == REPLICATION_KIND_PHYSICAL)
(gdb)
1539                        StartReplication(cmd);

进入StartReplication


1539                        StartReplication(cmd);
(gdb) step
StartReplication (cmd=0x1df4710) at walsender.c:532
532     if (ThisTimeLineID == 0)
(gdb)

1.执行相关初始化和校验


(gdb) n
546     if (cmd->slotname)
(gdb)
560     if (am_cascading_walsender)
(gdb)

2.选择时间线


(gdb) n
568     if (cmd->timeline != 0)
(gdb)
572         sendTimeLine = cmd->timeline;
(gdb)
573         if (sendTimeLine == ThisTimeLineID)
(gdb)
575             sendTimeLineIsHistoric = false;
(gdb) p FlushPtr
$9 = 1560397696
(gdb) n
576             sendTimeLineValidUpto = InvalidXLogRecPtr;
(gdb)
634     streamingDoneSending = streamingDoneReceiving = false;
(gdb) p sendTimeLine
$10 = 16
(gdb) p ThisTimeLineID
$11 = 16
(gdb) p *cmd
$12 = {type = T_StartReplicationCmd, kind = REPLICATION_KIND_PHYSICAL, slotname = 0x0, timeline = 16, startpoint = 1560281088, options = 0x0}
(gdb)

3.进入COPY模式


(gdb) n
637     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
(gdb)

3.1设置状态


648         WalSndSetState(WALSNDSTATE_CATCHUP);
(gdb) p sendTimeLineValidUpto
$13 = 0
(gdb) p cmd->startpoint
$14 = 1560281088
(gdb)

3.2发送CopyBothResponse消息,启动streaming


(gdb) n
651         pq_beginmessage(&buf, 'W');
(gdb)
652         pq_sendbyte(&buf, 0);
(gdb)
653         pq_sendint16(&buf, 0);
(gdb)
654         pq_endmessage(&buf);
(gdb) p buf
$15 = {data = 0x1df53b0 "", len = 3, maxlen = 1024, cursor = 87}
(gdb) p buf->data
$16 = 0x1df53b0 ""
(gdb) x/hb buf->data
0x1df53b0:  0
(gdb) x/32hb buf->data
0x1df53b0:  0   0   0   127 127 127 127 127
0x1df53b8:  127 127 127 127 127 127 127 127
0x1df53c0:  127 127 127 127 127 127 127 127
0x1df53c8:  127 127 127 127 127 127 127 127
(gdb)

3.3初始化相关变量,如共享内存状态等


(gdb) n
655         pq_flush();
(gdb)
661         if (FlushPtr < cmd->startpoint)
(gdb) p FlushPtr
$17 = 1560397696
(gdb) p cmd->startpoint
$18 = 1560281088
(gdb) n
672         sentPtr = cmd->startpoint;
(gdb)
675         SpinLockAcquire(&MyWalSnd->mutex);
(gdb)
676         MyWalSnd->sentPtr = sentPtr;
(gdb)
677         SpinLockRelease(&MyWalSnd->mutex);
(gdb)
679         SyncRepInitConfig();
(gdb)
682         replication_active = true;

3.4进入主循环(WalSndLoop)


(gdb)
684         WalSndLoop(XLogSendPhysical);
(gdb)

DONE!

四、参考资料

PG Source Code

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/6906/viewspace-2639084/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/6906/viewspace-2639084/

PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)相关推荐

  1. PostgreSQL 源码解读(153)- 后台进程#5(walsender#1)

    本节简单介绍了PostgreSQL的后台进程walsender,该进程实质上是streaming replication环境中master节点上普通的backend进程,在standby节点启动时,s ...

  2. PostgreSQL 源码解读(156)- 后台进程#8(walsender#4)

    上节介绍了PostgreSQL的后台进程walsender中的函数WalSndLoop->WaitLatchOrSocket->WaitEventSetWait->WaitEvent ...

  3. PostgreSQL 源码解读(155)- 后台进程#7(walsender#3)

    本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的函数WalSndLoop->WaitLatchOrSocket->WaitEventSetWait-&g ...

  4. PostgreSQL 源码解读(212)- 后台进程#11(checkpointer-SyncOneBuffer)

    本节介绍了checkpoint中用于刷一个脏page的函数:SyncOneBuffer,该函数在syncing期间处理一个buffer. 一.数据结构 宏定义 checkpoints request ...

  5. PostgreSQL 源码解读(147)- Storage Manager#3(fsm_search函数)

    本节简单介绍了PostgreSQL在执行插入过程中与存储相关的函数RecordAndGetPageWithFreeSpace->fsm_search,该函数搜索FSM,找到有足够空闲空间(min ...

  6. PostgreSQL 源码解读(160)- 查询#80(如何实现表达式解析)

    本节介绍了PostgreSQL如何解析查询语句中的表达式列并计算得出该列的值.表达式列是指除关系定义中的系统列/定义列之外的其他投影列.比如: testdb=# create table t_expr ...

  7. PostgreSQL 源码解读(203)- 查询#116(类型转换实现)

    本节简单介绍了PostgreSQL中的类型转换的具体实现. 解析表达式,涉及不同数据类型时: 1.如有相应类型的Operator定义(pg_operator),则尝试进行类型转换,否则报错; 2.如有 ...

  8. PostgreSQL 源码解读(31)- 查询语句#16(查询优化-表达式预处理#1)

    本节简单介绍了PG查询优化对表达式预处理中连接Var(RTE中的Var,其中RTE_KIND=RTE_JOIN)溯源的过程.处理逻辑在主函数subquery_planner中通过调用flatten_j ...

  9. PostgreSQL 源码解读(35)- 查询语句#20(查询优化-简化Having和Grou...

    本节简单介绍了PG查询优化中对Having和Group By子句的简化处理. 一.基本概念 简化Having语句 把Having中的约束条件,如满足可以提升到Where条件中的,则移动到Where子句 ...

最新文章

  1. Android 相对布局别自己快遗忘的属性layout_alignRight,layout_alignBottom,layout_alignTop,layout_alignLeft
  2. Redis 初次尝试
  3. 设计模式:面向对象的设计原则下(ISP、DIP、KISS、YAGNI、DRY、LOD)
  4. 栈、队列(链表实现)
  5. HP LINUX打印机驱动安装步骤
  6. 图像任意角度旋转和翻转(C#)
  7. 网页版聊天服务器,网页版在线聊天室
  8. OBLOG Accesss 转SQL Server 常见问题及解决方法
  9. java打印出 锟斤拷_一段java代码带你认识锟斤拷
  10. WiFi功耗管理(一)(概述)
  11. 【设计模式】—-(12)代理模式(结构型)
  12. 利用cobaltstrike加sqlmap拿下一个网站并提权
  13. MySQL中的极限值
  14. Javascript中删除数组中重复出现的元素
  15. 计算机计算编码知识题库,计算机基础知识复习题库
  16. ubuntu 下 电驴下载及配置
  17. 反幂法matlab程序,数值分析幂法和反幂法.doc
  18. 【基础知识】ActiveMQ基本原理
  19. 6000字长文,详解黑色产业链的「攻守道」
  20. Nginx配置SSL证书时——nginx:[emerg]unknowndirectivessl错误

热门文章

  1. 《鬼谷子本经阴符七术》2灵龟养志法 (原文)
  2. mysql导出建库语句_mysql建库建表,导出表结构
  3. [js高手之路] html5 canvas动画教程 - 匀速运动
  4. 移动端 viewport
  5. PPT中这8个隐藏技巧-掌握了马上让你幸福感满满
  6. 小括号在c语言的作用,c语言小括号的用法有哪些用处.docx
  7. 02采集器Categraf
  8. 两个比尔:情商奇才与智商天才
  9. Nightmare Ⅱ(双向BFS)
  10. 【编程之美】如何将代码写得像诗一样优雅?