1.server.c main()服务启动的入口

下面是redis启动的入口程序server.c,简单列了主程序中比较关键的一些点。

## server.c
int main(int argc, char **argv) {// 1.oom处理器zmalloc_set_oom_handler(redisOutOfMemoryHandler); // 设置OOM处理器// 2.构建hash种子char hashseed[16];getRandomHexChars(hashseed,sizeof(hashseed));// 3.初始化redis默认配置 initServerConfig();//4.如果是哨兵模式,则进行哨兵初始化if (server.sentinel_mode) {initSentinelConfig();initSentinel();}// 5.检查备份文件是否完整if (strstr(argv[0],"redis-check-rdb") != NULL)// 检查RDB文件是否完整redis_check_rdb_main(argc,argv,NULL);else if (strstr(argv[0],"redis-check-aof") != NULL)// 检查AOF文件redis_check_aof_main(argc,argv);// 6.初始化服务器initServer();// 哨兵模式与非哨兵模式处理if (!server.sentinel_mode) {// 从磁盘上读取AOF或RDB进行数据恢复loadDataFromDisk();} else {// 哨兵启动sentinelIsRunning();}// 主循环执行之前的函数beforeSleepaeSetBeforeSleepProc(server.el,beforeSleep);// 主循环执行之后的函数aeSetAfterSleepProc(server.el,afterSleep);// 事件驱动进行事件处理aeMain(server.el)// 移除事件驱动循环aeDeleteEventLoop(server.el);
}

2.initServerConfig 初始化默认配置

initServerConfig主要是初始化redis的默认配置

server.aof_state = AOF_OFF; 默认AOF关闭

server.aof_fsync = CONFIG_DEFAULT_AOF_FSYNC; AOF的默认刷盘方式

server.aof_rewrite_perc = AOF_REWRITE_PERC; aof重写每秒写一次

server.aof_rewrite_min_size = AOF_REWRITE_MIN_SIZE; 重写的最小大小

server.rdb_filename = zstrdup(CONFIG_DEFAULT_RDB_FILENAME); rdb的文件名

server.aof_filename = zstrdup(CONFIG_DEFAULT_AOF_FILENAME); aof文件名

server.rdb_compression = CONFIG_DEFAULT_RDB_COMPRESSION; rdb的压缩方式
server.rdb_checksum = CONFIG_DEFAULT_RDB_CHECKSUM; rdb文件完整性校验方式
server.masterport = 6379; 默认端口
// 初始化命令redisCommandTable存储着redis所有对外暴露的命令
populateCommandTable(); // 将redisCommandTable数组的命令转为字典存储,方便通过命令名称查找命令

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-uiu2SjF7-1649254774401)(https://s3-us-west-2.amazonaws.com/secure.notion-static.com/b5f808b5-cead-4575-b0c6-d53c8e5b7ba0/Untitled.png)]

3.initServer(void)初始化redis server

在initServer中最核心的莫过于申请默认的16个db的空间并进行初始化,然后就是打开tcp端口监听。

  1. 创建默认的16个数据库,并初始化数据库结构
  2. 打开端口监听
  3. 创建时间事件用于处理客户端过期和key过期剔除等后台行为
  4. 创建文件事件用于处理tcp连接,unix socket连接,客户端命令处理
  5. 如果开启了AOF,则需要在服务启动时就需要创建aof文件来写入客户端命令
void initServer(void) {// 创建全局共享变量,主要是一些常用的字符串createSharedObjects();// 调整可打开文件数(客户端数量限制)adjustOpenFilesLimit();// 创建事件循环server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);//申请16个db数据库锁使用的内存server.db = zmalloc(sizeof(redisDb)*server.dbnum);// 进行端口监听listenToPort(server.port,server.ipfd,&server.ipfd_count) // 打开unix套接字server.sofd = anetUnixServer(server.neterr,server.unixsocket,server.unixsocketperm, server.tcp_backlog);//创建redis数据库for (j = 0; j < server.dbnum; j++) {// 数据字典server.db[j].dict = dictCreate(&dbDictType,NULL);// 过期key字典server.db[j].expires = dictCreate(&keyptrDictType,NULL);阻塞key字典server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);server.db[j].id = j;server.db[j].avg_ttl = 0;server.db[j].defrag_later = listCreate();}// 初始化lru的可以链表池evictionPoolAlloc();//pub sub 发布订阅字典创建server.pubsub_channels = dictCreate(&keylistDictType,NULL);//时间事件   serverCron 用来做后天处理事件(客户端过期,剔除过去key等等)if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {serverPanic("Can't create event loop timers.");exit(1);}// 处理文件事件(针对server.ipfd)(用来处理tcp连接)for (j = 0; j < server.ipfd_count; j++) {if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR){serverPanic("Unrecoverable error creating server.ipfd file event.");}}// unix socket 文件事件aeCreateFileEvent(server.el,server.sofd,AE_READABLE,acceptUnixHandler,NULL)// 针对server.module_blocked_pipe[0]创建一个文件事件,用来唤醒事件循环去处理客户端的命令aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,moduleBlockedClientPipeReadable,NULL)// 如果开启了aofif (server.aof_state == AOF_ON) {// 创建aof的文件描述符server.aof_fd = open(server.aof_filename,O_WRONLY|O_APPEND|O_CREAT,0644);if (server.aof_fd == -1) {serverLog(LL_WARNING, "Can't open the append-only file: %s",strerror(errno));exit(1);}}// 集群初始化if (server.cluster_enabled) clusterInit();
}

4. redis数据库结构

redis默认是有16个数据库的,通常我们一般使用的0库

typedef struct redisDb {dict *dict;                 /* The keyspace for this DB 数据字典*/dict *expires;              /* Timeout of keys with a timeout set 有过期时间的key */dict *blocking_keys;        /* Keys with clients waiting for data (BLPOP) 等待获取数据的key*/dict *ready_keys;           /* Blocked keys that received a PUSH 等待push操作的可以*/dict *watched_keys;         /* WATCHED keys for MULTI/EXEC CAS 等待muti,exec,cas操作的key*/int id;                     /* Database ID 数据库序号id*/long long avg_ttl;          /* Average TTL, just for stats 平均生存时间*/list *defrag_later;         /* List of key names to attempt to defrag one by one, gradually. 碎片整理使用*/
} redisDb;

5. 数据恢复之loadDataFromDisk()

loadAppendOnlyFile(server.aof_filename) 加载aof文件

  1. rdbLoad(server.rdb_filename,&rsi) 加载rdb文件
void loadDataFromDisk(void) {long long start = ustime();//如果有aof则加载AOFif (server.aof_state == AOF_ON) {// 从aof文件中读取命令进行执行if (loadAppendOnlyFile(server.aof_filename) == C_OK)serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000);} else {rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;//加载RDBif (rdbLoad(server.rdb_filename,&rsi) == C_OK) {serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",(float)(ustime()-start)/1000000);/* Restore the replication ID / offset from the RDB file. */if (server.masterhost &&rsi.repl_id_is_set &&rsi.repl_offset != -1 &&/* Note that older implementations may save a repl_stream_db* of -1 inside the RDB file in a wrong way, see more information* in function rdbPopulateSaveInfo. */rsi.repl_stream_db != -1){memcpy(server.replid,rsi.repl_id,sizeof(server.replid));server.master_repl_offset = rsi.repl_offset;/* If we are a slave, create a cached master from this* information, in order to allow partial resynchronizations* with masters. */replicationCacheMasterUsingMyself();selectDb(server.cached_master,rsi.repl_stream_db);}} else if (errno != ENOENT) {serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));exit(1);}}
}

7.主事件驱动前置函数beforeSleep

主要工作内容:

  1. replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); 向所有slave发送ack请求
  2. 写AOF文件
void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);/* 如果开启了集群先走clusterBeforeSleep */if (server.cluster_enabled) clusterBeforeSleep();/* Run a fast expire cycle (the called function will return* ASAP if a fast cycle is not needed). */if (server.active_expire_enabled && server.masterhost == NULL)activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);/* Send all the slaves an ACK request if at least one client blocked* during the previous event loop iteration. *   如果有一个客户端在上次事件循环中被阻塞,则向所有的slave发送ack请求*/if (server.get_ack_from_slaves) {robj *argv[3];argv[0] = createStringObject("REPLCONF",8);argv[1] = createStringObject("GETACK",6);argv[2] = createStringObject("*",1); /* Not used argument. */replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);decrRefCount(argv[0]);decrRefCount(argv[1]);decrRefCount(argv[2]);server.get_ack_from_slaves = 0;}/* Unblock all the clients blocked for synchronous replication* in WAIT. 解锁所有客户端因为副本同步而产生的等待*/if (listLength(server.clients_waiting_acks))processClientsWaitingReplicas();/* Check if there are clients unblocked by modules that implement* blocking commands. */moduleHandleBlockedClients();/* Try to process pending commands for clients that were just unblocked.  处理刚刚解锁的客户端的命令*/if (listLength(server.unblocked_clients))processUnblockedClients();/* Write the AOF buffer on disk 写aof缓冲到文件中去 */flushAppendOnlyFile(0);/* Handle writes with pending output buffers. *///处理向客户端写入数据handleClientsWithPendingWrites();/* Before we are going to sleep, let the threads access the dataset by* releasing the GIL. Redis main thread will not touch anything at this* time. */if (moduleCount()) moduleReleaseGIL();
}

8.aeMain 主事件循环

void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {if (eventLoop->beforesleep != NULL)// 如果beforesleep不为null,则在每次循环的时候都执行一次beforesleepeventLoop->beforesleep(eventLoop);// 重中之重:处理事件aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);}
}

9. aeProcessEvents 事件处理

  1. 遍历时间事件链表aeSearchNearestTimer(eventLoop);
  2. numevents = aeApiPoll(eventLoop, tvp); 获取文件事件的数量
  3. rfileProc(eventLoop,fd,fe->clientData,mask) 处理读事件(文件事件)
  4. wfileProc(eventLoop,fd,fe->clientData,mask) 处理写事件(文件事件)
  5. processTimeEvents(eventLoop); 处理时间事件
/* 处理时间事件(后台事件)* 处理文件事件(网络时间,fd的变化)* 如果flags没有特殊标识,则此函数会进入睡眠状态,直到有文件事件或者时间时间触发* * If flags is 0, 啥也不做,就返回.* if flags has AE_ALL_EVENTS set, 所有事件类型都处理.* if flags has AE_FILE_EVENTS set, 文件事件处理.* if flags has AE_TIME_EVENTS set, 时间事件处理.* if flags has AE_DONT_WAIT 返回 ASAP* if flags has AE_CALL_AFTER_SLEEP set, 调用aftersleep函数* 处理那些无需等待即可处理的事件** The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))// 搜索最近的时间事件触发节点shortest = aeSearchNearestTimer(eventLoop);if (shortest) {long now_sec, now_ms;aeGetTime(&now_sec, &now_ms);tvp = &tv;/* How many milliseconds we need to wait for the next* time event to fire? 计算时间事件下次处理的时间节点 */long long ms =(shortest->when_sec - now_sec)*1000 +shortest->when_ms - now_ms;if (ms > 0) {tvp->tv_sec = ms/1000;tvp->tv_usec = (ms % 1000)*1000;} else {tvp->tv_sec = 0;tvp->tv_usec = 0;}} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}/* Call the multiplexing API, will return only on timeout or when* some event fires. */numevents = aeApiPoll(eventLoop, tvp);/* After sleep callback. */if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)eventLoop->aftersleep(eventLoop);for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int fired = 0; /* Number of events fired for current fd. *//* Normally we execute the readable event first, and the writable* event laster. This is useful as sometimes we may be able* to serve the reply of a query immediately after processing the* query.** However if AE_BARRIER is set in the mask, our application is* asking us to do the reverse: never fire the writable event* after the readable. In such a case, we invert the calls.* This is useful when, for instance, we want to do things* in the beforeSleep() hook, like fsynching a file to disk,* before replying to a client. */int invert = fe->mask & AE_BARRIER;/* Note the "fe->mask & mask & ..." code: maybe an already* processed event removed an element that fired and we still* didn't processed, so we check if the event is still valid.** Fire the readable event if the call sequence is not* inverted. */if (!invert && fe->mask & mask & AE_READABLE) {fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}/* Fire the writable event. */if (fe->mask & mask & AE_WRITABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}/* If we have to invert the call, fire the readable event now* after the writable one. */if (invert && fe->mask & mask & AE_READABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}processed++;}}/* Check time events */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}

## 10.lru 链表池初始化

void evictionPoolAlloc(void) {// lru结构体  struct evictionPoolEntry *ep;int j;// 申请内存ep = zmalloc(sizeof(*ep)*EVPOOL_SIZE);//初始化for (j = 0; j < EVPOOL_SIZE; j++) {ep[j].idle = 0;ep[j].key = NULL;ep[j].cached = sdsnewlen(NULL,EVPOOL_CACHED_SDS_SIZE);ep[j].dbid = 0;}// EvictionPoolLRU 接管刚创建的lru池EvictionPoolLRU = ep;
}struct evictionPoolEntry {// LRU 空闲时间 / LFU 频率倒数(优先淘汰该值较大的记录)unsigned long long idle;    /* Object idle time (inverse frequency for LFU) */// 参与淘汰筛选的keysds key;                    /* Key name. */// key名称的sds对象缓存 键名缓存sds cached;                 /* Cached SDS object for key name. */// db的编号int dbid;                   /* Key DB number. */
};

redis启动总结

核心流程:

  1. 初始化配置
  2. 初始化各种结构体(数据库,lru池,主事件循环等)
  3. 网络监听(开启tcp端口监听)
  4. 创建文件事件处理网络连接,数据读取,数据回写
  5. 创建时间事件处理rdb快照写,客户端过期剔除,过期key剔除等操作
  6. 开启主事件循环aeMain,通过aeProcessEvents来处理时间事件和文件事件

redis server服务端启动流程分析(一)相关推荐

  1. Netty 源码解析系列-服务端启动流程解析

    netty源码解析系列 Netty 源码解析系列-服务端启动流程解析 Netty 源码解析系列-客户端连接接入及读I/O解析 五分钟就能看懂pipeline模型 -Netty 源码解析 1.服务端启动 ...

  2. Netty实战 IM即时通讯系统(四)服务端启动流程

    ## Netty实战 IM即时通讯系统(四)服务端启动流程 零. 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf ...

  3. 《netty入门与实战》笔记-02:服务端启动流程

    为什么80%的码农都做不了架构师?>>>    1.服务端启动流程 这一小节,我们来学习一下如何使用 Netty 来启动一个服务端应用程序,以下是服务端启动的一个非常精简的 Demo ...

  4. 服务端_说说Netty服务端启动流程

    点击上方☝SpringForAll社区 轻松关注!及时获取有趣有料的技术文章 本文来源:http://yeming.me/2016/03/12/netty1/ netty服务端代码分析 服务端启动配置 ...

  5. kafka之服务端启动脚本分析

    前阵子在服务器上搭了个 kafka,搭好后安装在 /usr/local/kafka 下: [root@lucas kafka]# pwd /usr/local/kafka [root@lucas ka ...

  6. 【闪电侠学netty】第4章 服务端启动流程

    [Netty]读书笔记 - 跟闪电侠学 1. 内容概要 1 服务端启动最小化代码 启动服务器步骤 Step1:线程模型,服务器引导类ServerBootstrap Step2:IO 模型 Step3: ...

  7. Redis的服务端启动和客户端连接

    一.服务端 服务端的命令为redis-server redis-server后面加指定的服务器可以启动指定的服务器,不加默认启动127.0.0.1 可以使用help查看帮助文档:redis-serve ...

  8. Gaea源码阅读(三):服务端启动流程

    转载地址:http://blog.csdn.net/m_vptr/article/details/9163319 相对于客户端,服务器端工作就比较多了.服务器端需要load jar包,利用fliter ...

  9. 原理剖析-Netty之服务端启动工作原理分析(上)

    一.大致介绍 1.Netty这个词,对于熟悉并发的童鞋一点都不陌生,它是一个异步事件驱动型的网络通信框架: 2.使用Netty不需要我们关注过多NIO的API操作,简简单单的使用即可,非常方便,开发门 ...

最新文章

  1. shell连接工具_无需本地软件,只用浏览器就能连接并控制安卓手机
  2. 3.2.3 OS之页面置换算法(最佳置换算法、先进先出置换算法、最近最久未使用置换算法、普通时钟置换算法、改造型时钟置换算法)
  3. 【开发调试】谷歌浏览器中调试移动网页和测试网速下页面效果
  4. 从FTP下载文件带进度条
  5. ssl介绍以及双向认证和单向认证原理 (转)
  6. WinServer2008R2搭建和授权DHCP服务器详解
  7. mac地址查 计算机名字,怎么看mac地址-教你通过MAC地址查询设备的厂商名称
  8. java大数据在线考试系统在线阅卷系统及大数据统计分析计算机毕业设计MyBatis+系统+LW文档+源码+调试部署
  9. 模块EMERSONDELTAVSLS1508/本特利330400-02-CN
  10. 轻松掌握MySQL数据库锁机制的相关原理
  11. [汇编语言例题]计算地址连续的ffff:0~ffff:b单元中的数据的和(详解)
  12. aws篇1 aws-cli的使用
  13. mac宽带连接找不到pppoe服务器,macbookAIR 使用以太网转接头连接宽带… - Apple 社区...
  14. 保研之路——复旦计算机学院预推免
  15. DZY Loves Math 系列详细题解
  16. 网页图片上传到服务器
  17. 【JS数据结构与算法】双向链表(DoublylinkedList)封装及其方法
  18. vivo手机android耗电快怎么解决,vivo手机耗电严重怎么办 如何解决手机耗电严重的问题...
  19. Windows系统下Qt代码的QMake和CMake简单说明
  20. git push报错 emote: error: GH007

热门文章

  1. 支付渠道解释 - 麦腾支付它们是什么以及它们为什么重要
  2. java 将数据加载到内存中_java 将数据加载到内存中的操作
  3. Android 9.0 三方app whatsapp 拍照预览模糊
  4. 数字转货币金钱中文大写
  5. Ubuntu20.04系统安装中的问题及解决方法
  6. 4. 业务数据采集平台搭建
  7. Java程序员,你一定需要了解的六款大数据采集平台
  8. c/c++获取时间函数
  9. 小试牛刀——链表第三篇
  10. numpy与matplotlib可视化