pg_walsender
2024-04-08 03:07:36
pg_walsender
交互接口
全局变量
/* global state */
extern PGDLLIMPORT bool am_walsender; // 是否是walsender进程
extern PGDLLIMPORT bool am_cascading_walsender; // 是否是级联walsender
extern PGDLLIMPORT bool am_db_walsender; // 是否连接到数据库
extern PGDLLIMPORT bool wake_wal_senders; /* user-settable parameters */
extern PGDLLIMPORT int max_wal_senders; // 最大walsender进程数
extern PGDLLIMPORT int wal_sender_timeout; // wal消息发送超时时间
extern PGDLLIMPORT bool log_replication_commands;
am_walsender和am_db_walsender
解析启动参数replication的值进行赋值,如果replication的值是database或者true就设置这两个值为true。
if (strcmp(valptr, "database") == 0){am_walsender = true;am_db_walsender = true;}else if (!parse_bool(valptr, &am_walsender)) {}
am_cascading_walsender
am_cascading_walsender在初始化walSnd的时候赋值。
am_cascading_walsender = RecoveryInProgress();
值呢主要来自于全局的LocalRecoveryInProgress,LocalRecoveryInProgress=false时就是false,否则的话就从xlogctl->SharedRecoveryState取值。
LocalRecoveryInProgress = (xlogctl->SharedRecoveryState != RECOVERY_STATE_DONE);
对外接口
extern void InitWalSender(void);
extern bool exec_replication_command(const char *query_string);
extern void WalSndErrorCleanup(void);
extern void WalSndResourceCleanup(bool isCommit);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
extern void WalSndInitStopping(void);
extern void WalSndWaitStopping(void);
extern void HandleWalSndInitStopping(void);
extern void WalSndRqstFileReload(void);
InitWalSender
初始化一个walSnd。当am_walsender为true的时候,porstgres启动的时候就会初始化一个walSnd。
#mermaid-svg-Zy5TKudX6rw1yUqP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .error-icon{fill:#552222;}#mermaid-svg-Zy5TKudX6rw1yUqP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-Zy5TKudX6rw1yUqP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-Zy5TKudX6rw1yUqP .marker.cross{stroke:#333333;}#mermaid-svg-Zy5TKudX6rw1yUqP svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-Zy5TKudX6rw1yUqP .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster-label text{fill:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster-label span{color:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .label text,#mermaid-svg-Zy5TKudX6rw1yUqP span{fill:#333;color:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .node rect,#mermaid-svg-Zy5TKudX6rw1yUqP .node circle,#mermaid-svg-Zy5TKudX6rw1yUqP .node ellipse,#mermaid-svg-Zy5TKudX6rw1yUqP .node polygon,#mermaid-svg-Zy5TKudX6rw1yUqP .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-Zy5TKudX6rw1yUqP .node .label{text-align:center;}#mermaid-svg-Zy5TKudX6rw1yUqP .node.clickable{cursor:pointer;}#mermaid-svg-Zy5TKudX6rw1yUqP .arrowheadPath{fill:#333333;}#mermaid-svg-Zy5TKudX6rw1yUqP .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-Zy5TKudX6rw1yUqP .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-Zy5TKudX6rw1yUqP .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-Zy5TKudX6rw1yUqP .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster text{fill:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster span{color:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-Zy5TKudX6rw1yUqP :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}初始化walSndInitWalSenderInitWalSenderSlotMarkPostmasterChildWalSenderSendPostmasterSignalMemoryContextAllocZero初始化slot的时候会将全局的WalSndCtl的walsnds初始化,walsnds是一个变长数组,会根据max_wal_senders进行内存分配和初始化。每个创建的walSnd都会保存到全局的WalSndCtl的数组中。
根据walSnd的pid是否为0来判断是否需要初始化,每个初始化的walSnd的状态为WALSNDSTATE_STARTUP。
对内接口
extern void WalSndSetState(WalSndState state);/** Internal functions for parsing the replication grammar, in repl_gram.y and* repl_scanner.l*/
extern int replication_yyparse(void);
extern int replication_yylex(void);
extern void replication_yyerror(const char *str) pg_attribute_noreturn();
extern void replication_scanner_init(const char *query_string);
extern void replication_scanner_finish(void);
extern bool replication_scanner_is_replication_command(void);
WalSndSetState
用来更改walSnd的状态。
数据模型
- walsender状态
typedef enum WalSndState
{WALSNDSTATE_STARTUP = 0,WALSNDSTATE_BACKUP,WALSNDSTATE_CATCHUP,WALSNDSTATE_STREAMING,WALSNDSTATE_STOPPING
} WalSndState;
- walsender 结构
一个进程对应一个walSnd结构。
typedef struct WalSnd
{pid_t pid; /* this walsender's PID, or 0 if not active */WalSndState state; /* this walsender's state */XLogRecPtr sentPtr; /* WAL has been sent up to this point */bool needreload; /* does currently-open file need to be* reloaded? *//** The xlog locations that have been written, flushed, and applied by* standby-side. These may be invalid if the standby-side has not offered* values yet.*/XLogRecPtr write;XLogRecPtr flush;XLogRecPtr apply;/* Measured lag times, or -1 for unknown/none. */TimeOffset writeLag;TimeOffset flushLag;TimeOffset applyLag;/** The priority order of the standby managed by this WALSender, as listed* in synchronous_standby_names, or 0 if not-listed.*/int sync_standby_priority;/* Protects shared variables shown above. */slock_t mutex;/** Pointer to the walsender's latch. Used by backends to wake up this* walsender when it has work to do. NULL if the walsender isn't active.*/Latch *latch;/** Timestamp of the last message received from standby.*/TimestampTz replyTime;
} WalSnd;
- WalSndCtlData
typedef struct
{/** Synchronous replication queue with one queue per request type.* Protected by SyncRepLock.*/SHM_QUEUE SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];/** Current location of the head of the queue. All waiters should have a* waitLSN that follows this value. Protected by SyncRepLock.*/XLogRecPtr lsn[NUM_SYNC_REP_WAIT_MODE];/** Are any sync standbys defined? Waiting backends can't reload the* config file safely, so checkpointer updates this value as needed.* Protected by SyncRepLock.*/bool sync_standbys_defined;WalSnd walsnds[FLEXIBLE_ARRAY_MEMBER];
} WalSndCtlData;
- NodeTag
typedef enum NodeTag {...../** TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)*/T_IdentifySystemCmd,T_BaseBackupCmd,T_CreateReplicationSlotCmd,T_DropReplicationSlotCmd,T_ReadReplicationSlotCmd,T_StartReplicationCmd,T_TimeLineHistoryCmd,......
} NodeTag;
XLogReaderState
typedef uint64 XLogRecPtr; struct XLogReaderState {XLogReaderRoutine routine;XLogRecPtr ReadRecPtr; /* start of last record read */XLogRecPtr EndRecPtr; /* end+1 of last record read */ }
XLogReaderRoutine
typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char *readBuf); typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,XLogSegNo nextSegNo,TimeLineID *tli_p); typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader); typedef struct XLogReaderRoutine {XLogPageReadCB page_read;WALSegmentOpenCB segment_open;WALSegmentCloseCB segment_close; } XLogReaderRoutine;
XLogRecoveryCtlData
typedef struct XLogRecoveryCtlData {/** SharedHotStandbyActive indicates if we allow hot standby queries to be* run. Protected by info_lck.*/bool SharedHotStandbyActive;/** SharedPromoteIsTriggered indicates if a standby promotion has been* triggered. Protected by info_lck.*/bool SharedPromoteIsTriggered;Latch recoveryWakeupLatch;/** Last record successfully replayed.*/XLogRecPtr lastReplayedReadRecPtr; /* start position */XLogRecPtr lastReplayedEndRecPtr; /* end+1 position */TimeLineID lastReplayedTLI; /* timeline *//** When we're currently replaying a record, ie. in a redo function,* replayEndRecPtr points to the end+1 of the record being replayed,* otherwise it's equal to lastReplayedEndRecPtr.*/XLogRecPtr replayEndRecPtr;TimeLineID replayEndTLI;/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */TimestampTz recoveryLastXTime;/** timestamp of when we started replaying the current chunk of WAL data,* only relevant for replication or archive recovery*/TimestampTz currentChunkStartTime;/* Recovery pause state */RecoveryPauseState recoveryPauseState;ConditionVariable recoveryNotPausedCV;slock_t info_lck; /* locks shared variables shown above */ } XLogRecoveryCtlData;
数据发送
数据通过socket接口进行发送,最终数据出口为操作系统提供的socket接口的send函数。
typedef struct
{void (*comm_reset) (void);int (*flush) (void);int (*flush_if_writable) (void);bool (*is_send_pending) (void);int (*putmessage) (char msgtype, const char *s, size_t len);void (*putmessage_noblock) (char msgtype, const char *s, size_t len);
} PQcommMethods;
static const PQcommMethods PqCommSocketMethods = {socket_comm_reset,socket_flush,socket_flush_if_writable,socket_is_send_pending,socket_putmessage,socket_putmessage_noblock
};
其执行流程如下:
#mermaid-svg-dyXMajItu0toA8rR {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-dyXMajItu0toA8rR .error-icon{fill:#552222;}#mermaid-svg-dyXMajItu0toA8rR .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-dyXMajItu0toA8rR .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-dyXMajItu0toA8rR .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-dyXMajItu0toA8rR .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-dyXMajItu0toA8rR .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-dyXMajItu0toA8rR .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-dyXMajItu0toA8rR .marker{fill:#333333;stroke:#333333;}#mermaid-svg-dyXMajItu0toA8rR .marker.cross{stroke:#333333;}#mermaid-svg-dyXMajItu0toA8rR svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-dyXMajItu0toA8rR .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-dyXMajItu0toA8rR .cluster-label text{fill:#333;}#mermaid-svg-dyXMajItu0toA8rR .cluster-label span{color:#333;}#mermaid-svg-dyXMajItu0toA8rR .label text,#mermaid-svg-dyXMajItu0toA8rR span{fill:#333;color:#333;}#mermaid-svg-dyXMajItu0toA8rR .node rect,#mermaid-svg-dyXMajItu0toA8rR .node circle,#mermaid-svg-dyXMajItu0toA8rR .node ellipse,#mermaid-svg-dyXMajItu0toA8rR .node polygon,#mermaid-svg-dyXMajItu0toA8rR .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-dyXMajItu0toA8rR .node .label{text-align:center;}#mermaid-svg-dyXMajItu0toA8rR .node.clickable{cursor:pointer;}#mermaid-svg-dyXMajItu0toA8rR .arrowheadPath{fill:#333333;}#mermaid-svg-dyXMajItu0toA8rR .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-dyXMajItu0toA8rR .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-dyXMajItu0toA8rR .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-dyXMajItu0toA8rR .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-dyXMajItu0toA8rR .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-dyXMajItu0toA8rR .cluster text{fill:#333;}#mermaid-svg-dyXMajItu0toA8rR .cluster span{color:#333;}#mermaid-svg-dyXMajItu0toA8rR div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-dyXMajItu0toA8rR :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
XLogSendPhysical
pq_putmessage_noblock
socket_putmessage_noblock
socket_putmessage
internal_putbytes
internal_flush
secure_write
secure_raw_write
send
pg_walsender相关推荐
最新文章
- 25CSS3中的3D转换
- apache prefork和worker
- 双击“本地连接”打不开无反应的解决方法
- 现代化自定制 - 页面上的自定制
- ubuntu安装java的rpm_ubuntu安装jdk-6u45-linux-x64-rpm.bin
- 多节点 devstack 部署
- Git教程——回到从前 (reset)
- 阿里云iot平台实现MQTT通信(mqtt.fx接入iot平台及测试)
- 12种常见贴片焊接工具
- vue和ele结合使用form表单时:rules=“formValidate“的使用(ele的表单校验)
- 中国联通沃商店校园大使招募书
- python爬取简历模板_python爬取简历模板
- 深入理解Android
- struct结构体里能放函数吗?
- httpd的MPM工作模式
- python安装绘图库matplotlib_python绘图库Matplotlib的安装
- 利用pypdf2 安装包 基于 python 制作的PDF 文档合并脚本
- Qt实现mqtt客户端和mqtt服务器搭建
- element vue 上传模板_Vue Element UI upload 组件上传文件之后 file list 依旧是空数组
- 地球上20张最惊人的照片_地球上30个惊人的自然景点