修改gh-ost源码实现两表在线高速复制
一、问题起源
笔者所在的公司的需要对核心业务表tb_doc 进行表分区,目前该表的记录数为190,522,155。
由于该表没有分区,新增分区需要创建影子表,然后从原表导入数据,最后修改表名。
二、处理结果
我们在生产环境对tb_doc按照hash算法分区,分区数量1024。操作耗时9小时22分,累计迁移190,522,155条数据。
三、方案选型
关键步骤是导入数据的过程,我们经过一番筛选,初步选定kettle、gh-ost两种技术方案。
3.1、测试环境的服务器配置
Dell R720
Intel(R) Xeon(R) CPU E5-2620 v2 @ 2.10GHz (24核)
内存256G
4块SEAGATE ST3600057SS组成RAID10 磁盘空间1.089 TB
3.2 数据表结构
测试记录数:19,891,000
CREATE TABLE `tb_doc `(
`ID` varchar(40) NOT NULL,
`LASTMODIFIED` datetime DEFAULT NULL,
`FORMNAME` varchar(40) DEFAULT NULL,
`OWNER` varchar(40) DEFAULT NULL,
  `PARENT` varchar(40) DEFAULT NULL,
  `LASTMODIFIER` varchar(40) DEFAULT NULL,
  PRIMARY KEY (`ID`),
KEY `t_document_formname` (`FORMNAME`),
KEY `idx_t_document_parent` (`PARENT`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `tb_doc_partition` (
`ID` varchar(40) NOT NULL,
`LASTMODIFIED` datetime DEFAULT NULL,
`FORMNAME` varchar(40) DEFAULT NULL,
`OWNER` varchar(40) DEFAULT NULL,
  `PARENT` varchar(40) DEFAULT NULL,
  `LASTMODIFIER` varchar(40) DEFAULT NULL,
  PRIMARY KEY (`ID`),
KEY `t_document_formname` (`FORMNAME`),
KEY `idx_t_document_parent` (`PARENT`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
PARTITION BY LINEAR KEY( ID)
PARTITIONS 1024;
3.3Kettle测试
执行方式:分成四个任务,每个任务5个线程
耗费小时:10.57时
Kettle四个任务的CPU占用分别为87.4%、63.9%,47.4%,42.1%,
MySQL 的CPU占用为256.3%
3.4gh-ost测试
块大小10
耗费小时:8.57时
mysql cpu 11.6%
gh-ost cpu 4.6% 
四、gh-ost发现的一些不足
  1. 相比pt-osc缺少–check-interval参数;目前这个值写死是1s;issue地址默认是1s,足够小了,作者认为当前可以;
  2. –conf选项设置的my.cnf文件中不支持设置prompt=[\h]\u@\d\r:\m:\s>;issue地址,作者已经修复;
  3. 缺少增加unique index的时候的检查;如果增加unique index的时候会丢失数据;(pt-osc也存在这个问题);作者认为不需要修复,问题的解决难度也比较大,DBA在使用前需要注意;
  4. 相比pt-osc缺少–check-replication-filters;是否确实对从库是否存在这个表的检查
  5. 相比pt-osc缺少–recursion-method=processlist;需要通过参数进行设置,issue地址
五、修改gh-ost增加两表复制
main.go
       flag.StringVar(&migrationContext.TableDst, "table-dst", "", "MySQL Copy Table Dest - Added By chengqian")
       flag.BoolVar(&migrationContext.AllowTableDataCopy, "allow-table-data-copy", false, "Allow Copy Table From Source to Dest - Added By chengqian")
       flag.BoolVar(&migrationContext.SkipTrigerCheck, "skip-trigger-check", false, "Skip Trigger Check - Added By chengqian ")
       flag.BoolVar(&migrationContext.AllowSharedUniqueKeysLike, "allow-sharedUniqueKey-like", false, "allow sharedUniqueKey like - Added By chengqian")
       if migrationContext.AllowTableDataCopy {
              if migrationContext.TableDst == "" {
                     log.Fatalf("--table-dst Copy Table Dest must not be empty")
              }
       }else{
              if migrationContext.AlterStatement == "" {
                     log.Fatalf("--alter must be provided and statement must not be empty")
              }
       }
migrator.go (5 matches)
 Migrate()函数
       // 跳过Alter参数的检验
       if this.migrationContext.AllowTableDataCopy{
              log.Debugf("Allow Table Data Copy: %s", this.migrationContext.TableDst)
       }else{
              if err := this.parser.ParseAlterStatement(this.migrationContext.AlterStatement); err != nil {
                     return err
              }
              
              //
              if err := this.validateStatement(); err != nil {
                     return err
              }
       }
       ...
       //新增函数inspectOriginalAndCopyDstTables
       if this.migrationContext.AllowTableDataCopy{
              if err := this.inspector.inspectOriginalAndCopyDstTables(); err != nil {
                     return err
              }
       }else {
              if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
                     return err
              }
       }
applier.go (4 matches)
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
       startTime := time.Now()
       chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
       
       if this.migrationContext.AllowTableDataCopy{
              query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
                     this.migrationContext.DatabaseName,
                     this.migrationContext.OriginalTableName,
                     this.migrationContext.TableDst,
                     this.migrationContext.SharedColumns.Names(),
                     this.migrationContext.MappedSharedColumns.Names(),
                     this.migrationContext.UniqueKey.Name,
                     &this.migrationContext.UniqueKey.Columns,
                     this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
                     this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
                     this.migrationContext.GetIteration() == 0,
                     this.migrationContext.IsTransactionalTable(),
              )
              if err != nil {
                     return chunkSize, rowsAffected, duration, err
              }             
                     sqlResult, err := func() (gosql.Result, error) {
                     tx, err := this.db.Begin()
                     if err != nil {
                           return nil, err
                     }
                     sessionQuery := fmt.Sprintf(`SET
                           SESSION time_zone = '%s',
                           sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
                           `, this.migrationContext.ApplierTimeZone)
                     if _, err := tx.Exec(sessionQuery); err != nil {
                           return nil, err
                     }
                     result, err := tx.Exec(query, explodedArgs...)
                     if err != nil {
                           return nil, err
                     }
                     if err := tx.Commit(); err != nil {
                           return nil, err
                     }
                     return result, nil
              }()
       
              if err != nil {
                     return chunkSize, rowsAffected, duration, err
              }
              rowsAffected, _ = sqlResult.RowsAffected()
              duration = time.Since(startTime)
              log.Debugf(
                     "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
                     this.migrationContext.MigrationIterationRangeMinValues,
                     this.migrationContext.MigrationIterationRangeMaxValues,
                     this.migrationContext.GetIteration(),
                     chunkSize)
              return chunkSize, rowsAffected, duration, nil
       }else{
              query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
                     this.migrationContext.DatabaseName,
                     this.migrationContext.OriginalTableName,
                     this.migrationContext.GetGhostTableName(),
                     this.migrationContext.SharedColumns.Names(),
                     this.migrationContext.MappedSharedColumns.Names(),
                     this.migrationContext.UniqueKey.Name,
                     &this.migrationContext.UniqueKey.Columns,
                     this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
                     this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
                     this.migrationContext.GetIteration() == 0,
                     this.migrationContext.IsTransactionalTable(),
              )
              if err != nil {
                     return chunkSize, rowsAffected, duration, err
              }
                     sqlResult, err := func() (gosql.Result, error) {
                     tx, err := this.db.Begin()
                     if err != nil {
                           return nil, err
                     }
                     sessionQuery := fmt.Sprintf(`SET
                           SESSION time_zone = '%s',
                           sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
                           `, this.migrationContext.ApplierTimeZone)
                     if _, err := tx.Exec(sessionQuery); err != nil {
                           return nil, err
                     }
                     result, err := tx.Exec(query, explodedArgs...)
                     if err != nil {
                           return nil, err
                     }
                     if err := tx.Commit(); err != nil {
                           return nil, err
                     }
                     return result, nil
              }()
       
              if err != nil {
                     return chunkSize, rowsAffected, duration, err
              }
              rowsAffected, _ = sqlResult.RowsAffected()
              duration = time.Since(startTime)
              log.Debugf(
                     "Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
                     this.migrationContext.MigrationIterationRangeMinValues,
                     this.migrationContext.MigrationIterationRangeMaxValues,
                     this.migrationContext.GetIteration(),
                     chunkSize)
              return chunkSize, rowsAffected, duration, nil
       }
}
// SwapTablesQuickAndBumpy issues a two-step swap table operation:
// - rename original table to _old
// - rename ghost table to original
// There is a point in time in between where the table does not exist.
func (this *Applier) SwapTablesQuickAndBumpy() error {
       query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
              sql.EscapeName(this.migrationContext.DatabaseName),
              sql.EscapeName(this.migrationContext.OriginalTableName),
              sql.EscapeName(this.migrationContext.GetOldTableName()),
       )
       log.Infof("Renaming original table")
       this.migrationContext.RenameTablesStartTime = time.Now()
       if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
              return err
       }
       log.Infof("Original Table `%s` renamed `%s`",
              this.migrationContext.OriginalTableName,
              this.migrationContext.GetOldTableName())
       
       if this.migrationContext.AllowTableDataCopy{
              query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.GetCopyDstTableName()),
                     sql.EscapeName(this.migrationContext.OriginalTableName),
              )
              log.Infof("Renaming copy dest table `%s` to `%s`",this.migrationContext.GetCopyDstTableName(),this.migrationContext.OriginalTableName)
       }else{
              query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.GetGhostTableName()),
                     sql.EscapeName(this.migrationContext.OriginalTableName),
              )
              log.Infof("Renaming ghost table `%s` to `%s`",this.migrationContext.GetGhostTableName(),this.migrationContext.OriginalTableName)
       }
       
       if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
              return err
       }
       this.migrationContext.RenameTablesEndTime = time.Now()
       log.Infof("Tables renamed")
       return nil
}
// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
       tx, err := this.db.Begin()
       if err != nil {
              return err
       }
       defer func() {
              tx.Rollback()
              sessionIdChan <- -1
              tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
       }()
       var sessionId int64
       if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
              return err
       }
       sessionIdChan <- sessionId
       log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
       query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
       if _, err := tx.Exec(query); err != nil {
              return err
       }
       
       if this.migrationContext.AllowTableDataCopy{
              query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.OriginalTableName),
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.GetOldTableName()),
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.GetCopyDstTableName()),
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.OriginalTableName),
              )
       }else{
              query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.OriginalTableName),
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.GetOldTableName()),
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.GetGhostTableName()),
                     sql.EscapeName(this.migrationContext.DatabaseName),
                     sql.EscapeName(this.migrationContext.OriginalTableName),
              )
       }
       log.Infof("Issuing and expecting this to block: %s", query)
       if _, err := tx.Exec(query); err != nil {
              tablesRenamed <- err
              return log.Errore(err)
       }
       tablesRenamed <- nil
       log.Infof("Tables renamed")
       return nil
}
表分区
// getSharedUniqueKeys returns the intersection of two given unique keys,
// testing by list of columns
func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (uniqueKeys [](*sql.UniqueKey), err error) {
       // We actually do NOT rely on key name, just on the set of columns. This is because maybe
       // the ALTER is on the name itself...
       for _, originalUniqueKey := range originalUniqueKeys {
              for _, ghostUniqueKey := range ghostUniqueKeys {
                     log.Debugf("getSharedUniqueKeys UniqueKey columns compare: original -> %+v,copy dest -> %+v",
                           originalUniqueKey.Columns.Names(), ghostUniqueKey.Columns.Names())
                     if this.migrationContext.AllowSharedUniqueKeysLike{
                           if originalUniqueKey.Columns.EqualsByNamesNew(&ghostUniqueKey.Columns) {
                                  uniqueKeys = append(uniqueKeys, originalUniqueKey)
                           }
                     }else{
                           if originalUniqueKey.Columns.EqualsByNames(&ghostUniqueKey.Columns) {
                                  uniqueKeys = append(uniqueKeys, originalUniqueKey)
                           }
                     }
              }
       }
       return uniqueKeys, nil
}
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) {
       if this.migrationContext.AllowTableDataCopy {
              switch dmlEvent.DML {
              case binlog.DeleteDML:
                     {
                           query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.TableDst, this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
                           return query, uniqueKeyArgs, -1, err
                     }
              case binlog.InsertDML:
                     {
                           query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.TableDst, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
                           return query, sharedArgs, 1, err
                     }
              case binlog.UpdateDML:
                     {
                           query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.TableDst, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
                           args = append(args, sharedArgs...)
                           args = append(args, uniqueKeyArgs...)
                           return query, args, 0, err
                     }
              }      
       }else{
              switch dmlEvent.DML {
              case binlog.DeleteDML:
                     {
                           query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
                           return query, uniqueKeyArgs, -1, err
                     }
              case binlog.InsertDML:
                     {
                           query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
                           return query, sharedArgs, 1, err
                     }
              case binlog.UpdateDML:
                     {
                           query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
                           args = append(args, sharedArgs...)
                           args = append(args, uniqueKeyArgs...)
                           return query, args, 0, err
                     }
              }
       }
       return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
}
//新增索引比较
func (this *ColumnList) EqualsByNamesNew(other *ColumnList) bool {
       
       if len(this.Names())==len(other.Names()){
              return reflect.DeepEqual(this.Names(), other.Names())
       }
       fmt.Printf("Cur 0 -> `%s`,Other 0 -> `%s`\n",this.Names()[0],other.Names()[0])
       return this.Names()[0] == other.Names()[0]
}

修改gh-ost源码实现两表在线高速复制相关推荐

  1. 修改Android10系统源码关闭selinux

    一.seandroid简介 SEAndroid是Google在Android4.4上正式推出的一套以SELinux为核心的系统安全机制.在Android源码中,系统默认的seandroid配置存放如下 ...

  2. Mybatis源码分析--关联表查询及延迟加载原理(二)

    在上一篇博客Mybatis源码分析--关联表查询及延迟加载(一)中我们简单介绍了Mybatis的延迟加载的编程,接下来我们通过分析源码来分析一下Mybatis延迟加载的实现原理. 其实简单来说Myba ...

  3. html代码在线解析,VIP在线解析HTML源码(修改论坛的源码、加搜索功能)

    本帖最后由 闷骚小贱男 于 2017-3-28 00:08 编辑 今天有朋友问我要TX的VIP看视频来着,就给了他一个在线的网站,心想着自己也弄一个解析的吧.在论坛搜到一个源码 传送门:看到有需要直接 ...

  4. VC++设置文件最后修改时间(附源码)

      VC++开发常用功能一系列文章 (欢迎订阅,持续更新...) 第21章:VC++设置文件最后修改时间(附源码) 源代码demo已上传到百度网盘:永久生效  ,代码实现了设置文件最后修改时间 上一篇 ...

  5. UV云任务小米运动步数修改PHP网站源码

    UV云任务小米运动步数修改PHP网站源码 1.下载小米运动App,打开软件并输入手机号登录(不要使用第三方账号登录) 2.点击我的->第三方接入,绑定你想同步数据的项目. 3.打开小工具,输入账 ...

  6. [Unity3D]修改PaintIn3D插件源码以便用于VR

    修改PaintIn3D插件源码 1. 导入PaintIn3D插件和SteamVR 2. 修改PaintIn3D插件源码 2.1 修改P3dHitScreen.cs 2.2 修改P3dInputMana ...

  7. 狂雨1.2.2小说CMS系统源码 附两套优化模板 一套采集规则

    介绍: 狂雨1.2.2小说CMS系统源码带两套优化模板和一套采集规则,带安装教程 网盘下载地址: http://kekewl.cc/Zy4inCgIEnL 图片:

  8. Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理

    Redis如何实现分布式锁延时队列以及限流应用 视频讲解如下,点击观看: Redis如何实现分布式锁延时队列以及限流应用丨Redis源码原理|跳表|B+树|分布式锁|中间件|主从同步|存储原理|数据模 ...

  9. 易语言修改html内容,易语言修改网页标题源码

    易语言修改网页标题系统结构:取IES对象,GetCursorPos,WindowFromPoint,SendMessageTimeout,ObjectFromLresult,RegisterWindo ...

最新文章

  1. 成为顶级CIO ,应该怎么做?
  2. 七大科技巨头的最新人工智能布局
  3. 后台服务程序开发模式(一)
  4. TCP/IP学习笔记-Qt中的ReuseAddressHint以及SO_REUSEADDR,以为组播常用场景分析
  5. windows下的工具链 树莓派_Windows下交叉编译Qt 5.14.2至树莓派平台 QEMU模拟树莓派...
  6. android与服务器交互总结(json,post,xUtils,Volley)
  7. MYSQL下载及安装完整教程
  8. Android WebView 详细介绍
  9. 深入dwr2-commet模式
  10. wordpress 上传图片时提示“无法建立目录wp-content/uploads/2019/03。有没有上级目录的写权限?
  11. 【JavaWeb】石家庄地铁搭乘系统——第二版
  12. 《GPU编程与CG语言之阳春白雪下里巴人》 读书笔记2
  13. linux 指令熟悉
  14. 思维导图 基础篇(06)思维方法-曼陀罗思考法
  15. 计算机专业学习规划,计算机专业学习计划.doc
  16. 财务共享建设后,为什么比别人效率低,因为忽略了这个关键点
  17. 联想小新 Pro 14、联想小新 Pro 16 2023 酷睿版 评测 怎么样值得买吗
  18. mysql+纵表和横表_mysql 横表和纵表转换
  19. 2022-2-14至2022-2-19周报
  20. Web 前端开发技术 —— JavaScript

热门文章

  1. ubuntu-16.04.6安装教程
  2. 通讯录的实现(C语言)
  3. csrf漏洞防御方案_CSRF原理实战及防御手段
  4. 计算机考研380分能上什么学校,考研总分500考380难吗 能上什么学校
  5. 【新知实验室】——腾讯云音视频TRTC体验
  6. 《敏捷革命》读书笔记
  7. Apache web服务器(LAMP架构)
  8. 5G智慧合杆的城市商业区应用
  9. 使用安全杀毒软件禁止单个程序联网的方法,超详细
  10. 7000词汇这么背我比较可以接受,连续看20天足以