一、主从概述

Redis 支持 Master-Slave(主从)模式,Redis Server 可以设置为另一个 Redis Server 的主机(从机),从机定期从主机拿数据。特殊的,一个从机同样可以设置为一个 Redis Server 的主机,这样一来 Master-Slave 的分布看起来就是一个有向无环图 DAG,如此形成 Redis Server 集群,无论是主机还是从机都是 Redis Server,都可以提供服务。

在配置后,主机可负责读写服务,从机只负责读。Redis 提高这种配置方式,为的是让其支持数据的弱一致性,即最终一致性。在业务中,选择强一致性还是弱一致性,应该取决于具体的业务需求,像微博,完全可以使用弱一致性模型;像淘宝,可以选用强一致性模型。

Redis 主从复制的实现主要在 replication.c 中。

这篇文章涉及较多的代码,但我已经尽量删繁就简,达到能说明问题本质。为了保留代码的原生性并让读者能够阅读原生代码的注释,剖析 Redis 的几篇文章都没有删除代码中的英文注释,并已加注释。


二、积压空间

在《深入剖析 Redis AOF 持久化策略》中,介绍了更新缓存的概念,举一个例子:客户端发来命令:set name Jhon,这一数据更新被记录为:*3\r\n$3\r\nSET\r\n$4\r\nname\r\n$3\r\nJhon\r\n,并存储在更新缓存中。

同样,在主从连接中,也有更新缓存的概念。只是两者的用途不一样,前者被写入本地,后者被写入从机,这里我们把它称为积压空间。

更新缓存存储在 server.repl_backlog,Redis 将其作为一个环形空间来处理,这样做节省了空间,避免内存再分配的情况。

struct redisServer {/* Replication (master) */// 最近一次使用(访问)的数据集int slaveseldb;                 /* Last SELECTed DB in replication output */// 全局的数据同步偏移量long long master_repl_offset;   /* Global replication offset */// 主从连接心跳频率int repl_ping_slave_period;     /* Master pings the slave every N seconds */// 积压空间指针char *repl_backlog;             /* Replication backlog for partial syncs */// 积压空间大小long long repl_backlog_size;    /* Backlog circular buffer size */// 积压空间中写入的新数据的大小long long repl_backlog_histlen; /* Backlog actual data length */// 下一次向积压空间写入数据的起始位置long long repl_backlog_idx;     /* Backlog circular buffer current offset */// 积压数据的起始位置,是一个宏观值long long repl_backlog_off;     /* Replication offset of first bytein the backlog buffer. */// 积压空间有效时间time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */
}

积压空间中的数据变更记录是什么时候被写入的?在执行一个 Redis 命令的时候,如果存在数据的修改(写),那么就会把变更记录传播。Redis 源码中是这么实现的:call()->propagate()->replicationFeedSlaves()

注释:命令真正执行的地方在 call() 中,call() 如果发现数据被修改(dirty),则传播 propagrate(),replicationFeedSlaves() 将修改记录写入积压空间和所有已连接的从机。

这里可能会有疑问:为什么把数据添加入积压空间,又把数据分发给所有的从机?为什么不仅仅将数据分发给所有从机呢?

因为有一些从机会因特殊情况(???)与主机断开连接,注意从机断开前有暂存主机的状态信息,因此这些断开的从机就没有及时收到更新的数据。Redis 为了让断开的从机在下次连接后能够获取更新数据,将更新数据加入了积压空间。从 replicationFeedSlaves() 实现来看,在线的 Slave 能马上收到数据更新记录;因某些原因暂时断开连接的 Slave,需要从积压空间中找回断开期间的数据更新记录。如果断开的时间足够长,Master 会拒绝 Slave 的部分同步请求,从而 Slave 只能进行全同步。

下面是源码注释:

// call() 函数是执行命令的核心函数,真正执行命令的地方
/* Call() is the core of Redis execution of a command */
void call(redisClient *c, int flags) {....../* Call the command. */c->flags &= ~(REDIS_FORCE_AOF | REDIS_FORCE_REPL);redisOpArrayInit(&server.also_propagate);// 脏数据标记,数据是否被修改dirty = server.dirty;// 执行命令对应的函数c->cmd->proc(c);dirty = server.dirty - dirty;duration = ustime() - start;......// 将客户端请求的数据修改记录传播给 AOF 和从机/* Propagate the command into the AOF and replication link */if (flags & REDIS_CALL_PROPAGATE) {int flags = REDIS_PROPAGATE_NONE;// 强制主从复制if (c->flags & REDIS_FORCE_REPL) flags |= REDIS_PROPAGATE_REPL;// 强制 AOF 持久化if (c->flags & REDIS_FORCE_AOF) flags |= REDIS_PROPAGATE_AOF;// 数据被修改if (dirty)flags |= (REDIS_PROPAGATE_REPL | REDIS_PROPAGATE_AOF);// 传播数据修改记录if (flags != REDIS_PROPAGATE_NONE)propagate(c->cmd, c->db->id, c->argv, c->argc, flags);}......
}// 向 AOF 和从机发布数据更新
/* Propagate the specified command (in the context of the specified database id)* to AOF and Slaves.** flags are an xor between:* + REDIS_PROPAGATE_NONE (no propagation of command at all)* + REDIS_PROPAGATE_AOF (propagate into the AOF file if is enabled)* + REDIS_PROPAGATE_REPL (propagate into the replication link)*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,int flags)
{// AOF 策略需要打开,且设置 AOF 传播标记,将更新发布给本地文件if (server.aof_state != REDIS_AOF_OFF && flags & REDIS_PROPAGATE_AOF)feedAppendOnlyFile(cmd, dbid, argv, argc);// 设置了从机传播标记,将更新发布给从机if (flags & REDIS_PROPAGATE_REPL)replicationFeedSlaves(server.slaves,dbid,argv,argc);
}// 向积压空间和从机发送数据
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {listNode *ln;listIter li;int j, len;char llstr[REDIS_LONGSTR_SIZE];// 没有积压数据且没有从机,直接退出/* If there aren't slaves, and there is no backlog buffer to populate,* we can return ASAP. */if (server.repl_backlog == NULL && listLength(slaves) == 0) return;/* We can't have slaves attached and no backlog. */redisAssert(!(listLength(slaves) != 0 && server.repl_backlog == NULL));/* Send SELECT command to every slave if needed. */if (server.slaveseldb != dictid) {robj *selectcmd;// 小于等于 10 的可以用共享对象/* For a few DBs we have pre-computed SELECT command. */if (dictid >= 0 && dictid < REDIS_SHARED_SELECT_CMDS) {selectcmd = shared.select[dictid];} else {// 不能使用共享对象,生成 SELECT 命令对应的 redis 对象int dictid_len;dictid_len = ll2string(llstr, sizeof(llstr), dictid);selectcmd = createObject(REDIS_STRING,sdscatprintf(sdsempty(),"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",dictid_len, llstr));}// 这里可能会有疑问:为什么把数据添加入积压空间,又把数据分发给所有的从机?// 为什么不仅仅将数据分发给所有从机呢?// 因为有一些从机会因特殊情况(???)与主机断开连接,注意从机断开前有暂存// 主机的状态信息,因此这些断开的从机就没有及时收到更新的数据。redis 为了让// 断开的从机在下次连接后能够获取更新数据,将更新数据加入了积压空间。// 将 SELECT 命令对应的 redis 对象数据添加到积压空间/* Add the SELECT command into the backlog. */if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd);// 将数据分发所有的从机/* Send it to slaves. */listRewind(slaves, &li);while((ln = listNext(&li))) {redisClient *slave = ln->value;addReply(slave, selectcmd);}// 销毁对象if (dictid < 0 || dictid >= REDIS_SHARED_SELECT_CMDS)decrRefCount(selectcmd);}// 更新最近一次使用(访问)的数据集server.slaveseldb = dictid;// 将命令写入积压空间/* Write the command to the replication backlog if any. */if (server.repl_backlog) {char aux[REDIS_LONGSTR_SIZE+3];// 命令个数/* Add the multi bulk reply length. */aux[0] = '*';len = ll2string(aux + 1, sizeof(aux) - 1, argc);aux[len+1] = '\r';aux[len+2] = '\n';feedReplicationBacklog(aux, len + 3);// 逐个命令写入for (j = 0; j < argc; j++) {long objlen = stringObjectLen(argv[j]);/* We need to feed the buffer with the object as a bulk reply* not just as a plain string, so create the $..CRLF payload len* ad add the final CRLF */aux[0] = '$';len = ll2string(aux + 1, sizeof(aux) - 1, objlen);aux[len+1] = '\r';aux[len+2] = '\n';/* 每个命令格式如下:$3*3SET*4NAME*4Jhon*/// 命令长度feedReplicationBacklog(aux, len + 3);// 命令feedReplicationBacklogWithObject(argv[j]);// 换行feedReplicationBacklog(aux + len + 1, 2);}}// 立即给每一个从机发送命令/* Write the command to every slave. */listRewind(slaves, &li);while((ln = listNext(&li))) {redisClient *slave = ln->value;// 如果从机要求全同步,则不对此从机发送数据/* Don't feed slaves that are still waiting for BGSAVE to start */if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) continue;/* Feed slaves that are waiting for the initial SYNC (so these commands* are queued in the output buffer until the initial SYNC completes),* or are already in sync with the master. */// 向从机命令的长度/* Add the multi bulk length. */addReplyMultiBulkLen(slave, argc);// 向从机发送命令/* Finally any additional argument that was not stored inside the* static buffer if any (from j to argc). */for (j = 0; j < argc; j++)addReplyBulk(slave, argv[j]);}
}

三、主从数据同步机制概述

Redis 主从同步有两种方式(或者所两个阶段):全同步和部分同步。

主从刚刚连接的时候,进行全同步;全同步结束后,进行部分同步。当然,如果有需要,Slave 在任何时候都可以发起全同步。Redis 策略是,无论如何,首先会尝试进行部分同步,如不成功,要求从机进行全同步,并启动 BGSAVE……BGSAVE 结束后,传输 RDB 文件;如果成功,允许从机进行部分同步,并传输积压空间中的数据。

下面这幅图,总结了主从同步的机制:

如需设置 Slave,Master 需要向 Slave 发送 SLAVEOF hostname port,从机接收到后会自动连接主机,注册相应读写事件(syncWithMaster())。

// 修改主机
void slaveofCommand(redisClient *c) {if (!strcasecmp(c->argv[1]->ptr, "no") &&!strcasecmp(c->argv[2]->ptr, "one")) {// slaveof no one 断开主机连接if (server.masterhost) {replicationUnsetMaster();redisLog(REDIS_NOTICE, "MASTER MODE enabled (user request)");}} else {long port;if ((getLongFromObjectOrReply(c, c->argv[2], &port, NULL) != REDIS_OK))return;// 可能已经连接需要连接的主机/* Check if we are already attached to the specified slave */if (server.masterhost && !strcasecmp(server.masterhost,c->argv[1]->ptr)&& server.masterport == port) {redisLog(REDIS_NOTICE,"SLAVE OF would result into synchronization with the master we arealready connected with. No operation performed.");addReplySds(c, sdsnew("+OK Already connected to specified master\r\n"));return;}// 断开之前连接主机的连接,连接新的。 replicationSetMaster() 并不会真正连接主机,// 只是修改 struct server 中关于主机的设置。真正的主机连接在 replicationCron() 中完成/* There was no previous master or the user specified a different one,* we can continue. */replicationSetMaster(c->argv[1]->ptr, port);redisLog(REDIS_NOTICE, "SLAVE OF %s:%d enabled (user request)",server.masterhost, server.masterport);}addReply(c,shared.ok);
}// 设置新主机
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {sdsfree(server.masterhost);server.masterhost = sdsdup(ip);server.masterport = port;// 断开之前主机的连接if (server.master) freeClient(server.master);disconnectSlaves(); /* Force our slaves to resync with us as well. */// 取消缓存主机replicationDiscardCachedMaster(); /* Don't try a PSYNC. */// 释放积压空间freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */// cancelReplicationHandshake() 尝试断开数据传输和主机连接cancelReplicationHandshake();server.repl_state = REDIS_REPL_CONNECT;server.master_repl_offset = 0;
}// 管理主从连接的定时程序定时程序,每秒执行一次
// 在 serverCorn() 中调用
/* --------------------------- REPLICATION CRON  ----------------------------- *//* Replication cron funciton, called 1 time per second. */
void replicationCron(void) {......// 如果需要( REDIS_REPL_CONNECT),尝试连接主机,真正连接主机的操作在这里/* Check if we should connect to a MASTER */if (server.repl_state == REDIS_REPL_CONNECT) {redisLog(REDIS_NOTICE, "Connecting to MASTER %s:%d",server.masterhost, server.masterport);if (connectWithMaster() == REDIS_OK) {redisLog(REDIS_NOTICE, "MASTER <-> SLAVE sync started");}}......
}

四、全同步

接着自动发起 PSYNC 请求 Master 进行全同步。无论如何,Redis 首先会尝试部分同步,如果失败才尝试全同步。而刚刚建立连接的 Master-Slave 需要全同步。

从机连接主机后,会主动发起 PSYNC 命令,从机会提供 master_runid 和 offset,主机验证 master_runid 和 offset 是否有效?master_runid 相当于主机身份验证码,用来验证从机上一次连接的主机,offset 是全局积压空间数据的偏移量。

验证未通过则,则进行全同步:主机返回 +FULLRESYNC master_runid offset(从机接收并记录 master_runid 和 offset,并准备接收 RDB 文件)接着启动 BGSAVE 生成 RDB 文件,BGSAVE 结束后,向从机传输,从而完成全同步。

// 连接主机 connectWithMaster() 的时候,会被注册为回调函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {char tmpfile[256], *err;int dfd, maxtries = 5;int sockerr = 0, psync_result;socklen_t errlen = sizeof(sockerr);......// 这里尝试向主机请求部分同步,主机会回复以拒绝或接受请求。如果拒绝部分同步,// 会返回 +FULLRESYNC master_runid offset// 从机接收后准备进行全同步    psync_result = slaveTryPartialResynchronization(fd);if (psync_result == PSYNC_CONTINUE) {redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");return;}// 执行全同步/* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC* and the server.repl_master_runid and repl_master_initial_offset are* already populated. */// 未知结果,进行出错处理if (psync_result == PSYNC_NOT_SUPPORTED) {redisLog(REDIS_NOTICE,"Retrying with SYNC...");if (syncWrite(fd,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {redisLog(REDIS_WARNING,"I/O error writing to MASTER: %s",strerror(errno));goto error;}}// 为什么要尝试 5次???/* Prepare a suitable temp file for bulk transfer */while(maxtries--) {snprintf(tmpfile, 256, "temp-%d.%ld.rdb", (int)server.unixtime, (long int)getpid());dfd = open(tmpfile, O_CREAT|O_WRONLY|O_EXCL, 0644);if (dfd != -1) break;sleep(1);}if (dfd == -1) {redisLog(REDIS_WARNING,"Opening the temp file needed for MASTER <-> SLAVE synchronization: %s",strerror(errno));goto error;}// 注册读事件,回调函数 readSyncBulkPayload(), 准备读 RDB 文件/* Setup the non blocking download of the bulk file. */if (aeCreateFileEvent(server.el,fd, AE_READABLE, readSyncBulkPayload, NULL) == AE_ERR) {redisLog(REDIS_WARNING, "Can't create readable event for SYNC: %s (fd=%d)",strerror(errno), fd);goto error;}// 设置传输 RDB 文件数据的选项// 状态server.repl_state = REDIS_REPL_TRANSFER;// RDB 文件大小server.repl_transfer_size = -1;// 已经传输的大小server.repl_transfer_read = 0;// 上一次同步的偏移,为的是定时写入磁盘server.repl_transfer_last_fsync_off = 0;// 本地 RDB 文件套接字server.repl_transfer_fd = dfd;// 上一次同步 IO 时间server.repl_transfer_lastio = server.unixtime;// 临时文件名server.repl_transfer_tmpfile = zstrdup(tmpfile);return;error:close(fd);server.repl_transfer_s = -1;server.repl_state = REDIS_REPL_CONNECT;return;
}

全同步请求的数据是 RDB 数据文件和积压空间中的数据。关于 RDB 数据文件,请参看《深入剖析 Redis RDB 持久化策略》。如果没有后台持久化 BGSAVE 进程,那么 BGSVAE 会被触发,否则所有请求全同步的 Slave 都会被标记为等待 BGSAVE 结束。BGSAVE 结束后,Master 会马上向所有的从机发送 RDB 文件。

// 主机 SYNC 和 PSYNC 命令处理函数,会尝试进行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {......// 主机尝试部分同步,失败的话向从机发送 +FULLRESYNC master_runid offset,接着启动 BGSAVE// 执行全同步:/* Full resynchronization. */server.stat_sync_full++;/* Here we need to check if there is a background saving operation* in progress, or if it is required to start one */if (server.rdb_child_pid != -1) {/*  存在 BGSAVE 后台进程。1.如果 master 现有所连接的所有从机 slaves 当中有存在 REDIS_REPL_WAIT_BGSAVE_END 的从机,那么将从机 c 设置为 REDIS_REPL_WAIT_BGSAVE_END;2.否则,设置为 REDIS_REPL_WAIT_BGSAVE_START*//* Ok a background save is in progress. Let's check if it is a good* one for replication, i.e. if there is another slave that is* registering differences since the server forked to save */redisClient *slave;listNode *ln;listIter li;// 检测是否已经有从机申请全同步listRewind(server.slaves, &li);while((ln = listNext(&li))) {slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) break;}if (ln) {// 存在状态为 REDIS_REPL_WAIT_BGSAVE_END 的从机 slave,// 就将此从机 c 状态设置为 REDIS_REPL_WAIT_BGSAVE_END,// 从而在 BGSAVE 进程结束后,可以发送 RDB 文件,// 同时将从机 slave 中的更新复制到此从机 c。/* Perfect, the server is already registering differences for* another slave. Set the right state, and copy the buffer. */// 将其他从机上的待回复的缓存复制到从机 ccopyClientOutputBuffer(c, slave);// 修改从机 c 状态为「等待 BGSAVE 进程结束」c->replstate = REDIS_REPL_WAIT_BGSAVE_END;redisLog(REDIS_NOTICE, "Waiting for end of BGSAVE for SYNC");} else {// 不存在状态为 REDIS_REPL_WAIT_BGSAVE_END 的从机,就将此从机 c 状态设置为// REDIS_REPL_WAIT_BGSAVE_START,即等待新的 BGSAVE 进程的开启。// 修改状态为「等待 BGSAVE 进程开始」/* No way, we need to wait for the next BGSAVE in order to* register differences */c->replstate = REDIS_REPL_WAIT_BGSAVE_START;redisLog(REDIS_NOTICE, "Waiting for next BGSAVE for SYNC");}} else {// 不存在 BGSAVE 后台进程,启动一个新的 BGSAVE 进程/* Ok we don't have a BGSAVE in progress, let's start one */redisLog(REDIS_NOTICE, "Starting BGSAVE for SYNC");if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {redisLog(REDIS_NOTICE, "Replication failed, can't BGSAVE");addReplyError(c, "Unable to perform background save");return;}// 将此从机 c 状态设置为 REDIS_REPL_WAIT_BGSAVE_END,从而在 BGSAVE 进程结束后,// 可以发送 RDB 文件,同时将从机 slave 中的更新复制到此从机 c。c->replstate = REDIS_REPL_WAIT_BGSAVE_END;// 清理脚本缓存???/* Flush the script cache for the new slave. */replicationScriptCacheFlush();}if (server.repl_disable_tcp_nodelay)anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */c->repldbfd = -1;c->flags |= REDIS_SLAVE;server.slaveseldb = -1; /* Force to re-emit the SELECT command. */listAddNodeTail(server.slaves,c);if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)createReplicationBacklog();return;
}// BGSAVE 结束后,会调用
/* A background saving child (BGSAVE) terminated its work. Handle this. */
void backgroundSaveDoneHandler(int exitcode, int bysignal) {// 其他操作......// 可能从机正在等待 BGSAVE 进程的终止/* Possibly there are slaves waiting for a BGSAVE in order to be served* (the first stage of SYNC is a bulk transfer of dump.rdb) */updateSlavesWaitingBgsave(exitcode == 0 ? REDIS_OK : REDIS_ERR);
}// 当 RDB 持久化(backgroundSaveDoneHandler())结束后,会调用此函数
// RDB 文件就绪,给所有的从机发送 RDB 文件
/* This function is called at the end of every background saving.
* The argument bgsaveerr is REDIS_OK if the background saving succeeded
* otherwise REDIS_ERR is passed to the function.
*
* The goal of this function is to handle slaves waiting for a successful
* background saving in order to perform non-blocking synchronization. */
void updateSlavesWaitingBgsave(int bgsaveerr) {listNode *ln;int startbgsave = 0;listIter li;listRewind(server.slaves,&li);while((ln = listNext(&li))) {redisClient *slave = ln->value;// 等待 BGSAVE 开始。调整状态为等待下一次 BGSAVE 进程的结束if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START) {startbgsave = 1;slave->replstate = REDIS_REPL_WAIT_BGSAVE_END;// 等待 BGSAVE 结束。准备向 slave 发送 RDB 文件} else if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_END) {struct redis_stat buf;// 如果 RDB 持久化失败, bgsaveerr 会被设置为 REDIS_ERRif (bgsaveerr != REDIS_OK) {freeClient(slave);redisLog(REDIS_WARNING, "SYNC failed. BGSAVE child returned an error");continue;}// 打开 RDB 文件if ((slave->repldbfd = open(server.rdb_filename, O_RDONLY)) == -1 ||redis_fstat(slave->repldbfd, &buf) == -1) {freeClient(slave);redisLog(REDIS_WARNING,"SYNC failed. Can't open/stat DB after BGSAVE: %s", strerror(errno));continue;}slave->repldboff = 0;slave->repldbsize = buf.st_size;slave->replstate = REDIS_REPL_SEND_BULK;// 如果之前有注册写事件,取消aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE);// 注册新的写事件,sendBulkToSlave() 传输 RDB 文件if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave,slave) == AE_ERR) {freeClient(slave);continue;}}}// startbgsave == REDIS_ERR 表示 BGSAVE 失败,再一次进行 BGSAVE 尝试if (startbgsave) {/* Since we are starting a new background save for one or more slaves,* we flush the Replication Script Cache to use EVAL to propagate every* new EVALSHA for the first time, since all the new slaves don't know* about previous scripts. */replicationScriptCacheFlush();if (rdbSaveBackground(server.rdb_filename) != REDIS_OK) {/* BGSAVE 可能 fork 失败,所有等待 BGSAVE 的从机都将结束连接。* 这是 redis 自我保护的措施,fork 失败很可能是内存紧张*/listIter li;listRewind(server.slaves,&li);redisLog(REDIS_WARNING, "SYNC failed. BGSAVE failed");while((ln = listNext(&li))) {redisClient *slave = ln->value;if (slave->replstate == REDIS_REPL_WAIT_BGSAVE_START)freeClient(slave);}}}
}

五、部分同步

如上所说,无论如何,Redis 首先会尝试部分同步。部分同步即把积压空间缓存的数据,即更新记录发送给从机。

从机连接主机后,会主动发起 PSYNC 命令,从机会提供 master_runid 和 offset,主机验证 master_runid 和 offset 是否有效?

验证通过则,进行部分同步:主机返回 +CONTINUE(从机接收后会注册积压数据接收事件),接着发送积压空间数据。

// 连接主机 connectWithMaster() 的时候,会被注册为回调函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {char tmpfile[256], *err;int dfd, maxtries = 5;int sockerr = 0, psync_result;socklen_t errlen = sizeof(sockerr);......// 尝试部分同步,主机允许进行部分同步会返回 +CONTINUE,从机接收后注册相应的事件/* Try a partial resynchonization. If we don't have a cached master* slaveTryPartialResynchronization() will at least try to use PSYNC* to start a full resynchronization so that we get the master run id* and the global offset, to try a partial resync at the next* reconnection attempt. */// 函数返回三种状态:// PSYNC_CONTINUE:表示会进行部分同步,在 slaveTryPartialResynchronization()// 中已经设置回调函数 readQueryFromClient()// PSYNC_FULLRESYNC:全同步,会下载 RDB 文件// PSYNC_NOT_SUPPORTED:未知psync_result = slaveTryPartialResynchronization(fd);if (psync_result == PSYNC_CONTINUE) {redisLog(REDIS_NOTICE,"MASTER <-> SLAVE sync: Master accepted a Partial Resynchronization.");return;}// 执行全同步......
}// 函数返回三种状态:
// PSYNC_CONTINUE:表示会进行部分同步,已经设置回调函数
// PSYNC_FULLRESYNC:全同步,会下载 RDB 文件
// PSYNC_NOT_SUPPORTED:未知
#define PSYNC_CONTINUE 0
#define PSYNC_FULLRESYNC 1
#define PSYNC_NOT_SUPPORTED 2
int slaveTryPartialResynchronization(int fd) {char *psync_runid;char psync_offset[32];sds reply;/* Initially set repl_master_initial_offset to -1 to mark the current* master run_id and offset as not valid. Later if we'll be able to do* a FULL resync using the PSYNC command we'll set the offset at the* right value, so that this information will be propagated to the* client structure representing the master into server.master. */server.repl_master_initial_offset = -1;if (server.cached_master) {// 缓存了上一次与主机连接的信息,可以尝试进行部分同步,减少数据传输psync_runid = server.cached_master->replrunid;snprintf(psync_offset, sizeof(psync_offset), "%lld",server.cached_master->reploff + 1);redisLog(REDIS_NOTICE,"Trying a partial resynchronization (request %s:%s).",psync_runid, psync_offset);} else {// 未缓存上一次与主机连接的信息,进行全同步// psync ? -1 可以获取主机的 master_runidredisLog(REDIS_NOTICE, "Partial resynchronization not possible (no cached master)");psync_runid = "?";memcpy(psync_offset, "-1", 3);}// 向主机发送命令,并接收回复/* Issue the PSYNC command */reply = sendSynchronousCommand(fd, "PSYNC", psync_runid, psync_offset, NULL);// 全同步if (!strncmp(reply, "+FULLRESYNC", 11)) {char *runid = NULL, *offset = NULL;/* FULL RESYNC, parse the reply in order to extract the run id* and the replication offset. */runid = strchr(reply, ' ');if (runid) {runid++;offset = strchr(runid, ' ');if (offset) offset++;}if (!runid || !offset || (offset-runid-1) != REDIS_RUN_ID_SIZE) {redisLog(REDIS_WARNING,"Master replied with wrong +FULLRESYNC syntax.");/* This is an unexpected condition, actually the +FULLRESYNC* reply means that the master supports PSYNC, but the reply* format seems wrong. To stay safe we blank the master* runid to make sure next PSYNCs will fail. */memset(server.repl_master_runid, 0, REDIS_RUN_ID_SIZE + 1);} else {// 拷贝 runidmemcpy(server.repl_master_runid, runid, offset-runid-1);server.repl_master_runid[REDIS_RUN_ID_SIZE] = '\0';server.repl_master_initial_offset = strtoll(offset,NULL,10);redisLog(REDIS_NOTICE, "Full resync from master: %s:%lld",server.repl_master_runid,server.repl_master_initial_offset);}/* We are going to full resync, discard the cached master structure. */replicationDiscardCachedMaster();sdsfree(reply);return PSYNC_FULLRESYNC;}// 部分同步if (!strncmp(reply, "+CONTINUE", 9)) {/* Partial resync was accepted, set the replication state accordingly */redisLog(REDIS_NOTICE, "Successful partial resynchronization with master.");sdsfree(reply);// 缓存主机替代现有主机,且为 PSYNC(部分同步) 做好准备creplicationResurrectCachedMaster(fd);return PSYNC_CONTINUE;}/* If we reach this point we receied either an error since the master does* not understand PSYNC, or an unexpected reply from the master.* Reply with PSYNC_NOT_SUPPORTED in both cases. */// 接收到主机发出的错误信息if (strncmp(reply, "-ERR", 4)) {/* If it's not an error, log the unexpected event. */redisLog(REDIS_WARNING, "Unexpected reply to PSYNC from master: %s", reply);} else {redisLog(REDIS_NOTICE,"Master does not support PSYNC or is in ""error state (reply: %s)", reply);}sdsfree(reply);replicationDiscardCachedMaster();return PSYNC_NOT_SUPPORTED;
}// 主机 SYNC 和 PSYNC 命令处理函数,会尝试进行部分同步和全同步
/* SYNC ad PSYNC command implemenation. */
void syncCommand(redisClient *c) {......// 主机尝试部分同步,允许则进行部分同步,会返回 +CONTINUE,接着发送积压空间/* Try a partial resynchronization if this is a PSYNC command.* If it fails, we continue with usual full resynchronization, however* when this happens masterTryPartialResynchronization() already* replied with:** +FULLRESYNC <runid> <offset>** So the slave knows the new runid and offset to try a PSYNC later* if the connection with the master is lost. */if (!strcasecmp(c->argv[0]->ptr, "psync")) {// 部分同步if (masterTryPartialResynchronization(c) == REDIS_OK) {server.stat_sync_partial_ok++;return; /* No full resync needed, return. */} else {// 部分同步失败,会进行全同步,这时会收到来自客户端的 runidchar *master_runid = c->argv[1]->ptr;/* Increment stats for failed PSYNCs, but only if the* runid is not "?", as this is used by slaves to force a full* resync on purpose when they are not albe to partially* resync. */if (master_runid[0] != '?')server.stat_sync_partial_err++;}} else {/* If a slave uses SYNC, we are dealing with an old implementation* of the replication protocol (like redis-cli --slave). Flag the client* so that we don't expect to receive REPLCONF ACK feedbacks. */c->flags |= REDIS_PRE_PSYNC_SLAVE;}// 执行全同步:......
}// 主机尝试是否能进行部分同步
/* This function handles the PSYNC command from the point of view of a
* master receiving a request for partial resynchronization.
*
* On success return REDIS_OK, otherwise REDIS_ERR is returned and we proceed
* with the usual full resync. */
int masterTryPartialResynchronization(redisClient *c) {long long psync_offset, psync_len;char *master_runid = c->argv[1]->ptr;char buf[128];int buflen;/* Is the runid of this master the same advertised by the wannabe slave* via PSYNC? If runid changed this master is a different instance and* there is no way to continue. */if (strcasecmp(master_runid, server.runid)) {// 当因为异常需要与主机断开连接的时候,从机会暂存主机的状态信息,以便// 下一次的部分同步。// 1)master_runid 是从机提供一个因缓存主机的 runid,// 2)server.runid 是本机(主机)的 runid。// 匹配失败,说明是本机(主机)不是从机缓存的主机,这时候不能进行部分同步,// 只能进行全同步// "?" 表示从机要求全同步// 什么时候从机会要求全同步???/* Run id "?" is used by slaves that want to force a full resync. */if (master_runid[0] != '?') {redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: ""Runid mismatch (Client asked for '%s', I'm '%s')",master_runid, server.runid);} else {redisLog(REDIS_NOTICE, "Full resync requested by slave.");}goto need_full_resync;}// 从参数中解析整数,整数是从机指定的偏移量/* We still have the data our slave is asking for? */if (getLongLongFromObjectOrReply(c, c->argv[2], &psync_offset, NULL) != REDIS_OK)goto need_full_resync;// 部分同步失败的情况if (!server.repl_backlog || /*不存在积压空间*/psync_offset < server.repl_backlog_off ||  /*psync_offset 太过小,即从机错过太多更新记录,安全起见,实行全同步*//*psync_offset 越界*/psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen))// 经检测,不满足部分同步的条件,转而进行全同步{redisLog(REDIS_NOTICE,"Unable to partial resync with the slave for lack ofbacklog (Slave request was: %lld).", psync_offset);if (psync_offset > server.master_repl_offset) {redisLog(REDIS_WARNING,"Warning: slave tried to PSYNC with an offset that is greaterthan the master replication offset.");}goto need_full_resync;}// 执行部分同步:// 1)标记客户端为从机// 2)通知从机准备接收数据。从机收到 +CONTINUE 会做好准备// 3)开发发送数据/* If we reached this point, we are able to perform a partial resync:* 1) Set client state to make it a slave.* 2) Inform the client we can continue with +CONTINUE* 3) Send the backlog data (from the offset to the end) to the slave. */// 将连接的客户端标记为从机c->flags |= REDIS_SLAVE;// 表示进行部分同步// #define REDIS_REPL_ONLINE 9 /* RDB file transmitted, sending just// updates. */c->replstate = REDIS_REPL_ONLINE;// 更新 ack 的时间c->repl_ack_time = server.unixtime;// 添加入从机链表listAddNodeTail(server.slaves, c);// 告诉从机可以进行部分同步,从机收到后会做相关的准备(注册回调函数)/* We can't use the connection buffers since they are used to accumulate* new commands at this stage. But we are sure the socket send buffer is* emtpy so this write will never fail actually. */buflen = snprintf(buf, sizeof(buf), "+CONTINUE\r\n");if (write(c->fd, buf, buflen) != buflen) {freeClientAsync(c);return REDIS_OK;}// 向从机写积压空间中的数据,积压空间存储有「更新缓存」psync_len = addReplyReplicationBacklog(c, psync_offset);redisLog(REDIS_NOTICE,"Partial resynchronization request accepted. Sending %lld bytes of backlogstarting from offset %lld.", psync_len, psync_offset);/* Note that we don't need to set the selected DB at server.slaveseldb* to -1 to force the master to emit SELECT, since the slave already* has this state from the previous connection with the master. */refreshGoodSlavesCount();return REDIS_OK; /* The caller can return, no full resync needed. */need_full_resync:......// 向从机发送 +FULLRESYNC runid repl_offset
}

六、暂缓主机

从机因为某些原因,譬如网络延迟(PING 超时,ACK 超时等),可能会断开与主机的连接。这时候,从机会尝试保存与主机连接的信息,譬如全局积压空间数据偏移量等,以便下一次的部分同步,并且从机会再一次尝试连接主机。注意一点,如果断开的时间足够长,部分同步肯定会失败的。

void freeClient(redisClient *c) {listNode *ln;/* If this is marked as current client unset it */if (server.current_client == c) server.current_client = NULL;// 如果此机为从机,已经连接主机,可能需要保存主机状态信息,以便进行 PSYNC/* If it is our master that's beging disconnected we should make sure* to cache the state to try a partial resynchronization later.** Note that before doing this we make sure that the client is not in* some unexpected state, by checking its flags. */if (server.master && c->flags & REDIS_MASTER) {redisLog(REDIS_WARNING,"Connection with master lost.");if (!(c->flags & (REDIS_CLOSE_AFTER_REPLY|REDIS_CLOSE_ASAP|REDIS_BLOCKED|REDIS_UNBLOCKED))){replicationCacheMaster(c);return;}}......
}// 为了实现部分同步,从机会保存主机的状态信息后才会断开主机的连接,主机状态信息
// 保存在 server.cached_master
// 会在 freeClient() 中调用,保存与主机连接的状态信息,以便进行 PSYNC
void replicationCacheMaster(redisClient *c) {listNode *ln;redisAssert(server.master != NULL && server.cached_master == NULL);redisLog(REDIS_NOTICE,"Caching the disconnected master state.");// 从客户端列表删除主机的信息/* Remove from the list of clients, we don't want this client to be* listed by CLIENT LIST or processed in any way by batch operations. */ln = listSearchKey(server.clients,c);redisAssert(ln != NULL);listDelNode(server.clients,ln);// 保存主机的状态信息/* Save the master. Server.master will be set to null later by* replicationHandleMasterDisconnection(). */server.cached_master = server.master;// 注销事件,关闭连接/* Remove the event handlers and close the socket. We'll later reuse* the socket of the new connection with the master during PSYNC. */aeDeleteFileEvent(server.el,c->fd,AE_READABLE);aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);close(c->fd);/* Set fd to -1 so that we can safely call freeClient(c) later. */c->fd = -1;// 修改连接的状态,设置 server.master = NULL/* Caching the master happens instead of the actual freeClient() call,* so make sure to adjust the replication state. This function will* also set server.master to NULL. */replicationHandleMasterDisconnection();
}

七、总结

简单来说,主从同步就是 RDB 文件的上传下载;主机有小部分的数据修改,就把修改记录传播给每个从机。这篇文章详述了 Redis 主从复制的内部协议和机制。接下来的几篇关于 Redis 的文章,主要是其内部数据结构。

转载于:https://blog.51cto.com/sofar/1413024

深入剖析Redis主从复制相关推荐

  1. [转载] 深入剖析 redis 主从复制

    转载自http://www.cnblogs.com/daoluanxiaozi/p/3724299.html 主从概述 redis 支持 master-slave(主从)模式,redis server ...

  2. Redis主从复制配置(原理剖析)

    文章目录 前言 一.Redis主从复制的作用 二.Redis主从复制环境配置 1.查看默认配置信息 2.配置一主二从的集群模式 2.1.拷贝配置文件 2.2.配置redis79.conf文件 2.3. ...

  3. Redis主从复制配置

    环境描述 Redis Master:192.168.1.100 6379(Ubuntu系统) Redis Slave1:192.168.1.101 6380(Ubuntu系统) Redis Slave ...

  4. 深入剖析Redis系列(七) - Redis数据结构之列表

    前言 列表(list)类型是用来存储多个 有序 的 字符串.在 Redis 中,可以对列表的 两端 进行 插入(push)和 弹出(pop)操作,还可以获取 指定范围 的 元素列表.获取 指定索引下标 ...

  5. redis主从复制故障转移

    Redis主从复制与故障切换 目录 目录1 一.概述1 二. 实验目的2 三.试验环境2 四. 说明2 五. 拓扑2 六. 实施步骤2 6.1.分别安装redis2.8.32 6.2.配置主从同步3 ...

  6. redis+主从复制+集群配置

    redis+主从复制+集群配置 redis是一个key-value存储系统.和memcached类似,不过redis支持的value类型更多,主要有:string(字符串).list(链表).set( ...

  7. cxgrid主从表 点+号展开_深入理解Redis主从复制

    一.背景 前面的文章中,我们介绍过Redis的持久化机制,它可以实现Redis实例数据的crash-safe.但是这里有一个问题,就是Redis其实还存在着单点故障问题,比如说Redis的硬盘坏掉了, ...

  8. 深入剖析Redis系列(三) - Redis集群模式搭建与原理详解

    前言 在 Redis 3.0 之前,使用 哨兵(sentinel)机制来监控各个节点之间的状态.Redis Cluster 是 Redis 的 分布式解决方案,在 3.0 版本正式推出,有效地解决了 ...

  9. Redis主从复制下的工作原理

    Redis主从复制下的工作原理 Redis主从复制的配置十分简单,它可以使从服务器是主服务器的完全拷贝.需要清楚Redis主从复制的几点重要内容: 1)Redis使用异步复制.但从Redis 2.8开 ...

  10. php连接redis 主从复制,redis怎么进行主从复制

    redis主从复制同步实现的过程 1.从服务发送一个sync同步命令给主服务要求全量同步 (推荐学习:Redis视频教程) 2.主服务接收到从服务的sync同步命令时,会fork一个子进程后台执行bg ...

最新文章

  1. [转贴]经济学人:Win7拉开新时代序幕 云计算群雄逐鹿
  2. 海量数据处理_国家重点研发计划“面向异构体系结构的高性能分布式数据处理技术与系统”简介...
  3. python 导包语法 import package as name 和 from package import name 的区别
  4. mysql 导入超时_sql数据库有1000M怎么导入mysql?导入超时怎么办?
  5. 上传图片的表单java代码_java模拟post方式提交表单实现图片上传(示例代码)
  6. #脚本实现宠物动作行为_短视频剧情创作方法有哪些?爆款短视频的标配,只需88个脚本模板...
  7. PC客户端(CS架构)如何实现抓包
  8. C#控制BarTender自动打印(方法一)
  9. pyserial的踩坑记录
  10. Qt5.5.1 VS2010中文乱码解决办法
  11. 阿里云ECS搭建在线IDE
  12. 软件开发人员如何记笔记
  13. HTML打造动漫人物,百度贴吧打造二次元清明祭 回顾离开的动漫人物
  14. 【2023校招刷题】第二期:数字IC笔试模拟题(2)详细解析版
  15. 计算机网络习题:应用层
  16. 德国计算机专业硕士费用,德国留学费用一览表2021
  17. Building wheel for TA-Lib (setup.py) ... error / ERROR: Failed building wheel for TA-Lib
  18. wsappx是什么进程,可以关掉吗
  19. jar包一键重启动的shall脚本(可自动判断当前服务是否运行中)
  20. 【python】批量按坐标裁剪图片、ImageJ批量修改图片格式

热门文章

  1. java nextDouble exception_java 控制台输入输出 nextdouble问题
  2. pb 修改数据窗口种指定字段位置_如何在PB数据窗口中修改数据设置数据窗口的更新属性...
  3. java事件轮询_用scala实现的nio事件轮询
  4. aqara (737) -(a俩)_绿米与万科合作?Aqara线下服务商500家,合作有保障
  5. ansible 通过加密码来批量管理主机及管理Windows主机
  6. [译] 在 Twitch 代码直播一年的总结
  7. android view
  8. 一个女孩为什么要努力
  9. 【转】程序员:如何写出杀手级简历
  10. Silverlight 2.5D RPG游戏技巧与特效处理:(五“.NET研究”)圣赞之HLSL渲染动画