在本小节中,着重讲解CLOG日志的读写操作,获取事务的状态信息进行可见性判断内容,相关背景知识见回顾通道:
1 postgres CLOG源码解析-1
2 postgres源码分析 Slru缓冲池的实现-2
3 postgres源码分析 Slru缓冲池的实现-1

1 概述

  在Postgres中,每一个非读事务都会有一个事务号,事务号类似启动时间戳,但提交顺序是未知的。在高并发的事务场景下,如何确定数据表中那些 Tuple对于自身事务是可见的,那些数据有时不可见的呢?pg设计出一个巧妙方法:为每个Tuple 分配 xmin, xmax,通过比较自身 xid 与元组xmin 、xmax以及快照这几件的联系进行可见性判断。

typedef struct HeapTupleFields
{TransactionId t_xmin;      /* inserting xact ID */TransactionId t_xmax;        /* deleting or locking xact ID */union{CommandId    t_cid;      /* inserting or deleting command ID, or both */TransactionId t_xvac;    /* old-style VACUUM FULL xact ID */}            t_field3;
} HeapTupleFields;typedef struct DatumTupleFields
{int32      datum_len_;     /* varlena header (do not touch directly!) */int32      datum_typmod;   /* -1, or identifier of a record type */Oid         datum_typeid;   /* composite type OID, or RECORDOID *//** datum_typeid cannot be a domain over composite, only plain composite,* even if the datum is meant as a value of a domain-over-composite type.* This is in line with the general principle that CoerceToDomain does not* change the physical representation of the base type value.** Note: field ordering is chosen with thought that Oid might someday* widen to 64 bits.*/
} DatumTupleFields;typedef struct ItemPointerData
{BlockIdData ip_blkid;            // 块号OffsetNumber ip_posid;            // 表示该元组对应的ItemIdData数组的下标
}/* If compiler understands packed and aligned pragmas, use those */
#if defined(pg_attribute_packed) && defined(pg_attribute_aligned)pg_attribute_packed()pg_attribute_aligned(2)
#endif
ItemPointerData;
typedef ItemPointerData *ItemPointer;

2 CLOG写操作

  该操作定义在TransactionIdSetTreeStatus中,用于设置指定事务状态,其执行流程如下:
1) 确定父事务对应的CLOG物理页号;
2) 判断多少子事务与父事务在同一页面,调用** TransactionIdSetPageStatus**设定事务状态;规则如下:
a: 首先设置那些与父事务不在同一个页面的子事务,其状态为
TRANSACTION_STATUS_SUB_COMMITTED;
b: 然后设置与父事务同页的子事务TRANSACTION_STATUS_SUB_COMMITTED;
c: 紧接着设置父事务状态为TRANSACTION_STATUS_COMMITTED;
d: 更新与父事务同页的子事务为TRANSACTION_STATUS_COMMITTED;
e: 最后将a中的子事务状态更新为TRANSACTION_STATUS_COMMITTED;

源代码如下

void
TransactionIdSetTreeStatus(TransactionId xid, int nsubxids,TransactionId *subxids, XidStatus status, XLogRecPtr lsn)
{int            pageno = TransactionIdToPage(xid); /* get page of parent */int         i;Assert(status == TRANSACTION_STATUS_COMMITTED ||status == TRANSACTION_STATUS_ABORTED);/** See how many subxids, if any, are on the same page as the parent, if* any.*/for (i = 0; i < nsubxids; i++){if (TransactionIdToPage(subxids[i]) != pageno)break;}/** Do all items fit on a single page?*/if (i == nsubxids){/** Set the parent and all subtransactions in a single call*/TransactionIdSetPageStatus(xid, nsubxids, subxids, status, lsn,pageno, true);}else{int         nsubxids_on_first_page = i;/** If this is a commit then we care about doing this correctly (i.e.* using the subcommitted intermediate status).  By here, we know* we're updating more than one page of clog, so we must mark entries* that are *not* on the first page so that they show as subcommitted* before we then return to update the status to fully committed.** To avoid touching the first page twice, skip marking subcommitted* for the subxids on that first page.*/if (status == TRANSACTION_STATUS_COMMITTED)set_status_by_pages(nsubxids - nsubxids_on_first_page,subxids + nsubxids_on_first_page,TRANSACTION_STATUS_SUB_COMMITTED, lsn);/** Now set the parent and subtransactions on same page as the parent,* if any*/pageno = TransactionIdToPage(xid);TransactionIdSetPageStatus(xid, nsubxids_on_first_page, subxids, status,lsn, pageno, false);/** Now work through the rest of the subxids one clog page at a time,* starting from the second page onwards, like we did above.*/set_status_by_pages(nsubxids - nsubxids_on_first_page,subxids + nsubxids_on_first_page,status, lsn);}
}

2.1 TransactionIdSetPageStatus执行流程

/** Record the final state of transaction entries in the commit log for all* entries on a single page.  Atomic only on this page.*/
static void
TransactionIdSetPageStatus(TransactionId xid, int nsubxids,TransactionId *subxids, XidStatus status,XLogRecPtr lsn, int pageno,bool all_xact_same_page)
{/* Can't use group update when PGPROC overflows. */StaticAssertStmt(THRESHOLD_SUBTRANS_CLOG_OPT <= PGPROC_MAX_CACHED_SUBXIDS,"group clog threshold less than PGPROC cached subxids");/** When there is contention on XactSLRULock, we try to group multiple* updates; a single leader process will perform transaction status* updates for multiple backends so that the number of times XactSLRULock* needs to be acquired is reduced.*// 持有XactSLRULock,会尝试组提交,避免锁的频繁争用* For this optimization to be safe, the XID and subxids in MyProc must be* the same as the ones for which we're setting the status.  Check that* this is the case.// 优化手段, 尽量控制 子事务个数 < =THRESHOLD_SUBTRANS_CLOG_OPT [5]// 父事务与子事务在同一个页,且 xid == MyProc->xid * For this optimization to be efficient, we shouldn't have too many* sub-XIDs and all of the XIDs for which we're adjusting clog should be* on the same page.  Check those conditions, too.*/if (all_xact_same_page && xid == MyProc->xid &&nsubxids <= THRESHOLD_SUBTRANS_CLOG_OPT &&nsubxids == MyProc->subxidStatus.count &&memcmp(subxids, MyProc->subxids.xids,nsubxids * sizeof(TransactionId)) == 0){/** If we can immediately acquire XactSLRULock, we update the status of* our own XID and release the lock.  If not, try use group XID* update.  If that doesn't work out, fall back to waiting for the* lock to perform an update for this transaction only.*/// 如果能够立刻获得XactSLRULock,则立马更新该页指定事务的事务状态if (LWLockConditionalAcquire(XactSLRULock, LW_EXCLUSIVE)){/* Got the lock without waiting!  Do the update. */TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,lsn, pageno);LWLockRelease(XactSLRULock);return;}//若获取不到,则尝试进行组提交else if (TransactionGroupUpdateXidStatus(xid, status, lsn, pageno)){/* Group update mechanism has done the work. */return;}/* Fall through only if update isn't done yet. */}/* Group update not applicable, or couldn't accept this page number. */LWLockAcquire(XactSLRULock, LW_EXCLUSIVE);TransactionIdSetPageStatusInternal(xid, nsubxids, subxids, status,lsn, pageno);LWLockRelease(XactSLRULock);
}

2.2 TransactionIdSetPageStatusInternal执行流程

static void
TransactionIdSetPageStatusInternal(TransactionId xid, int nsubxids,TransactionId *subxids, XidStatus status,XLogRecPtr lsn, int pageno)
{int            slotno;int          i;// 持有XactSLRULockAssert(status == TRANSACTION_STATUS_COMMITTED ||status == TRANSACTION_STATUS_ABORTED ||(status == TRANSACTION_STATUS_SUB_COMMITTED && !TransactionIdIsValid(xid)));Assert(LWLockHeldByMeInMode(XactSLRULock, LW_EXCLUSIVE));/** If we're doing an async commit (ie, lsn is valid), then we must wait* for any active write on the page slot to complete.  Otherwise our* update could reach disk in that write, which will not do since we* mustn't let it reach disk until we've done the appropriate WAL flush.* But when lsn is invalid, it's OK to scribble on a page while it is* write-busy, since we don't care if the update reaches disk sooner than* we think.*/// 根据物理页号将其载入在SLRU缓冲池的某一slot页slotno = SimpleLruReadPage(XactCtl, pageno, XLogRecPtrIsInvalid(lsn), xid);/** Set the main transaction id, if any.// * If we update more than one xid on this page while it is being written* out, we might find that some of the bits go to disk and others don't.* If we are updating commits on the page with the top-level xid that* could break atomicity, so we subcommit the subxids first before we mark* the top-level commit.*/// 首先设置子事务状态  TRANSACTION_STATUS_SUB_COMMITTEDif (TransactionIdIsValid(xid)){/* Subtransactions first, if needed ... */if (status == TRANSACTION_STATUS_COMMITTED){for (i = 0; i < nsubxids; i++){Assert(XactCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));TransactionIdSetStatusBit(subxids[i],TRANSACTION_STATUS_SUB_COMMITTED,lsn, slotno);}}//设置父事务状态 TRANSACTION_STATUS_COMMITTED/* ... then the main transaction */TransactionIdSetStatusBit(xid, status, lsn, slotno);}// 更新子事务状态为 TRANSACTION_STATUS_COMMITTED/* Set the subtransactions */for (i = 0; i < nsubxids; i++){Assert(XactCtl->shared->page_number[slotno] == TransactionIdToPage(subxids[i]));TransactionIdSetStatusBit(subxids[i], status, lsn, slotno);}XactCtl->shared->page_dirty[slotno] = true;
}

2.3 TransactionIdSetStatusBit 执行流程

  该函数用于设定指定事务id的事务状态

/** Sets the commit status of a single transaction.** Must be called with XactSLRULock held*/
static void
TransactionIdSetStatusBit(TransactionId xid, XidStatus status, XLogRecPtr lsn, int slotno)
{int            byteno = TransactionIdToByte(xid);    // 页内字节偏移int         bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT; // 字节位偏移char    *byteptr;char        byteval;char        curval;byteptr = XactCtl->shared->page_buffer[slotno] + byteno;    // 槽页地址curval = (*byteptr >> bshift) & CLOG_XACT_BITMASK;/** When replaying transactions during recovery we still need to perform* the two phases of subcommit and then commit. However, some transactions* are already correctly marked, so we just treat those as a no-op which* allows us to keep the following Assert as restrictive as possible.*/// 在recovery 回放事务过程,仍要执行subcommit和cmmit操作;如果子事务事先已经mark为 commit,、     // 则无需任何操作if (InRecovery && status == TRANSACTION_STATUS_SUB_COMMITTED &&curval == TRANSACTION_STATUS_COMMITTED)return;/** Current state change should be from 0 or subcommitted to target state* or we should already be there when replaying changes during recovery.*/Assert(curval == 0 ||(curval == TRANSACTION_STATUS_SUB_COMMITTED &&status != TRANSACTION_STATUS_IN_PROGRESS) ||curval == status);/* note this assumes exclusive access to the clog page */     // 位或操作byteval = *byteptr;byteval &= ~(((1 << CLOG_BITS_PER_XACT) - 1) << bshift);byteval |= (status << bshift);*byteptr = byteval;/** Update the group LSN if the transaction completion LSN is higher.// 更新 group lsn        * Note: lsn will be invalid when supplied during InRecovery processing,* so we don't need to do anything special to avoid LSN updates during* recovery. After recovery completes the next clog change will set the* LSN correctly.*/if (!XLogRecPtrIsInvalid(lsn)){int         lsnindex = GetLSNIndex(slotno, xid);if (XactCtl->shared->group_lsn[lsnindex] < lsn)XactCtl->shared->group_lsn[lsnindex] = lsn;}
}

  CLOG日志写操作大致流程如下图所示

3 CLOG日志读操作

  该操作定义在TransactionIdGetStatus函数中,根据事务号获取事务状态信息和该事务对应的group lsn号,其执行流程如下如下:

1) 首先根据指定的事务号确定其在CLOG日志文件中的物理偏移量;
2)调用SimpleLruReadPage_ReadOnly将其读入SLRU缓冲池中的某一slot页;
a: 获取CLOG缓冲池共享锁,如果该页在缓冲池已加载,则直接返回槽页号,反之释放共享锁;
b: 获取CLOG缓冲池排他锁,调用 SimpleLruReadPage 将CLOG日志页加载入缓冲池中的某一槽页
[槽页的确定:该函数内部调用 SlruSelectLRUPage 确定]
3)根据偏移量读取CLOG日志中事务状态,并获取group lsn;
4)释放XactSLRULock。

XidStatus
TransactionIdGetStatus(TransactionId xid, XLogRecPtr *lsn)
{int            pageno = TransactionIdToPage(xid);int          byteno = TransactionIdToByte(xid);int          bshift = TransactionIdToBIndex(xid) * CLOG_BITS_PER_XACT;int           slotno;int          lsnindex;char      *byteptr;XidStatus   status;/* lock is acquired by SimpleLruReadPage_ReadOnly */slotno = SimpleLruReadPage_ReadOnly(XactCtl, pageno, xid);byteptr = XactCtl->shared->page_buffer[slotno] + byteno;status = (*byteptr >> bshift) & CLOG_XACT_BITMASK;lsnindex = GetLSNIndex(slotno, xid);*lsn = XactCtl->shared->group_lsn[lsnindex];LWLockRelease(XactSLRULock);return status;
}

postgres 源码解析11 CLOG管理器--2相关推荐

  1. postgres 源码解析9 CLOG管理器--1

    1 背景介绍   在Postgres数据库的日志管理系统中,采用CLOG日志记录集群中每个事务的最终状态,在内存中形式是基于SLRU缓冲实现的,有兴趣的回顾下SLRU相关知识:   1 postgre ...

  2. postgres 源码解析25 缓冲池管理器-3

      本文讲解缓冲块的选择策略BufferAlloc,同时该函数也是替换策略的核心函数, 知识回顾: postgres源码解析 缓冲池管理–1 postgres源码解析 缓冲池管理–2 总结<执行 ...

  3. [源码解析] PyTorch分布式优化器(1)----基石篇

    [源码解析] PyTorch分布式优化器(1)----基石篇 文章目录 [源码解析] PyTorch分布式优化器(1)----基石篇 0x00 摘要 0x01 从问题出发 1.1 示例 1.2 问题点 ...

  4. Spring MVC源码解析——HandlerMapping(处理器映射器)

    Sping MVC 源码解析--HandlerMapping处理器映射器 1. 什么是HandlerMapping 2. HandlerMapping 2.1 HandlerMapping初始化 2. ...

  5. postgres源码解析42 btree索引文件的创建--2

    本文将从btbuild函数作为入口从源码角度进行讲解btree文件的创建流程,执行SQL对应为CREATE TABLE wp_shy(id int primary key, name carchar( ...

  6. Netty源码解析之内存管理-PooledByteBufAllocator-PoolArena

      PooledByteBufAllocator是Netty中比较复杂的一种ByteBufAllocator , 因为他涉及到对内存的缓存,分配和释放策略,PooledByteBufAllocator ...

  7. Python源码解析:内存管理(DEBUG模式)的几个理解点

    写了这多贴子,顺带写点自己的感想吧!其实很多贴子在写的时候很踌躇,比如这次打算写的python内存管理,因为内存管理都比较琐碎,在软件架构里,也是很容易出问题的地方,涉及的细节内容非常多,要写好写明白 ...

  8. mybatis源码分析之事务管理器

    2019独角兽企业重金招聘Python工程师标准>>> 上一篇:mybatis源码分析之Configuration 主要分析了构建SqlSessionFactory的过程中配置文件的 ...

  9. OkHttp 3.x 源码解析之Dispatcher分发器

    Dispatcher概念 Dispatcher中文是分发器的意思,和拦截器不同的是分发器不做Aaction事件处理.只做事件流向.在Okhttp中Dispatcher负责将每一次Requst进行分发, ...

最新文章

  1. 15万奖金强化学习赛事!Go-Bigger多智能体决策智能挑战赛来了!
  2. anaconda不同虚拟环境下使用jupyter的问题
  3. 【Network】协议栈
  4. JVM从入门到精通(十):垃圾回收算法串讲:CMS,G1,三色标记算法
  5. 纸盒叠成的长方形竟然能自己动?
  6. unittest单元测试笔记
  7. html5 dropdownlist,使用HTML5 FindByValue下拉列表(html5 dropdownlist using F
  8. k均值的损失函数_K-Means算法的实现
  9. C++初始化,之不明白篇 coutxendl 与 coutx = coutxendl的输出的值会不一样...
  10. [转]SSH框架搭建
  11. paip.输入法编程---输入法ATIaN历史记录 v8b
  12. 将.npy文件转.txt文件
  13. Win11将输入法的繁体改为简体
  14. 创业半年回顾(没饭吃了,我再也不想创业了)
  15. 文献学习(part102-A)--Autoencoders
  16. 不一样的类小草图形写法
  17. (numpy)python中Array的常用函数
  18. 软件工程之软件过程模型
  19. 复利计算5.0 结对
  20. 微信IOT模块 用户手册(串口向微信发送消息,串口与微信通讯)

热门文章

  1. 杰里之 SPI 从机使用注意事项【篇】
  2. 简单三步教会你在前端监控平台:安装小程序、uni-app探针(详细教程)
  3. PayPal BrainTree 是怎么运作的
  4. 1146:统计立方数
  5. 微信小程序媒体文件上传到微信服务器
  6. python_小程序之分析出高中词汇跟四级词汇单词跟短语的相似度!
  7. 使用手柄控制Unity及效果展示(1)
  8. 主胰管和副胰管解剖图_正常胰管的解剖结构概述
  9. 黑鹰易语言培训课程 [ 06.17更新 ] [ 翻录版本,免key ]
  10. 一般肢端扭伤的简单偏方--正骨水的妙用