pg_walsender

交互接口

全局变量

/* global state */
extern PGDLLIMPORT bool am_walsender; // 是否是walsender进程
extern PGDLLIMPORT bool am_cascading_walsender; // 是否是级联walsender
extern PGDLLIMPORT bool am_db_walsender; // 是否连接到数据库
extern PGDLLIMPORT bool wake_wal_senders; /* user-settable parameters */
extern PGDLLIMPORT int max_wal_senders; // 最大walsender进程数
extern PGDLLIMPORT int wal_sender_timeout; // wal消息发送超时时间
extern PGDLLIMPORT bool log_replication_commands;
  • am_walsender和am_db_walsender

    解析启动参数replication的值进行赋值,如果replication的值是database或者true就设置这两个值为true。

                  if (strcmp(valptr, "database") == 0){am_walsender = true;am_db_walsender = true;}else if (!parse_bool(valptr, &am_walsender)) {}
    
  • am_cascading_walsender

    am_cascading_walsender在初始化walSnd的时候赋值。

    am_cascading_walsender = RecoveryInProgress();
    

    值呢主要来自于全局的LocalRecoveryInProgress,LocalRecoveryInProgress=false时就是false,否则的话就从xlogctl->SharedRecoveryState取值。

    LocalRecoveryInProgress = (xlogctl->SharedRecoveryState != RECOVERY_STATE_DONE);
    

对外接口

extern void InitWalSender(void);
extern bool exec_replication_command(const char *query_string);
extern void WalSndErrorCleanup(void);
extern void WalSndResourceCleanup(bool isCommit);
extern void WalSndSignals(void);
extern Size WalSndShmemSize(void);
extern void WalSndShmemInit(void);
extern void WalSndWakeup(void);
extern void WalSndInitStopping(void);
extern void WalSndWaitStopping(void);
extern void HandleWalSndInitStopping(void);
extern void WalSndRqstFileReload(void);
  • InitWalSender

    初始化一个walSnd。当am_walsender为true的时候,porstgres启动的时候就会初始化一个walSnd。

    #mermaid-svg-Zy5TKudX6rw1yUqP {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .error-icon{fill:#552222;}#mermaid-svg-Zy5TKudX6rw1yUqP .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-Zy5TKudX6rw1yUqP .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-Zy5TKudX6rw1yUqP .marker{fill:#333333;stroke:#333333;}#mermaid-svg-Zy5TKudX6rw1yUqP .marker.cross{stroke:#333333;}#mermaid-svg-Zy5TKudX6rw1yUqP svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-Zy5TKudX6rw1yUqP .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster-label text{fill:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster-label span{color:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .label text,#mermaid-svg-Zy5TKudX6rw1yUqP span{fill:#333;color:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .node rect,#mermaid-svg-Zy5TKudX6rw1yUqP .node circle,#mermaid-svg-Zy5TKudX6rw1yUqP .node ellipse,#mermaid-svg-Zy5TKudX6rw1yUqP .node polygon,#mermaid-svg-Zy5TKudX6rw1yUqP .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-Zy5TKudX6rw1yUqP .node .label{text-align:center;}#mermaid-svg-Zy5TKudX6rw1yUqP .node.clickable{cursor:pointer;}#mermaid-svg-Zy5TKudX6rw1yUqP .arrowheadPath{fill:#333333;}#mermaid-svg-Zy5TKudX6rw1yUqP .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-Zy5TKudX6rw1yUqP .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-Zy5TKudX6rw1yUqP .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-Zy5TKudX6rw1yUqP .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster text{fill:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP .cluster span{color:#333;}#mermaid-svg-Zy5TKudX6rw1yUqP div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-Zy5TKudX6rw1yUqP :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

    初始化walSnd
    InitWalSender
    InitWalSenderSlot
    MarkPostmasterChildWalSender
    SendPostmasterSignal
    MemoryContextAllocZero

    初始化slot的时候会将全局的WalSndCtl的walsnds初始化,walsnds是一个变长数组,会根据max_wal_senders进行内存分配和初始化。每个创建的walSnd都会保存到全局的WalSndCtl的数组中。

    根据walSnd的pid是否为0来判断是否需要初始化,每个初始化的walSnd的状态为WALSNDSTATE_STARTUP。

对内接口

extern void WalSndSetState(WalSndState state);/** Internal functions for parsing the replication grammar, in repl_gram.y and* repl_scanner.l*/
extern int  replication_yyparse(void);
extern int  replication_yylex(void);
extern void replication_yyerror(const char *str) pg_attribute_noreturn();
extern void replication_scanner_init(const char *query_string);
extern void replication_scanner_finish(void);
extern bool replication_scanner_is_replication_command(void);
  • WalSndSetState

    用来更改walSnd的状态。

数据模型

  • walsender状态
typedef enum WalSndState
{WALSNDSTATE_STARTUP = 0,WALSNDSTATE_BACKUP,WALSNDSTATE_CATCHUP,WALSNDSTATE_STREAMING,WALSNDSTATE_STOPPING
} WalSndState;
  • walsender 结构

一个进程对应一个walSnd结构。

typedef struct WalSnd
{pid_t      pid;            /* this walsender's PID, or 0 if not active */WalSndState state;           /* this walsender's state */XLogRecPtr sentPtr;        /* WAL has been sent up to this point */bool        needreload;     /* does currently-open file need to be* reloaded? *//** The xlog locations that have been written, flushed, and applied by* standby-side. These may be invalid if the standby-side has not offered* values yet.*/XLogRecPtr write;XLogRecPtr    flush;XLogRecPtr    apply;/* Measured lag times, or -1 for unknown/none. */TimeOffset   writeLag;TimeOffset flushLag;TimeOffset applyLag;/** The priority order of the standby managed by this WALSender, as listed* in synchronous_standby_names, or 0 if not-listed.*/int         sync_standby_priority;/* Protects shared variables shown above. */slock_t       mutex;/** Pointer to the walsender's latch. Used by backends to wake up this* walsender when it has work to do. NULL if the walsender isn't active.*/Latch       *latch;/** Timestamp of the last message received from standby.*/TimestampTz replyTime;
} WalSnd;
  • WalSndCtlData
typedef struct
{/** Synchronous replication queue with one queue per request type.* Protected by SyncRepLock.*/SHM_QUEUE   SyncRepQueue[NUM_SYNC_REP_WAIT_MODE];/** Current location of the head of the queue. All waiters should have a* waitLSN that follows this value. Protected by SyncRepLock.*/XLogRecPtr   lsn[NUM_SYNC_REP_WAIT_MODE];/** Are any sync standbys defined?  Waiting backends can't reload the* config file safely, so checkpointer updates this value as needed.* Protected by SyncRepLock.*/bool      sync_standbys_defined;WalSnd        walsnds[FLEXIBLE_ARRAY_MEMBER];
} WalSndCtlData;
  • NodeTag
typedef enum NodeTag {...../** TAGS FOR REPLICATION GRAMMAR PARSE NODES (replnodes.h)*/T_IdentifySystemCmd,T_BaseBackupCmd,T_CreateReplicationSlotCmd,T_DropReplicationSlotCmd,T_ReadReplicationSlotCmd,T_StartReplicationCmd,T_TimeLineHistoryCmd,......
} NodeTag;
  • XLogReaderState

    typedef uint64 XLogRecPtr;
    struct XLogReaderState
    {XLogReaderRoutine routine;XLogRecPtr   ReadRecPtr;     /* start of last record read */XLogRecPtr   EndRecPtr;      /* end+1 of last record read */
    }
    
    • XLogReaderRoutine

      typedef int (*XLogPageReadCB) (XLogReaderState *xlogreader,XLogRecPtr targetPagePtr,int reqLen,XLogRecPtr targetRecPtr,char *readBuf);
      typedef void (*WALSegmentOpenCB) (XLogReaderState *xlogreader,XLogSegNo nextSegNo,TimeLineID *tli_p);
      typedef void (*WALSegmentCloseCB) (XLogReaderState *xlogreader);
      typedef struct XLogReaderRoutine
      {XLogPageReadCB page_read;WALSegmentOpenCB segment_open;WALSegmentCloseCB segment_close;
      } XLogReaderRoutine;
      
  • XLogRecoveryCtlData

    typedef struct XLogRecoveryCtlData
    {/** SharedHotStandbyActive indicates if we allow hot standby queries to be* run.  Protected by info_lck.*/bool     SharedHotStandbyActive;/** SharedPromoteIsTriggered indicates if a standby promotion has been* triggered.  Protected by info_lck.*/bool     SharedPromoteIsTriggered;Latch      recoveryWakeupLatch;/** Last record successfully replayed.*/XLogRecPtr  lastReplayedReadRecPtr; /* start position */XLogRecPtr  lastReplayedEndRecPtr;  /* end+1 position */TimeLineID lastReplayedTLI;    /* timeline *//** When we're currently replaying a record, ie. in a redo function,* replayEndRecPtr points to the end+1 of the record being replayed,* otherwise it's equal to lastReplayedEndRecPtr.*/XLogRecPtr    replayEndRecPtr;TimeLineID  replayEndTLI;/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */TimestampTz recoveryLastXTime;/** timestamp of when we started replaying the current chunk of WAL data,* only relevant for replication or archive recovery*/TimestampTz currentChunkStartTime;/* Recovery pause state */RecoveryPauseState recoveryPauseState;ConditionVariable recoveryNotPausedCV;slock_t        info_lck;       /* locks shared variables shown above */
    } XLogRecoveryCtlData;
    

数据发送

数据通过socket接口进行发送,最终数据出口为操作系统提供的socket接口的send函数。

typedef struct
{void       (*comm_reset) (void);int            (*flush) (void);int         (*flush_if_writable) (void);bool        (*is_send_pending) (void);int           (*putmessage) (char msgtype, const char *s, size_t len);void        (*putmessage_noblock) (char msgtype, const char *s, size_t len);
} PQcommMethods;
static const PQcommMethods PqCommSocketMethods = {socket_comm_reset,socket_flush,socket_flush_if_writable,socket_is_send_pending,socket_putmessage,socket_putmessage_noblock
};

其执行流程如下:

#mermaid-svg-dyXMajItu0toA8rR {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-dyXMajItu0toA8rR .error-icon{fill:#552222;}#mermaid-svg-dyXMajItu0toA8rR .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-dyXMajItu0toA8rR .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-dyXMajItu0toA8rR .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-dyXMajItu0toA8rR .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-dyXMajItu0toA8rR .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-dyXMajItu0toA8rR .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-dyXMajItu0toA8rR .marker{fill:#333333;stroke:#333333;}#mermaid-svg-dyXMajItu0toA8rR .marker.cross{stroke:#333333;}#mermaid-svg-dyXMajItu0toA8rR svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-dyXMajItu0toA8rR .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-dyXMajItu0toA8rR .cluster-label text{fill:#333;}#mermaid-svg-dyXMajItu0toA8rR .cluster-label span{color:#333;}#mermaid-svg-dyXMajItu0toA8rR .label text,#mermaid-svg-dyXMajItu0toA8rR span{fill:#333;color:#333;}#mermaid-svg-dyXMajItu0toA8rR .node rect,#mermaid-svg-dyXMajItu0toA8rR .node circle,#mermaid-svg-dyXMajItu0toA8rR .node ellipse,#mermaid-svg-dyXMajItu0toA8rR .node polygon,#mermaid-svg-dyXMajItu0toA8rR .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-dyXMajItu0toA8rR .node .label{text-align:center;}#mermaid-svg-dyXMajItu0toA8rR .node.clickable{cursor:pointer;}#mermaid-svg-dyXMajItu0toA8rR .arrowheadPath{fill:#333333;}#mermaid-svg-dyXMajItu0toA8rR .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-dyXMajItu0toA8rR .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-dyXMajItu0toA8rR .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-dyXMajItu0toA8rR .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-dyXMajItu0toA8rR .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-dyXMajItu0toA8rR .cluster text{fill:#333;}#mermaid-svg-dyXMajItu0toA8rR .cluster span{color:#333;}#mermaid-svg-dyXMajItu0toA8rR div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-dyXMajItu0toA8rR :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}

XLogSendPhysical
pq_putmessage_noblock
socket_putmessage_noblock
socket_putmessage
internal_putbytes
internal_flush
secure_write
secure_raw_write
send

pg_walsender相关推荐

最新文章

  1. 25CSS3中的3D转换
  2. apache prefork和worker
  3. 双击“本地连接”打不开无反应的解决方法
  4. 现代化自定制 - 页面上的自定制
  5. ubuntu安装java的rpm_ubuntu安装jdk-6u45-linux-x64-rpm.bin
  6. 多节点 devstack 部署
  7. Git教程——回到从前 (reset)
  8. 阿里云iot平台实现MQTT通信(mqtt.fx接入iot平台及测试)
  9. 12种常见贴片焊接工具
  10. vue和ele结合使用form表单时:rules=“formValidate“的使用(ele的表单校验)
  11. 中国联通沃商店校园大使招募书
  12. python爬取简历模板_python爬取简历模板
  13. 深入理解Android
  14. struct结构体里能放函数吗?
  15. httpd的MPM工作模式
  16. python安装绘图库matplotlib_python绘图库Matplotlib的安装
  17. 利用pypdf2 安装包 基于 python 制作的PDF 文档合并脚本
  18. Qt实现mqtt客户端和mqtt服务器搭建
  19. element vue 上传模板_Vue Element UI upload 组件上传文件之后 file list 依旧是空数组
  20. 地球上20张最惊人的照片_地球上30个惊人的自然景点

热门文章

  1. Phaser3之 anims
  2. 今年 NFT 爆火,如何快速入行?(艺术家完整指南)
  3. bldc不同载波频率_有刷CD电机好用还是无刷BLDC电机好用?该选那个?
  4. android 屏幕上倒数,桌面时间倒数
  5. Naive UI使用useDialog、useMessage、useNotification、useLoadingBar,超实用
  6. python相关注册登录方式
  7. 魔法java_力量与魔法java
  8. 使用Python获取股票的报表数据
  9. python元组:格式化字符串
  10. 导出excel,sheetName名字乱码