Redis源码分析之PSYNC同步
Redis master-slave 同步源码分析
(1)slave 流程分析
(2)master 流程分析
Slave 分析
当Redis 启动后,会每隔 1s 调用 replicationCron (通过 redis自带的serverCron后台线程),即无论是单机、还是Master、还是Slave都会调用这个函数。
我们先来讨论下作为Slave的情况下,replicationCron 函数运行逻辑。
作为slave,这个函数的功能应该能猜到,就是和Master保持连接、握手、接受Master存量数据+接受Master增量数据
/* Replication cron function, called 1 time per second. */
void replicationCron(void) {static long long replication_cron_loops = 0;/*一堆超时判断,先不管*//* Non blocking connection timeout? */if (server.masterhost &&(server.repl_state == REPL_STATE_CONNECTING ||slaveIsInHandshakeState()) &&(time(NULL)-server.repl_transfer_lastio) > server.repl_timeout){serverLog(LL_WARNING,"Timeout connecting to the MASTER...");cancelReplicationHandshake();}....../*作为slave,connectWithMaster创建和Master的socket,并且将回调函数设置为 syncWithMaster,并将同步状态设置为 REPL_STATE_CONNECTING *//* Check if we should connect to a MASTER */if (server.repl_state == REPL_STATE_CONNECT) {serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",server.masterhost, server.masterport);if (connectWithMaster() == C_OK) {serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");}}/* Send ACK to master from time to time.* Note that we do not send periodic acks to masters that don't* support PSYNC and replication offsets. */if (server.masterhost && server.master &&!(server.master->flags & CLIENT_PRE_PSYNC))replicationSendAck();....
}
syncWithMaster 就是作为Slave和Master连接的的握手函数。
syncWithMaster函数就不再详细贴出代码,直接这里逻辑
1、发送ping
2、读pong表示成功继续第3步
3、如果master配置了requirepass ,那么slave必须配置 masterauth,所以就需要发送auth信息。如果不需要发送auth就到第5步
4、接受auth的结果,正确就继续
5、发送slave的port信息,这个port用于告知master连slave时使用哪个端口
6、判断 5 的结果
7、同5,只是发送的是slave 的 ip地址。5和7的端口,实际上目前翻看Redis源码,没有用到。
8、判断 7 的返回结果
9、slave发送自己的同步能力到master,用以和Master对齐同步的方法,最新版本支持 eof 和 psync2。psync是 partial resynchronization 的意思
10、发送 "PSYNC psync_replid psync_offset -1"到Master(全量的化就是 “PSYNC ? -1)
11、如果Master 回复 “+FULLRESYNC psync_replid psync_offset”,表示全量复制;如果回复”+CONTINUE psync_replid"表示部分复制
如果握手完成后,则 和 Master 建立的fd的 read event 变成 readSyncBulkPayload 函数
void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {....../* Setup the non blocking download of the bulk file. */if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL)== AE_ERR){serverLog(LL_WARNING,"Can't create readable event for SYNC: %s (fd=%d)",strerror(errno),fd);goto error;}
}
readSyncBulkPayload
负责读完数据,然后调用 replicationCreateMasterClient
,将当前和master连接的fd的callback变成readQueryFromClient
这样,这个连接后续就会接受Master发过来的指令。
void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {char buf[4096];ssize_t nread, readlen, nwritten;off_t left;UNUSED(el);UNUSED(privdata);UNUSED(mask);/* Static vars used to hold the EOF mark, and the last bytes received* form the server: when they match, we reached the end of the transfer. */static char eofmark[CONFIG_RUN_ID_SIZE];static char lastbytes[CONFIG_RUN_ID_SIZE];static int usemark = 0;/* If repl_transfer_size == -1 we still have to read the bulk length* from the master reply. *//*第一次读 repl_transfer_size 是 -1* Master第一次传输的数据是rdb文件的长度,*格式是有2种,一种是 $len。另一种 是 $EOF:<40 bytes delimiter>,前者表示指定长度,然后slave读指定长度即可,后者是 40字节的结束符*Master 会在rdb文件发送完成后发送40字节结束符表示自己结束了。 **/if (server.repl_transfer_size == -1) {if (syncReadLine(fd,buf,1024,server.repl_syncio_timeout*1000) == -1) {serverLog(LL_WARNING,"I/O error reading bulk count from MASTER: %s",strerror(errno));goto error;}if (buf[0] == '-') {serverLog(LL_WARNING,"MASTER aborted replication with an error: %s",buf+1);goto error;} else if (buf[0] == '\0') {/* At this stage just a newline works as a PING in order to take* the connection live. So we refresh our last interaction* timestamp. */server.repl_transfer_lastio = server.unixtime;return;} else if (buf[0] != '$') {serverLog(LL_WARNING,"Bad protocol from MASTER, the first byte is not '$' (we received '%s'), are you sure the host and port are right?", buf);goto error;}/* There are two possible forms for the bulk payload. One is the* usual $<count> bulk format. The other is used for diskless transfers* when the master does not know beforehand the size of the file to* transfer. In the latter case, the following format is used:** $EOF:<40 bytes delimiter>** At the end of the file the announced delimiter is transmitted. The* delimiter is long and random enough that the probability of a* collision with the actual file content can be ignored. */if (strncmp(buf+1,"EOF:",4) == 0 && strlen(buf+5) >= CONFIG_RUN_ID_SIZE) {usemark = 1;memcpy(eofmark,buf+5,CONFIG_RUN_ID_SIZE);memset(lastbytes,0,CONFIG_RUN_ID_SIZE);/* Set any repl_transfer_size to avoid entering this code path* at the next call. */server.repl_transfer_size = 0;serverLog(LL_NOTICE,"MASTER <-> REPLICA sync: receiving streamed RDB from master");} else {usemark = 0;server.repl_transfer_size = strtol(buf+1,NULL,10);serverLog(LL_NOTICE,"MASTER <-> REPLICA sync: receiving %lld bytes from master",(long long) server.repl_transfer_size);}return;}/*到这就是读文件了* usemark:表示通过EOF读*repl_transfer_size:文件总大小*repl_transfer_read:文件已读大小*//* Read bulk data */if (usemark) {readlen = sizeof(buf);} else {left = server.repl_transfer_size - server.repl_transfer_read;readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf);}nread = read(fd,buf,readlen);if (nread <= 0) {serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",(nread == -1) ? strerror(errno) : "connection lost");cancelReplicationHandshake();return;}server.stat_net_input_bytes += nread;/* When a mark is used, we want to detect EOF asap in order to avoid* writing the EOF mark into the file... */int eof_reached = 0;if (usemark) {/* Update the last bytes array, and check if it matches our delimiter.*/if (nread >= CONFIG_RUN_ID_SIZE) {memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE);} else {int rem = CONFIG_RUN_ID_SIZE-nread;memmove(lastbytes,lastbytes+nread,rem);memcpy(lastbytes+rem,buf,nread);}if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1;}server.repl_transfer_lastio = server.unixtime;if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) {serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s",(nwritten == -1) ? strerror(errno) : "short write");goto error;}server.repl_transfer_read += nread;/* Delete the last 40 bytes from the file if we reached EOF. */if (usemark && eof_reached) {if (ftruncate(server.repl_transfer_fd,server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1){serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno));goto error;}}/* Sync data on disk from time to time, otherwise at the end of the transfer* we may suffer a big delay as the memory buffers are copied into the* actual disk. */if (server.repl_transfer_read >=server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC){off_t sync_size = server.repl_transfer_read -server.repl_transfer_last_fsync_off;rdb_fsync_range(server.repl_transfer_fd,server.repl_transfer_last_fsync_off, sync_size);server.repl_transfer_last_fsync_off += sync_size;}/* Check if the transfer is now complete */if (!usemark) {if (server.repl_transfer_read == server.repl_transfer_size)eof_reached = 1;}/*读完一份,就加载rdb*/if (eof_reached) {int aof_is_enabled = server.aof_state != AOF_OFF;/* Ensure background save doesn't overwrite synced data */if (server.rdb_child_pid != -1) {serverLog(LL_NOTICE,"Replica is about to load the RDB file received from the ""master, but there is a pending RDB child running. ""Killing process %ld and removing its temp file to avoid ""any race",(long) server.rdb_child_pid);kill(server.rdb_child_pid,SIGUSR1);rdbRemoveTempFile(server.rdb_child_pid);}if (rename(server.repl_transfer_tmpfile,server.rdb_filename) == -1) {serverLog(LL_WARNING,"Failed trying to rename the temp DB into dump.rdb in MASTER <-> REPLICA synchronization: %s", strerror(errno));cancelReplicationHandshake();return;}serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data");/* We need to stop any AOFRW fork before flusing and parsing* RDB, otherwise we'll create a copy-on-write disaster. */if(aof_is_enabled) stopAppendOnly();signalFlushedDb(-1);emptyDb(-1,server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS,replicationEmptyDbCallback);/* Before loading the DB into memory we need to delete the readable* handler, otherwise it will get called recursively since* rdbLoad() will call the event loop to process events from time to* time for non blocking loading. */aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE);serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;if (rdbLoad(server.rdb_filename,&rsi) != C_OK) {serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");cancelReplicationHandshake();/* Re-enable the AOF if we disabled it earlier, in order to restore* the original configuration. */if (aof_is_enabled) restartAOFAfterSYNC();return;}/* Final setup of the connected slave <- master link */zfree(server.repl_transfer_tmpfile);close(server.repl_transfer_fd);replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db);server.repl_state = REPL_STATE_CONNECTED;server.repl_down_since = 0;/* After a full resynchroniziation we use the replication ID and* offset of the master. The secondary ID / offset are cleared since* we are starting a new history. */memcpy(server.replid,server.master->replid,sizeof(server.replid));server.master_repl_offset = server.master->reploff;clearReplicationId2();/* Let's create the replication backlog if needed. Slaves need to* accumulate the backlog regardless of the fact they have sub-slaves* or not, in order to behave correctly if they are promoted to* masters after a failover. */if (server.repl_backlog == NULL) createReplicationBacklog();serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");/* Restart the AOF subsystem now that we finished the sync. This* will trigger an AOF rewrite, and when done will start appending* to the new file. */if (aof_is_enabled) restartAOFAfterSYNC();}return;error:cancelReplicationHandshake();return;
}
总结一下,Redis 的 psync 模式
(1)slave 往 master connect,然后握手。
(2)slave 发送 sync/psync 往 Master。
(3)Master 判断 是否全量还是增量推送,然后回复给slave,全量模式下,master立刻发送 rdb文件,增量模式下,master就发送指令。
(4)如果是全量模式,在推送rdb文件的后,Master会把推送rdb文件这个窗口期内的数据+后续的数据,以指令方式推送,其实就是增量部分了。
(5)增量部分的发送,调用了call->propagate->replicationFeedSlaves->addReply 来发送至 slave, addReply 通常用于 处理完Client的请求然后发送响应,这里被用来同数据给Salve。但addReply只是写入Redis自己的缓冲区,然后等主循环的下一次循环,beforeSleep->handleClientsWithPendingWrites将数据发送出去。
Master 流程分析
看到Salve流程,基本就能猜出Master的流程。
1、Master处理 psync指令,判断是否是全量同步还是增量同步
2、masterTryPartialResynchronization->addReplyReplicationBacklog
实际上,Master开不开AOF和RDB并不影响 同步。
Master 的 backlog逻辑,对Master任意写操作,都会触发feedReplicationBacklog
,目的是将数据写入自己的缓冲区,然后记录一些偏移量。
缓冲区作用
(1)全量同步rdb时比较耗时,在RDB文件处理完成前,的这部分数据需要保留下来,作为增量数据传输
(2)Slave和Master全量同步之后增量同步,此时如果发生断网,那么Slave重连后,无条件全量同步肯定不能接受,Master需要保存部分数据,这部分数据如果恰巧包含了断网
期间Salve未收到的数据,则执行增量同步。
Redis 的backlog是一个环形的数据,所以不存在溢出,但是如果一个断网期间,Master写入的数据,实打实的超过一圈,那么也没救了,因为写入一圈,必然覆盖了部分旧数据,
相当于丢数据了所以看到不能增量同步了,需要全量。
几个offset说明一下意思
(1)repl_backlog_idx,下次待写入的数据的起始地址,所以下面代码可以看到,repl_backlog_idx 满一圈后,置为0,然后从0开始再写。
(2)repl_backlog_size,buffer的总长度,配置文件配置的,repl_backlog 就是根据这个大小申请出来的buffer
(3)master_repl_offset,从0开始累加的一个值
(4)repl_backlog_histlen 值 也是从0开始的一个值,最大为 repl_backlog_size
反正,大家不要尝试从字面角度或者源码角度理解这几个值,比较复杂,但是,核心目的很简单,这几个值就是需要让Master自己知道,Slave psync时发送过来的offset开始的指令,Master自己是否好保存。
void feedReplicationBacklog(void *ptr, size_t len) {unsigned char *p = ptr;printf("p %s\n",p);server.master_repl_offset += len;/* This is a circular buffer, so write as much data we can at every* iteration and rewind the "idx" index if we reach the limit. */while(len) {size_t thislen = server.repl_backlog_size - server.repl_backlog_idx;if (thislen > len) thislen = len;memcpy(server.repl_backlog+server.repl_backlog_idx,p,thislen);server.repl_backlog_idx += thislen;if (server.repl_backlog_idx == server.repl_backlog_size)server.repl_backlog_idx = 0;len -= thislen;p += thislen;server.repl_backlog_histlen += thislen;}if (server.repl_backlog_histlen > server.repl_backlog_size)server.repl_backlog_histlen = server.repl_backlog_size;/* Set the offset of the first byte we have in the backlog. */server.repl_backlog_off = server.master_repl_offset -server.repl_backlog_histlen + 1;
}
Redis源码分析之PSYNC同步相关推荐
- redis源码分析 -- cs结构之服务器
服务器与客户端是如何交互的 redis客户端向服务器发送命令请求,服务器接收到客户端发送的命令请求之后,读取解析命令,并执行命令,同时将命令执行结果返回给客户端. 客户端与服务器交互的代码流程如下图所 ...
- Redis源码分析:基础概念介绍与启动概述
Redis源码分析 基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容.仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是<Redis设计与 ...
- Redis源码分析(一)redis.c //redis-server.c
Redis源码分析(一)redis.c //redis-server.c 入口函数 int main() 4450 int main(int argc, char **argv) {4451 init ...
- 10年大厂程序员是如何高效学习使用redis的丨redis源码分析丨redis存储原理
10年大厂程序员是怎么学习使用redis的 1. redis存储原理分析 2. redis源码学习分享 3. redis跳表和B+树详细对比分析 视频讲解如下,点击观看: 10年大厂程序员是如何高效学 ...
- ffplay源码分析4-音视频同步
ffplay是FFmpeg工程自带的简单播放器,使用FFmpeg提供的解码器和SDL库进行视频播放.本文基于FFmpeg工程4.1版本进行分析,其中ffplay源码清单如下: https://gith ...
- Redis源码分析(一)--Redis结构解析
从今天起,本人将会展开对Redis源码的学习,Redis的代码规模比较小,非常适合学习,是一份非常不错的学习资料,数了一下大概100个文件左右的样子,用的是C语言写的.希望最终能把他啃完吧,C语言好久 ...
- Redis源码分析 —— 发布与订阅
前言 通过阅读Redis源码,配合GDB和抓包等调试手段,分析Redis发布订阅的实现原理,思考相关问题. 源码版本:Redis 6.0.10 思考问题 发布订阅基本概念介绍 订阅频道 -- SUBS ...
- linux内存源码分析 - 内存压缩(同步关系)
本文为原创,转载请注明:http://www.cnblogs.com/tolimit/ 概述 最近在看内存回收,内存回收在进行同步的一些情况非常复杂,然后就想,不会内存压缩的页面迁移过程中的同步关系也 ...
- Redis 源码分析之故障转移
在 Redis cluster 中故障转移是个很重要的功能,下面就从故障发现到故障转移整个流程做一下详细分析. 故障检测 PFAIL 标记 集群中每个节点都会定期向其他节点发送 PING 消息,以此来 ...
最新文章
- 97页PPT,读懂自动驾驶全产业链发展!
- Java-Java5.0注解解读
- hdu 1443 Joseph
- 1083 矩阵取数问题
- C++之virtual 方法
- android 音乐资源获取失败,android – 获取嵌入式mp3文件嵌入式艺术失败
- bootstrap 点击图片放大查看_Bootstrap 开源 SVG 图标库 Bootstrap Icons
- 云终端要如何和服务器配置起来使用的
- Unity 中 Png转Texture2D再转Sprite
- TCP/IP笔记 三.运输层(3)——TCP超时重传算法
- mybatis --XML 映射配置文件
- php原始 实现双向队列,用PHP实现一个双向队列
- 斐讯k1潘多拉专版固件_斐讯K1、K2路由器刷机Breed/华硕老毛子、梅林、潘多拉/openwrt固件...
- Java学习路线(转)
- 端口扫描工具masscan常用方法和参数
- android ppt 转图片显示不全,ppt转pdf图片显示不全怎么办 这两种方法供您参考
- python敏感字替换_python如何实现敏感词替换
- 训练分类器OpenCV(3.4.1) Error: Assertion failed错误解决
- php 的单例模式(一)
- 审核和发送环节脱离?!看TOP级企业如何解决安全管理问题
热门文章
- redhat各版本和下载地址
- 大数据分析师的沟通技巧
- 如何在word 插入矢量图
- For queries with named parameters you need to use provide names for method parameters. Use @Param fo
- R语言命令行写linux,如何在Linux上编写和使用R脚本
- QT使用log4cpp日志库
- Python开发qq批量登陆
- 10 个超好用的免费开源项目管理软件
- 关于Smalltalk,squeak,scratch的学习体验
- Django使用容联云发送短信验证码时提示:172001,网络错误