前言

本源码分析假设读者已经了解 CockroachDB 架构和各层设计要点。

理论知识见 CockRoachDB Documentation.

TxnCoordSender

TxnCoordSender 是 Coordinator 向事务涉及的各个 Rangeleaseholder 分发工作的关键数据结构。

此处借用官方文档中的事务提交过程图说明 TxnCoordSender 的定位:

TxnCoordSender 位于 Transaction Coordinator 中,它根据 key 将各个操作派发到相应 Rangeleaseholder

TxnCoordSender 完成的工作

TxnCoordSender 用于:

  1. 处理事务状态。当一个事务开启时,TxnCoordSender 开始异步地发送心跳到事务的 tx record,指示事务应该被保活。如果心跳停止,会将 tx record 转移到 aborted 状态;
  2. 在事务过程中,追踪每个写入的 keykey 的范围;
  3. 当事务提交(commit)或中止(abort)时清除积累的 write intent。由于同一个事务执行的所有操作都需要经过同一个 TxnCoordSender 来说明其 write intent,这优化了清理过程。

当簿记设置完成后,请求交给分发层(Distribution layer)的 DistSender

下文将从源码层面分析以上工作是如何完成的。

TxnCoordSender 实现的接口

TxnCoordSender 实现了 SenderTxnSender 接口。

Sender 抽象了发送方法 Send

// crdb 在 client 侧及 server 侧均有 Sender 接口的实现类型。
// 一些实现类型如 client.Txn, kv.TxnCoordSender, storage.Node,
// storage.Store, storage.Replica.
type Sender interface {// Send 方法发送一个batch用于估算,将返回一个响应或错误。// 调用者保有BatchRequest的所有权,方法返回后,被调用方不应持有它。// 当请求到达传输模块,有另一个限制(尤其是当传输层通讯的节点是本地,这时没有gRPC marshal 和 unmarshal):// 被调用者必须把BatchRequest内的所有东西当成已读的。这时客户端模块得以保留传递指针给其内部的权利。Send(context.Context, roachpb.BatchRequest) (*roachpb.BatchResponse, *roachpb.Error)
}

TxnSender 则定义了发送事务请求时调用的接口。值得粗略看一下每个方法完成了哪些工作。

type TxnSender interface {Sender// AnchorOnSystemConfigRange 确保 txn record 会被创建在 system config Range.// 因为某些提交只有在该 Range 的 EndTxn 计算后才会触发。AnchorOnSystemConfigRange() error// GetLeafTxnInputState 取回初始化 LeafTxn 所必须的输入状态。GetLeafTxnInputState(context.Context, TxnStatusOpt) (roachpb.LeafTxnInputState, error)// GetLeafTxnFinalState 取回 LeafTxn 的最终状态。该状态信息用于更新 RootTxn。GetLeafTxnFinalState(context.Context, TxnStatusOpt) (roachpb.LeafTxnFinalState, error)// UpdateRootWithLeafFinalState 用 LeafTxn 的状态更新 RootTxn 的状态。UpdateRootWithLeafFinalState(context.Context, *roachpb.LeafTxnFinalState)// SetUserPriority 设置事务的优先级。SetUserPriority(roachpb.UserPriority) error// SetDebugName 设置事务的 debug 名称。SetDebugName(name string)// String 返回以 string 表示的事务。String() string// TxnStatus 输出事务状态。TxnStatus() roachpb.TransactionStatus// CreateSavepoint 创建一个 savepoint。// 只有在 RootTxn 调用时才是合法的。CreateSavepoint(context.Context) (SavepointToken, error)// RollbackToSavepoint 回滚到给定的 savepoint。// 只有在 RootTxn 调用时才是合法的。RollbackToSavepoint(context.Context, SavepointToken) error// ReleaseSavepoint 释放给定的 savepoint。// 只有在 RootTxn 调用时才是合法的。ReleaseSavepoint(context.Context, SavepointToken) error// SetFixedTimestamp 让事务在固定时间戳下运行。// Timestamp 和 ReadTimestamp 被设为 ts,不存在时钟不确定性,事务的截止时间被设为 ts// 事务不能被推到一个不同的时间戳。SetFixedTimestamp(ctx context.Context, ts hlc.Timestamp)// ManualRestart 提高事务的 epoch,也可以提高事务的时间戳和优先级。// 被 SQL 层试用,当得知一个事务不能够提交时尽早重启它。ManualRestart(context.Context, roachpb.UserPriority, hlc.Timestamp)// UpdateStateOnRemoteRetryableErr 更新事务 txn,作为执行该事务的请求时遇到错误的响应。UpdateStateOnRemoteRetryableErr(context.Context, *roachpb.Error) *roachpb.Error// DisablePipelining 让 TxnSender 不流水线发送请求。DisablePipelining() error// ReadTimestamp 返回事务当前的读时间戳。// 一个事务在提交前,时间戳能被内部向前推,所以返回的时间戳不一定是提交的时间戳。ReadTimestamp() hlc.Timestamp// CommitTimestamp 返回事务的开始时间戳。该方法总是返回同样的值。// 第一次调用该方法会固定开始时间戳,并防止发送方自动 push 事务的时间戳。// 即,调用该方法会增加返回重试错误给 client 的几率。CommitTimestamp() hlc.Timestamp// CommitTimestampFixed 检查时间戳是否被固定。CommitTimestampFixed() bool// ProvisionalCommitTimestamp 返回事务的临时提交时间戳。该时间戳可能被推后。ProvisionalCommitTimestamp() hlc.Timestamp// RequiredFrontier 返回事务只读操作时读操作的最大时间戳。RequiredFrontier() hlc.Timestamp// IsSerializablePushAndRefreshNotPossible 在事务可串行化、时间戳被推后、更新读操作持续时间后事务不可能成功时返回true。// 用于之后检测事务是否会得到一个可重试 error。IsSerializablePushAndRefreshNotPossible() bool// Active 返回该事务是否已经执行了一些命令。Active() bool// Epoch 返回该事务的 epoch。Epoch() enginepb.TxnEpoch// PrepareRetryableError 生成一个伴随该事务荷载的 TransactionRetryWithProtoRefreshError。PrepareRetryableError(ctx context.Context, msg string) error// TestingCloneTxn 返回事务当前 proto 的副本,用于测试。TestingCloneTxn() *roachpb.Transaction// Step 在当前事务创建一个 sequencing point。// sequencing point 为后续只读操作建立一个基线:在下一个 sequencing point 之前,// 只读操作在创建快照时观察数据,并忽略此后执行的写入操作。Step(context.Context) error// ConfigureStepping 设置 sequencing point 的行为。ConfigureStepping(ctx context.Context, mode SteppingMode) (prevMode SteppingMode)// GetSteppingMode 伴随 ConfigureStepping,用于测试。GetSteppingMode(ctx context.Context) (curMode SteppingMode)// ManualRefresh 试图更新一个事务的读时间戳到它的临时提交时间戳。// 有人可能这样做的原因是,确保一个事务可以提交,而不用经历第二次 push。ManualRefresh(ctx context.Context) error// DeferCommitWait 延迟一个事务的 commit-wait 操作,将 commit-wait 的责任从 TxnSender 交给该方法的调用者。// 返回一个函数,如果事务执行成功,调用者最终必须运行它。// 如果没有调用该函数,会违反一致性。因果依赖的事务可能不能观测到该事务的写操作。DeferCommitWait(ctx context.Context) func(context.Context) error
}

TxnCoordSenderTxnSender 的实现。

  • TxnCoordSender 实际将命令发送到 DistSender
  • TxnCoordSender 代表 client 跟踪一个事务的状态,并在必要时给事务发送心跳;
  • TxnCoordSender 不是单例,TxnCoordSenderFactory 为每一个事务创建一个 TxnCoordSender

TxnCoordSender 为一个事务做了大量统计和追踪。包括:

  • 审计一个事务中涉及的所有键,用于管理事务的状态;
  • 打包一个事务中的所有 K/V 操作到一个 BatchRequest 中,并把它送到 DistSender

TxnCoordSenderSend 方法

Send 如何将请求传递给 DistSender

这里分析 Send 的代码逻辑,及其最终请求 Send 是怎么到达 DistSender 的。

TxnCoordSenderSend 方法实际将 BatchRequest 交由分发层的 DistSender,由 DistSender 将请求转发给对应的 leaseholder。下面看看其代码是如何实现的。

TxnCoordSenderSend 方法开始:

func (tc *TxnCoordSender) Send(...// 调用了 tc.interceptorStack 的第一个 item 的 SendLocked 方法将 BatchRequest 发出。br, pErr := tc.interceptorStack[0].SendLocked(ctx, ba)...
}

那么 interceptorStack 到底是什么?其在 TxnCoordSender 结构体中定义如下:

   // 可插拔的请求拦截器,可以在收发请求时对 BatchRequest 进行转化(transform)。// 此 interceptorStack 实际指向 interceptorAlloc 的 arr 数组,// 而 txnInterceptor 各元素实际是一个指向各拦截器的指针,即紧随其后的 txnHeartbeater, txnSeqNumAllocator...// 每个 interceptor 都嵌入在 interceptorAlloc 结构体中,// 所以整个 interceptorStack 空间都在栈上分配,而不用堆分配。interceptorStack []txnInterceptorinterceptorAlloc struct {arr [6]txnInterceptor// 以下是各拦截器,都嵌入到 interceptorAlloc 中。txnHeartbeatertxnSeqNumAllocatortxnPipelinertxnSpanRefreshertxnCommittertxnMetricRecordertxnLockGatekeeper // 不在 interceptorStack 的 slice 中。}

由上述布局可以看出,各 interceptor(以下称拦截器)随 interceptorAlloc 结构体一并分配空间,即,它们都保存在 interceptorAlloc.arr 中。以 newRootTxnCoordSender 为例,我们看它是怎么初始化的。

func newRootTxnCoordSender(tcf *TxnCoordSenderFactory, txn *roachpb.Transaction, pri roachpb.UserPriority,
) kv.TxnSender {txn.AssertInitialized(context.TODO())if txn.Status != roachpb.PENDING {log.Fatalf(context.TODO(), "unexpected non-pending txn in RootTransactionalSender: %s", txn)}if txn.Sequence != 0 {log.Fatalf(context.TODO(), "cannot initialize root txn with seq != 0: %s", txn)}tcs := &TxnCoordSender{typ:                   kv.RootTxn,TxnCoordSenderFactory: tcf,}tcs.mu.txnState = txnPendingtcs.mu.userPriority = pri// 创建一个拦截器栈,因为空间已经在 TxnCoordSender 结构体中预分配,// 这里只是对其进行初始化并将它们连接起来。然后它在栈底加入一个 txnLockGatekeeper,// 来使该栈与 TxnCoordSender 的 wrapped sender 连接。//// 对 txnHeartbeater 进行初始化。// 这些初始化参数来自 TxnCoordSenderFactory.tcs.interceptorAlloc.txnHeartbeater.init(tcf.AmbientContext,tcs.stopper,tcs.clock,&tcs.metrics,tcs.heartbeatInterval,&tcs.interceptorAlloc.txnLockGatekeeper,&tcs.mu.Mutex,&tcs.mu.txn,)tcs.interceptorAlloc.txnCommitter = txnCommitter{st:      tcf.st,stopper: tcs.stopper,mu:      &tcs.mu.Mutex,}tcs.interceptorAlloc.txnMetricRecorder = txnMetricRecorder{metrics: &tcs.metrics,clock:   tcs.clock,txn:     &tcs.mu.txn,}// 初始化各拦截器,主要是根据 TxnCoordSenderFactory 中的参数对 tcs.tcs.interceptorAlloc 中的各拦截器进行传参。tcs.initCommonInterceptors(tcf, txn, kv.RootTxn)// 在 tcs.interceptorAlloc 的 arr 数组中引用各拦截器。tcs.interceptorAlloc.arr = [...]txnInterceptor{&tcs.interceptorAlloc.txnHeartbeater,// Various interceptors below rely on sequence number allocation,// so the sequence number allocator is near the top of the stack.&tcs.interceptorAlloc.txnSeqNumAllocator,// The pipeliner sits above the span refresher because it will// never generate transaction retry errors that could be avoided// with a refresh.&tcs.interceptorAlloc.txnPipeliner,// The span refresher may resend entire batches to avoid transaction// retries. Because of that, we need to be careful which interceptors// sit below it in the stack.&tcs.interceptorAlloc.txnSpanRefresher,// The committer sits beneath the span refresher so that any// retryable errors that it generates have a chance of being// "refreshed away" without the need for a txn restart. Because the// span refresher can re-issue batches, it needs to be careful about// what parts of the batch it mutates. Any mutation needs to be// idempotent and should avoid writing to memory when not changing// it to avoid looking like a data race.&tcs.interceptorAlloc.txnCommitter,// The metrics recorder sits at the bottom of the stack so that it// can observe all transformations performed by other interceptors.&tcs.interceptorAlloc.txnMetricRecorder,}// 将 tcs.interceptorStack 指向该拦截器数组 arr。tcs.interceptorStack = tcs.interceptorAlloc.arr[:]// 将各拦截器连接起来。tcs.connectInterceptors()tcs.mu.txn.Update(txn)return tcs
}

各拦截器是如何连接的:

func (tc *TxnCoordSender) connectInterceptors() {for i, reqInt := range tc.interceptorStack {if i < len(tc.interceptorStack)-1 {// 下标为 i 的拦截器将其成员变量 wrapped 设置为下标为 i+1 的拦截器reqInt.setWrapped(tc.interceptorStack[i+1])} else {// 末尾的拦截器将 wrapped 设置为最终出口 txnLockGatekeeper.reqInt.setWrapped(&tc.interceptorAlloc.txnLockGatekeeper)}}
}

通过上述分析,各个拦截器一层 wrap 一层,TxnCoordSenderSend 方法首先通过:

tc.interceptorStack[0].SendLocked(ctx, ba)

调用了最外层的 interceptor,点进某个 interceptor 实现的 SendLocked 方法实现,其包含语句:

return h.wrapped.SendLocked(ctx, ba)

继续调用了里层 interceptorSendLocked 方法。
上述 connectInterceptors 代码中将最后一个拦截器连接到了 txnLockGatekeeper,说明 txnLockGatekeeper 是请求的最终出口。

txnLockGatekeeperinitCommonInterceptors 初始化拦截器时初始化:

   tc.interceptorAlloc.txnLockGatekeeper = txnLockGatekeeper{wrapped:                 tc.wrapped,mu:                      &tc.mu.Mutex,allowConcurrentRequests: typ == kv.LeafTxn,}

txnLockGatekeeper 也 wrap 了一个 Sender,他来自 TxnCoordSender 指向的 Factory

// TxnCoordSenderFactory implements client.TxnSenderFactory.
type TxnCoordSenderFactory struct {...wrapped                kv.Sender  // 我在这...
}

为了找到 wrapped 的来源,继续跟踪其初始化,它在 NewTxnCoordSenderFactory 函数被赋值:

// NewTxnCoordSenderFactory creates a new TxnCoordSenderFactory. The
// factory creates new instances of TxnCoordSenders.
func NewTxnCoordSenderFactory(cfg TxnCoordSenderFactoryConfig, wrapped kv.Sender,    // 传入了 wrapped.
) *TxnCoordSenderFactory {...// 将 wrapped 传给了新创建的工厂。
}

src/github.com/cockroachdb/cockroach/pkg/server/server.goNewServer 函数调用了上述 NewTxnCoordSenderFactory 函数:

// NewServer creates a Server from a server.Config.
func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {...tcsFactory := kvcoord.NewTxnCoordSenderFactory(txnCoordSenderFactoryCfg, distSender)   // TxnCoordSenderFactory 的 wrapped 是一个 DistSender....
}

至此,可以确认 TxnCoordSender 最终将 BatchRequest 交由分发层的 DistSender 进行转发。

总结一下, TxnCoordSender 首先将请求传给拦截器栈栈顶的拦截器,请求从栈顶一路走到位于栈底的 txnLockGatekeepertxnLockGatekeeper 将请求传给更下层的 DistSender.

RootTxnLeafTxn

上述分析设计了 newRootTxnCoordSender 函数,除此以外还有 newLeafTxnCoordSender 函数,两种事务 RootTxnLeafTxn 的区别可以在src/github.com/cockroachdb/cockroach/pkg/kv/sender.go 中找到。

const (_ TxnType = iota// RootTxn specifies this sender is the root transaction, and is// responsible for aggregating all transactional state and// finalizing the transaction. The root txn is responsible for// heartbeating the transaction record.RootTxn// LeafTxn specifies this sender is for one of potentially many// distributed client transactions. The state from this transaction// must be propagated back to the root transaction and used to// augment its state before the transaction can be finalized. Leaf// transactions do not heartbeat the transaction record.//// Note: As leaves don't perform heartbeats, the transaction might// be cleaned up while this leaf is executing an operation. We rely// on the cleanup process poisoning the AbortSpans for all intents// so that reads performed through a leaf txn don't miss writes// previously performed by the transaction (at least not until the// expiration of the GC period / abort span entry timeout).LeafTxn
)
  • RootTxn 说明发送方是一个根事务,负责收集所有事务状态、最后确定事务,并负责给 txn record 发送心跳;
  • LeafTxn 说明发送方属于众多分布的事务之一。在该事务结束前,必须将其状态传回根事务,并用于增加它的状态。因为叶事务不发送心跳,使得我们可以在该叶事务执行操作时清理它。我们依赖清理过程为所有 intent 毒害 AbortSpans ,从而一个叶节点上的读操作不会错过一个该事务之前执行的写操作(至少在 GC 期过期前/中止 span entry 超时前不会发生)。

TxnCoordSender 各拦截器作用浅析

txnInterceptor 接口
type txnInterceptor interface {lockedSender// setWrapped 设置拦截器包装的 lockedSender.setWrapped(wrapped lockedSender)// populateLeafInputState 为一个 LeafTxn 填充给出的荷载。populateLeafInputState(*roachpb.LeafTxnInputState)// populateLeafFinalState populates the final payload// for a LeafTxn to bring back into a RootTxn.// populateLeafFinalState 为一个 LeafTxn 填充最终荷载,以取回到 RootTxn 中。populateLeafFinalState(*roachpb.LeafTxnFinalState)// importLeafFinalState 根据 LeafTxn 更新拦截器内部状态。importLeafFinalState(context.Context, *roachpb.LeafTxnFinalState)// epochBumpedLocked 在事务 epoch 增加时重置拦截器。epochBumpedLocked()// createSavepointLocked is used to populate a savepoint with all the state// that needs to be restored on a rollback.// createSavepointLocked 用回滚所需要的所有状态填充一个 savepoint.createSavepointLocked(context.Context, *savepoint)// rollbackToSavepointLocked 恢复到之前 createSavepointLocked 保存的的状态。rollbackToSavepointLocked(context.Context, savepoint)// closeLocked 关闭拦截器。在事务提交/中止时, TxnCoordSender 关闭会调用此方法。closeLocked()
}
txnHeartbeater

txnHeartbeater 负责事务的心跳。

txnSeqNumAllocator

txnSeqNumAllocator 负责为 batch 里面的每个请求分配序列号。

序列号的作用:

  1. 为事务中对同一 key 的一系列读/写操作施加顺序;
  2. 唯一标识写操作,因为每个写操作都有一个新的序列号,(txn_id, txn_epoch, seq)可以在集群中唯一标识一个写操作。这个唯一标识可以用于查找写操作的 intent。在事务写操作流水线中,用 QueryIntent 请求查找 intent
  3. 用于辨别一个 batch 是否含有一个事务的所有写操作,详见 BatchRequest.IsCompleteTransaction
  4. 用于在重放和重发中提供幂等性,因为 MVCC 层可以辨别序列号,并确保在某一序列号的读操作无视大于其序列号的写操作。类似地,如果一个相同序列号的 intent 出现,写操作将成为空操作;如果一个序列号的 intent 还没出现,但一个更大的序列号的 intent 出现了,将返回一个错误。类似地,如果出现一个序列号相同的 intent,但它的值与我们重新计算的不同,将返回一个错误。
txnPipeliner

txnPipeliner异步共识(Asynchronous Consensus) 管道化(Pipelining)事务写操作。txnPipeliner 通过 Raft 追踪所有异步发起的写操作,并通过首先证明异步写操作成功,确保所有干扰请求链接到它们。
txnPipeliner 同样确保当一个事务提交时,已提出(Proposed)但尚未证实成功的写操作会先被检查,再考虑提交事务。这些异步的写操作被称为“in-flight writes”(悬空写),证明这些悬空写已经成功的过程叫“proving” the write。当所有写都被证明已结束后,可以认为它们是“稳定”的。

链接到正在进行的异步写入非常重要,原因有两个:

  1. 发送到 Raft 的请求不一定成功。一个事务在提交前必须检查所有异步写操作成功;
  2. txnPipeliner 下的运输层不为同一个事务中的多个并发请求提供足够强的顺序保证。
txnSpanRefresher

txnSpanRefresher 在一个事务收到 序列化重试错误(Serializable Retry Error) 时收集 read span,如果所有 span 可以更新到当前事务时间戳,它可以用这些 read span 来避免事务重试。

只有一个 write intent 可以存在一个 key 中(单 write intent),在解决该 write intent 之前不允许时间戳更高的读操作对其进行读取,所以一个事务可以自由地将其 intent 移到更高的时间戳。实际上,同步重写这些 intent 并不是必须的,因为解决 intent 时会在必要时将 intent 的时间戳提高。所以,“将 write intent 移到一个更高的时间戳”可以通过在更高的时间戳提交这些 intent 来隐式地完成。然而在写操作提高时间戳后,其时间戳可能盖过了其它事务的某些读操作,这意味着一个事务不能盲操作提高其读操作的时间戳,因为其他事务的写操作可能让它们无效。事务需要获取悲观写锁和乐观读锁。

txnSpanRefresher 负责检测一个事务何时会想要将其临时提交时间戳(provisional commit timestamp)向前推。并确定这样做对于其读操作是否安全。当一个拦截器决定试图向前移动事务的时间戳,它首先“更新”(refresh)它的每个读操作,这个更新步骤重新访问了事务访问过的所有 key span,并检查在最初时间戳和当前时间戳之间是否有写操作出现。如果任一读操作会产出不同的结果,refresh 失败并且事务被强迫完全重启。如果所有读操作都产出同样的值,事务无需重启,只需更新其临时提交时间戳即可。

txnCommitter

txnCommitter 关联提交和回滚事务。它拦截 EndTxn 请求并协调它们的执行。

  • 如果它们是单独的,配置好地址后直接将它们发送;
  • 如果它们是不需要的则忽略它们;
  • 如果它们是更大的请求集合的一部分,协调它们并发执行。
txnMetricRecorder

txnMetricRecorder 负责更新事务的行为和结果的相关指标。

TxnCoordSender 发送心跳

在研究心跳发送源码前,有必要先理解一下 txn record.

Transaction record

为了追踪一个事务的执行,将一个名为 transaction record 的值写到 key-value store 中。一个事务中所有的 write intent 都指向同一个 txn record,这让所有事务都可以检查它遇到的某一个 write intent 的状态。

txn record 总是写到事务中读写的第一个 key 所在的 Rangetxn record 在以下事件之一发生后创建:

  • 写操作提交;
  • TxnCoordSender 发送心跳到 txn record
  • 一个操作强制事务中止。

txn record 有以下状态:

  • PENDING,表明 write intent 所属的事务仍在进行中;
  • COMMITTED,当 txn record 已提交,可以将所有 write intent 视为已提交的 value;
  • STAGING,用于使能并行提交(Parallel Commit)功能,事务是否已经提交取决于事务的 write intent 是否都已提交;
  • ABORTED,事务中止,其 value 应该被丢弃;
  • Record does not exist,当一个事务遇到一个 write intenttxn record 不存在,它用该 write intent 的时间戳来决定如何继续:如果 write intent 的时间戳在本事务的 liveness threshold 内,本事务将 write intent 所属事务视为 PENDING,否则将 write intent 所属事务视为 ABORTED

txn record 在事务提交后仍然存在,直到其所有 write intent 都被转化成 MVCC values.

txn record 结构体的定义如下:

// TransactionRecord message 包含一个 Transaction message 的字段的子集,
// 可以将它当成一个对 Transaction 字段的掩码,取出的字段需要持久化到一个 txn record.
//
// 消息类型与持久化的 Transaction proto 有线兼容,
// 但避免了持久化 txn record 不需要的字段。
// 它声明了一个 txn record 应该持久化哪些字段。
type TransactionRecord struct {// See comments on Transaction proto.enginepb.TxnMeta `protobuf:"bytes,1,opt,name=meta,proto3,embedded=meta" json:"meta"`Status           TransactionStatus             `protobuf:"varint,4,opt,name=status,proto3,enum=cockroach.roachpb.TransactionStatus" json:"status,omitempty"`LastHeartbeat    hlc.Timestamp                 `protobuf:"bytes,5,opt,name=last_heartbeat,json=lastHeartbeat,proto3" json:"last_heartbeat"`LockSpans        []Span                        `protobuf:"bytes,11,rep,name=lock_spans,json=lockSpans,proto3" json:"lock_spans"`InFlightWrites   []SequencedWrite              `protobuf:"bytes,17,rep,name=in_flight_writes,json=inFlightWrites,proto3" json:"in_flight_writes"`IgnoredSeqNums   []enginepb.IgnoredSeqNumRange `protobuf:"bytes,18,rep,name=ignored_seqnums,json=ignoredSeqnums,proto3" json:"ignored_seqnums"`
}

由上述定义可以看到,一个 TransactionRecord 包含事务元数据、事务状态、最近一次心跳的时间戳、一组 Range,还未执行完的写请求、忽略的序列号。

什么是心跳?

心跳即发送方每隔一个固定的时间间隔就发送一个消息给接收方,这个消息包含了该事务的相关信息。

心跳有什么用?

Dealing with transactions’ state. After a transaction is started, TxnCoordSender starts asynchronously sending heartbeat messages to that transaction’s transaction record, which signals that it should be kept alive. If the TxnCoordSender 's heartbeating stops, the transaction record is moved to the ABORTED status.

心跳主要用于簿记一个事务的状态。

当一个事务开始时,TxnCoordSender 异步地开始发送心跳包给事务的 txn record,表明该事务应该被保活。如果该心跳停止,txn record 会被转移到 ABORTED 状态。

具体地,心跳的作用如下:

  1. 对于处于 STAGING 状态的事务,当一个与之冲突的事务遇到它时,需要检查其是否仍在接收心跳,如果正在接收心跳,说明事务仍在活跃,当前事务应该等待;
  2. 如果在 liveness threshold 内事务的 txn record 没有接收到心跳,说明事务过期,将其状态设为 ABORTED

简言之,协调者(coordinator)周期性地向处于 PENDINGtxn record 发送心跳,向竞争(contending)的事务保证该事务仍在取得进展。

心跳发送流程

在上文构建拦截器栈时,提到以下代码:

// 在 tcs.interceptorAlloc 的 arr 数组中引用各拦截器。
tcs.interceptorAlloc.arr = [...]txnInterceptor{&tcs.interceptorAlloc.txnHeartbeater,    // 这是负责心跳工作的拦截器。...
}

TxnCoordSender 调用了 txnHeartbeaterSendLocked 方法,其实现如下:

// SendLocked is part of the txnInterceptor interface.
func (h *txnHeartbeater) SendLocked(ctx context.Context, ba roachpb.BatchRequest,
) (*roachpb.BatchResponse, *roachpb.Error) {etArg, hasET := ba.GetArg(roachpb.EndTxn)firstLockingIndex, pErr := firstLockingIndex(&ba)if pErr != nil {return nil, pErr}if firstLockingIndex != -1 {// Set txn key based on the key of the first transactional write if not// already set. If it is already set, make sure we keep the anchor key// the same.if len(h.mu.txn.Key) == 0 {anchor := ba.Requests[firstLockingIndex].GetInner().Header().Keyh.mu.txn.Key = anchor// Put the anchor also in the ba's copy of the txn, since this batch// was prepared before we had an anchor.ba.Txn.Key = anchor}// 如果心跳循环还没开启,且此 BatchRequest 不打算提交/中止事务,则开启心跳循环。if !h.mu.loopStarted {if !hasET {if err := h.startHeartbeatLoopLocked(ctx); err != nil {return nil, roachpb.NewError(err)}}}}if hasET {et := etArg.(*roachpb.EndTxnRequest)// Set the EndTxn request's TxnHeartbeating flag. Set to true if// a hearbeat loop was started which indicates that transaction has// a transaction record.et.TxnHeartbeating = h.mu.loopStarted// Preemptively stop the heartbeat loop in case of transaction abort.// In case of transaction commit we don't want to do this because commit// could fail with retryable error and transaction would be restarted// with the next epoch.if !et.Commit {h.cancelHeartbeatLoopLocked()}}// Forward the batch through the wrapped lockedSender.return h.wrapped.SendLocked(ctx, ba)
}

上述代码调用了 startHeartbeatLoopLocked 方法开启心跳循环,其实现如下:

// startHeartbeatLoopLocked starts a heartbeat loop in a different goroutine.
func (h *txnHeartbeater) startHeartbeatLoopLocked(ctx context.Context) error {if h.mu.loopStarted {log.Fatal(ctx, "attempting to start a second heartbeat loop")}log.VEventf(ctx, 2, "coordinator spawns heartbeat loop")h.mu.loopStarted = true// NB: we can't do this in init() because the txn isn't populated yet then// (it's zero).h.AmbientContext.AddLogTag("txn-hb", h.mu.txn.Short())// Create a new context so that the heartbeat loop doesn't inherit the// caller's cancelation.// We want the loop to run in a span linked to the current one, though, so we// put our span in the new context and expect RunAsyncTask to fork it// immediately.hbCtx := h.AnnotateCtx(context.Background())hbCtx = tracing.ContextWithSpan(hbCtx, tracing.SpanFromContext(ctx))hbCtx, h.mu.loopCancel = context.WithCancel(hbCtx)return h.stopper.RunAsyncTask(hbCtx, "kv.TxnCoordSender: heartbeat loop", h.heartbeatLoop) // 实际开启了一个新的 goroutine 运行 heartbeatLoop 方法发送心跳。
}

heartbeatLoop 代码如下:

// heartbeatLoop 周期性地发送心跳到 txn record,在事务中止或者提交后停止。
func (h *txnHeartbeater) heartbeatLoop(ctx context.Context) {// 方法返回时取消该心跳。defer func() {h.mu.Lock()h.cancelHeartbeatLoopLocked()h.mu.Unlock()}()var tickChan <-chan time.Time{ticker := time.NewTicker(h.loopInterval)tickChan = ticker.Cdefer ticker.Stop()}// Loop with ticker for periodic heartbeats.for {select {case <-tickChan:if !h.heartbeat(ctx) {// 心跳发现一个达到结束状态的事务,停止发送心跳。return}case <-ctx.Done():// Transaction finished normally.returncase <-h.stopper.ShouldQuiesce():return}}
}

heartbeatLoop 实际调用 heartbeat 发送心跳给 txn record。其代码如下:

// heartbeat 发送心跳到 txn record.
// 当应该继续发送心跳时返回 true,否则事务不再处于 PENDING 状态,不需要继续发送心跳,返回 false。
func (h *txnHeartbeater) heartbeat(ctx context.Context) bool {// 调用该方法并没有一直持有锁,在 wrapper.Send() 里,// 栈底部的 interceptor 会解锁,直到其收到回复再次加锁。h.mu.Lock()defer h.mu.Unlock()// The heartbeat loop might have raced with the cancelation of the heartbeat.if ctx.Err() != nil {return false}if h.mu.txn.Status != roachpb.PENDING {if h.mu.txn.Status == roachpb.COMMITTED {// 事务提交了心跳却没有停止,直接 panic.log.Fatalf(ctx, "txn committed but heartbeat loop hasn't been signaled to stop: %s", h.mu.txn)}// 如果事务已经中止,没必要继续发心跳,client 需要发送一个 rollback.return false}// 拷贝事务并将其放到心跳中。txn := h.mu.txn.Clone()if txn.Key == nil {log.Fatalf(ctx, "attempting to heartbeat txn without anchor key: %v", txn)}// 构造一个心跳 BatchRequest.ba := roachpb.BatchRequest{}ba.Txn = txn // 将事务放到 BatchRequest 请求中。ba.Add(&roachpb.HeartbeatTxnRequest{   // 往 BatchRequest 添加一个心跳请求。RequestHeader: roachpb.RequestHeader{// 心跳请求头包含了事务的 key。// 该 key 是锚定事务的键。通常是事务读取/写入的第一个键,// 这决定了集群中哪个 Range 会持有事务的 txn record.Key: txn.Key, },Now: h.clock.Now(),})// 直接用 gatekeeper interceptor 发送心跳。// h.gatekeeper 的注释提到了为何要这样做:// 直接用 gatekeeper 发送会绕过所有其他拦截器,// 心跳不需要这些功能:我们不需要发送心跳的时候获取序列号或检查 intent。// gatekeeper 为 lockedSender 类型,其实际是一个分发层的 DistSender.log.VEvent(ctx, 2, "heartbeat")br, pErr := h.gatekeeper.SendLocked(ctx, ba)// 如果事务的状态不再是 PENDING,无视心跳请求的响应并返回 false。if h.mu.txn.Status != roachpb.PENDING {return false}var respTxn *roachpb.Transactionif pErr != nil {log.VEventf(ctx, 2, "heartbeat failed: %s", pErr)// 需要处理没有事务 proto 的 TransactionAbortedErrorif _, ok := pErr.GetDetail().(*roachpb.TransactionAbortedError); ok {// 有可能事务已经提交,但其 record 被 GC 回收了。这时 abort 不会有任何影响,// 因为所有 intent 都解决了。 需要弄清楚的是,我们没有将该错误告诉 client,// 这会导致 client 最终得到一个事务提交的结果,或者一个含糊的结果。h.abortTxnAsyncLocked(ctx)h.mu.finalObservedStatus = roachpb.ABORTEDreturn false}respTxn = pErr.GetTxn()} else {respTxn = br.Txn    // 从响应中取出事务}// 如果响应中的事务已经到达最终状态,拆解心跳循环并返回。if respTxn != nil && respTxn.Status.IsFinalized() {switch respTxn.Status {case roachpb.COMMITTED:// 事务已提交,什么也不用做。case roachpb.ABORTED:// 中止事务,回滚其 txn record 以清理 write intent.h.abortTxnAsyncLocked(ctx)}h.mu.finalObservedStatus = respTxn.Statusreturn false}return true
}

心跳的接收

接收心跳的函数位于 src/github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/cmd_heartbeat_txn.go,其代码如下:

// HeartbeatTxn 在接收到协调者的心跳后,更新事务的状态和心跳时间戳,并返回更新后的事务。
func HeartbeatTxn(ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {args := cArgs.Args.(*roachpb.HeartbeatTxnRequest)h := cArgs.Headerreply := resp.(*roachpb.HeartbeatTxnResponse)// 执行一些健全性检查,检查头部和请求中的事务兼容。if err := VerifyTransaction(h, args, roachpb.PENDING, roachpb.STAGING); err != nil {return result.Result{}, err}if args.Now.IsEmpty() {return result.Result{}, fmt.Errorf("now not specified for heartbeat")}key := keys.TransactionKey(h.Txn.Key, h.Txn.ID)var txn roachpb.Transaction// 根据键拿到值放到 txn 中。if ok, err := storage.MVCCGetProto(ctx, readWriter, key, hlc.Timestamp{}, &txn, storage.MVCCGetOptions{},); err != nil {return result.Result{}, err} else if !ok {// 没找到 txn record,创建一个,将txn指向 h.Txn.txn = *h.Txn// 验证是否能够安全地创建 txn record.if err := CanCreateTxnRecord(ctx, cArgs.EvalCtx, &txn); err != nil {return result.Result{}, err}}if !txn.Status.IsFinalized() {// 更新 LastHeartbeat 的时间戳。txn.LastHeartbeat.Forward(args.Now)// 根据 txn 获取 txn record.txnRecord := txn.AsRecord()// 以 key 为键保存该 txn record.if err := storage.MVCCPutProto(ctx, readWriter, cArgs.Stats, key, hlc.Timestamp{}, nil, &txnRecord); err != nil {return result.Result{}, err}}reply.Txn = &txn  // 以此事务进行回复。return result.Result{}, nil
}

总结如下:

经过 RootTxnCoordSender 发送 BatchRequest 请求会经过 txnHeartbeater 拦截器,从而开始发送心跳。

  • 接收端收到心跳后,会对事务进行验证,创建 txn record、更新 txn record 的时间戳,并以此事务作为回复。
  • 发送端收到回复后,检查是否有错误以及事务的状态,并决定对事务进行提交、中止还是继续发送心跳。

心跳如何通知其它竞争事务

心跳到达接收方后,会作为一个 TransactionRecord 持久化到 K/V store 中。每次接收到心跳,都会更新其 LastHeartbeat 时间戳。上文提到,竞争事务会检查该时间戳,以下从代码层面分析该检查动作。

TransactionRecord 可以通过 AsTransaction 方法转化为一个 Transaction

// AsTransaction returns a Transaction object containing populated fields for
// state in the transaction record and empty fields for state omitted from the
// transaction record.
func (tr *TransactionRecord) AsTransaction() Transaction {var t Transactiont.TxnMeta = tr.TxnMetat.Status = tr.Statust.LastHeartbeat = tr.LastHeartbeatt.LockSpans = tr.LockSpanst.InFlightWrites = tr.InFlightWritest.IgnoredSeqNums = tr.IgnoredSeqNumsreturn t
}

Transaction 又有 LastActive 方法,用于获取 client 最近一次活跃的时间:

// LastActive returns the last timestamp at which client activity definitely
// occurred, i.e. the maximum of ReadTimestamp and LastHeartbeat.
func (t Transaction) LastActive() hlc.Timestamp {ts := t.LastHeartbeatif !t.ReadTimestamp.Synthetic {   // 如果读时间戳不是来自 HLC 时钟。ts.Forward(t.ReadTimestamp) // 取 ReadTimestamp 和 LastHeartbeat 较大者。}return ts
}

TxnExpiration 函数用于获取一个事务过期的时间戳:

// TxnExpiration computes the timestamp after which the transaction will be
// considered expired.
func TxnExpiration(txn *roachpb.Transaction) hlc.Timestamp {return txn.LastActive().Add(TxnLivenessThreshold.Nanoseconds(), 0) // 最近活跃时间 + liveness threhold
}

IsExpired 调用了该函数检查一个事务是否过期:

// IsExpired is true if the given transaction is expired.
func IsExpired(now hlc.Timestamp, txn *roachpb.Transaction) bool {return TxnExpiration(txn).Less(now)
}

PushTxn 在解决并发事务冲突时用到了 IsExpired 函数检查事务是否过期:

// PushTxn resolves conflicts between concurrent txns (or between
// a non-transactional reader or writer and a txn) in several ways,
// depending on the statuses and priorities of the conflicting
// transactions.
func PushTxn(ctx context.Context, readWriter storage.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
) (result.Result, error) {...switch {case txnwait.IsExpired(cArgs.EvalCtx.Clock().Now(), &reply.PusheeTxn):reason = "pushee is expired"// When cleaning up, actually clean up (as opposed to simply pushing// the garbage in the path of future writers).pushType = roachpb.PUSH_ABORTpusherWins = truecase pushType == roachpb.PUSH_TOUCH:......// Determine what to do with the pushee, based on the push type.switch pushType {case roachpb.PUSH_ABORT:// If aborting the transaction, set the new status.reply.PusheeTxn.Status = roachpb.ABORTED // 将事务的状态设为 ABORTED.// If the transaction record was already present, forward the timestamp// to accommodate AbortSpan GC. See method comment for details.if ok {reply.PusheeTxn.WriteTimestamp.Forward(reply.PusheeTxn.LastActive())}case roachpb.PUSH_TIMESTAMP:...
}

至此,竞争事务检查与其冲突事务的最近活跃时间,如果已经超出了 liveness threhold,则直接将其状态设为 ABORTED.

CockroachDB 分布式事务源码分析之 TxnCoordSender相关推荐

  1. springboot 事务_原创002 | 搭上SpringBoot事务源码分析专车

    前言 如果这是你第二次看到师长,说明你在觊觎我的美色! 点赞+关注再看,养成习惯 没别的意思,就是需要你的窥屏^_^ 专车介绍 该趟专车是开往Spring Boot事务源码分析的专车 专车问题 为什么 ...

  2. springboot事务回滚源码_002 | 搭上SpringBoot事务源码分析专车

    发车啦,发车啦,上车要求: 点击左上方的"java进阶架构师"进入页面 选择右上角的"置顶公众号"上车 专车介绍 该趟专车是开往Spring Boot事务源码分 ...

  3. Spring事务源码分析责任链事务链事务不生效

    文章目录 前言 带着问题分析源码 事务源码分析 寻找Spring事务源码类 TransactionInterceptor调用栈 分析Spring AOP责任链 分析TransactionInterce ...

  4. Kafka#4:存储设计 分布式设计 源码分析

    https://sites.google.com/a/mammatustech.com/mammatusmain/kafka-architecture/4-kafka-detailed-archite ...

  5. Spring源码分析-Spring事务源码分析

    导语      在配置Spring事务管理的时候会用到一个类TransactionInterceptor,从下面的类关系图中可以看到TransactionInterceptor继承了MethodInt ...

  6. [转]php与memcached服务器交互的分布式实现源码分析[memcache版]

    原文链接:http://www.cnblogs.com/luckcs/articles/2619846.html 前段时间,因为一个项目的关系,研究了php通过调用memcache和memcached ...

  7. JuiceFS分布式文件系统源码分析(Java层)

    文章目录 01 引言 02 JuiceFS Hadoop Java API 2.1 如何使用? 2.2 入口 2.2.1 getFileSystem方法 2.2.2 小结 2.3 JuiceFS源码 ...

  8. 基于后端开发Redisson实现分布式锁源码分析解读

    一.分布式锁的概念和使用场景 分布式锁是控制分布式系统之间同步访问共享资源的一种方式. 在分布式系统中,常常需要协调他们的动作.如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问 ...

  9. spring事务源码分析结合mybatis源码(二)

    让我们继续上篇,分析下如果有第二个调用进入的过程. 代码部分主要是下面这个: if (isExistingTransaction(transaction)) {return handleExistin ...

最新文章

  1. Unity3D Adam Demo的学习与研究
  2. 第二种PHP协议,PHP多种形式,第二种使用来自First的数据
  3. MySQL数据库开启root用户远程登录
  4. android pd px sp 转换
  5. hosts文件分发其他机器
  6. jdbc:initialize-database标签的研究
  7. Android Studio — Could not determine java version from ‘11.0.8‘. The project uses Gradle version wh
  8. c语言课程设计报告书通讯录,C语言课程设计学生通讯录管理系统设计
  9. 解决Cannot find module ‘./index.module.scss‘ or its corresponding type declarations.ts(2307)
  10. 计算机c盘要满了电脑会卡吗,电脑卡就一定是C盘装太满吗?
  11. Airbnb房源信息爬取(二)——获取房源信息
  12. H5多点触控原理以及对多点触控的追踪
  13. 国内各大企业邮箱,选择看重哪几个方面?
  14. linux中 halt shutdown
  15. 为什么叫区块链存储?兼谈IPFSFilecoin, Chia
  16. C语言实现植物大战僵尸自动收集阳光(三) 解决收集不全与收集奖杯卡死的问题
  17. Bootstrap抽样和Monte Carlo思想
  18. java-不死神兔百钱百鸡
  19. Ti IMGLIB库简介
  20. PID算法(三)串级PID

热门文章

  1. 计算机系统中位和字节表示的含义,计算机存储单位 位、字节、字、KB、MB 分别是什么含义...
  2. 在Linux中ipcs命令,Linux下ipcs指令的用法详解。
  3. 分布式环境下的服务器时钟同步问题解决办法
  4. 并行编程的几种常见框架总结
  5. win10缩放导致html布局混乱,技术编辑为你解说win10系统chrome在dpi缩放下导致界面放大怎么处理...
  6. 音频笔记-AudioTrack
  7. 【转】window7下Word 2007报“Microsoft office word已停止工作“
  8. 人脸识别--翔云API
  9. 用selenium对svg标签的定位方法
  10. 深度好文:最详细的卷积神经网络入门教程