服务器端处理过程

在前面我们大致分析了Redis的服务器端的启动流程,Redis服务端主要就是依据单线程的反应器模式来设计的,并且在处理的事件过程中主要分为时间事件和连接响应事件。本文就根据客户端发送一个请求进来,来查看服务器端是如何工作的。

服务器端的工作流程

本文主要描述连接响应事件,即客户端新进来的连接、响应客户端发送的数据和将处理结果发送给客户端三种形式。

客户端新进来的连接

在启动的概述中,当服务器端都监听好端口之后,就会注册一个新请求连接处理响应回调函数;

        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR)     // 位于server.c  initServer函数中

从该函数可看,监听新连接进来的请求就是acceptTcpHandler函数来处理。该段的意思就是当客户端连接进来之后就调用acceptTcpHandler来进行新连接的处理与接下来客户端发送的数据的接受与数据的发送返回。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[NET_IP_STR_LEN];UNUSED(el);UNUSED(mask);UNUSED(privdata);while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);     // 接受新进来的请求if (cfd == ANET_ERR) {                                                // 如果出错则返回if (errno != EWOULDBLOCK)serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);return;}serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);                   // 打印日志acceptCommonHandler(cfd,0,cip);                                       // 接受请求处理}
}

从函数的处理中,可知只要成功的接受到了新进来的请求,则直接调用acceptCommonHandler来处理接下来的流程。

static void acceptCommonHandler(int fd, int flags, char *ip) {client *c;if ((c = createClient(fd)) == NULL) {               // 给每个新进来的客户端新建一个客户端对象serverLog(LL_WARNING,"Error registering fd event for the new client: %s (fd=%d)",strerror(errno),fd);close(fd); /* May be already closed, just ignore errors */   // 如果创建客户端对象出错则关闭连接并返回return;}/* If maxclient directive is set and this is one client more... close the* connection. Note that we create the client instead to check before* for this condition, since now the socket is already set in non-blocking* mode and we can send an error for free using the Kernel I/O */if (listLength(server.clients) > server.maxclients) {               // 如果当前已经接受的客户端的连接数超过了配置的最大连接数char *err = "-ERR max number of clients reached\r\n";/* That's a best effort error message, don't check write errors */if (write(c->fd,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}server.stat_rejected_conn++;                                    // 拒绝该连接  并释放分配好的内存空间freeClient(c);                                                  return;}/* If the server is running in protected mode (the default) and there* is no password set, nor a specific interface is bound, we don't accept* requests from non loopback interfaces. Instead we try to explain the* user what to do to fix it if needed. */if (server.protected_mode &&server.bindaddr_count == 0 &&server.requirepass == NULL &&!(flags & CLIENT_UNIX_SOCKET) &&                            // 检查是否是保护模式等模式ip != NULL){if (strcmp(ip,"127.0.0.1") && strcmp(ip,"::1")) {char *err ="-DENIED Redis is running in protected mode because protected ""mode is enabled, no bind address was specified, no ""authentication password is requested to clients. In this mode ""connections are only accepted from the loopback interface. ""If you want to connect from external computers to Redis you ""may adopt one of the following solutions: ""1) Just disable protected mode sending the command ""'CONFIG SET protected-mode no' from the loopback interface ""by connecting to Redis from the same host the server is ""running, however MAKE SURE Redis is not publicly accessible ""from internet if you do so. Use CONFIG REWRITE to make this ""change permanent. ""2) Alternatively you can just disable the protected mode by ""editing the Redis configuration file, and setting the protected ""mode option to 'no', and then restarting the server. ""3) If you started the server manually just for testing, restart ""it with the '--protected-mode no' option. ""4) Setup a bind address or an authentication password. ""NOTE: You only need to do one of the above things in order for ""the server to start accepting connections from the outside.\r\n";if (write(c->fd,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}server.stat_rejected_conn++;freeClient(c);return;}}server.stat_numconnections++;c->flags |= flags;
}

从该函数的处理过程中可知,最重要最核心的处理工作就是通过createClient函数来处理的,在创建完成之后主要就是查看一下是否满足一些条件的限制。

client *createClient(int fd) {client *c = zmalloc(sizeof(client));                    // 申请内存/* passing -1 as fd it is possible to create a non connected client.* This is useful since all the commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */if (fd != -1) {                                         // 检查文件描述符是否合法anetNonBlock(NULL,fd);                              // 设置该连接为非阻塞连接anetEnableTcpNoDelay(NULL,fd);                      // 是否开启Nagle算法if (server.tcpkeepalive)anetKeepAlive(NULL,fd,server.tcpkeepalive);if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR)              // 创建数据进来的时候的处理回调函数readQueryFromClient{close(fd);                                      // 如果注册出错则关闭连接并释放内存zfree(c);return NULL;}}selectDb(c,0);                                          // 默认选择0号数据库uint64_t client_id;atomicGetIncr(server.next_client_id,client_id,1);      // 原子性的客户端id加1c->id = client_id;                                     // 设置客户端idc->fd = fd;c->name = NULL;c->bufpos = 0;c->qb_pos = 0;c->querybuf = sdsempty();                              // 保存的接受客户端发送的数据c->pending_querybuf = sdsempty();c->querybuf_peak = 0;c->reqtype = 0;c->argc = 0;                                            // 客户端发送的数据参数c->argv = NULL;c->cmd = c->lastcmd = NULL;c->multibulklen = 0;c->bulklen = -1;c->sentlen = 0;                                         // 发送的数据长度c->flags = 0;c->ctime = c->lastinteraction = server.unixtime;c->authenticated = 0;                                   // 是否认证c->replstate = REPL_STATE_NONE;c->repl_put_online_on_ack = 0;c->reploff = 0;c->read_reploff = 0;c->repl_ack_off = 0;c->repl_ack_time = 0;c->slave_listening_port = 0;c->slave_ip[0] = '\0';c->slave_capa = SLAVE_CAPA_NONE;c->reply = listCreate();c->reply_bytes = 0;c->obuf_soft_limit_reached_time = 0;listSetFreeMethod(c->reply,freeClientReplyValue);listSetDupMethod(c->reply,dupClientReplyValue);c->btype = BLOCKED_NONE;c->bpop.timeout = 0;c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);c->bpop.target = NULL;c->bpop.xread_group = NULL;c->bpop.xread_consumer = NULL;c->bpop.xread_group_noack = 0;c->bpop.numreplicas = 0;c->bpop.reploffset = 0;c->woff = 0;c->watched_keys = listCreate();c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);       // 创建发布订阅的通道字典c->pubsub_patterns = listCreate();c->peerid = NULL;c->client_list_node = NULL;listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);             // 设置订阅发布相关列表listSetMatchMethod(c->pubsub_patterns,listMatchObjects);if (fd != -1) linkClient(c);                                        // 添加到客户端列表中initClientMultiState(c);                                        return c;
}

从该函数的执行过程可知,主要是初始化生成一个client实例,并初始化该client,该client的结构中的内容后续有机会在使用过程中分析,在初始化之外也注册了一个该新连接的可读事件的回调函数readQueryFromClient

客户端发送数据处理

客户端发送数据过来的时候的回调函数就是readQueryFromClient,后续客户端发送的数据都是通过该函数处理;

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {client *c = (client*) privdata;int nread, readlen;size_t qblen;UNUSED(el);UNUSED(mask);readlen = PROTO_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG)                                       // 检查该命令是否是多个命令的请求{ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);/* Note that the 'remaining' variable may be zero in some edge case,* for example once we resume a blocked client after CLIENT PAUSE. */if (remaining > 0 && remaining < readlen) readlen = remaining;}qblen = sdslen(c->querybuf);                                                    // 获取保存的缓冲区大小if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);                             // 扩充内存来容纳剩余需要读到的数据nread = read(fd, c->querybuf+qblen, readlen);                                   // 从连接中读取数据if (nread == -1) {                                                              // 如果读取的长度为空则检查是否连接出错if (errno == EAGAIN) {return;} else {serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));       // 释放连接并打印错误freeClient(c);return;}} else if (nread == 0) {                                                        // 如果为0 则表示连接关闭 释放客户端serverLog(LL_VERBOSE, "Client closed connection");freeClient(c);return;} else if (c->flags & CLIENT_MASTER) {/* Append the query buffer to the pending (not applied) buffer* of the master. We'll use this buffer later in order to have a* copy of the string applied by the last command executed. */c->pending_querybuf = sdscatlen(c->pending_querybuf,c->querybuf+qblen,nread);}sdsIncrLen(c->querybuf,nread);                                                  // 提升保存缓存区的大小c->lastinteraction = server.unixtime;                                           // 获取时间if (c->flags & CLIENT_MASTER) c->read_reploff += nread;                 server.stat_net_input_bytes += nread;                                           // 保存新加入的数据处理长度if (sdslen(c->querybuf) > server.client_max_querybuf_len) {                     // 如果超过了缓冲区最大的接受长度则答应错误释放内存关闭客户端sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64);serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}/* Time to process the buffer. If the client is a master we need to* compute the difference between the applied offset before and after* processing the buffer, to understand how much of the replication stream* was actually applied to the master state: this quantity, and its* corresponding part of the replication stream, will be propagated to* the sub-slaves and to the replication backlog. */processInputBufferAndReplicate(c);                                      // 尝试去解析输入数据并根据解析的数据执行对应命令
}

检查缓冲区大小是否超出限制等条件之后,就调用processInputBufferAndReplicate来解析接受的数据;

void processInputBufferAndReplicate(client *c) {if (!(c->flags & CLIENT_MASTER)) {                      // 如果不是master则直接解析数据        processInputBuffer(c);} else {size_t prev_offset = c->reploff;                    // 获取上一次回复的位置processInputBuffer(c);                              // 解析输入参数size_t applied = c->reploff - prev_offset;          // 回复现在的需要发送给slave复制的数据长度if (applied) {replicationFeedSlavesFromMasterStream(server.slaves,c->pending_querybuf, applied);          // 发送个slave节点数据sdsrange(c->pending_querybuf,applied,-1);}}
}

除了检查一下是否需要将接受的数据发送给slave之后,通过调用processInputBuffer来解析具体的命令和参数;

void processInputBuffer(client *c) {server.current_client = c;                          // 设置当前处理的连接的客户端/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {            // 检查长度是否符合要求, 循环执行 因为缓冲区可能会接受多个命令/* Return if clients are paused. */if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;/* Immediately abort if the client is in the middle of something. */if (c->flags & CLIENT_BLOCKED) break;/* Don't process input from the master while there is a busy script* condition on the slave. We want just to accumulate the replication* stream (instead of replying -BUSY like we do with other clients) and* later resume the processing. */if (server.lua_timedout && c->flags & CLIENT_MASTER) break;/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands).** The same applies for clients we want to terminate ASAP. */if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;/* Determine request type when unknown. */if (!c->reqtype) {                                                      // 通过传入的第一个数据来检查是否是多个命令参数if (c->querybuf[c->qb_pos] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;} else {c->reqtype = PROTO_REQ_INLINE;}}if (c->reqtype == PROTO_REQ_INLINE) {                                   // 检查接受数据的参数类型if (processInlineBuffer(c) != C_OK) break;                          // 解析一个命令参数} else if (c->reqtype == PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != C_OK) break;                       // 解析多个命令参数} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {                                             resetClient(c);                                                     // 重置标志位和参数} else {/* Only reset the client when the command was executed. */      // if (processCommand(c) == C_OK) {                                    // 处理参数查找命令if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {/* Update the applied replication offset of our master. */c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;     }/* Don't reset the client structure for clients blocked in a* module blocking command, so that the reply callback will* still be able to access the client argv and argc field.* The client will be reset in unblockClientFromModule(). */if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)resetClient(c);}/* freeMemoryIfNeeded may flush slave output buffers. This may* result into a slave, that may be the active client, to be* freed. */if (server.current_client == NULL) break;}}/* Trim to pos */if (server.current_client != NULL && c->qb_pos) {sdsrange(c->querybuf,c->qb_pos,-1);c->qb_pos = 0;}server.current_client = NULL;                       // 处理完成置为空
}

processInlineBuffer函数就是将接受缓冲区里面的数据,解析成对应的命令与参数放置在c中保存,然后再调用processCommand来查找对应的命令,并调用该命令去执行。

int processCommand(client *c) {/* The QUIT command is handled separately. Normal command procs will* go through checking for replication and QUIT will cause trouble* when FORCE_REPLICATION is enabled and would be implemented in* a regular command proc. */if (!strcasecmp(c->argv[0]->ptr,"quit")) {              // 检查是否是quit指令addReply(c,shared.ok);                              // 回复ok 并返回错误c->flags |= CLIENT_CLOSE_AFTER_REPLY;return C_ERR;}/* Now lookup the command and check ASAP about trivial error conditions* such as wrong arity, bad command name and so forth. */c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);       // 查找客户端传入的指令if (!c->cmd) {                                              // 如果没有找到指令flagTransaction(c);sds args = sdsempty();int i;for (i=1; i < c->argc && sdslen(args) < 128; i++)args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",(char*)c->argv[0]->ptr, args);          // 释放对应的输入参数并返回客户端没找到该指令sdsfree(args);return C_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);                          // 检查是否是错误的输入参数return C_OK;}/* Check if the user is authenticated */if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand){flagTransaction(c);addReply(c,shared.noautherr);                           // 如果是没有认证则返回认证失败return C_OK;}/* If cluster is enabled perform the cluster redirection here.* However we don't perform the redirection if:* 1) The sender of this command is our master.* 2) The command has no key arguments. */if (server.cluster_enabled &&!(c->flags & CLIENT_MASTER) &&!(c->flags & CLIENT_LUA &&server.lua_caller->flags & CLIENT_MASTER) &&!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&c->cmd->proc != execCommand)){int hashslot;int error_code;clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);if (n == NULL || n != server.cluster->myself) {if (c->cmd->proc == execCommand) {discardTransaction(c);} else {flagTransaction(c);}clusterRedirectClient(c,n,hashslot,error_code);return C_OK;}}/* Handle the maxmemory directive.** Note that we do not want to reclaim memory if we are here re-entering* the event loop since there is a busy Lua script running in timeout* condition, to avoid mixing the propagation of scripts with the* propagation of DELs due to eviction. */if (server.maxmemory && !server.lua_timedout) {                         // 检查是否打开了内存限制int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;           // 检查是否超过了内存使用量/* freeMemoryIfNeeded may flush slave output buffers. This may result* into a slave, that may be the active client, to be freed. */if (server.current_client == NULL) return C_ERR;/* It was impossible to free enough memory, and the command the client* is trying to execute is denied during OOM conditions or the client* is in MULTI/EXEC context? Error. */if (out_of_memory &&(c->cmd->flags & CMD_DENYOOM ||(c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {flagTransaction(c);addReply(c, shared.oomerr);                                     // 返回超过内存使用内容return C_OK;}}/* Don't accept write commands if there are problems persisting on disk* and if this is a master instance. */int deny_write_type = writeCommandsDeniedByDiskError();if (deny_write_type != DISK_ERROR_TYPE_NONE &&server.masterhost == NULL &&(c->cmd->flags & CMD_WRITE ||c->cmd->proc == pingCommand)){flagTransaction(c);if (deny_write_type == DISK_ERROR_TYPE_RDB)addReply(c, shared.bgsaveerr);elseaddReplySds(c,sdscatprintf(sdsempty(),"-MISCONF Errors writing to the AOF file: %s\r\n",strerror(server.aof_last_write_errno)));return C_OK;}/* Don't accept write commands if there are not enough good slaves and* user configured the min-slaves-to-write option. */if (server.masterhost == NULL &&server.repl_min_slaves_to_write &&server.repl_min_slaves_max_lag &&c->cmd->flags & CMD_WRITE &&server.repl_good_slaves_count < server.repl_min_slaves_to_write){flagTransaction(c);addReply(c, shared.noreplicaserr);return C_OK;}/* Don't accept write commands if this is a read only slave. But* accept write commands if this is our master. */if (server.masterhost && server.repl_slave_ro &&!(c->flags & CLIENT_MASTER) &&c->cmd->flags & CMD_WRITE){addReply(c, shared.roslaveerr);return C_OK;}/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */if (c->flags & CLIENT_PUBSUB &&c->cmd->proc != pingCommand &&c->cmd->proc != subscribeCommand &&c->cmd->proc != unsubscribeCommand &&c->cmd->proc != psubscribeCommand &&c->cmd->proc != punsubscribeCommand) {addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");return C_OK;}/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,* when slave-serve-stale-data is no and we are a slave with a broken* link with master. */if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&server.repl_serve_stale_data == 0 &&!(c->cmd->flags & CMD_STALE)){flagTransaction(c);addReply(c, shared.masterdownerr);return C_OK;}/* Loading DB? Return an error if the command has not the* CMD_LOADING flag. */if (server.loading && !(c->cmd->flags & CMD_LOADING)) {addReply(c, shared.loadingerr);return C_OK;}/* Lua script too slow? Only allow a limited number of commands. */if (server.lua_timedout &&c->cmd->proc != authCommand &&c->cmd->proc != replconfCommand &&!(c->cmd->proc == shutdownCommand &&c->argc == 2 &&tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&!(c->cmd->proc == scriptCommand &&c->argc == 2 &&tolower(((char*)c->argv[1]->ptr)[0]) == 'k')){flagTransaction(c);addReply(c, shared.slowscripterr);return C_OK;}/* Exec the command */if (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){queueMultiCommand(c);addReply(c,shared.queued);} else {call(c,CMD_CALL_FULL);                      // 执行命令c->woff = server.master_repl_offset;if (listLength(server.ready_keys))handleClientsBlockedOnKeys();}return C_OK;
}

例如,如果是单机Redis客户端传入的命令如果是set命令的话,就会调用call函数来执行该命令,命令的执行可留到后续再详细分析,通过执行过程也可知,命令执行完成后会调用addReply函数回复客户端执行的结果,接下来就看服务器返回数据个客户端的过程。

服务器端发送数据到客户端

详细查看一下addReply函数的执行过程;

int _addReplyToBuffer(client *c, const char *s, size_t len) {size_t available = sizeof(c->buf)-c->bufpos;                        // 检查还剩余多少空间可用if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;               // 是否关闭/* If there already are entries in the reply list, we cannot* add anything more to the static buffer. */if (listLength(c->reply) > 0) return C_ERR;                         // 检查是否有应答/* Check that the buffer has enough space available for this string. */if (len > available) return C_ERR;                                  // 如果超出容量则返回错误码memcpy(c->buf+c->bufpos,s,len);                                     // 置空c->bufpos+=len;return C_OK;
}void _addReplyStringToList(client *c, const char *s, size_t len) {if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;listNode *ln = listLast(c->reply);                                  // 获取回复的列表clientReplyBlock *tail = ln? listNodeValue(ln): NULL;               /* Note that 'tail' may be NULL even if we have a tail node, becuase when* addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just* fo fill it later, when the size of the bulk length is set. *//* Append to tail string when possible. */if (tail) {                                                         // 如果列表中已经有数据,拷贝到新的值中去 将剩下的留给新的空间/* Copy the part we can fit into the tail, and leave the rest for a* new node */size_t avail = tail->size - tail->used;                     size_t copy = avail >= len? len: avail;memcpy(tail->buf + tail->used, s, copy);tail->used += copy;s += copy;len -= copy;}if (len) {/* Create a new node, make sure it is allocated to at* least PROTO_REPLY_CHUNK_BYTES */                                 // 新建一个node来保存待发送数据size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;tail = zmalloc(size + sizeof(clientReplyBlock));/* take over the allocation's internal fragmentation */tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);tail->used = len;memcpy(tail->buf, s, len);listAddNodeTail(c->reply, tail);                                    // 添加数据c->reply_bytes += tail->size;}asyncCloseClientOnOutputBufferLimitReached(c);
}void addReply(client *c, robj *obj) {if (prepareClientToWrite(c) != C_OK) return;            // 检查客户端是否可以发送数据if (sdsEncodedObject(obj)) {                            // 检查数据的编码方式if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)      // 检查buf中是否可以将该数据添加进去 如果可以添加进去_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));          // 将该数据添加到列表中等待发送} else if (obj->encoding == OBJ_ENCODING_INT) {/* For integer encoded strings we just convert it into a string* using our optimized function, and attach the resulting string* to the output buffer. */char buf[32];size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);if (_addReplyToBuffer(c,buf,len) != C_OK)                        // 检查缓冲区是否有数据可以发送 如果可以则加入待发送列表中_addReplyStringToList(c,buf,len);} else {serverPanic("Wrong obj->encoding in addReply()");}
}

从执行的流程可以看出,只是将待发送的数据保存在了一个列表中去了,并没有注册可写事件,其实真正注册可写事件的是在server.c文件中的beforeSleep函数中的handleClientsWithPendingWrites来检查哪些客户端需要将数据发送给客户端。

int writeToClient(int fd, client *c, int handler_installed) {ssize_t nwritten = 0, totwritten = 0;size_t objlen;clientReplyBlock *o;while(clientHasPendingReplies(c)) {                 // 检查是否还有待发送的数据if (c->bufpos > 0) {                            // 如果有则发送该数据nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);    // 发送数据if (nwritten <= 0) break;                   c->sentlen += nwritten;                     // 统计已经发送出去的数据长度totwritten += nwritten;                     // 统计总的发送出去的数据长度/* If the buffer was sent, set bufpos to zero to continue with* the remainder of the reply. */if ((int)c->sentlen == c->bufpos) {c->bufpos = 0;c->sentlen = 0;}} else {o = listNodeValue(listFirst(c->reply));     // 获取node列表中的数据objlen = o->used;if (objlen == 0) {                          // 如果长度为0 则删除该节点c->reply_bytes -= o->size;listDelNode(c->reply,listFirst(c->reply));continue;}nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);         // 发送数据if (nwritten <= 0) break;c->sentlen += nwritten;                                                 // 统计发送出去的数据totwritten += nwritten;/* If we fully sent the object on head go to the next one */if (c->sentlen == objlen) {c->reply_bytes -= o->size;listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;/* If there are no longer objects in the list, we expect* the count of reply bytes to be exactly zero. */if (listLength(c->reply) == 0)serverAssert(c->reply_bytes == 0);}}/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT* bytes, in a single threaded server it's a good idea to serve* other clients as well, even if a very large request comes from* super fast link that is always able to accept data (in real world* scenario think about 'KEYS *' against the loopback interface).** However if we are over the maxmemory limit we ignore that and* just deliver as much data as it is possible to deliver.** Moreover, we also send as much as possible if the client is* a slave (otherwise, on high-speed traffic, the replication* buffer will grow indefinitely) */if (totwritten > NET_MAX_WRITES_PER_EVENT &&(server.maxmemory == 0 ||zmalloc_used_memory() < server.maxmemory) &&!(c->flags & CLIENT_SLAVE)) break;}server.stat_net_output_bytes += totwritten;if (nwritten == -1) {if (errno == EAGAIN) {nwritten = 0;} else {serverLog(LL_VERBOSE,"Error writing to client: %s", strerror(errno));freeClient(c);return C_ERR;}}if (totwritten > 0) {/* For clients representing masters we don't count sending data* as an interaction, since we always send REPLCONF ACK commands* that take some time to just fill the socket output buffer.* We just rely on data / pings received for timeout detection. */if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;}if (!clientHasPendingReplies(c)) {                                          // 如果没有待发送数据c->sentlen = 0;if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);      // 如果已经注册了读事件则删除读事件/* Close connection after entire reply has been sent. */if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {freeClient(c);return C_ERR;}}return C_OK;
}

如果在loop的beforeSleep中检查到有可以发送的数据则直接发送出去;至此数据的返回也进行了了解,只不过数据的返回没有很明显的调用写事件回调来进行将数据发送给客户端。

总体的事件循环
每次循环调用beforSleep函数进行检查是否有数据发送
直接写入到fd中发送,发送完成下一次循环
event_loop
有数据

总结

本文主要是概述了一下服务器端的数据处理的过程,大致了解了心连接进来是如果进行处理的,接受的新连接是如何接受新的数据处理的流程,最后就是了解了数据是如何发送给客户端的由于没有显示的使用写事件回调而是通过主循环来检查一个列表是否有待发送的数据从而来进行写数据的处理。由于本人才疏学浅,如有错误请批评指正。

Redis源码分析:服务器端处理过程相关推荐

  1. Redis源码分析:基础概念介绍与启动概述

    Redis源码分析 基于Redis-5.0.4版本,进行基础的源码分析,主要就是分析一些平常使用过程中的内容.仅作为相关内容的学习记录,有关Redis源码学习阅读比较广泛的便是<Redis设计与 ...

  2. redis源码分析 -- cs结构之服务器

    服务器与客户端是如何交互的 redis客户端向服务器发送命令请求,服务器接收到客户端发送的命令请求之后,读取解析命令,并执行命令,同时将命令执行结果返回给客户端. 客户端与服务器交互的代码流程如下图所 ...

  3. Redis源码分析(一)redis.c //redis-server.c

    Redis源码分析(一)redis.c //redis-server.c 入口函数 int main() 4450 int main(int argc, char **argv) {4451 init ...

  4. Hyperledger Fabric从源码分析背书提案过程

    在之前的文章中 Hyperledger Fabric从源码分析链码安装过程 Hyperledger Fabric从源码分析链码实例化过程 Hyperledger Fabric从源码分析链码查询与调用 ...

  5. 10年大厂程序员是如何高效学习使用redis的丨redis源码分析丨redis存储原理

    10年大厂程序员是怎么学习使用redis的 1. redis存储原理分析 2. redis源码学习分享 3. redis跳表和B+树详细对比分析 视频讲解如下,点击观看: 10年大厂程序员是如何高效学 ...

  6. WebRTC源码分析-呼叫建立过程之五(创建Offer,CreateOffer,上篇)

    目录 1. 引言 2 CreateOffer声明 && 两个参数 2.1 CreateOffer声明 2.2 参数CreateSessionDescriptionObserver 2. ...

  7. MyBatis 源码分析 - 配置文件解析过程

    文章目录 * 本文速览 1.简介 2.配置文件解析过程分析 2.1 配置文件解析入口 2.2 解析 properties 配置 2.3 解析 settings 配置 2.3.1 settings 节点 ...

  8. Redis 源码分析之故障转移

    在 Redis cluster 中故障转移是个很重要的功能,下面就从故障发现到故障转移整个流程做一下详细分析. 故障检测 PFAIL 标记 集群中每个节点都会定期向其他节点发送 PING 消息,以此来 ...

  9. Redis源码分析 —— 发布与订阅

    前言 通过阅读Redis源码,配合GDB和抓包等调试手段,分析Redis发布订阅的实现原理,思考相关问题. 源码版本:Redis 6.0.10 思考问题 发布订阅基本概念介绍 订阅频道 -- SUBS ...

  10. modelandview使用过程_深入源码分析SpringMVC执行过程

    本文主要讲解 SpringMVC 执行过程,并针对相关源码进行解析. 首先,让我们从 Spring MVC 的四大组件:前端控制器(DispatcherServlet).处理器映射器(HandlerM ...

最新文章

  1. Scratch等级考试(一级)模拟题
  2. 【jetson nano】两台ubuntu ssh远程连接控制
  3. MySQL数据类型--与MySQL零距离接触2-6数据表
  4. mongodb幽灵操作的解决方案
  5. trim的返回值php,php trim()函数
  6. Python+Selenium基础篇之2-打开和关闭火狐浏览器
  7. 用3种方式解决复杂报表
  8. 汇编语言定时器转化为c语言,不用定时器和汇编语言,只用C语言实现精确无误的延时...
  9. Python入门--闭包,工程函数
  10. Java进阶:SpringMVC中获取Restful风格的参数(从请求路径中获取参数 )
  11. 手写字体生成器,这种软件居然被大佬做出来了!
  12. python选择时间窗口_对pandas中时间窗函数rolling的使用详解
  13. 【每天一个 Linux 命令】ssh 命令
  14. 【精益生产】108页PPT搞懂精益生产价值流分析图(VSM)
  15. 医院体检PEIS系统
  16. Processing 案例 | 三角函数之美
  17. 第七课,OpenGL之LookAt函数
  18. python回溯法解9*9数独
  19. 沪漂大专程序员,一边跟刘畊宏健身,一边拿22k的offer
  20. 云集微店怎么做 我的第一份生意经

热门文章

  1. 深度解析MegEngine亚线性显存优化技术
  2. AI应用落地哪家强?CSDN AI Top 30+案例评选等你来秀!
  3. 【码书】一本经典且内容全面算法书籍,学算法必备
  4. 不用开着电脑,如何将脚本代码放到服务器上?
  5. 阿里首次开源深度学习框架X-Deep Learning
  6. YC陆奇发起知乎第一问:怎样的环境才能让更多AI创业公司成功?
  7. “偷鸡”不成的马斯克,终于丢掉了自己的“王位”
  8. 人工智能灵魂注入,燃烧你的卡路里——2018,你AI了吗!?
  9. AI领域真正最最最最最稀缺的人才是……会庖丁解牛的那个人
  10. “神奇女侠”Gal Gadot穿性感粉色吊带乱伦?原来都是深度学习惹的祸