本节简单介绍了PostgreSQL的备份工具pg_basebackup源码中实际执行备份逻辑的BaseBackup中对WAL数据进行备份的实现函数StartLogStreamer->LogStreamerMain及其主要的实现函数ReceiveXlogStream.

一、数据结构

logstreamer_param
WAL data streamer参数.


typedef struct
{后台连接PGconn     *bgconn;//开始位置XLogRecPtr  startptr;//目录或者tar文件,依赖于使用的模式char        xlog[MAXPGPATH];    /* directory or tarfile depending on mode *///系统标识符char       *sysidentifier;//时间线int         timeline;
} logstreamer_param;

StreamCtl
接收xlog流数据时的全局参数


/** Global parameters when receiving xlog stream. For details about the individual fields,* see the function comment for ReceiveXlogStream().* 接收xlog流数据时的全局参数.* 每个域字段的详细解释,参见ReceiveXlogStream()函数注释.*/
typedef struct StreamCtl
{//streaming的开始位置XLogRecPtr  startpos;       /* Start position for streaming *///时间线TimeLineID  timeline;       /* Timeline to stream data from *///系统标识符char       *sysidentifier;  /* Validate this system identifier and* timeline *///standby超时信息int         standby_message_timeout;    /* Send status messages this often *///是否同步(写入时是否马上Flush WAL data)bool        synchronous;    /* Flush immediately WAL data on write *///在已归档的数据中标记segment为已完成bool        mark_done;      /* Mark segment as done in generated archive *///刷新到磁盘上以确保数据的一致性状态(是否已刷新到磁盘上)bool        do_sync;        /* Flush to disk to ensure consistent state of* data *///在返回T时停止streamingstream_stop_callback stream_stop;   /* Stop streaming when returns true *///如有效,监测该socket中的输入并检查stream_stop()的返回pgsocket    stop_socket;    /* if valid, watch for input on this socket* and check stream_stop() when there is any *///如何写WALWalWriteMethod *walmethod;  /* How to write the WAL *///附加到部分接受文件的后缀char       *partial_suffix; /* Suffix appended to partially received files *///使用的replication slot,如无则为NULLchar       *replication_slot;   /* Replication slot to use, or NULL */
} StreamCtl;

二、源码解读

LogStreamerMain
WAL流复制主函数,用于fork后的子进程调用


static int
LogStreamerMain(logstreamer_param *param)
{StreamCtl   stream;//接收xlog流数据时的全局参数in_log_streamer = true;//初始化StreamCtl结构体MemSet(&stream, 0, sizeof(stream));stream.startpos = param->startptr;stream.timeline = param->timeline;stream.sysidentifier = param->sysidentifier;stream.stream_stop = reached_end_position;
#ifndef WIN32stream.stop_socket = bgpipe[0];
#elsestream.stop_socket = PGINVALID_SOCKET;
#endifstream.standby_message_timeout = standby_message_timeout;stream.synchronous = false;stream.do_sync = do_sync;stream.mark_done = true;stream.partial_suffix = NULL;stream.replication_slot = replication_slot;if (format == 'p')stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);elsestream.walmethod = CreateWalTarMethod(param->xlog, compresslevel, do_sync);//接收数据if (!ReceiveXlogStream(param->bgconn, &stream))/** Any errors will already have been reported in the function process,* but we need to tell the parent that we didn't shutdown in a nice* way.* 在函数执行过程中出现的错误已通过警告的方式发出,* 但仍需要告知父进程不能优雅的关闭本进程.*/return 1;if (!stream.walmethod->finish()){fprintf(stderr,_("%s: could not finish writing WAL files: %s\n"),progname, strerror(errno));return 1;}//结束连接PQfinish(param->bgconn);//普通文件格式if (format == 'p')FreeWalDirectoryMethod();elseFreeWalTarMethod();//是否内存pg_free(stream.walmethod);return 0;
}

ReceiveXlogStream
在指定的开始位置接收log stream


/** Receive a log stream starting at the specified position.* 在指定的开始位置接收log stream** Individual parameters are passed through the StreamCtl structure.* 通过StreamCtl结构体传递参数.** If sysidentifier is specified, validate that both the system* identifier and the timeline matches the specified ones* (by sending an extra IDENTIFY_SYSTEM command)* 如指定了系统标识符,验证系统标识符和timeline是否匹配指定的信息.* (通过发送额外的IDENTIFY_SYSTEM命令)** All received segments will be written to the directory* specified by basedir. This will also fetch any missing timeline history* files.* 所有接收到的segments会写入到basedir中.* 这同时会提前所有缺失的timeline history文件.** The stream_stop callback will be called every time data* is received, and whenever a segment is completed. If it returns* true, the streaming will stop and the function* return. As long as it returns false, streaming will continue* indefinitely.* stream_stop回调函数在每次接收到数据以及segment完成传输后调用.* 如返回T,streaming会停止,函数返回.* 如返回F,streaming会一直继续.** If stream_stop() checks for external input, stop_socket should be set to* the FD it checks.  This will allow such input to be detected promptly* rather than after standby_message_timeout (which might be indefinite).* Note that signals will interrupt waits for input as well, but that is* race-y since a signal received while busy won't interrupt the wait.* 如stream_stop()用于检测额外的输入,stop_socket变量应设置为该函数需检查的FD.* 这会允许立即检测此类输入,而不是在standby_message_timeout之后(可能会无限循环).* 注意信号也会中断输入等待,但这是存在竞争的,因为在忙时接收到信号不会中断等待.** standby_message_timeout controls how often we send a message* back to the master letting it know our progress, in milliseconds.* Zero means no messages are sent.* This message will only contain the write location, and never* flush or replay.* standby_message_timeout控制发送进度消息回master的频度,单位为ms.* 0意味着没有消息会发送.* 该消息只保存写入位置,永远不会flush或replay.** If 'partial_suffix' is not NULL, files are initially created with the* given suffix, and the suffix is removed once the file is finished. That* allows you to tell the difference between partial and completed files,* so that you can continue later where you left.* 如'partial_suffix'不为NULL,文件已通过给定的suffix创建,*   一旦文件完成传输,则suffix会被清除.* 这是部分和完整完成文件的异同,以便在离开后可以继续.** If 'synchronous' is true, the received WAL is flushed as soon as written,* otherwise only when the WAL file is closed.* 如'synchronous'为T,接收到的WAL会刷新为写入,否则的话只会在WAL file关闭时才写入.** Note: The WAL location *must* be at a log segment start!* 注意:WAL位置必须是log segment的起始位置.*/
bool
ReceiveXlogStream(PGconn *conn, StreamCtl *stream)
{char        query[128];char        slotcmd[128];PGresult   *res;XLogRecPtr  stoppos;/** The caller should've checked the server version already, but doesn't do* any harm to check it here too.* 调用者已完成版本校验,但这里重复校验并没有什么问题.*/if (!CheckServerVersionForStreaming(conn))return false;/** Decide whether we want to report the flush position. If we report the* flush position, the primary will know what WAL we'll possibly* re-request, and it can then remove older WAL safely. We must always do* that when we are using slots.* 确定是否需要报告flush位置.* 如果我们报告了flush位置,主服务器将会知道可能重复请求的WAL file,*   这样可以安全的移除更老的WAL.* 如使用slots,应经常执行该操作.** Reporting the flush position makes one eligible as a synchronous* replica. People shouldn't include generic names in* synchronous_standby_names, but we've protected them against it so far,* so let's continue to do so unless specifically requested.* 报告flush位置使其符合同步副本的条件.* DBA不应该在synchronous_standby_names中包含常规的名称,但我们截止目前位置已很好的保护了它们,*   因此可以继续这样执行除非特别请求.*/if (stream->replication_slot != NULL){//存在slotreportFlushPosition = true;sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);}else{if (stream->synchronous)reportFlushPosition = true;//同步elsereportFlushPosition = false;//异步slotcmd[0] = 0;//ASCII 0}if (stream->sysidentifier != NULL){//系统标识符不为NULL/* Validate system identifier hasn't changed *///验证系统标识符没有改变//发送IDENTIFY_SYSTEM命令res = PQexec(conn, "IDENTIFY_SYSTEM");if (PQresultStatus(res) != PGRES_TUPLES_OK){fprintf(stderr,_("%s: could not send replication command \"%s\": %s"),progname, "IDENTIFY_SYSTEM", PQerrorMessage(conn));PQclear(res);return false;}if (PQntuples(res) != 1 || PQnfields(res) < 3){fprintf(stderr,_("%s: could not identify system: got %d rows and %d fields, expected %d rows and %d r more fields\n"),progname, PQntuples(res), PQnfields(res), 1, 3);PQclear(res);return false;}if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0){fprintf(stderr,_("%s: system identifier does not match between base backup and streaming onnection\n"),progname);PQclear(res);return false;}if (stream->timeline > atoi(PQgetvalue(res, 0, 1))){fprintf(stderr,_("%s: starting timeline %u is not present in the server\n"),progname, stream->timeline);PQclear(res);return false;}PQclear(res);}/** initialize flush position to starting point, it's the caller's* responsibility that that's sane.* 初始化flush位置为开始点,这是调用者的责任.*/lastFlushPosition = stream->startpos;while (1){/** Fetch the timeline history file for this timeline, if we don't have* it already. When streaming log to tar, this will always return* false, as we are never streaming into an existing file and* therefore there can be no pre-existing timeline history file.* 为该timeline提前timeline history,如我们已不需要.* 如streaming日志为tar格式,这通常会返回F,这如同从来没有streaming到已存在的文件中,*   因此没有已存在的timeline history文件.*/if (!existsTimeLineHistoryFile(stream)){//如不存在history文件snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);//发送TIMELINE_HISTORY命令res = PQexec(conn, query);if (PQresultStatus(res) != PGRES_TUPLES_OK){/* FIXME: we might send it ok, but get an error */fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),progname, "TIMELINE_HISTORY", PQresultErrorMessage(res));PQclear(res);return false;}/** The response to TIMELINE_HISTORY is a single row result set* with two fields: filename and content* TIMELINE_HISTORY的响应是一个单行结果集,有两个字段:filename和content*/if (PQnfields(res) != 2 || PQntuples(res) != 1){fprintf(stderr,_("%s: unexpected response to TIMELINE_HISTORY command: got %d rows and %d ields, expected %d rows and %d fields\n"),progname, PQntuples(res), PQnfields(res), 1, 2);}/* Write the history file to disk *///写入history文件到磁盘上writeTimeLineHistoryFile(stream,PQgetvalue(res, 0, 0),PQgetvalue(res, 0, 1));PQclear(res);}/** Before we start streaming from the requested location, check if the* callback tells us to stop here.* 从请求的位置开始streaming前,检查回调函数告诉我们在哪停止*/if (stream->stream_stop(stream->startpos, stream->timeline, false))return true;/* Initiate the replication stream at specified location *///在指定的位置初始化复制流snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",slotcmd,(uint32) (stream->startpos >> 32), (uint32) stream->startpos,stream->timeline);//发送START_REPLICATION命令res = PQexec(conn, query);if (PQresultStatus(res) != PGRES_COPY_BOTH){fprintf(stderr, _("%s: could not send replication command \"%s\": %s"),progname, "START_REPLICATION", PQresultErrorMessage(res));PQclear(res);return false;}PQclear(res);/* Stream the WAL *///流化WALres = HandleCopyStream(conn, stream, &stoppos);if (res == NULL)goto error;/** Streaming finished.** There are two possible reasons for that: a controlled shutdown, or* we reached the end of the current timeline. In case of* end-of-timeline, the server sends a result set after Copy has* finished, containing information about the next timeline. Read* that, and restart streaming from the next timeline. In case of* controlled shutdown, stop here.* Streaming完成.* 这里有两个可能的原因:可控的shutdown或者到达了当前时间线的末尾.* 在end-of-timeline这种情况下,服务器在Copy完成后发送结果集,*   含有关于下一个时间线的相关信息.* 读取这些信息,在下一个时间线开始重新启动streaming.* 如为可控的关闭,可以停止了.*/if (PQresultStatus(res) == PGRES_TUPLES_OK){/** End-of-timeline. Read the next timeline's ID and starting* position. Usually, the starting position will match the end of* the previous timeline, but there are corner cases like if the* server had sent us half of a WAL record, when it was promoted.* The new timeline will begin at the end of the last complete* record in that case, overlapping the partial WAL record on the* old timeline.* 这是End-of-timeline的情况.* 读取下一个时间线ID和开始位置.通常来说,开始位置将匹配先前时间线的末尾,*   但会存在特殊的情况比如服务器已经传输了WAL Record的一部分.* 这种情况下,新的时间线会在上次已完成的记录末尾开始,与旧时间线的部分WAL Record重叠.*/uint32      newtimeline;//新的时间线bool        parsed;//是否解析//读取结果集的末尾parsed = ReadEndOfStreamingResult(res, &stream->startpos, &newtimeline);PQclear(res);if (!parsed)goto error;/* Sanity check the values the server gave us *///执行校验和坚持if (newtimeline <= stream->timeline){//新的时间线不可能小于等于stream中的时间线fprintf(stderr,_("%s: server reported unexpected next timeline %u, following timeline %u\n"),progname, newtimeline, stream->timeline);goto error;}if (stream->startpos > stoppos){//开始位置大于结束位置fprintf(stderr,_("%s: server stopped streaming timeline %u at %X/%X, but reported next timeline u to begin at %X/%X\n"),progname,stream->timeline, (uint32) (stoppos >> 32), (uint32) stoppos,newtimeline, (uint32) (stream->startpos >> 32), (uint32) stream->startpos);goto error;}/* Read the final result, which should be CommandComplete. *///读取最后的结果,应为命令结束res = PQgetResult(conn);if (PQresultStatus(res) != PGRES_COMMAND_OK){fprintf(stderr,_("%s: unexpected termination of replication stream: %s"),progname, PQresultErrorMessage(res));PQclear(res);goto error;}PQclear(res);/** Loop back to start streaming from the new timeline. Always* start streaming at the beginning of a segment.* 从新时间线开始循环,通常会在segment的开始出开始streaming*/stream->timeline = newtimeline;stream->startpos = stream->startpos -XLogSegmentOffset(stream->startpos, WalSegSz);continue;//继续循环}else if (PQresultStatus(res) == PGRES_COMMAND_OK){PQclear(res);/** End of replication (ie. controlled shut down of the server).* replication完成(比如服务器关闭了复制)** Check if the callback thinks it's OK to stop here. If not,* complain.* 检查是否回调函数认为在这里停止就OK了,如果不是,则报警.*/if (stream->stream_stop(stoppos, stream->timeline, false))return true;else{fprintf(stderr, _("%s: replication stream was terminated before stop point\n"),progname);goto error;}}else{/* Server returned an error. *///返回错误fprintf(stderr,_("%s: unexpected termination of replication stream: %s"),progname, PQresultErrorMessage(res));PQclear(res);goto error;}}
error:if (walfile != NULL && stream->walmethod->close(walfile, CLOSE_NO_RENAME) != 0)fprintf(stderr, _("%s: could not close file \"%s\": %s\n"),progname, current_walfile_name, stream->walmethod->getlasterror());walfile = NULL;return false;
}
/** The main loop of ReceiveXlogStream. Handles the COPY stream after* initiating streaming with the START_REPLICATION command.* ReceiveXlogStream中的主循环实现函数.* 在使用START_REPLICATION命令初始化streaming后处理COPY stream.** If the COPY ends (not necessarily successfully) due a message from the* server, returns a PGresult and sets *stoppos to the last byte written.* On any other sort of error, returns NULL.* 如COPY由于服务器端的原因终止,返回PGresult并设置*stoppos为最后写入的字节.* 如出现错误,则返回NULL.*/
static PGresult *
HandleCopyStream(PGconn *conn, StreamCtl *stream,XLogRecPtr *stoppos)
{char       *copybuf = NULL;TimestampTz last_status = -1;XLogRecPtr  blockpos = stream->startpos;still_sending = true;while (1){//循环处理int         r;TimestampTz now;//时间戳long        sleeptime;/** Check if we should continue streaming, or abort at this point.* 检查我们是否应该继续streaming,或者在当前就退出*/if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))goto error;now = feGetCurrentTimestamp();/** If synchronous option is true, issue sync command as soon as there* are WAL data which has not been flushed yet.* 如同步选项为T,只要存在未flushed的WAL data,马上执行sync命令.*/if (stream->synchronous && lastFlushPosition < blockpos && walfile != NULL){if (stream->walmethod->sync(walfile) != 0){fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),progname, current_walfile_name, stream->walmethod->getlasterror());goto error;}lastFlushPosition = blockpos;/** Send feedback so that the server sees the latest WAL locations* immediately.* 发送反馈以便服务器马上可看到最后的WAL位置.*/if (!sendFeedback(conn, blockpos, now, false))goto error;last_status = now;}/** Potentially send a status message to the master* 可能向主服务器发送状态消息*/if (still_sending && stream->standby_message_timeout > 0 &&feTimestampDifferenceExceeds(last_status, now,stream->standby_message_timeout)){/* Time to send feedback! *///是时候发送反馈了.if (!sendFeedback(conn, blockpos, now, false))goto error;last_status = now;}/** Calculate how long send/receive loops should sleep* 计算send/receive循环应该睡眠多长时间*/sleeptime = CalculateCopyStreamSleeptime(now, stream->standby_message_timeout,last_status);//拷贝stream中接收到的内容r = CopyStreamReceive(conn, sleeptime, stream->stop_socket, ©buf);while (r != 0){if (r == -1)goto error;//出错if (r == -2){//已完结或出错PGresult   *res = HandleEndOfCopyStream(conn, stream, copybuf, blockpos, stoppos);if (res == NULL)goto error;elsereturn res;}/* Check the message type. *///检查消息类型if (copybuf[0] == 'k'){if (!ProcessKeepaliveMsg(conn, stream, copybuf, r, blockpos,&last_status))goto error;}else if (copybuf[0] == 'w'){if (!ProcessXLogDataMsg(conn, stream, copybuf, r, &blockpos))goto error;/** Check if we should continue streaming, or abort at this* point.* 检查我们是否应该继续streaming或者在此就停止*/if (!CheckCopyStreamStop(conn, stream, blockpos, stoppos))goto error;}else{fprintf(stderr, _("%s: unrecognized streaming header: \"%c\"\n"),progname, copybuf[0]);goto error;}/** Process the received data, and any subsequent data we can read* without blocking.* 处理接收到的数据,后续的数据可以无阻塞的读取.*/r = CopyStreamReceive(conn, 0, stream->stop_socket, ©buf);}}
error:if (copybuf != NULL)PQfreemem(copybuf);return NULL;
}
/** Check if we should continue streaming, or abort at this point.*/
static bool
CheckCopyStreamStop(PGconn *conn, StreamCtl *stream, XLogRecPtr blockpos,XLogRecPtr *stoppos)
{if (still_sending && stream->stream_stop(blockpos, stream->timeline, false)){if (!close_walfile(stream, blockpos)){/* Potential error message is written by close_walfile */return false;}if (PQputCopyEnd(conn, NULL) <= 0 || PQflush(conn)){fprintf(stderr, _("%s: could not send copy-end packet: %s"),progname, PQerrorMessage(conn));return false;}still_sending = false;}return true;
}
/** Receive CopyData message available from XLOG stream, blocking for* maximum of 'timeout' ms.* 接收从XLOG stream中可用的CopyData消息,如超出最大的'timeout'毫秒,需要阻塞.** If data was received, returns the length of the data. *buffer is set to* point to a buffer holding the received message. The buffer is only valid* until the next CopyStreamReceive call.* 如接收到数据,则返回数据的大小.* 变量*buffer设置为指向含有接收到消息的buffer.buffer在下一个CopyStreamReceive调用才会生效.** Returns 0 if no data was available within timeout, or if wait was* interrupted by signal or stop_socket input.* -1 on error. -2 if the server ended the COPY.* 如在timeout时间内没有数据返回,或者如果因为信号等待/stop_socket输入中断,则返回0.* -1:表示出现错误.-2表示服务器完成了COPY*/
static int
CopyStreamReceive(PGconn *conn, long timeout, pgsocket stop_socket,char **buffer)
{char       *copybuf = NULL;int         rawlen;if (*buffer != NULL)PQfreemem(*buffer);*buffer = NULL;/* Try to receive a CopyData message */rawlen = PQgetCopyData(conn, ©buf, 1);if (rawlen == 0){int         ret;/** No data available.  Wait for some to appear, but not longer than* the specified timeout, so that we can ping the server.  Also stop* waiting if input appears on stop_socket.*/ret = CopyStreamPoll(conn, timeout, stop_socket);if (ret <= 0)return ret;/* Now there is actually data on the socket */if (PQconsumeInput(conn) == 0){fprintf(stderr,_("%s: could not receive data from WAL stream: %s"),progname, PQerrorMessage(conn));return -1;}/* Now that we've consumed some input, try again */rawlen = PQgetCopyData(conn, ©buf, 1);if (rawlen == 0)return 0;}if (rawlen == -1)           /* end-of-streaming or error */return -2;if (rawlen == -2){fprintf(stderr, _("%s: could not read COPY data: %s"),progname, PQerrorMessage(conn));return -1;}/* Return received messages to caller */*buffer = copybuf;return rawlen;
}

三、跟踪分析

备份命令


pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v

启动gdb跟踪(跟踪fork的子进程)


[xdb@localhost ~]$ gdb pg_basebackup
GNU gdb (GDB) Red Hat Enterprise Linux 7.6.1-100.el7
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html>
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law.  Type "show copying"
and "show warranty" for details.
This GDB was configured as "x86_64-redhat-linux-gnu".
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>...
Reading symbols from /appdb/xdb/pg11.2/bin/pg_basebackup...done.
(gdb) set args -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
(gdb) set follow-fork-mode child
(gdb) b LogStreamerMain
Breakpoint 1 at 0x403c51: file pg_basebackup.c, line 490.
(gdb) r
Starting program: /appdb/xdb/pg11.2/bin/pg_basebackup -h 192.168.26.25 -U replicator -p 5432 -D /data/backup -P -Xs -R -v
[Thread debugging using libthread_db enabled]
Using host libthread_db library "/lib64/libthread_db.so.1".
Password:
pg_basebackup: initiating base backup, waiting for checkpoint to complete
pg_basebackup: checkpoint completed
pg_basebackup: write-ahead log start point: 0/5A000028 on timeline 16
pg_basebackup: starting background WAL receiver
pg_basebackup: created temporary replication slot "pg_basebackup_1604"
[New process 2036]
[Thread debugging using libthread_db enabled]backup/backup_label          )
Using host libthread_db library "/lib64/libthread_db.so.1".
[Switching to Thread 0x7ffff7fe7840 (LWP 2036)]
Breakpoint 1, LogStreamerMain (param=0x629db0) at pg_basebackup.c:490
490     in_log_streamer = true;
305153/305153 kB (100%), 1/1 tablespace                                          )
pg_basebackup: write-ahead log end point: 0/5A0000F8
pg_basebackup: waiting for background process to finish streaming ...
(gdb)

输入参数


(gdb) n
492     MemSet(&stream, 0, sizeof(stream));
(gdb) p *param
$1 = {bgconn = 0x62a280, startptr = 1509949440, xlog = "/data/backup/pg_wal", '\000' <repeats 1004 times>, sysidentifier = 0x61f1a0 "6666964067616600474", timeline = 16}
(gdb)

设置StreamCtl结构体


(gdb) n
493     stream.startpos = param->startptr;
(gdb)
494     stream.timeline = param->timeline;
(gdb)
495     stream.sysidentifier = param->sysidentifier;
(gdb)
496     stream.stream_stop = reached_end_position;
(gdb)
498     stream.stop_socket = bgpipe[0];
(gdb)
502     stream.standby_message_timeout = standby_message_timeout;
(gdb)
503     stream.synchronous = false;
(gdb)
504     stream.do_sync = do_sync;
(gdb)
505     stream.mark_done = true;
(gdb)
506     stream.partial_suffix = NULL;
(gdb)
507     stream.replication_slot = replication_slot;
(gdb)
509     if (format == 'p')
(gdb)
510         stream.walmethod = CreateWalDirectoryMethod(param->xlog, 0, do_sync);
(gdb)

进入ReceiveXlogStream函数


(gdb)
514     if (!ReceiveXlogStream(param->bgconn, &stream))
(gdb) step
ReceiveXlogStream (conn=0x62a280, stream=0x7fffffffda30) at receivelog.c:458
458     if (!CheckServerVersionForStreaming(conn))
(gdb)
(gdb) n
472     if (stream->replication_slot != NULL)
(gdb) p *stream
$2 = {startpos = 1509949440, timeline = 16, sysidentifier = 0x61f1a0 "6666964067616600474", standby_message_timeout = 10000, synchronous = false, mark_done = true, do_sync = true, stream_stop = 0x403953 <reached_end_position>, stop_socket = 8, walmethod = 0x632b10, partial_suffix = 0x0, replication_slot = 0x62a1e0 "pg_basebackup_1604"}
(gdb)

判断系统标识符和时间线


(gdb) n
474         reportFlushPosition = true;
(gdb)
475         sprintf(slotcmd, "SLOT \"%s\" ", stream->replication_slot);
(gdb)
486     if (stream->sysidentifier != NULL)
(gdb)
489         res = PQexec(conn, "IDENTIFY_SYSTEM");
(gdb)
490         if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb)
498         if (PQntuples(res) != 1 || PQnfields(res) < 3)
(gdb)
506         if (strcmp(stream->sysidentifier, PQgetvalue(res, 0, 0)) != 0)
(gdb) p PQgetvalue(res, 0, 0)
$3 = 0x633500 "6666964067616600474"
(gdb) n
514         if (stream->timeline > atoi(PQgetvalue(res, 0, 1)))
(gdb)
522         PQclear(res);
(gdb) p PQgetvalue(res, 0, 1)
$4 = 0x633514 "16"
(gdb)

不存在时间线history文件,生成history文件


(gdb) n
529     lastFlushPosition = stream->startpos;
(gdb)
539         if (!existsTimeLineHistoryFile(stream))
(gdb)
541             snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", stream->timeline);
(gdb)
542             res = PQexec(conn, query);
(gdb)
543             if (PQresultStatus(res) != PGRES_TUPLES_OK)
(gdb)
556             if (PQnfields(res) != 2 || PQntuples(res) != 1)
(gdb)
564             writeTimeLineHistoryFile(stream,
(gdb)
568             PQclear(res);
(gdb)

调用START_REPLICATION命令初始化


(gdb)
575         if (stream->stream_stop(stream->startpos, stream->timeline, false))
(gdb) n
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb)
581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb)
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb)
581                  (uint32) (stream->startpos >> 32), (uint32) stream->startpos,
(gdb)
579         snprintf(query, sizeof(query), "START_REPLICATION %s%X/%X TIMELINE %u",
(gdb)
583         res = PQexec(conn, query);
(gdb)
584         if (PQresultStatus(res) != PGRES_COPY_BOTH)
(gdb)
591         PQclear(res);
(gdb)

执行命令,处理stream WAL,完成调用


595         if (res == NULL)
(gdb) p *res
$5 = {ntups = 0, numAttributes = 0, attDescs = 0x0, tuples = 0x0, tupArrSize = 0, numParameters = 0, paramDescs = 0x0, resultStatus = PGRES_COMMAND_OK, cmdStatus = "START_STREAMING\000\000\000\000\000\270\027u\367\377\177\000\000P/c\000\000\000\000\000CT\000\000\001", '\000' <repeats 19 times>, "\200\000\000", binary = 0, noticeHooks = {noticeRec = 0x7ffff7b9eaa4 <defaultNoticeReceiver>, noticeRecArg = 0x0, noticeProc = 0x7ffff7b9eaf9 <defaultNoticeProcessor>, noticeProcArg = 0x0}, events = 0x0, nEvents = 0, client_encoding = 0, errMsg = 0x0, errFields = 0x0, errQuery = 0x0, null_field = "", curBlock = 0x0, curOffset = 0, spaceLeft = 0}
(gdb) n
608         if (PQresultStatus(res) == PGRES_TUPLES_OK)
(gdb)
666         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
(gdb)
668             PQclear(res);
(gdb)
676             if (stream->stream_stop(stoppos, stream->timeline, false))
(gdb)
677                 return true;
(gdb)
702 }
(gdb)
LogStreamerMain (param=0x629db0) at pg_basebackup.c:523
523     if (!stream.walmethod->finish())
(gdb)

DONE!

四、参考资料

PG Source Code

来自 “ ITPUB博客 ” ,链接:http://blog.itpub.net/6906/viewspace-2638760/,如需转载,请注明出处,否则将追究法律责任。

转载于:http://blog.itpub.net/6906/viewspace-2638760/

PostgreSQL 源码解读(152)- PG Tools#4(ReceiveXlogStream)相关推荐

  1. PostgreSQL 源码解读(160)- 查询#80(如何实现表达式解析)

    本节介绍了PostgreSQL如何解析查询语句中的表达式列并计算得出该列的值.表达式列是指除关系定义中的系统列/定义列之外的其他投影列.比如: testdb=# create table t_expr ...

  2. PostgreSQL 源码解读(147)- Storage Manager#3(fsm_search函数)

    本节简单介绍了PostgreSQL在执行插入过程中与存储相关的函数RecordAndGetPageWithFreeSpace->fsm_search,该函数搜索FSM,找到有足够空闲空间(min ...

  3. PostgreSQL 源码解读(156)- 后台进程#8(walsender#4)

    上节介绍了PostgreSQL的后台进程walsender中的函数WalSndLoop->WaitLatchOrSocket->WaitEventSetWait->WaitEvent ...

  4. PostgreSQL 源码解读(154)- 后台进程#6(walsender#2)

    本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的exec_replication_command和StartReplication函数. 调用栈如下: (gdb) ...

  5. PostgreSQL 源码解读(155)- 后台进程#7(walsender#3)

    本节继续介绍PostgreSQL的后台进程walsender,重点介绍的是调用栈中的函数WalSndLoop->WaitLatchOrSocket->WaitEventSetWait-&g ...

  6. PostgreSQL 源码解读(153)- 后台进程#5(walsender#1)

    本节简单介绍了PostgreSQL的后台进程walsender,该进程实质上是streaming replication环境中master节点上普通的backend进程,在standby节点启动时,s ...

  7. PostgreSQL 源码解读(31)- 查询语句#16(查询优化-表达式预处理#1)

    本节简单介绍了PG查询优化对表达式预处理中连接Var(RTE中的Var,其中RTE_KIND=RTE_JOIN)溯源的过程.处理逻辑在主函数subquery_planner中通过调用flatten_j ...

  8. PostgreSQL 源码解读(35)- 查询语句#20(查询优化-简化Having和Grou...

    本节简单介绍了PG查询优化中对Having和Group By子句的简化处理. 一.基本概念 简化Having语句 把Having中的约束条件,如满足可以提升到Where条件中的,则移动到Where子句 ...

  9. PostgreSQL 源码解读(216)- 实现简单的扩展函数

    本节简单介绍了如何实现扩展函数. 主要内容翻译自 PostgreSQL: Simple C extension Development for a Novice User (and Performan ...

  10. PostgreSQL 源码解读(212)- 后台进程#11(checkpointer-SyncOneBuffer)

    本节介绍了checkpoint中用于刷一个脏page的函数:SyncOneBuffer,该函数在syncing期间处理一个buffer. 一.数据结构 宏定义 checkpoints request ...

最新文章

  1. CentOS 6.5的安装详解
  2. 正则表达式的匹配模式
  3. webpart template
  4. api报错 javaee maven_maven test 换javaee6 无法加载spring的配置文件 ?报错-问答-阿里云开发者社区-阿里云...
  5. 从网上看到的很搞笑的东西
  6. 利用有名管道实现进程间的通信
  7. C语言+数据结构总结
  8. 机器学习笔记(九)---- 集成学习(ensemble learning)【华为云技术分享】
  9. OpenShift - 扩展收缩应用部署规模
  10. selenium web的自动化测试工具
  11. nachos操作系统初步认识
  12. 尼康单反AF自动对焦模式与AF区域模式详解
  13. Android中打开浏览器更新App的最新版本
  14. 计算机术语仿真,计算机仿真-精.ppt
  15. 一位大学教授的感叹:一流大学的真实样子!
  16. BG2RHE - 树莓派安装官网新版ArduinoIDE
  17. java计算农历日期
  18. Java将图片放入word文档中
  19. (转)为什么LISP语言如此先进?
  20. jQuery学习之旅 Item1 选择器【一】

热门文章

  1. 冰蝎工具的最新检测特征
  2. 汇编语言基础知识(二)
  3. 现代软件工程学期总结
  4. 《赖氏经典英语语法》第二集
  5. 文件和文件之间的 相对路径 绝对路径的访问(之前总是容易忘记)
  6. 策略和投资组合分析-收益分析、风险回报分析和回撤分析
  7. iPhoneX、iPhoneXS、iPhoneXR、iPhoneXSMax屏幕适配
  8. mysql不带加密模式jar包_使用Xjar对jar文件进行加密,防止反编译
  9. LeetCode题解:707.设计链表
  10. jasperreport报表导出excel锁定行和列