go mysql driver事务,Go 数据库事务的源码分析
假设 mobile_applications 表的字段只有两个 app_name, other_info,均为 varchar(256),先上一段简单的业务逻辑代码
package main
import (
"database/sql"
_ "github.com/go-sql-driver/mysql"
)
func main() {
db, _ := sql.Open("mysql",
"root:123456@tcp(127.0.0.1:3306)/xg?charset=utf8&parseTime=true&loc=Local")
// tx1
tx, _ := db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第一个应用", "第一个应用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第二个应用", "第二个应用的信息")
_ = tx.Commit()
// tx2
tx, _ = db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第三个应用", "第三个应用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第四个应用", "第四个应用的信息")
//_ = tx.Commit()
// tx3
tx, _ = db.Begin()
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第五个应用", "第五个应用的信息")
_, _ = tx.Exec("INSERT INTO mobile_applications (app_name, other_info) VALUES (?, ?)",
"第六个应用", "第六个应用的信息")
_ = tx.Rollback()
}
执行结果:
mysql> select * from mobile_applications;
+-----------------+--------------------------+------------------+---------------+
| app_name | other_info | xxx_unrecognized | xxx_sizecache |
+-----------------+--------------------------+------------------+---------------+
| 第一个应用 | 第一个应用的信息 | NULL | NULL |
| 第二个应用 | 第二个应用的信息 | NULL | NULL |
+-----------------+--------------------------+------------------+---------------+
2 rows in set (0.00 sec)
显然,只有 tx1 最终落库,tx2 和 tx3 都没有落库。这个符合我们对事务的理解,即只有 commit 的操作才会最终落库。
这里我们首先要理解,对于计算机而言,什么是事务?数据库系统概念里对“事务”的定义相信很多人耳熟能详,但是对于我们运行的程序而言,事务实际上分为两层,第一层是内存中的上下文,第二层是DBMS控制的我们常说的“事务”。底层事务实际上是通过上下文来定义和操作的,中间隔了层 driver,屏蔽了具体的细节。
以 go 中的 事务为例,是通过一个 struct 来定义的。
type Tx struct {
db *DB
// closemu prevents the transaction from closing while there
// is an active query. It is held for read during queries
// and exclusively during close.
closemu sync.RWMutex
// dc is owned exclusively until Commit or Rollback, at which point
// it's returned with putConn.
dc *driverConn
txi driver.Tx
// releaseConn is called once the Tx is closed to release
// any held driverConn back to the pool.
releaseConn func(error)
// done transitions from 0 to 1 exactly once, on Commit
// or Rollback. once done, all operations fail with
// ErrTxDone.
// Use atomic operations on value when checking value.
done int32
// All Stmts prepared for this transaction. These will be closed after the
// transaction has been committed or rolled back.
stmts struct {
sync.Mutex
v []*Stmt
}
// cancel is called after done transitions from 0 to 1.
cancel func()
// ctx lives for the life of the transaction.
ctx context.Context
}
其中核心是:
driverConn
go 里面的数据库连接,封装了 driver 里的数据库连接。
driver.Tx
定义了 commit 和 rollback 两个方法。
一个事务,实际上就是所有 CRUD 操作都在同一个数据库连接里,调用 Begin 会通过该连接,经有 driver 执行特定 DBMS 的事务开启指令,比如 mysql 的 driver 就是
func (mc *mysqlConn) begin(readOnly bool) (driver.Tx, error) {
if mc.closed.IsSet() {
errLog.Print(ErrInvalidConn)
return nil, driver.ErrBadConn
}
var q string
if readOnly {
q = "START TRANSACTION READ ONLY"
} else {
q = "START TRANSACTION"
}
err := mc.exec(q)
if err == nil {
return &mysqlTx{mc}, err
}
return nil, mc.markBadConn(err)
}
后续操作就都在一个事务里了。上层可以通过 Begin 方法返回的 Commit/Rollback 方法来提交或回滚这个事务。那么,go 是如何实现对数据库事务的支持的呢?我们从入口代码一步一步来看。
db.Beigin
driver.go 里定义了 ErrBadConn: driver 抛出的错误,表示底层连接不可用或已中断,上层应该重新用一个连接
sql.go 里定义了两个常量,
cachedOrNewConn:
如果连接池里有 idle 状态的连接,直接返回;如果连接池里的连接数已经达到 MaxOpenCons 定义的数量,则阻塞等待,直到有一个连接 idel;否则,创建新的连接,加入到连接池,然后返回。
alwaysNewConn:
强制使用新的连接而不是从接池里里复用
// Begin() 方法是带参数版本的一个默认版本
func (db *DB) Begin() (*Tx, error) {
return db.BeginTx(context.Background(), nil)
}
// 可以设置 Contxt 和 事务配置项
func (db *DB) BeginTx(ctx context.Context, opts *TxOptions) (*Tx, error) {
var tx *Tx
var err error
// 如果遇到 ErrBadConn 默认会重试 2 次
for i := 0; i < maxBadConnRetries; i++ {
tx, err = db.begin(ctx, opts, cachedOrNewConn)
if err != driver.ErrBadConn {
break
}
}
if err == driver.ErrBadConn {
return db.begin(ctx, opts, alwaysNewConn)
}
return tx, err
}
func (db *DB) begin(ctx context.Context, opts *TxOptions, strategy connReuseStrategy) (tx *Tx, err error) {
dc, err := db.conn(ctx, strategy)
if err != nil {
return nil, err
}
return db.beginDC(ctx, dc, dc.releaseConn, opts)
}
外部调用最终进入私有方法 begin 里,begin 主要完成两个操作,一是获取一个数据库连接,二是创建事务的上下文,即开启事务。下面我们一一来看。
db.conn
这个方法很长,核心业务逻辑主要分为以下几个部分:
检查
首先检查 db 是否关闭了,再检查是否 context 过期了,若任一为是都会直接返回错误
db.mu.Lock() // 临界区的结束点不同情况不相同
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
// Check if the context is expired.
select {
default:
case
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime // 后面逻辑使用
cachedOrNewConn 且 存在 idle 的连接
// db.freeConn 是一个 slice,存储了 idle 的连接
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
// 移走第一个
copy(db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock() // 释放锁了
// 判断连接是否过期了
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
// lastErr 字段的意义没看懂,好像在返回一个连接之前都要检查这个错误有没有设置
// Lock around reading lastErr to ensure the session resetter finished.
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
没有空闲连接了,或者强制使用新连接。
连接池达到最大了
连接池达到最大了,就必须阻塞,这里用了一个 channel,类似于 Java 的条件队列
// Make the connRequest channel. It's buffered so that the
// connectionOpener doesn't block while waiting for the req to be read.
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
// Timeout the connection request with the context.
select {
// Context 结束了
case
// Remove the connection request and ensure no value has been sent
// on it after removing.
db.mu.Lock()
delete(db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default:
// 已经发出去了,且创建成功了,那么这个连接就需要加入连接池
// 但是不清楚为什么这里没有验证了
case ret, ok :=
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
// 收到返回
case ret, ok :=
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
// channel 是被关闭了的
if !ok {
return nil, errDBClosed
}
// 超时了
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
// Lock around reading lastErr to ensure the session resetter finished.
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
连接池还没有达到最大
直接新建连接
db.numOpen++ // optimistically
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen-- // correct for earlier optimism
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
putConnDBLocked
前面提到,当连接池已满且没有 idel 连接的时候,是通过注册了一个 channel 来异步接收 free 连接的通知的。维护所有 channel 的是
connReqeusts map[uint64] chan connRequest
type connRequest struct {
conn *driverConn
err error
}
使用完一个连接后,需要“归还”连接给 DB。DB 的逻辑是:
如果存在 connRequest,会将 dc 直接交给它
否则,放入到连接池里,标记为 idel,并启动清理线程,关闭那些超时的连接
func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
if db.closed {
return false
}
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete(db.connRequests, reqKey) // Remove from pending requests.
if err == nil {
dc.inUse = true
}
req
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
return false
}
db.beginDC
前面的操作完成了连接的获取(创建/释放),下面就要启动一个事务。
// beginDC starts a transaction. The provided dc must be valid and ready to use.
func (db *DB) beginDC(ctx context.Context, dc *driverConn, release func(error), opts *TxOptions) (tx *Tx, err error) {
var txi driver.Tx
withLock(dc, func() { // 就是个包装方法,加锁,执行函数,放锁
txi, err = ctxDriverBegin(ctx, opts, dc.ci) // 调用 driver 开启事务
})
if err != nil {
release(err)
return nil, err
}
// 后面的主要是暴露上下文,注册资源清理回调
// Schedule the transaction to rollback when the context is cancelled.
// The cancel function in Tx will be called after done is set to true.
ctx, cancel := context.WithCancel(ctx)
tx = &Tx{
db: db,
dc: dc,
releaseConn: release,
txi: txi,
cancel: cancel,
ctx: ctx,
}
go tx.awaitDone()
return tx, nil
}
// awaitDone blocks until the context in Tx is canceled and rolls back
// the transaction if it's not already done.
func (tx *Tx) awaitDone() {
// 如果在事务提交/回滚前,就结束阻塞,说明 Context结束了,那就要执行资源清理
// 关必并从连接池里删除这个连接,来保证事务已经关闭、资源得到释放
// 对于已经提交/回滚的事务,这个方法不会由任何影响
tx.rollback(true)
}
go mysql driver事务,Go 数据库事务的源码分析相关推荐
- 数据库中间件 MyCAT源码分析 —— XA分布式事务
title: MyCAT 源码分析 -- XA分布式事务 date: 2017-07-15 tags: categories: MyCAT permalink: MyCAT/xa-distribute ...
- 数据库中间件 Sharding-JDBC 源码分析 —— SQL 执行
摘要: 原创出处 http://www.iocoder.cn/Sharding-JDBC/sql-execute/ 「芋道源码」欢迎转载,保留摘要,谢谢! 本文主要基于 Sharding-JDBC 1 ...
- 数据库中间件 MyCAT源码分析:【单库单表】插入【推荐阅读】
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- 数据库中间件MyCAT源码分析:调试环境搭建
???关注微信公众号:[芋艿的后端小屋]有福利: RocketMQ / MyCAT / Sharding-JDBC 所有源码分析文章列表 RocketMQ / MyCAT / Sharding-JDB ...
- Spring事务源码分析责任链事务链事务不生效
文章目录 前言 带着问题分析源码 事务源码分析 寻找Spring事务源码类 TransactionInterceptor调用栈 分析Spring AOP责任链 分析TransactionInterce ...
- mysql dba系统学习-数据库事务详解
mysql dba系统学习-数据库事务详解 上个星期去面试数据库管理员的工作,笔试通过之后就是直接的面试,他问了我一个问题,叫我介绍哈数据库的事务的看法和理解,但是不知所错的没有章法的乱答一气,唉唉, ...
- Spring的AOP和IOC是什么?使用场景有哪些?Spring事务与数据库事务,传播行为,数据库隔离级别
Spring的AOP和IOC是什么?使用场景有哪些?Spring事务与数据库事务,传播行为,数据库隔离级别 AOP:面向切面编程. 即在一个功能模块中新增其他功能,比方说你要下楼取个快递,你同事对你说 ...
- channelfuture怎么拿到数据_SpringBoot2.x系列教程66--Spring Boot整合分布式事务之数据库事务回顾
SpringBoot2.x系列教程66--Spring Boot整合分布式事务之数据库事务回顾 作者:一一哥 本节主要内容 一. 事务出现的原因 转账是生活中常见的操作,比如从A账户转账100元到B账 ...
- getprivateprofilestring读不到数据_SpringBoot2.x系列教程66--Spring Boot整合分布式事务之数据库事务回顾
SpringBoot2.x系列教程66--Spring Boot整合分布式事务之数据库事务回顾 作者:一一哥 本节主要内容 一. 事务出现的原因 转账是生活中常见的操作,比如从A账户转账100元到B账 ...
- Spring中的事务及数据库事务的关系
Spring中的事务及数据库事务的关系 一.MySQL中的事务 如果对MySQL中的事务不了解的话,请先看 基于MySQL 8.0 对事务的深度理解 二.Spring中的事务 Spring管理事务的方 ...
最新文章
- AtCoder AGC002E Candy Piles (博弈论)
- video thumbnails
- ERROR 1044 (42000): Access denied for user ''@'localhost' to database
- 动态规划C语言实现之最长公共子序列(LCS)
- 小米平板android版本,小米平板2发布:分Android和Wind 10两个版本
- 计算机软考程序员客观题,历年计算机软考程序员部分选择题真题重点
- Jmeter(十七)Logic Controllers 之 Interleave Controller
- HashKey TokenGazer | 去中心化身份(DID)研究报告
- win7无线网络突然不能用了(或打开无线按钮灰色)
- Java将多张图片合并保存到同一页PDF中
- java 合并两个有序链表
- Rplidar A1利用Qt图形库进行周围环境的轮廓抓取
- 三星android5 root包,三星N9200刷国行系统(含五件套root权限recovery SuperSU)
- 移动端——less(学会less,这一篇就够了)
- Flex富文本编辑器
- 同网段能互通,跨网段不通
- OAuth 2.0 的四种认证模式
- 利用docker+雨巷云打造私有网盘之安装mysql5.6(1)
- [COGS 2583]南极科考旅行
- 微信公众号没办法直接获取用户的手机号
热门文章
- activiti 源码笔记之startProcess
- Oracle中“行转列”的实现方式
- vue模块单独封装html,在vue中怎么定义自定义组件?
- springboot 集成 jwt+oauth+springsecurity 实现单点登录,feign远程调用,eruka注册中心,seata分布式事务配置
- 字节码指令之加载与存储指令
- redis分布式锁+事务+AOP一起使用注意点
- Spring的注入方式中,官方推荐哪种方式
- CyclicBarrier源码解读
- 将不确定变成确定~Uri文本文件不用浏览器自动打开,而是下载到本地
- 用 JavaScript 实现内存位翻转漏洞