Redis master-slave 同步源码分析

(1)slave 流程分析
(2)master 流程分析

Slave 分析

当Redis 启动后,会每隔 1s 调用 replicationCron (通过 redis自带的serverCron后台线程),即无论是单机、还是Master、还是Slave都会调用这个函数。
我们先来讨论下作为Slave的情况下,replicationCron 函数运行逻辑。
作为slave,这个函数的功能应该能猜到,就是和Master保持连接、握手、接受Master存量数据+接受Master增量数据

/* Replication cron function, called 1 time per second. */
void replicationCron(void) {static long long replication_cron_loops = 0;/*一堆超时判断,先不管*//* Non blocking connection timeout? */if (server.masterhost &&(server.repl_state == REPL_STATE_CONNECTING ||slaveIsInHandshakeState()) &&(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout){serverLog(LL_WARNING,"Timeout connecting to the MASTER...");cancelReplicationHandshake();}....../*作为slave,connectWithMaster创建和Master的socket,并且将回调函数设置为 syncWithMaster,并将同步状态设置为 REPL_STATE_CONNECTING *//* Check if we should connect to a MASTER */if (server.repl_state == REPL_STATE_CONNECT) {serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",server.masterhost, server.masterport);if (connectWithMaster() == C_OK) {serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");}}/* Send ACK to master from time to time.* Note that we do not send periodic acks to masters that don't* support PSYNC and replication offsets. */if (server.masterhost && server.master &&!(server.master->flags & CLIENT_PRE_PSYNC))replicationSendAck();....
}

syncWithMaster 就是作为Slave和Master连接的的握手函数。
syncWithMaster函数就不再详细贴出代码,直接这里逻辑
1、发送ping
2、读pong表示成功继续第3步
3、如果master配置了requirepass ,那么slave必须配置 masterauth,所以就需要发送auth信息。如果不需要发送auth就到第5步
4、接受auth的结果,正确就继续
5、发送slave的port信息,这个port用于告知master连slave时使用哪个端口
6、判断 5 的结果
7、同5,只是发送的是slave 的 ip地址。5和7的端口,实际上目前翻看Redis源码,没有用到。
8、判断 7 的返回结果
9、slave发送自己的同步能力到master,用以和Master对齐同步的方法,最新版本支持 eof 和 psync2。psync是 partial resynchronization 的意思
10、发送 "PSYNC psync_replid psync_offset -1"到Master(全量的化就是 “PSYNC ? -1)
11、如果Master 回复 “+FULLRESYNC psync_replid psync_offset”,表示全量复制;如果回复”+CONTINUE psync_replid"表示部分复制

如果握手完成后,则 和 Master 建立的fd的 read event 变成 readSyncBulkPayload 函数

void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {....../* Setup the non blocking download of the bulk file. */if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)== AE_ERR){serverLog(LL_WARNING,"Can't create readable event for SYNC: %s (fd=%d)",strerror(errno),fd);goto error;}
}

readSyncBulkPayload负责读完数据,然后调用 replicationCreateMasterClient,将当前和master连接的fd的callback变成readQueryFromClient
这样,这个连接后续就会接受Master发过来的指令。

void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {char buf[4096];ssize_t nread, readlen, nwritten;off_t left;UNUSED(el);UNUSED(privdata);UNUSED(mask);/* Static vars used to hold the EOF mark, and the last bytes received* form the server: when they match, we reached the end of the transfer. */static char eofmark[CONFIG_RUN_ID_SIZE];static char lastbytes[CONFIG_RUN_ID_SIZE];static int usemark = 0;/* If repl_transfer_size == -1 we still have to read the bulk length* from the master reply. *//*第一次读 repl_transfer_size 是 -1* Master第一次传输的数据是rdb文件的长度,*格式是有2种,一种是  $len。另一种 是 $EOF:<40 bytes delimiter>,前者表示指定长度,然后slave读指定长度即可,后者是 40字节的结束符*Master 会在rdb文件发送完成后发送40字节结束符表示自己结束了。 **/if (server.repl_transfer_size == -1) {if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {serverLog(LL_WARNING,"I/O error reading bulk count from MASTER: %s",strerror(errno));goto error;}if (buf[0] == '-') {serverLog(LL_WARNING,"MASTER aborted replication with an error: %s",buf+1);goto error;} else if (buf[0] == '\0') {/* At this stage just a newline works as a PING in order to take* the connection live. So we refresh our last interaction* timestamp. */server.repl_transfer_lastio = server.unixtime;return;} else if (buf[0] != '$') {serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);goto error;}/* There are two possible forms for the bulk payload. One is the* usual $<count> bulk format. The other is used for diskless transfers* when the master does not know beforehand the size of the file to* transfer. In the latter case, the following format is used:** $EOF:<40 bytes delimiter>** At the end of the file the announced delimiter is transmitted. The* delimiter is long and random enough that the probability of a* collision with the actual file content can be ignored. */if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {usemark = 1;memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);memset(lastbytes,0,CONFIG_RUN_ID_SIZE);/* Set any repl_transfer_size to avoid entering this code path* at the next call. */server.repl_transfer_size = 0;serverLog(LL_NOTICE,"MASTER <-> REPLICA sync: receiving streamed RDB from master");} else {usemark = 0;server.repl_transfer_size = strtol(buf+1,NULL,10);serverLog(LL_NOTICE,"MASTER <-> REPLICA sync: receiving %lld bytes from master",(long long) server.repl_transfer_size);}return;}/*到这就是读文件了* usemark:表示通过EOF读*repl_transfer_size:文件总大小*repl_transfer_read:文件已读大小*//* Read bulk data */if (usemark) {readlen = sizeof(buf);} else {left = server.repl_transfer_size - server.repl_transfer_read;readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);}nread = read(fd,buf,readlen);if (nread <= 0) {serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",(nread == -1) ? strerror(errno) : "connection lost");cancelReplicationHandshake();return;}server.stat_net_input_bytes += nread;/* When a mark is used, we want to detect EOF asap in order to avoid* writing the EOF mark into the file... */int eof_reached = 0;if (usemark) {/* Update the last bytes array, and check if it matches our delimiter.*/if (nread >= CONFIG_RUN_ID_SIZE) {memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);} else {int rem = CONFIG_RUN_ID_SIZE-nread;memmove(lastbytes,lastbytes+nread,rem);memcpy(lastbytes+rem,buf,nread);}if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;}server.repl_transfer_lastio = server.unixtime;if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",(nwritten == -1) ? strerror(errno) : "short write");goto error;}server.repl_transfer_read += nread;/* Delete the last 40 bytes from the file if we reached EOF. */if (usemark && eof_reached) {if (ftruncate(server.repl_transfer_fd,server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1){serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));goto error;}}/* Sync data on disk from time to time, otherwise at the end of the transfer* we may suffer a big delay as the memory buffers are copied into the* actual disk. */if (server.repl_transfer_read >=server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC){off_t sync_size = server.repl_transfer_read -server.repl_transfer_last_fsync_off;rdb_fsync_range(server.repl_transfer_fd,server.repl_transfer_last_fsync_off, sync_size);server.repl_transfer_last_fsync_off += sync_size;}/* Check if the transfer is now complete */if (!usemark) {if (server.repl_transfer_read == server.repl_transfer_size)eof_reached = 1;}/*读完一份,就加载rdb*/if (eof_reached) {int aof_is_enabled = server.aof_state != AOF_OFF;/* Ensure background save doesn't overwrite synced data */if (server.rdb_child_pid != -1) {serverLog(LL_NOTICE,"Replica is about to load the RDB file received from the ""master, but there is a pending RDB child running. ""Killing process %ld and removing its temp file to avoid ""any race",(long) server.rdb_child_pid);kill(server.rdb_child_pid,SIGUSR1);rdbRemoveTempFile(server.rdb_child_pid);}if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));cancelReplicationHandshake();return;}serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");/* We need to stop any AOFRW fork before flusing and parsing* RDB, otherwise we'll create a copy-on-write disaster. */if(aof_is_enabled) stopAppendOnly();signalFlushedDb(-1);emptyDb(-1,server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,replicationEmptyDbCallback);/* Before loading the DB into memory we need to delete the readable* handler, otherwise it will get called recursively since* rdbLoad() will call the event loop to process events from time to* time for non blocking loading. */aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");cancelReplicationHandshake();/* Re-enable the AOF if we disabled it earlier, in order to restore* the original configuration. */if (aof_is_enabled) restartAOFAfterSYNC();return;}/* Final setup of the connected slave <- master link */zfree(server.repl_transfer_tmpfile);close(server.repl_transfer_fd);replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);server.repl_state = REPL_STATE_CONNECTED;server.repl_down_since = 0;/* After a full resynchroniziation we use the replication ID and* offset of the master. The secondary ID / offset are cleared since* we are starting a new history. */memcpy(server.replid,server.master->replid,sizeof(server.replid));server.master_repl_offset = server.master->reploff;clearReplicationId2();/* Let's create the replication backlog if needed. Slaves need to* accumulate the backlog regardless of the fact they have sub-slaves* or not, in order to behave correctly if they are promoted to* masters after a failover. */if (server.repl_backlog == NULL) createReplicationBacklog();serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");/* Restart the AOF subsystem now that we finished the sync. This* will trigger an AOF rewrite, and when done will start appending* to the new file. */if (aof_is_enabled) restartAOFAfterSYNC();}return;error:cancelReplicationHandshake();return;
}

总结一下,Redis 的 psync 模式
(1)slave 往 master connect,然后握手。
(2)slave 发送 sync/psync 往 Master。
(3)Master 判断 是否全量还是增量推送,然后回复给slave,全量模式下,master立刻发送 rdb文件,增量模式下,master就发送指令。
(4)如果是全量模式,在推送rdb文件的后,Master会把推送rdb文件这个窗口期内的数据+后续的数据,以指令方式推送,其实就是增量部分了。
(5)增量部分的发送,调用了call->propagate->replicationFeedSlaves->addReply 来发送至 slave, addReply 通常用于 处理完Client的请求然后发送响应,这里被用来同数据给Salve。但addReply只是写入Redis自己的缓冲区,然后等主循环的下一次循环,beforeSleep->handleClientsWithPendingWrites将数据发送出去。

Master 流程分析

看到Salve流程,基本就能猜出Master的流程。

1、Master处理 psync指令,判断是否是全量同步还是增量同步
2、masterTryPartialResynchronization->addReplyReplicationBacklog

实际上,Master开不开AOF和RDB并不影响 同步。

Master 的 backlog逻辑,对Master任意写操作,都会触发feedReplicationBacklog,目的是将数据写入自己的缓冲区,然后记录一些偏移量。
缓冲区作用
(1)全量同步rdb时比较耗时,在RDB文件处理完成前,的这部分数据需要保留下来,作为增量数据传输
(2)Slave和Master全量同步之后增量同步,此时如果发生断网,那么Slave重连后,无条件全量同步肯定不能接受,Master需要保存部分数据,这部分数据如果恰巧包含了断网
期间Salve未收到的数据,则执行增量同步。

Redis 的backlog是一个环形的数据,所以不存在溢出,但是如果一个断网期间,Master写入的数据,实打实的超过一圈,那么也没救了,因为写入一圈,必然覆盖了部分旧数据,
相当于丢数据了所以看到不能增量同步了,需要全量。

几个offset说明一下意思
(1)repl_backlog_idx,下次待写入的数据的起始地址,所以下面代码可以看到,repl_backlog_idx 满一圈后,置为0,然后从0开始再写。
(2)repl_backlog_size,buffer的总长度,配置文件配置的,repl_backlog 就是根据这个大小申请出来的buffer
(3)master_repl_offset,从0开始累加的一个值
(4)repl_backlog_histlen 值 也是从0开始的一个值,最大为 repl_backlog_size

反正,大家不要尝试从字面角度或者源码角度理解这几个值,比较复杂,但是,核心目的很简单,这几个值就是需要让Master自己知道,Slave psync时发送过来的offset开始的指令,Master自己是否好保存。

void feedReplicationBacklog(void *ptr, size_t len) {unsigned char *p = ptr;printf("p %s\n",p);server.master_repl_offset += len;/* This is a circular buffer, so write as much data we can at every* iteration and rewind the "idx" index if we reach the limit. */while(len) {size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;if (thislen > len) thislen = len;memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);server.repl_backlog_idx += thislen;if (server.repl_backlog_idx == server.repl_backlog_size)server.repl_backlog_idx = 0;len -= thislen;p += thislen;server.repl_backlog_histlen += thislen;}if (server.repl_backlog_histlen > server.repl_backlog_size)server.repl_backlog_histlen = server.repl_backlog_size;/* Set the offset of the first byte we have in the backlog. */server.repl_backlog_off = server.master_repl_offset -server.repl_backlog_histlen + 1;
}

Redis源码分析之PSYNC同步相关推荐

  1. redis源码分析 -- cs结构之服务器

    服务器与客户端是如何交互的 redis客户端向服务器发送命令请求,服务器接收到客户端发送的命令请求之后,读取解析命令,并执行命令,同时将命令执行结果返回给客户端. 客户端与服务器交互的代码流程如下图所 ...

  2. Redis源码分析:基础概念介绍与启动概述

    Redis源码分析 基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容.仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是<Redis设计与 ...

  3. Redis源码分析(一)redis.c //redis-server.c

    Redis源码分析(一)redis.c //redis-server.c 入口函数 int main() 4450 int main(int argc, char **argv) {4451 init ...

  4. 10年大厂程序员是如何高效学习使用redis的丨redis源码分析丨redis存储原理

    10年大厂程序员是怎么学习使用redis的 1. redis存储原理分析 2. redis源码学习分享 3. redis跳表和B+树详细对比分析 视频讲解如下,点击观看: 10年大厂程序员是如何高效学 ...

  5. ffplay源码分析4-音视频同步

    ffplay是FFmpeg工程自带的简单播放器,使用FFmpeg提供的解码器和SDL库进行视频播放.本文基于FFmpeg工程4.1版本进行分析,其中ffplay源码清单如下: https://gith ...

  6. Redis源码分析(一)--Redis结构解析

    从今天起,本人将会展开对Redis源码的学习,Redis的代码规模比较小,非常适合学习,是一份非常不错的学习资料,数了一下大概100个文件左右的样子,用的是C语言写的.希望最终能把他啃完吧,C语言好久 ...

  7. Redis源码分析 —— 发布与订阅

    前言 通过阅读Redis源码,配合GDB和抓包等调试手段,分析Redis发布订阅的实现原理,思考相关问题. 源码版本:Redis 6.0.10 思考问题 发布订阅基本概念介绍 订阅频道 -- SUBS ...

  8. linux内存源码分析 - 内存压缩(同步关系)

    本文为原创,转载请注明:http://www.cnblogs.com/tolimit/ 概述 最近在看内存回收,内存回收在进行同步的一些情况非常复杂,然后就想,不会内存压缩的页面迁移过程中的同步关系也 ...

  9. Redis 源码分析之故障转移

    在 Redis cluster 中故障转移是个很重要的功能,下面就从故障发现到故障转移整个流程做一下详细分析. 故障检测 PFAIL 标记 集群中每个节点都会定期向其他节点发送 PING 消息,以此来 ...

最新文章

  1. 97页PPT,读懂自动驾驶全产业链发展!
  2. Java-Java5.0注解解读
  3. hdu 1443 Joseph
  4. 1083 矩阵取数问题
  5. C++之virtual 方法
  6. android 音乐资源获取失败,android – 获取嵌入式mp3文件嵌入式艺术失败
  7. bootstrap 点击图片放大查看_Bootstrap 开源 SVG 图标库 Bootstrap Icons
  8. 云终端要如何和服务器配置起来使用的
  9. Unity 中 Png转Texture2D再转Sprite
  10. TCP/IP笔记 三.运输层(3)——TCP超时重传算法
  11. mybatis --XML 映射配置文件
  12. php原始 实现双向队列,用PHP实现一个双向队列
  13. 斐讯k1潘多拉专版固件_斐讯K1、K2路由器刷机Breed/华硕老毛子、梅林、潘多拉/openwrt固件...
  14. Java学习路线(转)
  15. 端口扫描工具masscan常用方法和参数
  16. android ppt 转图片显示不全,ppt转pdf图片显示不全怎么办 这两种方法供您参考
  17. python敏感字替换_python如何实现敏感词替换
  18. 训练分类器OpenCV(3.4.1) Error: Assertion failed错误解决
  19. php 的单例模式(一)
  20. 审核和发送环节脱离?!看TOP级企业如何解决安全管理问题

热门文章

  1. redhat各版本和下载地址
  2. 大数据分析师的沟通技巧
  3. 如何在word 插入矢量图
  4. For queries with named parameters you need to use provide names for method parameters. Use @Param fo
  5. R语言命令行写linux,如何在Linux上编写和使用R脚本
  6. QT使用log4cpp日志库
  7. Python开发qq批量登陆
  8. 10 个超好用的免费开源项目管理软件
  9. 关于Smalltalk,squeak,scratch的学习体验
  10. Django使用容联云发送短信验证码时提示:172001,网络错误