假设 mobile_applications 表的字段只有两个 app_name, other_info,均为 varchar(256),先上一段简单的业务逻辑代码

package main

import (

"database/sql"

_ "github.com/go-sql-driver/mysql"

)

func main() {

db, _ := sql.Open("mysql",

"root:123456@tcp(127.0.0.1:3306)/xg?charset=utf8&parseTime=true&loc=Local")

// tx1

tx, _ := db.Begin()

_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",

"第一个应用", "第一个应用的信息")

_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",

"第二个应用", "第二个应用的信息")

_ = tx.Commit()

// tx2

tx, _ = db.Begin()

_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",

"第三个应用", "第三个应用的信息")

_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",

"第四个应用", "第四个应用的信息")

//_ = tx.Commit()

// tx3

tx, _ = db.Begin()

_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",

"第五个应用", "第五个应用的信息")

_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",

"第六个应用", "第六个应用的信息")

_ = tx.Rollback()

}

执行结果:

mysql> select * from mobile_applications;

+-----------------+--------------------------+------------------+---------------+

| app_name | other_info | xxx_unrecognized | xxx_sizecache |

+-----------------+--------------------------+------------------+---------------+

| 第一个应用 | 第一个应用的信息 | NULL | NULL |

| 第二个应用 | 第二个应用的信息 | NULL | NULL |

+-----------------+--------------------------+------------------+---------------+

2 rows in set (0.00 sec)

显然,只有 tx1 最终落库,tx2 和 tx3 都没有落库。这个符合我们对事务的理解,即只有 commit 的操作才会最终落库。

这里我们首先要理解,对于计算机而言,什么是事务?数据库系统概念里对“事务”的定义相信很多人耳熟能详,但是对于我们运行的程序而言,事务实际上分为两层,第一层是内存中的上下文,第二层是DBMS控制的我们常说的“事务”。底层事务实际上是通过上下文来定义和操作的,中间隔了层 driver,屏蔽了具体的细节。

以 go 中的 事务为例,是通过一个 struct 来定义的。

type Tx struct {

db *DB

// closemu prevents the transaction from closing while there

// is an active query. It is held for read during queries

// and exclusively during close.

closemu sync.RWMutex

// dc is owned exclusively until Commit or Rollback, at which point

// it's returned with putConn.

dc *driverConn

txi driver.Tx

// releaseConn is called once the Tx is closed to release

// any held driverConn back to the pool.

releaseConn func(error)

// done transitions from 0 to 1 exactly once, on Commit

// or Rollback. once done, all operations fail with

// ErrTxDone.

// Use atomic operations on value when checking value.

done int32

// All Stmts prepared for this transaction. These will be closed after the

// transaction has been committed or rolled back.

stmts struct {

sync.Mutex

v []*Stmt

}

// cancel is called after done transitions from 0 to 1.

cancel func()

// ctx lives for the life of the transaction.

ctx context.Context

}

其中核心是:

driverConn

go 里面的数据库连接,封装了 driver 里的数据库连接。

driver.Tx

定义了 commit 和 rollback 两个方法。

一个事务,实际上就是所有 CRUD 操作都在同一个数据库连接里,调用 Begin 会通过该连接,经有 driver 执行特定 DBMS 的事务开启指令,比如 mysql 的 driver 就是

func (mc *mysqlConn) begin(readOnly bool) (driver.Tx, error) {

if mc.closed.IsSet() {

errLog.Print(ErrInvalidConn)

return nil, driver.ErrBadConn

}

var q string

if readOnly {

q = "START TRANSACTION READ ONLY"

} else {

q = "START TRANSACTION"

}

err := mc.exec(q)

if err == nil {

return &mysqlTx{mc}, err

}

return nil, mc.markBadConn(err)

}

后续操作就都在一个事务里了。上层可以通过 Begin 方法返回的 Commit/Rollback 方法来提交或回滚这个事务。那么,go 是如何实现对数据库事务的支持的呢?我们从入口代码一步一步来看。

db.Beigin

driver.go 里定义了 ErrBadConn: driver 抛出的错误,表示底层连接不可用或已中断,上层应该重新用一个连接

sql.go 里定义了两个常量,

cachedOrNewConn:

如果连接池里有 idle 状态的连接,直接返回;如果连接池里的连接数已经达到 MaxOpenCons 定义的数量,则阻塞等待,直到有一个连接 idel;否则,创建新的连接,加入到连接池,然后返回。

alwaysNewConn:

强制使用新的连接而不是从接池里里复用

// Begin() 方法是带参数版本的一个默认版本

func (db *DB) Begin() (*Tx, error) {

return db.BeginTx(context.Background(), nil)

}

// 可以设置 Contxt 和 事务配置项

func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {

var tx *Tx

var err error

// 如果遇到 ErrBadConn 默认会重试 2 次

for i := 0; i < maxBadConnRetries; i++ {

tx, err = db.begin(ctx, opts, cachedOrNewConn)

if err != driver.ErrBadConn {

break

}

}

if err == driver.ErrBadConn {

return db.begin(ctx, opts, alwaysNewConn)

}

return tx, err

}

func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {

dc, err := db.conn(ctx, strategy)

if err != nil {

return nil, err

}

return db.beginDC(ctx, dc, dc.releaseConn, opts)

}

外部调用最终进入私有方法 begin 里,begin 主要完成两个操作,一是获取一个数据库连接,二是创建事务的上下文,即开启事务。下面我们一一来看。

db.conn

这个方法很长,核心业务逻辑主要分为以下几个部分:

检查

首先检查 db 是否关闭了,再检查是否 context 过期了,若任一为是都会直接返回错误

db.mu.Lock() // 临界区的结束点不同情况不相同

if db.closed {

db.mu.Unlock()

return nil, errDBClosed

}

// Check if the context is expired.

select {

default:

case

db.mu.Unlock()

return nil, ctx.Err()

}

lifetime := db.maxLifetime // 后面逻辑使用

cachedOrNewConn 且 存在 idle 的连接

// db.freeConn 是一个 slice,存储了 idle 的连接

numFree := len(db.freeConn)

if strategy == cachedOrNewConn && numFree > 0 {

conn := db.freeConn[0]

// 移走第一个

copy(db.freeConn, db.freeConn[1:])

db.freeConn = db.freeConn[:numFree-1]

conn.inUse = true

db.mu.Unlock() // 释放锁了

// 判断连接是否过期了

if conn.expired(lifetime) {

conn.Close()

return nil, driver.ErrBadConn

}

// lastErr 字段的意义没看懂,好像在返回一个连接之前都要检查这个错误有没有设置

// Lock around reading lastErr to ensure the session resetter finished.

conn.Lock()

err := conn.lastErr

conn.Unlock()

if err == driver.ErrBadConn {

conn.Close()

return nil, driver.ErrBadConn

}

return conn, nil

}

没有空闲连接了,或者强制使用新连接。

连接池达到最大了

连接池达到最大了,就必须阻塞,这里用了一个 channel,类似于 Java 的条件队列

// Make the connRequest channel. It's buffered so that the

// connectionOpener doesn't block while waiting for the req to be read.

req := make(chan connRequest, 1)

reqKey := db.nextRequestKeyLocked()

db.connRequests[reqKey] = req

db.waitCount++

db.mu.Unlock()

waitStart := time.Now()

// Timeout the connection request with the context.

select {

// Context 结束了

case

// Remove the connection request and ensure no value has been sent

// on it after removing.

db.mu.Lock()

delete(db.connRequests, reqKey)

db.mu.Unlock()

atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

select {

default:

// 已经发出去了,且创建成功了,那么这个连接就需要加入连接池

// 但是不清楚为什么这里没有验证了

case ret, ok :=

if ok && ret.conn != nil {

db.putConn(ret.conn, ret.err, false)

}

}

return nil, ctx.Err()

// 收到返回

case ret, ok :=

atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))

// channel 是被关闭了的

if !ok {

return nil, errDBClosed

}

// 超时了

if ret.err == nil && ret.conn.expired(lifetime) {

ret.conn.Close()

return nil, driver.ErrBadConn

}

if ret.conn == nil {

return nil, ret.err

}

// Lock around reading lastErr to ensure the session resetter finished.

ret.conn.Lock()

err := ret.conn.lastErr

ret.conn.Unlock()

if err == driver.ErrBadConn {

ret.conn.Close()

return nil, driver.ErrBadConn

}

return ret.conn, ret.err

}

连接池还没有达到最大

直接新建连接

db.numOpen++ // optimistically

db.mu.Unlock()

ci, err := db.connector.Connect(ctx)

if err != nil {

db.mu.Lock()

db.numOpen-- // correct for earlier optimism

db.maybeOpenNewConnections()

db.mu.Unlock()

return nil, err

}

db.mu.Lock()

dc := &driverConn{

db: db,

createdAt: nowFunc(),

ci: ci,

inUse: true,

}

db.addDepLocked(dc, dc)

db.mu.Unlock()

return dc, nil

putConnDBLocked

前面提到,当连接池已满且没有 idel 连接的时候,是通过注册了一个 channel 来异步接收 free 连接的通知的。维护所有 channel 的是

connReqeusts map[uint64] chan connRequest

type connRequest struct {

conn *driverConn

err error

}

使用完一个连接后,需要“归还”连接给 DB。DB 的逻辑是:

如果存在 connRequest,会将 dc 直接交给它

否则,放入到连接池里,标记为 idel,并启动清理线程,关闭那些超时的连接

func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {

if db.closed {

return false

}

if db.maxOpen > 0 && db.numOpen > db.maxOpen {

return false

}

if c := len(db.connRequests); c > 0 {

var req chan connRequest

var reqKey uint64

for reqKey, req = range db.connRequests {

break

}

delete(db.connRequests, reqKey) // Remove from pending requests.

if err == nil {

dc.inUse = true

}

req

conn: dc,

err: err,

}

return true

} else if err == nil && !db.closed {

if db.maxIdleConnsLocked() > len(db.freeConn) {

db.freeConn = append(db.freeConn, dc)

db.startCleanerLocked()

return true

}

db.maxIdleClosed++

}

return false

}

db.beginDC

前面的操作完成了连接的获取(创建/释放),下面就要启动一个事务。

// beginDC starts a transaction. The provided dc must be valid and ready to use.

func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {

var txi driver.Tx

withLock(dc, func() { // 就是个包装方法,加锁,执行函数,放锁

txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 调用 driver 开启事务

})

if err != nil {

release(err)

return nil, err

}

// 后面的主要是暴露上下文,注册资源清理回调

// Schedule the transaction to rollback when the context is cancelled.

// The cancel function in Tx will be called after done is set to true.

ctx, cancel := context.WithCancel(ctx)

tx = &Tx{

db: db,

dc: dc,

releaseConn: release,

txi: txi,

cancel: cancel,

ctx: ctx,

}

go tx.awaitDone()

return tx, nil

}

// awaitDone blocks until the context in Tx is canceled and rolls back

// the transaction if it's not already done.

func (tx *Tx) awaitDone() {

// 如果在事务提交/回滚前,就结束阻塞,说明 Context结束了,那就要执行资源清理

// 关必并从连接池里删除这个连接,来保证事务已经关闭、资源得到释放

// 对于已经提交/回滚的事务,这个方法不会由任何影响

tx.rollback(true)

}

go mysql driver事务,Go 数据库事务的源码分析相关推荐

  1. 数据库中间件 MyCAT源码分析 —— XA分布式事务

    title: MyCAT 源码分析 -- XA分布式事务 date: 2017-07-15 tags: categories: MyCAT permalink: MyCAT/xa-distribute ...

  2. 数据库中间件 Sharding-JDBC 源码分析 —— SQL 执行

    摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/sql-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢! 本文主要基于 Sharding-JDBC 1 ...

  3. 数据库中间件 MyCAT源码分析:【单库单表】插入【推荐阅读】

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  4. 数据库中间件MyCAT源码分析:调试环境搭建

    ???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...

  5. Spring事务源码分析责任链事务链事务不生效

    文章目录 前言 带着问题分析源码 事务源码分析 寻找Spring事务源码类 TransactionInterceptor调用栈 分析Spring AOP责任链 分析TransactionInterce ...

  6. mysql dba系统学习-数据库事务详解

    mysql dba系统学习-数据库事务详解 上个星期去面试数据库管理员的工作,笔试通过之后就是直接的面试,他问了我一个问题,叫我介绍哈数据库的事务的看法和理解,但是不知所错的没有章法的乱答一气,唉唉, ...

  7. Spring的AOP和IOC是什么?使用场景有哪些?Spring事务与数据库事务,传播行为,数据库隔离级别

    Spring的AOP和IOC是什么?使用场景有哪些?Spring事务与数据库事务,传播行为,数据库隔离级别 AOP:面向切面编程. 即在一个功能模块中新增其他功能,比方说你要下楼取个快递,你同事对你说 ...

  8. channelfuture怎么拿到数据_SpringBoot2.x系列教程66--Spring Boot整合分布式事务之数据库事务回顾

    SpringBoot2.x系列教程66--Spring Boot整合分布式事务之数据库事务回顾 作者:一一哥 本节主要内容 一. 事务出现的原因 转账是生活中常见的操作,比如从A账户转账100元到B账 ...

  9. getprivateprofilestring读不到数据_SpringBoot2.x系列教程66--Spring Boot整合分布式事务之数据库事务回顾

    SpringBoot2.x系列教程66--Spring Boot整合分布式事务之数据库事务回顾 作者:一一哥 本节主要内容 一. 事务出现的原因 转账是生活中常见的操作,比如从A账户转账100元到B账 ...

  10. Spring中的事务及数据库事务的关系

    Spring中的事务及数据库事务的关系 一.MySQL中的事务 如果对MySQL中的事务不了解的话,请先看 基于MySQL 8.0 对事务的深度理解 二.Spring中的事务 Spring管理事务的方 ...

最新文章

  1. AtCoder AGC002E Candy Piles (博弈论)
  2. video thumbnails
  3. ERROR 1044 (42000): Access denied for user ''@'localhost' to database
  4. 动态规划C语言实现之最长公共子序列(LCS)
  5. 小米平板android版本,小米平板2发布:分Android和Wind 10两个版本
  6. 计算机软考程序员客观题,历年计算机软考程序员部分选择题真题重点
  7. Jmeter(十七)Logic Controllers 之 Interleave Controller
  8. HashKey TokenGazer | 去中心化身份(DID)研究报告
  9. win7无线网络突然不能用了(或打开无线按钮灰色)
  10. Java将多张图片合并保存到同一页PDF中
  11. java 合并两个有序链表
  12. Rplidar A1利用Qt图形库进行周围环境的轮廓抓取
  13. 三星android5 root包,三星N9200刷国行系统(含五件套root权限recovery SuperSU)
  14. 移动端——less(学会less,这一篇就够了)
  15. Flex富文本编辑器
  16. 同网段能互通,跨网段不通
  17. OAuth 2.0 的四种认证模式
  18. 利用docker+雨巷云打造私有网盘之安装mysql5.6(1)
  19. [COGS 2583]南极科考旅行
  20. 微信公众号没办法直接获取用户的手机号

热门文章

  1. activiti 源码笔记之startProcess
  2. Oracle中“行转列”的实现方式
  3. vue模块单独封装html,在vue中怎么定义自定义组件?
  4. springboot 集成 jwt+oauth+springsecurity 实现单点登录,feign远程调用,eruka注册中心,seata分布式事务配置
  5. 字节码指令之加载与存储指令
  6. redis分布式锁+事务+AOP一起使用注意点
  7. Spring的注入方式中,官方推荐哪种方式
  8. CyclicBarrier源码解读
  9. 将不确定变成确定~Uri文本文件不用浏览器自动打开,而是下载到本地
  10. 用 JavaScript 实现内存位翻转漏洞