Redis源码分析

基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容。仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是《Redis设计与实现》一书,浏览学习该书之后,觉得相关内容还是觉得抽象的高度比较高,故对照着代码再阅读学习一下。

Redis的数据结构

官网上的数据结构的介绍

  1. 二进制安全的字符串,二进制安全的意思大家可以自行查阅定义,在redis中的字符串安全,主要就是在C语言中的字符串的长度判断与溢出的判断都是通过单独维护一个len字段来进行字符串的相关长度检查,并没有直接依靠C语言的字符串数组来进行检查。
  2. 列表,根据插入顺序的字符串列表,主要就是通过链表来实现。
  3. 集合,主要就是不重复的不排序的字符串集合。
  4. 有序集合,与Sets类似,但是每个字符串元素都与一个称为score的浮点值相关联。 元素总是按它们的分数排序,因此与Sets不同,可以检索一系列元素(例如,您可能会问:给我前10名或后10名)。
  5. 哈希字典,是由与值相关联的字段组成的map字段和值都是字符串,这与Ruby或Python哈希非常相似。
  6. 位数组,可以使用特殊命令像位数组一样处理字符串值:您可以设置和清除单个位,计数所有设置为1的位,找到第一个设置或未设置的位,依此类推。
  7. HyperLogLogs,这是一个概率数据结构,用于估计集合的基数。
  8. 流:提供抽象日志数据类型的类地图项的仅追加集合。

这基本上就是官网上介绍的八种数据结构,其中流这种数据结构是版本5开始添加进来的主要就是完善对日志流的相关操作的补充。

Redis的工作模式与初始化流程

Redis服务端的工作模式,主要都是Reactor模式,该模式主要就是被称为文件事件处理方式,主线程通过使用IO多路复用技术来同事监听多个套接字,并根据套接字目前执行的任务来为套接字处理不同的事件处理器,当被监听的套接字准备好接受连接(accept)、读取(read)、写入(write)或者关闭(close)时都会操作相对应的文件事件,这样就达到了单线程来处理大量客户端请求的情况,Redis服务端选择了单线程的Reactor模式,即所有的业务的处理都在一个线程中来进行,单线程的响应模式也保证了Redis执行某些命令所需要的原子操作,但是Redis服务器本身中也会出现会需要一些其他线程或者进程来做的事情,但是这些都是为了支持一些服务器本身的如备份等其他功能操作。

接下来就通过Redis服务端的初始化过程简单的概述一下Redis的工作模式。

server的main函数
int main(int argc, char **argv) {struct timeval tv;int j;#ifdef REDIS_TEST                                           // 是否是测试if (argc == 3 && !strcasecmp(argv[1], "test")) {if (!strcasecmp(argv[2], "ziplist")) {return ziplistTest(argc, argv);} else if (!strcasecmp(argv[2], "quicklist")) {quicklistTest(argc, argv);} else if (!strcasecmp(argv[2], "intset")) {return intsetTest(argc, argv);} else if (!strcasecmp(argv[2], "zipmap")) {return zipmapTest(argc, argv);} else if (!strcasecmp(argv[2], "sha1test")) {return sha1Test(argc, argv);} else if (!strcasecmp(argv[2], "util")) {return utilTest(argc, argv);} else if (!strcasecmp(argv[2], "sds")) {return sdsTest(argc, argv);} else if (!strcasecmp(argv[2], "endianconv")) {return endianconvTest(argc, argv);} else if (!strcasecmp(argv[2], "crc64")) {return crc64Test(argc, argv);} else if (!strcasecmp(argv[2], "zmalloc")) {return zmalloc_test(argc, argv);}return -1; /* test not found */}
#endif/* We need to initialize our libraries, and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENTspt_init(argc, argv);
#endifsetlocale(LC_COLLATE,"");                                   // 设置地域信息tzset(); /* Populates 'timezone' global. */           zmalloc_set_oom_handler(redisOutOfMemoryHandler);          // 设置超过内存处理函数srand(time(NULL)^getpid());                                // 随机数产生器的初始值gettimeofday(&tv,NULL);                                    // 获取时间char hashseed[16];getRandomHexChars(hashseed,sizeof(hashseed));dictSetHashFunctionSeed((uint8_t*)hashseed);               // 设置hash函数的值server.sentinel_mode = checkForSentinelMode(argc,argv);    // 检查是否启动的是哨兵模式initServerConfig();                                         // 初始化服务器配置moduleInitModulesSystem();                                  // 加载扩展的模块/* Store the executable path and arguments in a safe place in order* to be able to restart the server later. */server.executable = getAbsolutePath(argv[0]);               // 保存可执行文件server.exec_argv = zmalloc(sizeof(char*)*(argc+1));         // 申请内存保存输入的参数server.exec_argv[argc] = NULL;for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);      // 保存输入的参数,/* We need to init sentinel right now as parsing the configuration file* in sentinel mode will have the effect of populating the sentinel* data structures with master nodes to monitor. */if (server.sentinel_mode) {                                                 // 如果是哨兵模式则初始化哨兵模式配置initSentinelConfig();initSentinel();}/* Check if we need to start in redis-check-rdb/aof mode. We just execute* the program main. However the program is part of the Redis executable* so that we can easily execute an RDB check on loading errors. */if (strstr(argv[0],"redis-check-rdb") != NULL)                              // 检查是否包含了redis-check-rdb参数 如果有则检查rdb文件是否正确redis_check_rdb_main(argc,argv,NULL);else if (strstr(argv[0],"redis-check-aof") != NULL)                         // 检查是否包括了aof文件的检查 如果有则检查aof文件redis_check_aof_main(argc,argv);if (argc >= 2) {                                                            // 如果输入参数多余等于两个  则检查是否包含如下的输入参数j = 1; /* First option to parse in argv[] */sds options = sdsempty();char *configfile = NULL;/* Handle special options --help and --version */if (strcmp(argv[1], "-v") == 0 ||strcmp(argv[1], "--version") == 0) version();if (strcmp(argv[1], "--help") == 0 ||strcmp(argv[1], "-h") == 0) usage();if (strcmp(argv[1], "--test-memory") == 0) {if (argc == 3) {memtest(atoi(argv[2]),50);exit(0);} else {fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");exit(1);}}/* First argument is the config file name? */if (argv[j][0] != '-' || argv[j][1] != '-') {configfile = argv[j];server.configfile = getAbsolutePath(configfile);/* Replace the config file in server.exec_argv with* its absolute path. */zfree(server.exec_argv[j]);server.exec_argv[j] = zstrdup(server.configfile);j++;}/* All the other options are parsed and conceptually appended to the* configuration file. For instance --port 6380 will generate the* string "port 6380\n" to be parsed after the actual file name* is parsed, if any. */while(j != argc) {if (argv[j][0] == '-' && argv[j][1] == '-') {/* Option name */if (!strcmp(argv[j], "--check-rdb")) {/* Argument has no options, need to skip for parsing. */j++;continue;}if (sdslen(options)) options = sdscat(options,"\n");options = sdscat(options,argv[j]+2);options = sdscat(options," ");} else {/* Option argument */options = sdscatrepr(options,argv[j],strlen(argv[j]));options = sdscat(options," ");}j++;}if (server.sentinel_mode && configfile && *configfile == '-') {serverLog(LL_WARNING,"Sentinel config from STDIN not allowed.");serverLog(LL_WARNING,"Sentinel needs config file on disk to save state.  Exiting...");exit(1);}resetServerSaveParams();loadServerConfig(configfile,options);sdsfree(options);}serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");serverLog(LL_WARNING,"Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started",REDIS_VERSION,(sizeof(long) == 8) ? 64 : 32,redisGitSHA1(),strtol(redisGitDirty(),NULL,10) > 0,(int)getpid());             // 输出运行的基本信息if (argc == 1) {serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");} else {serverLog(LL_WARNING, "Configuration loaded");    // 检查是否输入指定的输入参数}server.supervised = redisIsSupervised(server.supervised_mode);    // 是否是supervised模式int background = server.daemonize && !server.supervised;          // 是否是守护进程模式if (background) daemonize();                                      // 如果是守护进程模式则启动守护进程模式initServer();                                                     // 初始化服务端if (background || server.pidfile) createPidFile();                // 是否是后台启动 并且是否配置了Pid文件路径  如果有则创建Pid文件redisSetProcTitle(argv[0]);redisAsciiArt();checkTcpBacklogSettings();                                        // 检查tcp相关配置if (!server.sentinel_mode) {                                      // 是否是哨兵模式/* Things not needed when running in Sentinel mode. */serverLog(LL_WARNING,"Server initialized");#ifdef __linux__linuxMemoryWarnings();#endifmoduleLoadFromQueue();loadDataFromDisk();if (server.cluster_enabled) {if (verifyClusterConfigWithData() == C_ERR) {serverLog(LL_WARNING,"You can't have keys in a DB different than DB 0 when in ""Cluster mode. Exiting.");exit(1);}}if (server.ipfd_count > 0)serverLog(LL_NOTICE,"Ready to accept connections");if (server.sofd > 0)serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);} else {sentinelIsRunning();}/* Warning the user about suspicious maxmemory setting. */if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {      // 检查最大内容的大小  是否打印警告信息serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);}aeSetBeforeSleepProc(server.el,beforeSleep);         // 设置beforeSleep的回调函数aeSetAfterSleepProc(server.el,afterSleep);           // 设置afterSleep的回调函数aeMain(server.el);                                   // 启动事件循环aeDeleteEventLoop(server.el);                        // 事件循环结束删除配置文件中的事件return 0;
}

从main函数的执行流程可知,主要就是进行了相关的输入参数的检查与服务器端的启动模式之间的检查,根据不同的输入参数启动不同的方式来工作,在整个流程中的启动流程中,比较主要的过程就是initServer的初始化流程与aeMain函数的执行过程。

initServer函数的概述
void initServer(void) {int j;signal(SIGHUP, SIG_IGN);          signal(SIGPIPE, SIG_IGN);setupSignalHandlers();              // 注册信号处理函数if (server.syslog_enabled) {       // 是否打开syslogopenlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,server.syslog_facility);}server.hz = server.config_hz;server.pid = getpid();               // 获取当前运行的进程号server.current_client = NULL;        // 记录当前执行命令的客户端server.clients = listCreate();        // 当前所有连接的客户端  通过一个列表来保存server.clients_index = raxNew();         server.clients_to_close = listCreate();   // 获取一个需要关闭的clients列表server.slaves = listCreate();             // 生成一个从节点的列表server.monitors = listCreate();           // 生成一个监视器的列表server.clients_pending_write = listCreate();    server.slaveseldb = -1; /* Force to emit the first SELECT command. */server.unblocked_clients = listCreate();           // 生成一个不阻塞的客户端列表server.ready_keys = listCreate();                   server.clients_waiting_acks = listCreate();server.get_ack_from_slaves = 0;server.clients_paused = 0;server.system_memory_size = zmalloc_get_memory_size();    // 系统的内存大小createSharedObjects();                                      // 创建objectsadjustOpenFilesLimit();server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);   // 创建事件循环if (server.el == NULL) {serverLog(LL_WARNING,"Failed creating the event loop. Error message: '%s'",strerror(errno));exit(1);}server.db = zmalloc(sizeof(redisDb)*server.dbnum);                   // 初始化数据库数量/* Open the TCP listening socket for the user commands. */if (server.port != 0 &&listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)    // 监听端口exit(1);/* Open the listening Unix domain socket. */if (server.unixsocket != NULL) {                                         // 如果unix套接字配置不为空则创建原始套接字接口unlink(server.unixsocket); /* don't care if this fails */server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.tcp_backlog);if (server.sofd == ANET_ERR) {serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);exit(1);}anetNonBlock(NULL,server.sofd);}/* Abort if there are no listening sockets at all. */if (server.ipfd_count == 0 && server.sofd < 0) {serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");exit(1);}/* Create the Redis databases, and initialize other internal state. */              // 初始化每个数据库for (j = 0; j < server.dbnum; j++) {server.db[j].dict = dictCreate(&dbDictType,NULL);                               // 数据库的字典server.db[j].expires = dictCreate(&keyptrDictType,NULL);                        // 过期字典server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);                 // 阻塞的key列表server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);     server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);                  // 观察的key字典server.db[j].id = j;server.db[j].avg_ttl = 0;server.db[j].defrag_later = listCreate();}evictionPoolAlloc(); /* Initialize the LRU keys pool. */                            // 注册LRU缓冲池server.pubsub_channels = dictCreate(&keylistDictType,NULL);                         // 初始化发布订阅的字典server.pubsub_patterns = listCreate();                                              // 发布订阅的列表listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);                        // 注册相关的释放与匹配函数listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);server.cronloops = 0;server.rdb_child_pid = -1;                                                          // rdb子进程server.aof_child_pid = -1;                                                          // aof子进程server.rdb_child_type = RDB_CHILD_TYPE_NONE;server.rdb_bgsave_scheduled = 0;server.child_info_pipe[0] = -1;                                                     // 通信管道server.child_info_pipe[1] = -1;server.child_info_data.magic = 0;aofRewriteBufferReset();server.aof_buf = sdsempty();                                                        // aof缓存server.lastsave = time(NULL); /* At startup we consider the DB saved. */server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */server.rdb_save_time_last = -1;server.rdb_save_time_start = -1;server.dirty = 0;resetServerStats();/* A few stats we don't want to reset: server startup time, and peak mem. */server.stat_starttime = time(NULL);                                                 // 开始时间server.stat_peak_memory = 0;server.stat_rdb_cow_bytes = 0;server.stat_aof_cow_bytes = 0;server.cron_malloc_stats.zmalloc_used = 0;server.cron_malloc_stats.process_rss = 0;server.cron_malloc_stats.allocator_allocated = 0;server.cron_malloc_stats.allocator_active = 0;server.cron_malloc_stats.allocator_resident = 0;server.lastbgsave_status = C_OK;server.aof_last_write_status = C_OK;server.aof_last_write_errno = 0;server.repl_good_slaves_count = 0;/* Create the timer callback, this is our way to process many background* operations incrementally, like clients timeout, eviction of unaccessed* expired keys and so forth. */if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {           // 创建事件事件serverPanic("Can't create event loop timers.");exit(1);}/* Create an event handler for accepting new connections in TCP and Unix* domain sockets. */for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR)                                         // 创建事件监听的接受事件,回调函数为acceptTcpHandler                   {serverPanic("Unrecoverable error creating server.ipfd file event.");}}if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,                // 创建接受unix原始套接字的接受事件acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");/* Register a readable event for the pipe used to awake the event loop* when a blocked client in a module needs attention. */if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,moduleBlockedClientPipeReadable,NULL) == AE_ERR) {                             // 创建模块的管道沟通的回调事件serverPanic("Error registering the readable event for the module ""blocked clients subsystem.");}/* Open the AOF file if needed. */if (server.aof_state == AOF_ON) {                                                  // 是否打开aof文件server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);if (server.aof_fd == -1) {serverLog(LL_WARNING, "Can't open the append-only file: %s",strerror(errno));exit(1);}}/* 32 bit instances are limited to 4GB of address space, so if there is* no explicit limit in the user provided configuration we set a limit* at 3 GB using maxmemory with 'noeviction' policy'. This avoids* useless crashes of the Redis instance for out of memory. */if (server.arch_bits == 32 && server.maxmemory == 0) {serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");server.maxmemory = 3072LL*(1024*1024); /* 3 GB */server.maxmemory_policy = MAXMEMORY_NO_EVICTION;}if (server.cluster_enabled) clusterInit();                                   // 是否集群模式replicationScriptCacheInit();scriptingInit(1);slowlogInit();                                                             // 慢日志初始化latencyMonitorInit();                                                       // 监控初始化bioInit();server.initial_memory_usage = zmalloc_used_memory();                       // 可以使用的内存大小
}

该函数主要就是根据传入的配置文件的参数,初始化数据库大小,创建时间事件,并且创建客户端连接请求的事件,接着再初始化对应的一些脚本监控等初始化函数即完成了初始化过程。

aeMain函数
void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)eventLoop->beforesleep(eventLoop);aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);}
}

该函数主要就是通过一个while循环,每次都先检查一下beforesleep是否设置了回调函数,如果设置了则调用该函数执行该函数主要做了一些集群相关的操作和aof相关的检查等操作后续可以详细分析一下该函数,执行完成之后就执行aeProcessEvents。

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;    // 如果没有可以处理的事件则返回/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);          // 查找最近的时间事件if (shortest) {long now_sec, now_ms;aeGetTime(&now_sec, &now_ms);tvp = &tv;/* How many milliseconds we need to wait for the next* time event to fire? */long long ms =(shortest->when_sec - now_sec)*1000 +shortest->when_ms - now_ms;                      // 检查还需要多久可以执行该事件 或者时间已经到了可以执行该事件if (ms > 0) {tvp->tv_sec = ms/1000;tvp->tv_usec = (ms % 1000)*1000;} else {tvp->tv_sec = 0;tvp->tv_usec = 0;}} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}/* Call the multiplexing API, will return only on timeout or when* some event fires. */numevents = aeApiPoll(eventLoop, tvp);                          // 事件处理函数  即 select  epoll等 /* After sleep callback. */if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)eventLoop->aftersleep(eventLoop);                                   // 是否需要执行aftersleep的回调函数for (j = 0; j < numevents; j++) {                                       // 获取有多少个已经响应的事件aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int fired = 0; /* Number of events fired for current fd. *//* Normally we execute the readable event first, and the writable* event laster. This is useful as sometimes we may be able* to serve the reply of a query immediately after processing the* query.** However if AE_BARRIER is set in the mask, our application is* asking us to do the reverse: never fire the writable event* after the readable. In such a case, we invert the calls.* This is useful when, for instance, we want to do things* in the beforeSleep() hook, like fsynching a file to disk,* before replying to a client. */int invert = fe->mask & AE_BARRIER;/* Note the "fe->mask & mask & ..." code: maybe an already* processed event removed an element that fired and we still* didn't processed, so we check if the event is still valid.** Fire the readable event if the call sequence is not* inverted. */if (!invert && fe->mask & mask & AE_READABLE) {                 // 是否是可读事件  如果是可读事件则调用该事件的回调方法并传入当前的数据fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}/* Fire the writable event. */if (fe->mask & mask & AE_WRITABLE) {                            // 如果是科协时间  则处理可写事件的回调方法if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}/* If we have to invert the call, fire the readable event now* after the writable one. */if (invert && fe->mask & mask & AE_READABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}processed++;}}/* Check time events */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);                              // 如果当前有时间事件需要执行则调用时间事件执行return processed; /* return the number of processed file/time events */
}

主要就是分层级执行了不同的任务,首先先检查是否有时间到期的时间事件需要执行,然后再检查是否有响应的任务事件需要回调执行,如果有响应任务需要执行则先执行响应任务,然后再执行时间事件。

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {int processed = 0;aeTimeEvent *te;long long maxId;time_t now = time(NULL);                                // 获取当前时间/* If the system clock is moved to the future, and then set back to the* right value, time events may be delayed in a random way. Often this* means that scheduled operations will not be performed soon enough.** Here we try to detect system clock skews, and force all the time* events to be processed ASAP when this happens: the idea is that* processing events earlier is less dangerous than delaying them* indefinitely, and practice suggests it is. */if (now < eventLoop->lastTime) {                        // 检查当前时间是否小于最后调用的时间te = eventLoop->timeEventHead;while(te) {                                         // 一次循环将when_sec置空te->when_sec = 0;te = te->next;}}eventLoop->lastTime = now;                              // 时间置为当前时间te = eventLoop->timeEventHead;                          // 获取事件的头部maxId = eventLoop->timeEventNextId-1;while(te) {long now_sec, now_ms;long long id;/* Remove events scheduled for deletion. */if (te->id == AE_DELETED_EVENT_ID) {                // 是否执行完成之后移除该任务aeTimeEvent *next = te->next;                   // 移除该任务if (te->prev)te->prev->next = te->next;elseeventLoop->timeEventHead = te->next;if (te->next)te->next->prev = te->prev;if (te->finalizerProc)te->finalizerProc(eventLoop, te->clientData);  // 执行该任务zfree(te);te = next;continue;}/* Make sure we don't process time events created by time events in* this iteration. Note that this check is currently useless: we always* add new timers on the head, however if we change the implementation* detail, this check may be useful again: we keep it here for future* defense. */if (te->id > maxId) {te = te->next;continue;}aeGetTime(&now_sec, &now_ms);if (now_sec > te->when_sec ||(now_sec == te->when_sec && now_ms >= te->when_ms))         // 更精确的时间对比{int retval;id = te->id;retval = te->timeProc(eventLoop, id, te->clientData);       // 执行任务 根据任务的执行返回来判断是否继续加入时间循环processed++;if (retval != AE_NOMORE) {                                      // 继续延长该任务时间来继续执行aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);} else {te->id = AE_DELETED_EVENT_ID;                               // 设置删除任务id}}te = te->next;                                                      // 下一个}return processed;
}

处理时间事件,也分为定时执行即执行完成之后继续延长指定时间之后还会再执行,也分为执行当次任务执行就不再继续执行,时间事件主要是通过时间的比较判断来进行并进行事件的回调处理。

总结

本文主要就是概述了Redis相关的基础的数据结构,5.0.4版本的Redis的启动流程除了增加了许多新功能外,基本的启动流程与以前整理的redis源码分析(beta版本)的启动流程基本一致,都是单线程的事件驱动模式,事件中包括了连接的文件描述符事件,也包括了时间事件的处理。后续会再深入探讨5.0.4版本的Redis的其他的功能实现。由于本人才疏学浅,如有错误请批评指正。

Redis源码分析:基础概念介绍与启动概述相关推荐

  1. Zookeeper源码分析:集群模式启动概述

    参考资料 <<从PAXOS到ZOOKEEPER分布式一致性原理与实践>> zookeeper-3.0.0 Zookeeper概述 Zookeeper是一个分布式的,开放源码的分 ...

  2. redis源码分析 -- cs结构之服务器

    服务器与客户端是如何交互的 redis客户端向服务器发送命令请求,服务器接收到客户端发送的命令请求之后,读取解析命令,并执行命令,同时将命令执行结果返回给客户端. 客户端与服务器交互的代码流程如下图所 ...

  3. Redis源码分析(一)redis.c //redis-server.c

    Redis源码分析(一)redis.c //redis-server.c 入口函数 int main() 4450 int main(int argc, char **argv) {4451 init ...

  4. 【SemiDrive源码分析】【X9芯片启动流程】08 - X9平台 lk 目录源码分析 之 目录介绍

    [SemiDrive源码分析][X9芯片启动流程]08 - X9平台 lk 目录源码分析 之 目录介绍 一./rtos/lk/ 目录结构分析 1.1 /rtos/lk_boot/ 目录结构分析 1.2 ...

  5. 【SemiDrive源码分析】【X9芯片启动流程】21 - MailBox 核间通信机制介绍(代码分析篇)之 Mailbox for Linux 篇

    [SemiDrive源码分析][X9芯片启动流程]21 - MailBox 核间通信机制介绍(代码分析篇)之 Mailbox for Linux 篇 一.Mailbox for Linux 驱动框架分 ...

  6. 【SemiDrive源码分析】【X9芯片启动流程】20 - MailBox 核间通信机制介绍(代码分析篇)之 MailBox for RTOS 篇

    [SemiDrive源码分析][X9芯片启动流程]20 - MailBox 核间通信机制介绍(代码分析篇)之 MailBox for RTOS 篇 一.Mailbox for RTOS 源码分析 1. ...

  7. v62.02 鸿蒙内核源码分析(文件概念) | 为什么说一切皆是文件 | 百篇博客分析OpenHarmony源码

    司马牛忧曰:"人皆有兄弟,我独亡."子夏曰:"商闻之矣:死生有命,富贵在天.君子敬而无失,与人恭而有礼.四海之内,皆兄弟也.君子何患乎无兄弟也?" <论语 ...

  8. 【SemiDrive源码分析】【X9芯片启动流程】19 - MailBox 核间通信机制介绍(理论篇)

    [SemiDrive源码分析][X9芯片启动流程]19 - MailBox 核间通信机制介绍(理论篇) 一.核间通信 二.核间通信软件架构 三.Mailbox 设备驱动 3.1 Mailbox for ...

  9. 10年大厂程序员是如何高效学习使用redis的丨redis源码分析丨redis存储原理

    10年大厂程序员是怎么学习使用redis的 1. redis存储原理分析 2. redis源码学习分享 3. redis跳表和B+树详细对比分析 视频讲解如下,点击观看: 10年大厂程序员是如何高效学 ...

最新文章

  1. mysql查看正在执行的sql语句
  2. bulkwrite 批量插入_使用SqlBulkCopy批量插入数据
  3. leetcode 341. Flatten Nested List Iterator | 341. 扁平化嵌套列表迭代器(Java)
  4. 群里有朋友关于SAP Spartacus的疑问
  5. 斯坦福的著名小兔子模型的点云数据_传统方法的点云分割以及PCL中分割模块
  6. 我和2000万人在B站刷凤凰传奇
  7. mysql如何添加用户_如何创建新用户和授予MySQL中的权限
  8. Python Socket编程初探
  9. 来一杯java_初级java笔试题
  10. BAT等大厂十年研发经历,总结了12开发条经验(墙裂推荐)
  11. 网易云音乐推荐系统特训_笔记
  12. AE怎么制作流体文字效果?这波教程我真学会了
  13. ibm服务器报错代码大全_IBM错误代码对应表
  14. vue全局配置_silent
  15. oracle 优化建议,oracle 性能优化建议小结
  16. 用WinGrub来引导Linux的安装
  17. 【LoRa 与 LoRaWAN】知识点汇总
  18. sqlserver服务器主体 “Jack-PC/Jack“ 无法在当前安全上下文下访问数据库 “model“。 (Microsoft SQL Server,错误: 916)
  19. 开源项目之Android-GL(OpenGL 编程)
  20. 良好的客厅风水,应该如何布置呢?

热门文章

  1. 使用 ChatterBot 库制作一个聊天机器人
  2. “Hey Siri” 背后的黑科技大揭秘!
  3. AAAI 2020论文解读:商汤科技发布新视频语义分割和光流联合学习算法
  4. 从流感预测到智能决策,深度学习能帮企业做哪些事?
  5. 国行版HomePod售价2799元,本周五发售
  6. 那位13岁就当上老板的开发者是如何炼成的?
  7. 专访图灵奖得主John Hopcroft:中国必须提升本科教育水平,才能在AI领域赶上美国
  8. 八大深度学习最佳实践
  9. 干货 | 如何使用 CNN 推理机在 IoT 设备上实现深度学习
  10. Java jar 如何防止被反编译?代码写的太烂,害怕被人发现