修改gh-ost源码实现两表在线高速复制
一、问题起源
笔者所在的公司的需要对核心业务表tb_doc 进行表分区,目前该表的记录数为190,522,155。
由于该表没有分区,新增分区需要创建影子表,然后从原表导入数据,最后修改表名。
二、处理结果
我们在生产环境对tb_doc按照hash算法分区,分区数量1024。操作耗时9小时22分,累计迁移190,522,155条数据。
三、方案选型
关键步骤是导入数据的过程,我们经过一番筛选,初步选定kettle、gh-ost两种技术方案。
3.1、测试环境的服务器配置
Dell R720
Intel(R) Xeon(R) CPU E5-2620 v2 @ 2.10GHz (24核)
内存256G
4块SEAGATE ST3600057SS组成RAID10 磁盘空间1.089 TB
3.2 数据表结构
测试记录数:19,891,000
CREATE TABLE `tb_doc `(
`ID` varchar(40) NOT NULL,
`LASTMODIFIED` datetime DEFAULT NULL,
`FORMNAME` varchar(40) DEFAULT NULL,
`OWNER` varchar(40) DEFAULT NULL,
`PARENT` varchar(40) DEFAULT NULL,
`LASTMODIFIER` varchar(40) DEFAULT NULL,
PRIMARY KEY (`ID`),
KEY `t_document_formname` (`FORMNAME`),
KEY `idx_t_document_parent` (`PARENT`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `tb_doc_partition` (
`ID` varchar(40) NOT NULL,
`LASTMODIFIED` datetime DEFAULT NULL,
`FORMNAME` varchar(40) DEFAULT NULL,
`OWNER` varchar(40) DEFAULT NULL,
`PARENT` varchar(40) DEFAULT NULL,
`LASTMODIFIER` varchar(40) DEFAULT NULL,
PRIMARY KEY (`ID`),
KEY `t_document_formname` (`FORMNAME`),
KEY `idx_t_document_parent` (`PARENT`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
PARTITION BY LINEAR KEY( ID)
PARTITIONS 1024;
3.3Kettle测试
执行方式:分成四个任务,每个任务5个线程
耗费小时:10.57时
Kettle四个任务的CPU占用分别为87.4%、63.9%,47.4%,42.1%,
MySQL 的CPU占用为256.3%
3.4gh-ost测试
块大小10
耗费小时:8.57时
mysql cpu 11.6%
gh-ost cpu 4.6%
四、gh-ost发现的一些不足
- 相比pt-osc缺少–check-interval参数;目前这个值写死是1s;issue地址默认是1s,足够小了,作者认为当前可以;
- –conf选项设置的my.cnf文件中不支持设置prompt=[\h]\u@\d\r:\m:\s>;issue地址,作者已经修复;
- 缺少增加unique index的时候的检查;如果增加unique index的时候会丢失数据;(pt-osc也存在这个问题);作者认为不需要修复,问题的解决难度也比较大,DBA在使用前需要注意;
- 相比pt-osc缺少–check-replication-filters;是否确实对从库是否存在这个表的检查
- 相比pt-osc缺少–recursion-method=processlist;需要通过参数进行设置,issue地址
五、修改gh-ost增加两表复制
main.go
flag.StringVar(&migrationContext.TableDst, "table-dst", "", "MySQL Copy Table Dest - Added By chengqian")
flag.BoolVar(&migrationContext.AllowTableDataCopy, "allow-table-data-copy", false, "Allow Copy Table From Source to Dest - Added By chengqian")
flag.BoolVar(&migrationContext.SkipTrigerCheck, "skip-trigger-check", false, "Skip Trigger Check - Added By chengqian ")
flag.BoolVar(&migrationContext.AllowSharedUniqueKeysLike, "allow-sharedUniqueKey-like", false, "allow sharedUniqueKey like - Added By chengqian")
if migrationContext.AllowTableDataCopy {
if migrationContext.TableDst == "" {
log.Fatalf("--table-dst Copy Table Dest must not be empty")
}
}else{
if migrationContext.AlterStatement == "" {
log.Fatalf("--alter must be provided and statement must not be empty")
}
}
migrator.go (5 matches)
Migrate()函数
// 跳过Alter参数的检验
if this.migrationContext.AllowTableDataCopy{
log.Debugf("Allow Table Data Copy: %s", this.migrationContext.TableDst)
}else{
if err := this.parser.ParseAlterStatement(this.migrationContext.AlterStatement); err != nil {
return err
}
//
if err := this.validateStatement(); err != nil {
return err
}
}
...
//新增函数inspectOriginalAndCopyDstTables
if this.migrationContext.AllowTableDataCopy{
if err := this.inspector.inspectOriginalAndCopyDstTables(); err != nil {
return err
}
}else {
if err := this.inspector.inspectOriginalAndGhostTables(); err != nil {
return err
}
}
applier.go (4 matches)
func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) {
startTime := time.Now()
chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize)
if this.migrationContext.AllowTableDataCopy{
query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.TableDst,
this.migrationContext.SharedColumns.Names(),
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
}
sqlResult, err := func() (gosql.Result, error) {
tx, err := this.db.Begin()
if err != nil {
return nil, err
}
sessionQuery := fmt.Sprintf(`SET
SESSION time_zone = '%s',
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
`, this.migrationContext.ApplierTimeZone)
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
result, err := tx.Exec(query, explodedArgs...)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return result, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}
rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime)
log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil
}else{
query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery(
this.migrationContext.DatabaseName,
this.migrationContext.OriginalTableName,
this.migrationContext.GetGhostTableName(),
this.migrationContext.SharedColumns.Names(),
this.migrationContext.MappedSharedColumns.Names(),
this.migrationContext.UniqueKey.Name,
&this.migrationContext.UniqueKey.Columns,
this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(),
this.migrationContext.MigrationIterationRangeMaxValues.AbstractValues(),
this.migrationContext.GetIteration() == 0,
this.migrationContext.IsTransactionalTable(),
)
if err != nil {
return chunkSize, rowsAffected, duration, err
}
sqlResult, err := func() (gosql.Result, error) {
tx, err := this.db.Begin()
if err != nil {
return nil, err
}
sessionQuery := fmt.Sprintf(`SET
SESSION time_zone = '%s',
sql_mode = CONCAT(@@session.sql_mode, ',STRICT_ALL_TABLES')
`, this.migrationContext.ApplierTimeZone)
if _, err := tx.Exec(sessionQuery); err != nil {
return nil, err
}
result, err := tx.Exec(query, explodedArgs...)
if err != nil {
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err
}
return result, nil
}()
if err != nil {
return chunkSize, rowsAffected, duration, err
}
rowsAffected, _ = sqlResult.RowsAffected()
duration = time.Since(startTime)
log.Debugf(
"Issued INSERT on range: [%s]..[%s]; iteration: %d; chunk-size: %d",
this.migrationContext.MigrationIterationRangeMinValues,
this.migrationContext.MigrationIterationRangeMaxValues,
this.migrationContext.GetIteration(),
chunkSize)
return chunkSize, rowsAffected, duration, nil
}
}
// SwapTablesQuickAndBumpy issues a two-step swap table operation:
// - rename original table to _old
// - rename ghost table to original
// There is a point in time in between where the table does not exist.
func (this *Applier) SwapTablesQuickAndBumpy() error {
query := fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
)
log.Infof("Renaming original table")
this.migrationContext.RenameTablesStartTime = time.Now()
if _, err := sqlutils.ExecNoPrepare(this.singletonDB, query); err != nil {
return err
}
log.Infof("Original Table `%s` renamed `%s`",
this.migrationContext.OriginalTableName,
this.migrationContext.GetOldTableName())
if this.migrationContext.AllowTableDataCopy{
query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetCopyDstTableName()),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Renaming copy dest table `%s` to `%s`",this.migrationContext.GetCopyDstTableName(),this.migrationContext.OriginalTableName)
}else{
query = fmt.Sprintf(`alter /* gh-ost */ table %s.%s rename %s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
log.Infof("Renaming ghost table `%s` to `%s`",this.migrationContext.GetGhostTableName(),this.migrationContext.OriginalTableName)
}
if _, err := sqlutils.ExecNoPrepare(this.db, query); err != nil {
return err
}
this.migrationContext.RenameTablesEndTime = time.Now()
log.Infof("Tables renamed")
return nil
}
// AtomicCutoverRename
func (this *Applier) AtomicCutoverRename(sessionIdChan chan int64, tablesRenamed chan<- error) error {
tx, err := this.db.Begin()
if err != nil {
return err
}
defer func() {
tx.Rollback()
sessionIdChan <- -1
tablesRenamed <- fmt.Errorf("Unexpected error in AtomicCutoverRename(), injected to release blocking channel reads")
}()
var sessionId int64
if err := tx.QueryRow(`select connection_id()`).Scan(&sessionId); err != nil {
return err
}
sessionIdChan <- sessionId
log.Infof("Setting RENAME timeout as %d seconds", this.migrationContext.CutOverLockTimeoutSeconds)
query := fmt.Sprintf(`set session lock_wait_timeout:=%d`, this.migrationContext.CutOverLockTimeoutSeconds)
if _, err := tx.Exec(query); err != nil {
return err
}
if this.migrationContext.AllowTableDataCopy{
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetCopyDstTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
}else{
query = fmt.Sprintf(`rename /* gh-ost */ table %s.%s to %s.%s, %s.%s to %s.%s`,
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetOldTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.GetGhostTableName()),
sql.EscapeName(this.migrationContext.DatabaseName),
sql.EscapeName(this.migrationContext.OriginalTableName),
)
}
log.Infof("Issuing and expecting this to block: %s", query)
if _, err := tx.Exec(query); err != nil {
tablesRenamed <- err
return log.Errore(err)
}
tablesRenamed <- nil
log.Infof("Tables renamed")
return nil
}
表分区
// getSharedUniqueKeys returns the intersection of two given unique keys,
// testing by list of columns
func (this *Inspector) getSharedUniqueKeys(originalUniqueKeys, ghostUniqueKeys [](*sql.UniqueKey)) (uniqueKeys [](*sql.UniqueKey), err error) {
// We actually do NOT rely on key name, just on the set of columns. This is because maybe
// the ALTER is on the name itself...
for _, originalUniqueKey := range originalUniqueKeys {
for _, ghostUniqueKey := range ghostUniqueKeys {
log.Debugf("getSharedUniqueKeys UniqueKey columns compare: original -> %+v,copy dest -> %+v",
originalUniqueKey.Columns.Names(), ghostUniqueKey.Columns.Names())
if this.migrationContext.AllowSharedUniqueKeysLike{
if originalUniqueKey.Columns.EqualsByNamesNew(&ghostUniqueKey.Columns) {
uniqueKeys = append(uniqueKeys, originalUniqueKey)
}
}else{
if originalUniqueKey.Columns.EqualsByNames(&ghostUniqueKey.Columns) {
uniqueKeys = append(uniqueKeys, originalUniqueKey)
}
}
}
}
return uniqueKeys, nil
}
func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (query string, args []interface{}, rowsDelta int64, err error) {
if this.migrationContext.AllowTableDataCopy {
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.TableDst, this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return query, uniqueKeyArgs, -1, err
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.TableDst, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, 1, err
}
case binlog.UpdateDML:
{
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.TableDst, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return query, args, 0, err
}
}
}else{
switch dmlEvent.DML {
case binlog.DeleteDML:
{
query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues())
return query, uniqueKeyArgs, -1, err
}
case binlog.InsertDML:
{
query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues())
return query, sharedArgs, 1, err
}
case binlog.UpdateDML:
{
query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues())
args = append(args, sharedArgs...)
args = append(args, uniqueKeyArgs...)
return query, args, 0, err
}
}
}
return "", args, 0, fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML)
}
//新增索引比较
func (this *ColumnList) EqualsByNamesNew(other *ColumnList) bool {
if len(this.Names())==len(other.Names()){
return reflect.DeepEqual(this.Names(), other.Names())
}
fmt.Printf("Cur 0 -> `%s`,Other 0 -> `%s`\n",this.Names()[0],other.Names()[0])
return this.Names()[0] == other.Names()[0]
}