目录

1 主从复制模式

2 Sentinel(哨兵)模式

3 Cluster模式

4.参考文档


1 主从复制模式

主库负责读写操作,从库负责数据同步,接受来自主库的同步命令。通过分析Redis的客户端源码(redis.clients.jedis.JedisClusterConnectionHandler#initializeSlotsCache -> redis.clients.jedis.JedisClusterInfoCache#discoverClusterNodesAndSlots方法调用),可以知悉客户端只连接Redis的Master节点进行进行相关的读写命令操作,因此Redis的从库只负责数据的同步,不对外提供读写服务。但是服务端cluster slots命令是返回了副本节点或者说从节点的信息给客户端,说明客户端还是可以选择连接从节点读取数据,服务端并未对此做任何限制。

数据库的主从同步流程如下图,图片来源知乎:https://zhuanlan.zhihu.com/p/145186839。

当从节点启动之后,会向Master节点发送sync命令,可以查看到redis源码server.c文件中({"sync",syncCommand,1,"admin no-script",0,NULL,0,0,0,0,0,0},),Master收到同步请求之后,通过校验之后,会启动BGSAVE命令,通过磁盘文件同步或者Socket方式同步。BGSAVE同步的方式,是通过fork命令,启动子进程通过COW机制进行数据同步。具体的源码如下:

// SYNC 命令的函数入口
/* SYNC and PSYNC command implemenation. */
void syncCommand(client *c) {
// 如果当前节点是从节点,不支持同步命令,直接退出/* ignore SYNC if already slave or in monitor mode */if (c->flags & CLIENT_SLAVE) return;// 对当前节点做检查/* Refuse SYNC requests if we are a slave but the link with our master* is not ok... */if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));return;}/* SYNC can't be issued when the server has pending data to send to* the client about already issued commands. We need a fresh reply* buffer registering the differences between the BGSAVE and the current* dataset, so that we can copy to other slaves if needed. */if (clientHasPendingReplies(c)) {addReplyError(c,"SYNC and PSYNC are invalid with pending output");return;}serverLog(LL_NOTICE,"Replica %s asks for synchronization",replicationGetSlaveName(c));/* Try a partial resynchronization if this is a PSYNC command.* If it fails, we continue with usual full resynchronization, however* when this happens masterTryPartialResynchronization() already* replied with:** +FULLRESYNC <replid> <offset>** So the slave knows the new replid and offset to try a PSYNC later* if the connection with the master is lost. */if (!strcasecmp(c->argv[0]->ptr,"psync")) {if (masterTryPartialResynchronization(c) == C_OK) {server.stat_sync_partial_ok++;return; /* No full resync needed, return. */} else {char *master_replid = c->argv[1]->ptr;/* Increment stats for failed PSYNCs, but only if the* replid is not "?", as this is used by slaves to force a full* resync on purpose when they are not albe to partially* resync. */if (master_replid[0] != '?') server.stat_sync_partial_err++;}} else {/* If a slave uses SYNC, we are dealing with an old implementation* of the replication protocol (like redis-cli --slave). Flag the client* so that we don't expect to receive REPLCONF ACK feedbacks. */c->flags |= CLIENT_PRE_PSYNC;}/* Full resynchronization. */server.stat_sync_full++;/* Setup the slave as one waiting for BGSAVE to start. The following code* paths will change the state if we handle the slave differently. */c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;if (server.repl_disable_tcp_nodelay)connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */c->repldbfd = -1;c->flags |= CLIENT_SLAVE;listAddNodeTail(server.slaves,c);/* Create the replication backlog if needed. */if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {/* When we create the backlog from scratch, we always use a new* replication ID and clear the ID2, since there is no valid* past history. */changeReplicationId();clearReplicationId2();createReplicationBacklog();serverLog(LL_NOTICE,"Replication backlog created, my new ""replication IDs are '%s' and '%s'",server.replid, server.replid2);}// 判断BGSAVE命令的执行情况/* CASE 1: BGSAVE is in progress, with disk target. */if (server.rdb_child_pid != -1 &&server.rdb_child_type == RDB_CHILD_TYPE_DISK){/* Ok a background save is in progress. Let's check if it is a good* one for replication, i.e. if there is another slave that is* registering differences since the server forked to save. */client *slave;listNode *ln;listIter li;listRewind(server.slaves,&li);while((ln = listNext(&li))) {slave = ln->value;if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;}/* To attach this slave, we check that it has at least all the* capabilities of the slave that triggered the current BGSAVE. */if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {/* Perfect, the server is already registering differences for* another slave. Set the right state, and copy the buffer. */copyClientOutputBuffer(c,slave);replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");} else {/* No way, we need to wait for the next BGSAVE in order to* register differences. */serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");}/* CASE 2: BGSAVE is in progress, with socket target. */} else if (server.rdb_child_pid != -1 &&server.rdb_child_type == RDB_CHILD_TYPE_SOCKET){/* There is an RDB child process but it is writing directly to* children sockets. We need to wait for the next BGSAVE* in order to synchronize. */serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");/* CASE 3: There is no BGSAVE is progress. */} else {
// 启动BGSAVE命令进行数据同步到从库if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {/* Diskless replication RDB child is created inside* replicationCron() since we want to delay its start a* few seconds to wait for more slaves to arrive. */if (server.repl_diskless_sync_delay)serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");else
// AOF模式,开启数据同步操作startBgsaveForReplication(c->slave_capa);} else {/* Target is disk (or the slave is not capable of supporting* diskless replication) and we don't have a BGSAVE in progress,* let's start one. */if (!hasActiveChildProcess()) {startBgsaveForReplication(c->slave_capa);} else {serverLog(LL_NOTICE,"No BGSAVE in progress, but another BG operation is active. ""BGSAVE for replication delayed");}}}return;
}// BGSAVE复制模式开启
/* Start a BGSAVE for replication goals, which is, selecting the disk or* socket target depending on the configuration, and making sure that* the script cache is flushed before to start.** The mincapa argument is the bitwise AND among all the slaves capabilities* of the slaves waiting for this BGSAVE, so represents the slave capabilities* all the slaves support. Can be tested via SLAVE_CAPA_* macros.** Side effects, other than starting a BGSAVE:** 1) Handle the slaves in WAIT_START state, by preparing them for a full*    sync if the BGSAVE was successfully started, or sending them an error*    and dropping them from the list of slaves.** 2) Flush the Lua scripting script cache if the BGSAVE was actually*    started.** Returns C_OK on success or C_ERR otherwise. */
int startBgsaveForReplication(int mincapa) {int retval;int socket_target = server.repl_diskless_sync && (mincapa & SLAVE_CAPA_EOF);listIter li;listNode *ln;serverLog(LL_NOTICE,"Starting BGSAVE for SYNC with target: %s",socket_target ? "replicas sockets" : "disk");rdbSaveInfo rsi, *rsiptr;rsiptr = rdbPopulateSaveInfo(&rsi);/* Only do rdbSave* when rsiptr is not NULL,* otherwise slave will miss repl-stream-db. */if (rsiptr) {if (socket_target)
// SOCKET模式进行数据同步retval = rdbSaveToSlavesSockets(rsiptr);elseretval = rdbSaveBackground(server.rdb_filename,rsiptr);} else {serverLog(LL_WARNING,"BGSAVE for replication: replication information not available, can't generate the RDB file right now. Try later.");retval = C_ERR;}// 同步结果处理,并做相关记录/* If we succeeded to start a BGSAVE with disk target, let's remember* this fact, so that we can later delete the file if needed. Note* that we don't set the flag to 1 if the feature is disabled, otherwise* it would never be cleared: the file is not deleted. This way if* the user enables it later with CONFIG SET, we are fine. */if (retval == C_OK && !socket_target && server.rdb_del_sync_files)RDBGeneratedByReplication = 1;/* If we failed to BGSAVE, remove the slaves waiting for a full* resynchronization from the list of slaves, inform them with* an error about what happened, close the connection ASAP. */if (retval == C_ERR) {serverLog(LL_WARNING,"BGSAVE for replication failed");listRewind(server.slaves,&li);while((ln = listNext(&li))) {client *slave = ln->value;if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {slave->replstate = REPL_STATE_NONE;slave->flags &= ~CLIENT_SLAVE;listDelNode(server.slaves,ln);addReplyError(slave,"BGSAVE failed, replication can't continue");slave->flags |= CLIENT_CLOSE_AFTER_REPLY;}}return retval;}/* If the target is socket, rdbSaveToSlavesSockets() already setup* the slaves for a full resync. Otherwise for disk target do it now.*/if (!socket_target) {listRewind(server.slaves,&li);while((ln = listNext(&li))) {client *slave = ln->value;if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());}}}/* Flush the script cache, since we need that slave differences are* accumulated without requiring slaves to match our cached scripts. */if (retval == C_OK) replicationScriptCacheFlush();return retval;
}// 通过fork启动子进程进行数据同步相关操作x
int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {pid_t childpid;if (hasActiveChildProcess()) return C_ERR;server.dirty_before_bgsave = server.dirty;server.lastbgsave_try = time(NULL);openChildInfoPipe();// 通过redisFork方法启动子进程进行数据同步,fork函数返回0表示子进程,非零为父进程返回if ((childpid = redisFork()) == 0) {int retval;// 子进程处理,采用COW方式进行RDB文件生成,然后启动同步操作/* Child */redisSetProcTitle("redis-rdb-bgsave");redisSetCpuAffinity(server.bgsave_cpulist);retval = rdbSave(filename,rsi);if (retval == C_OK) {sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");}exitFromChild((retval == C_OK) ? 0 : 1);} else {
// 父进程只做子进程创建结果处理,并记录日志/* Parent */if (childpid == -1) {closeChildInfoPipe();server.lastbgsave_status = C_ERR;serverLog(LL_WARNING,"Can't save in background: fork: %s",strerror(errno));return C_ERR;}serverLog(LL_NOTICE,"Background saving started by pid %d",childpid);server.rdb_save_time_start = time(NULL);server.rdb_child_pid = childpid;server.rdb_child_type = RDB_CHILD_TYPE_DISK;return C_OK;}return C_OK; /* unreached */
}// 采用fork启动子进程
int redisFork() {int childpid;long long start = ustime();
// 依赖操作系统底层的fork函数进行启动子进程,返回0表示子进程if ((childpid = fork()) == 0) {/* Child */setOOMScoreAdj(CONFIG_OOM_BGCHILD);setupChildSignalHandlers();closeClildUnusedResourceAfterFork();} else {/* Parent */server.stat_fork_time = ustime()-start;server.stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / server.stat_fork_time / (1024*1024*1024); /* GB per second. */latencyAddSampleIfNeeded("fork",server.stat_fork_time/1000);if (childpid == -1) {return -1;}updateDictResizePolicy();}return childpid;
}

主从复制的缺点是发生宕机时,需要人工干预进行切换才可以。由RDB之后,后续的同步个人理解是采用事件监听的机制,实现AOF命令完成数据的近实时性同步。

2 Sentinel(哨兵)模式

哨兵是基于上面的主从复制的模式,增加了一个独立的哨兵进程,哨兵进程会与多个Redis实例保持心跳,监控redis实例的运行状态。多个哨兵与Redis实例,以及哨兵之间保持心跳,当有哨兵实例或者Redis的实例节点宕机时,集群能够及时感知到,并作出相应的主从切换的决策,即当大部分哨兵检测到redis实例异常时,就会进行faiover切换操作,操作根据选出的新的实例节点作为master节点,查看如下源码。对客户端而言,这一切都是透明的。

// sentinel 命令执行方法体
void sentinelCommand(client *c) {
//... ...else if (!strcasecmp(c->argv[1]->ptr,"failover")) {
// 发生故障时,执行failover命令操作/* SENTINEL FAILOVER <master-name> */sentinelRedisInstance *ri;if (c->argc != 3) goto numargserr;
// 故障切换,检查入参对应的redis实例if ((ri = sentinelGetMasterByNameOrReplyError(c,c->argv[2])) == NULL)return;if (ri->flags & SRI_FAILOVER_IN_PROGRESS) {addReplySds(c,sdsnew("-INPROG Failover already in progress\r\n"));return;}
// 从众多的slave中选取指定实例节点作为master节点,ri为redis的实例if (sentinelSelectSlave(ri) == NULL) {addReplySds(c,sdsnew("-NOGOODSLAVE No suitable replica to promote\r\n"));return;}serverLog(LL_WARNING,"Executing user requested FAILOVER of '%s'",ri->name);
// 将选中的ri节点设置为master节点,处理切换操作sentinelStartFailover(ri);ri->flags |= SRI_FORCE_FAILOVER;addReply(c,shared.ok);}
// ... ...
}// 根据入参查找对应的redis节点
/* Lookup the named master into sentinel.masters.* If the master is not found reply to the client with an error and returns* NULL. */
sentinelRedisInstance *sentinelGetMasterByNameOrReplyError(client *c,robj *name)
{sentinelRedisInstance *ri;ri = dictFetchValue(sentinel.masters,name->ptr);if (!ri) {addReplyError(c,"No such master with that name");return NULL;}return ri;
}

3 Cluster模式

redis在3.0以后的版本中加入了cluster模式,实现了redis的分布式存储,也就是说每台redis节点存放不同的内容。Redis集群没有使用一致性hash,而是采用了哈希槽(hash solt 16384个)的概念,通过对Redis的key进行CRC16校验(限定在key的“{XXX}”以内,没有大括号就是整个key),然后对槽取模(CRC16(key) & (16384-1))来决定放到哪个槽中。而集群的每个节点只是负责一部分hash槽。

官方文档说明:

"Redis集群不是使用一致性哈希,而是使用哈希槽。整个redis集群有16384个哈希槽,决定一个key应该分配到那个槽的算法是:计算该key的CRC16结果再模16834。

这样的分布方式方便节点的添加和删除。比如,需要新增一个节点D,只需要把A、B、C中的部分哈希槽数据移到D节点。同样,如果希望在集群中删除A节点,只需要把A节点的哈希槽的数据移到B和C节点,当A节点的数据全部被移走后,A节点就可以完全从集群中删除。

因为把哈希槽从一个节点移到另一个节点是不需要停机的,所以,增加或删除节点,或更改节点上的哈希槽,也是不需要停机的。

集群支持通过一个命令(或事务, 或lua脚本)同时操作多个key。通过"哈希标签"的概念,用户可以让多个key分配到同一个哈希槽。哈希标签在集群详细文档中有描述,这里做个简单介绍:如果key含有大括号"{}",则只有大括号中的字符串会参与哈希,比如"this{foo}"和"another{foo}"这2个key会分配到同一个哈希槽,所以可以在一个命令中同时操作他们。"

在Redis集群模式下,参考Redis的客户端如何获取对应的卡槽信息,参考源码如下。客户端通过cluster slots命令从redis服务端中获取卡槽的信息,然后构建本地卡槽与节点的存储数据。

// 初始化或者重连的时候,从jedis服务端获取卡槽信息
// redis.clients.jedis.JedisClusterInfoCache#discoverClusterNodesAndSlots
public void discoverClusterNodesAndSlots(Jedis jedis) {w.lock();try {reset();
// 通过 cluster slots 命令获取卡槽的信息List<Object> slots = jedis.clusterSlots();for (Object slotInfoObj : slots) {List<Object> slotInfo = (List<Object>) slotInfoObj;if (slotInfo.size() <= MASTER_NODE_INDEX) {continue;}// 获取卡槽的信息List<Integer> slotNums = getAssignedSlotArray(slotInfo);// hostInfosint size = slotInfo.size();for (int i = MASTER_NODE_INDEX; i < size; i++) {List<Object> hostInfos = (List<Object>) slotInfo.get(i);if (hostInfos.size() <= 0) {continue;}HostAndPort targetNode = generateHostAndPort(hostInfos);setupNodeIfNotExist(targetNode);
// 如果是master节点,才添加到卡槽的列表中,意味着只有master节点对应用提供读写服务if (i == MASTER_NODE_INDEX) {assignSlotsToNode(slotNums, targetNode);}}}} finally {w.unlock();}}

客户端在发起redis读写操作时,需要获取集群对应的卡槽和节点信息,然后发送命令,具体执行如下源码。首先,根据key采用CRC16算法进行查表运算,然后与0xFFFF进行取模运算,再与(16384-1)与操作相当于对16384 (十六进制0x4000)进行取模运算,获取低位值即对应的卡槽位。注意这里16384是redis服务端集群模式下虚拟出来的卡槽数。获取对应的slot之后,在runWithRetries方法中,根据slot获取对应的redis服务端节点信息的connection,然后调用execute方法进行执行命令。这里的连接信息依赖初始化的时候,通过cluster slots命令从服务端获取卡槽与节点的映射关系,解析到客户端本地存储。如果服务端发生了节点迁移,此时客户端调用会抛出JedisRedirectionException异常信息,此时依赖客户端重新获取最新的卡槽slot与节点映射即依赖redis.clients.jedis.JedisClusterConnectionHandler#renewSlotCache(redis.clients.jedis.Jedis)方法,然后进行重试命令。命令执行完成之后,释放连接,socket关闭。

// 以集群模式的get方法为例进行分析
// 方法入口 redis.clients.jedis.JedisCluster#get@Overridepublic String get(final String key) {return new JedisClusterCommand<String>(connectionHandler, maxAttempts) {@Overridepublic String execute(Jedis connection) {return connection.get(key);}}.run(key);}// 查看run方法 redis.clients.jedis.JedisClusterCommand#run(java.lang.String)public T run(String key) {if (key == null) {throw new JedisClusterException("No way to dispatch this command to Redis Cluster.");}// 对key进行CRC16进行哈希处理,获取key所在的slotreturn runWithRetries(JedisClusterCRC16.getSlot(key), this.maxAttempts, false, false);}// 获取key对应的slot方法, redis.clients.util.JedisClusterCRC16#getSlot(java.lang.String)public static int getSlot(String key) {
// 截取key的大括号{tag}之间的内容,进行哈希计算key = JedisClusterHashTagUtil.getHashTag(key);// optimization with modulo operator with power of 2// equivalent to getCRC16(key) % 16384return getCRC16(key) & (16384 - 1);}/*** Create a CRC16 checksum from the bytes. implementation is from mp911de/lettuce, modified with* some more optimizations* @param bytes* @return CRC16 as integer value See <a*         href="https://github.com/xetorthio/jedis/pull/733#issuecomment-55840331">Issue 733</a>*/
// 这里对key的每一个字节,通过查表的方式,计算对应的crc值,然后对0xFFFF卡槽数取模
// redis.clients.util.JedisClusterCRC16#getCRC16(byte[], int, int)public static int getCRC16(byte[] bytes, int s, int e) {int crc = 0x0000;for (int i = s; i < e; i++) {crc = ((crc << 8) ^ LOOKUP_TABLE[((crc >>> 8) ^ (bytes[i] & 0xFF)) & 0xFF]);}return crc & 0xFFFF;}public static int getCRC16(byte[] bytes) {return getCRC16(bytes, 0, bytes.length);}// redis.clients.util.JedisClusterCRC16#getCRC16(java.lang.String)public static int getCRC16(String key) {byte[] bytesKey = SafeEncoder.encode(key);return getCRC16(bytesKey, 0, bytesKey.length);}// 获取key对应的tag
public final class JedisClusterHashTagUtil {private JedisClusterHashTagUtil() {throw new InstantiationError("Must not instantiate this class");}public static String getHashTag(String key) {return extractHashTag(key, true);}public static boolean isClusterCompliantMatchPattern(String matchPattern) {String tag = extractHashTag(matchPattern, false);return tag != null && !tag.isEmpty();}// 只截取打括号之间的内容,如果有的话就截取括号之间的,如果没有大括号就直接返回private static String extractHashTag(String key, boolean returnKeyOnAbsence) {int s = key.indexOf("{");if (s > -1) {int e = key.indexOf("}", s + 1);if (e > -1 && e != s + 1) {return key.substring(s + 1, e);}}return returnKeyOnAbsence ? key : null;}
}// 上述代码均为获取key的对应的卡槽的过程,下面来看执行的过程
// 方法入口:redis.clients.jedis.JedisClusterCommand#runWithRetries
private T runWithRetries(final int slot, int attempts, boolean tryRandomNode, boolean asking) {if (attempts <= 0) {throw new JedisClusterMaxRedirectionsException("Too many Cluster redirections?");}Jedis connection = null;try {if (asking) {// TODO: Pipeline asking with the original command to make it// faster....connection = askConnection.get();connection.asking();// if asking success, reset asking flagasking = false;} else {if (tryRandomNode) {connection = connectionHandler.getConnection();} else {// 从卡槽中获取对应的连接,参考下一个方法connection = connectionHandler.getConnectionFromSlot(slot);}}// 在对应的connection上执行最开始的execute方法,即get通过socket发送到服务端然后获取结果数据return execute(connection);} catch (JedisNoReachableClusterNodeException jnrcne) {throw jnrcne;} catch (JedisConnectionException jce) {
// 连接异常,然后重新连接并重试// release current connection before recursionreleaseConnection(connection);connection = null;if (attempts <= 1) {//We need this because if node is not reachable anymore - we need to finally initiate slots renewing,//or we can stuck with cluster state without one node in opposite case.//But now if maxAttempts = 1 or 2 we will do it too often. For each time-outed request.//TODO make tracking of successful/unsuccessful operations for node - do renewing only//if there were no successful responses from this node last few secondsthis.connectionHandler.renewSlotCache();}return runWithRetries(slot, attempts - 1, tryRandomNode, asking);} catch (JedisRedirectionException jre) {
// 发生了重定向异常,即服务端卡槽对应的节点发生了变更时,需要重新获取slots的信息,构建新的连接重新执行,即释放当前连接,构建新的slot-节点关系// if MOVED redirection occurred,if (jre instanceof JedisMovedDataException) {// it rebuilds cluster's slot cache// recommended by Redis cluster specificationthis.connectionHandler.renewSlotCache(connection);}// release current connection before recursion or renewingreleaseConnection(connection);connection = null;if (jre instanceof JedisAskDataException) {asking = true;askConnection.set(this.connectionHandler.getConnectionFromNode(jre.getTargetNode()));} else if (jre instanceof JedisMovedDataException) {} else {throw new JedisClusterException(jre);}return runWithRetries(slot, attempts - 1, false, asking);} finally {releaseConnection(connection);}}// 根据slot获取连接 redis.clients.jedis.JedisSlotBasedConnectionHandler#getConnectionFromSlot@Overridepublic Jedis getConnectionFromSlot(int slot) {JedisPool connectionPool = cache.getSlotPool(slot);if (connectionPool != null) {// It can't guaranteed to get valid connection because of node// assignmentreturn connectionPool.getResource();} else {
// 如果没有对应的连接,重新建立slot与节点的映射,回溯到小节上一部分源码,参考类:
// redis.clients.jedis.JedisClusterInfoCacherenewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover stateconnectionPool = cache.getSlotPool(slot);if (connectionPool != null) {return connectionPool.getResource();} else {//no choice, fallback to new connection to random nodereturn getConnection();}}}

下面解析一下,服务端接受来自客户端获取卡槽信息的命令。客户端获取卡槽信息时,发送cluster slots命令到服务端,服务端执行逻辑如下源码。

// 服务端server.c文件中 redisCommandTable 全局变量存放了所有支持的命令{"cluster",clusterCommand,-2,"admin ok-stale random",0,NULL,0,0,0,0,0,0},// 查看clusterCommand方法执行源码
void clusterCommand(client *c) {
// 如果不是cluster模式,直接退出if (server.cluster_enabled == 0) {addReplyError(c,"This instance has cluster support disabled");return;}
......} else if (!strcasecmp(c->argv[1]->ptr,"slots") && c->argc == 2) {/* CLUSTER SLOTS */
// 这里通过 cluster slots命令,获取卡槽信息,写入到client变量c中clusterReplyMultiBulkSlots(c);} else if (!strcasecmp(c->argv[1]->ptr,"flushslots") && c->argc == 2) {
......}// 查看clusterReplyMultiBulkSlots源码,服务端返回集群的卡槽的起始-终止位置对应的主节点,副本信息
void clusterReplyMultiBulkSlots(client *c) {/* Format: 1) 1) start slot*            2) end slot*            3) 1) master IP*               2) master port*               3) node ID*            4) 1) replica IP*               2) replica port*               3) node ID*           ... continued until done*/int num_masters = 0;void *slot_replylen = addReplyDeferredLen(c);dictEntry *de;dictIterator *di = dictGetSafeIterator(server.cluster->nodes);while((de = dictNext(di)) != NULL) {clusterNode *node = dictGetVal(de);int j = 0, start = -1;int i, nested_elements = 0;/* Skip slaves (that are iterated when producing the output of their* master) and  masters not serving any slot. */if (!nodeIsMaster(node) || node->numslots == 0) continue;for(i = 0; i < node->numslaves; i++) {if (nodeFailed(node->slaves[i])) continue;nested_elements++;}// 这里定义了卡槽数 #define CLUSTER_SLOTS 16384for (j = 0; j < CLUSTER_SLOTS; j++) {int bit, i;if ((bit = clusterNodeGetSlotBit(node,j)) != 0) {if (start == -1) start = j;}if (start != -1 && (!bit || j == CLUSTER_SLOTS-1)) {addReplyArrayLen(c, nested_elements + 3); /* slots (2) + master addr (1). */if (bit && j == CLUSTER_SLOTS-1) j++;/* If slot exists in output map, add to it's list.* else, create a new output map for this slot */if (start == j-1) {addReplyLongLong(c, start); /* only one slot; low==high */addReplyLongLong(c, start);} else {addReplyLongLong(c, start); /* low */addReplyLongLong(c, j-1);   /* high */}start = -1;/* First node reply position is always the master */addReplyArrayLen(c, 3);addReplyBulkCString(c, node->ip);addReplyLongLong(c, node->port);addReplyBulkCBuffer(c, node->name, CLUSTER_NAMELEN);/* Remaining nodes in reply are replicas for slot range */for (i = 0; i < node->numslaves; i++) {/* This loop is copy/pasted from clusterGenNodeDescription()* with modifications for per-slot node aggregation */if (nodeFailed(node->slaves[i])) continue;addReplyArrayLen(c, 3);addReplyBulkCString(c, node->slaves[i]->ip);addReplyLongLong(c, node->slaves[i]->port);addReplyBulkCBuffer(c, node->slaves[i]->name, CLUSTER_NAMELEN);}num_masters++;}}}dictReleaseIterator(di);setDeferredArrayLen(c, slot_replylen, num_masters);
}

关于redis客户端,如果是集群模式,节点执行命令,发现卡槽slot迁移,异常处理源码如下,以jedis为主:

// 这里执行redis命令,发生了slot迁移,如下示例
127.0.0.1:6379> SET key:{test}:555 value:test:555
-> Redirected to slot [6918] located at 127.0.0.1:6380
OK
127.0.0.1:6380> SET key:{test}:666 value:test:666
OK// jedis源码分析,查看set源码:redis.clients.jedis.Jedis#set@Overridepublic String set(final String key, final String value, final String nxxx, final String expx,final long time) {checkIsInMultiOrPipeline();client.set(key, value, nxxx, expx, time);return client.getStatusCodeReply();}// 查看client.getStatusCodeReply()方法源码,层层跟进获取响应结果数据
// redis.clients.jedis.Connection#readProtocolWithCheckingBrokenprotected Object readProtocolWithCheckingBroken() {try {return Protocol.read(inputStream);} catch (JedisConnectionException exc) {broken = true;throw exc;}}// 通过read方法读取流信息 redis.clients.jedis.Protocol#processprivate static Object process(final RedisInputStream is) {final byte b = is.readByte();switch(b) {case PLUS_BYTE:return processStatusCodeReply(is);case DOLLAR_BYTE:return processBulkReply(is);case ASTERISK_BYTE:return processMultiBulkReply(is);case COLON_BYTE:return processInteger(is);
// 重定向匹配这个选项case MINUS_BYTE:processError(is);return null;default:throw new JedisConnectionException("Unknown reply: " + (char) b);}}// 发生重定向时,MOVED返回,参考源码 redis.clients.jedis.Protocol#processErrorprivate static void processError(final RedisInputStream is) {String message = is.readLine();// TODO: I'm not sure if this is the best way to do this.// Maybe Read only first 5 bytes instead?if (message.startsWith(MOVED_RESPONSE)) {
// 当服务端发生了数据迁移,slot信息变更,会抛出该异常String[] movedInfo = parseTargetHostAndSlot(message);throw new JedisMovedDataException(message, new HostAndPort(movedInfo[1],Integer.parseInt(movedInfo[2])), Integer.parseInt(movedInfo[0]));} else if (message.startsWith(ASK_RESPONSE)) {String[] askInfo = parseTargetHostAndSlot(message);throw new JedisAskDataException(message, new HostAndPort(askInfo[1],Integer.parseInt(askInfo[2])), Integer.parseInt(askInfo[0]));} else if (message.startsWith(CLUSTERDOWN_RESPONSE)) {throw new JedisClusterException(message);} else if (message.startsWith(BUSY_RESPONSE)) {throw new JedisBusyException(message);} else if (message.startsWith(NOSCRIPT_RESPONSE) ) {throw new JedisNoScriptException(message);}throw new JedisDataException(message);}

4.参考文档

1.https://zhuanlan.zhihu.com/p/145186839

2.redis源码

3.https://www.redis.com.cn/topics/cluster-tutorial.html#id2

Redis集群模式源码分析相关推荐

  1. 【作者面对面问答】包邮送《Redis 5设计与源码分析》5本

    墨墨导读:本文节选自<Redis 5设计与源码分析>,主要为读者分析Redis高性能内幕,重点从源码层次讲解了Redis事件模型,网络IO事件重在使用IO复用模型,时间事件重在限制最大执行 ...

  2. 霸榜巨作、阿里内部顶级专家整理(Redis 5设计与源码分析)

    前言 在开源界,高性能服务的典型代表就是Nginx和Redis.纵观这两个软件的源码,都是非常简洁高效的,也都是基于异步网络I/O机制的,所以对于要学习高性能服务的程序员或者爱好者来说,研究这两个网络 ...

  3. 新书推荐 |《Redis 5设计与源码分析》

    新书推荐 <Redis 5设计与源码分析> 点击上图了解及购买 好未来.滴滴.百度等公司专家联合撰写,掌握Redis 5设计与命令实现,透彻掌握分布式缓存. 编辑推荐 多名专家联袂推荐,资 ...

  4. 深入剖析Redis系列(三) - Redis集群模式搭建与原理详解

    前言 在 Redis 3.0 之前,使用 哨兵(sentinel)机制来监控各个节点之间的状态.Redis Cluster 是 Redis 的 分布式解决方案,在 3.0 版本正式推出,有效地解决了 ...

  5. SRS流媒体服务器——Forward集群搭建和源码分析

    SRS流媒体服务器--Forward集群搭建和源码分析 目录 Forward集群原理 RTMP流转发(Forward)部署实例 Forward集群源码分析 1. Forward集群原理 Forward ...

  6. Tomcat集群实现源码级别剖析

    随着互联网快速发展,各种各样供外部访问的系统越来越多且访问量越来越大,以前Web容器可以包揽接收-逻辑处理-响应整个请求生命周期的工作,现在为了构建让更多用户访问更强大的系统,人们通过不断地业务解耦. ...

  7. Redis分布式锁解析源码分析

    Redis分布式锁解析&源码分析 概述 实战 简单的分布式锁 Redisson实现分布式锁 Redission源码分析 构造方法 获取锁lock 解锁 锁失效 红锁 案例分析 原始的写法 进化 ...

  8. Redis数据库(四)——Redis集群模式(主从复制、哨兵、Cluster)

    Redis数据库(四)--Redis集群模式(主从复制.哨兵.Cluster) 一.Redis主从复制 1.主从复制流程 二.哨兵模式 1.哨兵模式集群架构 2.哨兵模式主要功能 3.哨兵监控整个系统 ...

  9. Redis:Redis集群模式(Cluster)原理

    1.前言 由于Redis主从复制模式和Redis哨兵模式采用的都是复制Master节点的数据,实现读写分离.但是这种设计存在一个严重的问题,它没有真正意义上实现数据分片.两个模式都有一个问题,不能水平 ...

最新文章

  1. nginx下的session一致性
  2. keepalive+nginx实现负载均衡高可用_超详细的LVS+keepalived+nginx实现高性能高可用负载均衡集群教程...
  3. GridView常用总结
  4. 重温WCF之会话Session(九)
  5. Qt中的QLabel组件
  6. PHP的serialize与json_encode
  7. AAAI 2019 四个杰出论文奖论文揭晓
  8. android layer阴影,Android Layer-List实现自定义Shape阴影
  9. 还敢吹「毫无PS痕迹」?小心被Adobe官方AI打脸
  10. Vue简单入门及组件的简单使用
  11. 吉大20春学期计算机系统结构在线作业一,吉大20春学期《计算机原理及系统结构》在线作业一【奥鹏百分答案】...
  12. MySQL 事务的实现原理,写得太好了!
  13. IPhone开发从零开始之1-构思你的产品
  14. Stack with max and min 查找栈中最大最小数
  15. HP ProLiant DL380 Gen9 升级到 ESXi 7.0 U3
  16. Sling CMS 学习:环境搭建(一)
  17. TX2 4.6.1 全部软件环境刷机要点
  18. Win11系统/RTX30系列显卡——安装gpu版pytorch
  19. 推荐8个堪称神器的网站!
  20. v-model的radio checkbox以及键盘修饰符使用

热门文章

  1. tp管理界面找不到服务器,TP-LINK路由器无法登陆管理界面的解决办法
  2. 医疗知识图谱问答系统探究(一)
  3. isl导致编译gcc-8.1出现错误
  4. 读书笔记:《启示录》
  5. 泛微oa数据库之查询流程批次条件、出口条件
  6. 最全前端面试问题及答案总结(转载自hawx1993)
  7. C++中try--catch用法
  8. 近期接触到大数据业务,速上地图采集器图文教程仅供参考
  9. IDEA 阿里巴巴代码规范检查插件使用
  10. 数据结构——树 (知识点思维导图)