





        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,acceptTcpHandler,NULL) == AE_ERR)     // 位于server.c  initServer函数中


void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {int cport, cfd, max = MAX_ACCEPTS_PER_CALL;char cip[NET_IP_STR_LEN];UNUSED(el);UNUSED(mask);UNUSED(privdata);while(max--) {cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);     // 接受新进来的请求if (cfd == ANET_ERR) {                                                // 如果出错则返回if (errno != EWOULDBLOCK)serverLog(LL_WARNING,"Accepting client connection: %s", server.neterr);return;}serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);                   // 打印日志acceptCommonHandler(cfd,0,cip);                                       // 接受请求处理}


static void acceptCommonHandler(int fd, int flags, char *ip) {client *c;if ((c = createClient(fd)) == NULL) {               // 给每个新进来的客户端新建一个客户端对象serverLog(LL_WARNING,"Error registering fd event for the new client: %s (fd=%d)",strerror(errno),fd);close(fd); /* May be already closed, just ignore errors */   // 如果创建客户端对象出错则关闭连接并返回return;}/* If maxclient directive is set and this is one client more... close the* connection. Note that we create the client instead to check before* for this condition, since now the socket is already set in non-blocking* mode and we can send an error for free using the Kernel I/O */if (listLength(server.clients) > server.maxclients) {               // 如果当前已经接受的客户端的连接数超过了配置的最大连接数char *err = "-ERR max number of clients reached\r\n";/* That's a best effort error message, don't check write errors */if (write(c->fd,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}server.stat_rejected_conn++;                                    // 拒绝该连接  并释放分配好的内存空间freeClient(c);                                                  return;}/* If the server is running in protected mode (the default) and there* is no password set, nor a specific interface is bound, we don't accept* requests from non loopback interfaces. Instead we try to explain the* user what to do to fix it if needed. */if (server.protected_mode &&server.bindaddr_count == 0 &&server.requirepass == NULL &&!(flags & CLIENT_UNIX_SOCKET) &&                            // 检查是否是保护模式等模式ip != NULL){if (strcmp(ip,"") && strcmp(ip,"::1")) {char *err ="-DENIED Redis is running in protected mode because protected ""mode is enabled, no bind address was specified, no ""authentication password is requested to clients. In this mode ""connections are only accepted from the loopback interface. ""If you want to connect from external computers to Redis you ""may adopt one of the following solutions: ""1) Just disable protected mode sending the command ""'CONFIG SET protected-mode no' from the loopback interface ""by connecting to Redis from the same host the server is ""running, however MAKE SURE Redis is not publicly accessible ""from internet if you do so. Use CONFIG REWRITE to make this ""change permanent. ""2) Alternatively you can just disable the protected mode by ""editing the Redis configuration file, and setting the protected ""mode option to 'no', and then restarting the server. ""3) If you started the server manually just for testing, restart ""it with the '--protected-mode no' option. ""4) Setup a bind address or an authentication password. ""NOTE: You only need to do one of the above things in order for ""the server to start accepting connections from the outside.\r\n";if (write(c->fd,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}server.stat_rejected_conn++;freeClient(c);return;}}server.stat_numconnections++;c->flags |= flags;


client *createClient(int fd) {client *c = zmalloc(sizeof(client));                    // 申请内存/* passing -1 as fd it is possible to create a non connected client.* This is useful since all the commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */if (fd != -1) {                                         // 检查文件描述符是否合法anetNonBlock(NULL,fd);                              // 设置该连接为非阻塞连接anetEnableTcpNoDelay(NULL,fd);                      // 是否开启Nagle算法if (server.tcpkeepalive)anetKeepAlive(NULL,fd,server.tcpkeepalive);if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR)              // 创建数据进来的时候的处理回调函数readQueryFromClient{close(fd);                                      // 如果注册出错则关闭连接并释放内存zfree(c);return NULL;}}selectDb(c,0);                                          // 默认选择0号数据库uint64_t client_id;atomicGetIncr(server.next_client_id,client_id,1);      // 原子性的客户端id加1c->id = client_id;                                     // 设置客户端idc->fd = fd;c->name = NULL;c->bufpos = 0;c->qb_pos = 0;c->querybuf = sdsempty();                              // 保存的接受客户端发送的数据c->pending_querybuf = sdsempty();c->querybuf_peak = 0;c->reqtype = 0;c->argc = 0;                                            // 客户端发送的数据参数c->argv = NULL;c->cmd = c->lastcmd = NULL;c->multibulklen = 0;c->bulklen = -1;c->sentlen = 0;                                         // 发送的数据长度c->flags = 0;c->ctime = c->lastinteraction = server.unixtime;c->authenticated = 0;                                   // 是否认证c->replstate = REPL_STATE_NONE;c->repl_put_online_on_ack = 0;c->reploff = 0;c->read_reploff = 0;c->repl_ack_off = 0;c->repl_ack_time = 0;c->slave_listening_port = 0;c->slave_ip[0] = '\0';c->slave_capa = SLAVE_CAPA_NONE;c->reply = listCreate();c->reply_bytes = 0;c->obuf_soft_limit_reached_time = 0;listSetFreeMethod(c->reply,freeClientReplyValue);listSetDupMethod(c->reply,dupClientReplyValue);c->btype = BLOCKED_NONE;c->bpop.timeout = 0;c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);c->bpop.target = NULL;c->bpop.xread_group = NULL;c->bpop.xread_consumer = NULL;c->bpop.xread_group_noack = 0;c->bpop.numreplicas = 0;c->bpop.reploffset = 0;c->woff = 0;c->watched_keys = listCreate();c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);       // 创建发布订阅的通道字典c->pubsub_patterns = listCreate();c->peerid = NULL;c->client_list_node = NULL;listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);             // 设置订阅发布相关列表listSetMatchMethod(c->pubsub_patterns,listMatchObjects);if (fd != -1) linkClient(c);                                        // 添加到客户端列表中initClientMultiState(c);                                        return c;




void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {client *c = (client*) privdata;int nread, readlen;size_t qblen;UNUSED(el);UNUSED(mask);readlen = PROTO_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG)                                       // 检查该命令是否是多个命令的请求{ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);/* Note that the 'remaining' variable may be zero in some edge case,* for example once we resume a blocked client after CLIENT PAUSE. */if (remaining > 0 && remaining < readlen) readlen = remaining;}qblen = sdslen(c->querybuf);                                                    // 获取保存的缓冲区大小if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);                             // 扩充内存来容纳剩余需要读到的数据nread = read(fd, c->querybuf+qblen, readlen);                                   // 从连接中读取数据if (nread == -1) {                                                              // 如果读取的长度为空则检查是否连接出错if (errno == EAGAIN) {return;} else {serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));       // 释放连接并打印错误freeClient(c);return;}} else if (nread == 0) {                                                        // 如果为0 则表示连接关闭 释放客户端serverLog(LL_VERBOSE, "Client closed connection");freeClient(c);return;} else if (c->flags & CLIENT_MASTER) {/* Append the query buffer to the pending (not applied) buffer* of the master. We'll use this buffer later in order to have a* copy of the string applied by the last command executed. */c->pending_querybuf = sdscatlen(c->pending_querybuf,c->querybuf+qblen,nread);}sdsIncrLen(c->querybuf,nread);                                                  // 提升保存缓存区的大小c->lastinteraction = server.unixtime;                                           // 获取时间if (c->flags & CLIENT_MASTER) c->read_reploff += nread;                 server.stat_net_input_bytes += nread;                                           // 保存新加入的数据处理长度if (sdslen(c->querybuf) > server.client_max_querybuf_len) {                     // 如果超过了缓冲区最大的接受长度则答应错误释放内存关闭客户端sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64);serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClient(c);return;}/* Time to process the buffer. If the client is a master we need to* compute the difference between the applied offset before and after* processing the buffer, to understand how much of the replication stream* was actually applied to the master state: this quantity, and its* corresponding part of the replication stream, will be propagated to* the sub-slaves and to the replication backlog. */processInputBufferAndReplicate(c);                                      // 尝试去解析输入数据并根据解析的数据执行对应命令


void processInputBufferAndReplicate(client *c) {if (!(c->flags & CLIENT_MASTER)) {                      // 如果不是master则直接解析数据        processInputBuffer(c);} else {size_t prev_offset = c->reploff;                    // 获取上一次回复的位置processInputBuffer(c);                              // 解析输入参数size_t applied = c->reploff - prev_offset;          // 回复现在的需要发送给slave复制的数据长度if (applied) {replicationFeedSlavesFromMasterStream(server.slaves,c->pending_querybuf, applied);          // 发送个slave节点数据sdsrange(c->pending_querybuf,applied,-1);}}


void processInputBuffer(client *c) {server.current_client = c;                          // 设置当前处理的连接的客户端/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {            // 检查长度是否符合要求, 循环执行 因为缓冲区可能会接受多个命令/* Return if clients are paused. */if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;/* Immediately abort if the client is in the middle of something. */if (c->flags & CLIENT_BLOCKED) break;/* Don't process input from the master while there is a busy script* condition on the slave. We want just to accumulate the replication* stream (instead of replying -BUSY like we do with other clients) and* later resume the processing. */if (server.lua_timedout && c->flags & CLIENT_MASTER) break;/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands).** The same applies for clients we want to terminate ASAP. */if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;/* Determine request type when unknown. */if (!c->reqtype) {                                                      // 通过传入的第一个数据来检查是否是多个命令参数if (c->querybuf[c->qb_pos] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;} else {c->reqtype = PROTO_REQ_INLINE;}}if (c->reqtype == PROTO_REQ_INLINE) {                                   // 检查接受数据的参数类型if (processInlineBuffer(c) != C_OK) break;                          // 解析一个命令参数} else if (c->reqtype == PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != C_OK) break;                       // 解析多个命令参数} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {                                             resetClient(c);                                                     // 重置标志位和参数} else {/* Only reset the client when the command was executed. */      // if (processCommand(c) == C_OK) {                                    // 处理参数查找命令if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {/* Update the applied replication offset of our master. */c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos;     }/* Don't reset the client structure for clients blocked in a* module blocking command, so that the reply callback will* still be able to access the client argv and argc field.* The client will be reset in unblockClientFromModule(). */if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)resetClient(c);}/* freeMemoryIfNeeded may flush slave output buffers. This may* result into a slave, that may be the active client, to be* freed. */if (server.current_client == NULL) break;}}/* Trim to pos */if (server.current_client != NULL && c->qb_pos) {sdsrange(c->querybuf,c->qb_pos,-1);c->qb_pos = 0;}server.current_client = NULL;                       // 处理完成置为空


int processCommand(client *c) {/* The QUIT command is handled separately. Normal command procs will* go through checking for replication and QUIT will cause trouble* when FORCE_REPLICATION is enabled and would be implemented in* a regular command proc. */if (!strcasecmp(c->argv[0]->ptr,"quit")) {              // 检查是否是quit指令addReply(c,shared.ok);                              // 回复ok 并返回错误c->flags |= CLIENT_CLOSE_AFTER_REPLY;return C_ERR;}/* Now lookup the command and check ASAP about trivial error conditions* such as wrong arity, bad command name and so forth. */c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);       // 查找客户端传入的指令if (!c->cmd) {                                              // 如果没有找到指令flagTransaction(c);sds args = sdsempty();int i;for (i=1; i < c->argc && sdslen(args) < 128; i++)args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",(char*)c->argv[0]->ptr, args);          // 释放对应的输入参数并返回客户端没找到该指令sdsfree(args);return C_OK;} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||(c->argc < -c->cmd->arity)) {flagTransaction(c);addReplyErrorFormat(c,"wrong number of arguments for '%s' command",c->cmd->name);                          // 检查是否是错误的输入参数return C_OK;}/* Check if the user is authenticated */if (server.requirepass && !c->authenticated && c->cmd->proc != authCommand){flagTransaction(c);addReply(c,shared.noautherr);                           // 如果是没有认证则返回认证失败return C_OK;}/* If cluster is enabled perform the cluster redirection here.* However we don't perform the redirection if:* 1) The sender of this command is our master.* 2) The command has no key arguments. */if (server.cluster_enabled &&!(c->flags & CLIENT_MASTER) &&!(c->flags & CLIENT_LUA &&server.lua_caller->flags & CLIENT_MASTER) &&!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&c->cmd->proc != execCommand)){int hashslot;int error_code;clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,&hashslot,&error_code);if (n == NULL || n != server.cluster->myself) {if (c->cmd->proc == execCommand) {discardTransaction(c);} else {flagTransaction(c);}clusterRedirectClient(c,n,hashslot,error_code);return C_OK;}}/* Handle the maxmemory directive.** Note that we do not want to reclaim memory if we are here re-entering* the event loop since there is a busy Lua script running in timeout* condition, to avoid mixing the propagation of scripts with the* propagation of DELs due to eviction. */if (server.maxmemory && !server.lua_timedout) {                         // 检查是否打开了内存限制int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;           // 检查是否超过了内存使用量/* freeMemoryIfNeeded may flush slave output buffers. This may result* into a slave, that may be the active client, to be freed. */if (server.current_client == NULL) return C_ERR;/* It was impossible to free enough memory, and the command the client* is trying to execute is denied during OOM conditions or the client* is in MULTI/EXEC context? Error. */if (out_of_memory &&(c->cmd->flags & CMD_DENYOOM ||(c->flags & CLIENT_MULTI && c->cmd->proc != execCommand))) {flagTransaction(c);addReply(c, shared.oomerr);                                     // 返回超过内存使用内容return C_OK;}}/* Don't accept write commands if there are problems persisting on disk* and if this is a master instance. */int deny_write_type = writeCommandsDeniedByDiskError();if (deny_write_type != DISK_ERROR_TYPE_NONE &&server.masterhost == NULL &&(c->cmd->flags & CMD_WRITE ||c->cmd->proc == pingCommand)){flagTransaction(c);if (deny_write_type == DISK_ERROR_TYPE_RDB)addReply(c, shared.bgsaveerr);elseaddReplySds(c,sdscatprintf(sdsempty(),"-MISCONF Errors writing to the AOF file: %s\r\n",strerror(server.aof_last_write_errno)));return C_OK;}/* Don't accept write commands if there are not enough good slaves and* user configured the min-slaves-to-write option. */if (server.masterhost == NULL &&server.repl_min_slaves_to_write &&server.repl_min_slaves_max_lag &&c->cmd->flags & CMD_WRITE &&server.repl_good_slaves_count < server.repl_min_slaves_to_write){flagTransaction(c);addReply(c, shared.noreplicaserr);return C_OK;}/* Don't accept write commands if this is a read only slave. But* accept write commands if this is our master. */if (server.masterhost && server.repl_slave_ro &&!(c->flags & CLIENT_MASTER) &&c->cmd->flags & CMD_WRITE){addReply(c, shared.roslaveerr);return C_OK;}/* Only allow SUBSCRIBE and UNSUBSCRIBE in the context of Pub/Sub */if (c->flags & CLIENT_PUBSUB &&c->cmd->proc != pingCommand &&c->cmd->proc != subscribeCommand &&c->cmd->proc != unsubscribeCommand &&c->cmd->proc != psubscribeCommand &&c->cmd->proc != punsubscribeCommand) {addReplyError(c,"only (P)SUBSCRIBE / (P)UNSUBSCRIBE / PING / QUIT allowed in this context");return C_OK;}/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,* when slave-serve-stale-data is no and we are a slave with a broken* link with master. */if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&server.repl_serve_stale_data == 0 &&!(c->cmd->flags & CMD_STALE)){flagTransaction(c);addReply(c, shared.masterdownerr);return C_OK;}/* Loading DB? Return an error if the command has not the* CMD_LOADING flag. */if (server.loading && !(c->cmd->flags & CMD_LOADING)) {addReply(c, shared.loadingerr);return C_OK;}/* Lua script too slow? Only allow a limited number of commands. */if (server.lua_timedout &&c->cmd->proc != authCommand &&c->cmd->proc != replconfCommand &&!(c->cmd->proc == shutdownCommand &&c->argc == 2 &&tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&!(c->cmd->proc == scriptCommand &&c->argc == 2 &&tolower(((char*)c->argv[1]->ptr)[0]) == 'k')){flagTransaction(c);addReply(c, shared.slowscripterr);return C_OK;}/* Exec the command */if (c->flags & CLIENT_MULTI &&c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&c->cmd->proc != multiCommand && c->cmd->proc != watchCommand){queueMultiCommand(c);addReply(c,shared.queued);} else {call(c,CMD_CALL_FULL);                      // 执行命令c->woff = server.master_repl_offset;if (listLength(server.ready_keys))handleClientsBlockedOnKeys();}return C_OK;




int _addReplyToBuffer(client *c, const char *s, size_t len) {size_t available = sizeof(c->buf)-c->bufpos;                        // 检查还剩余多少空间可用if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;               // 是否关闭/* If there already are entries in the reply list, we cannot* add anything more to the static buffer. */if (listLength(c->reply) > 0) return C_ERR;                         // 检查是否有应答/* Check that the buffer has enough space available for this string. */if (len > available) return C_ERR;                                  // 如果超出容量则返回错误码memcpy(c->buf+c->bufpos,s,len);                                     // 置空c->bufpos+=len;return C_OK;
}void _addReplyStringToList(client *c, const char *s, size_t len) {if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return;listNode *ln = listLast(c->reply);                                  // 获取回复的列表clientReplyBlock *tail = ln? listNodeValue(ln): NULL;               /* Note that 'tail' may be NULL even if we have a tail node, becuase when* addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just* fo fill it later, when the size of the bulk length is set. *//* Append to tail string when possible. */if (tail) {                                                         // 如果列表中已经有数据,拷贝到新的值中去 将剩下的留给新的空间/* Copy the part we can fit into the tail, and leave the rest for a* new node */size_t avail = tail->size - tail->used;                     size_t copy = avail >= len? len: avail;memcpy(tail->buf + tail->used, s, copy);tail->used += copy;s += copy;len -= copy;}if (len) {/* Create a new node, make sure it is allocated to at* least PROTO_REPLY_CHUNK_BYTES */                                 // 新建一个node来保存待发送数据size_t size = len < PROTO_REPLY_CHUNK_BYTES? PROTO_REPLY_CHUNK_BYTES: len;tail = zmalloc(size + sizeof(clientReplyBlock));/* take over the allocation's internal fragmentation */tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock);tail->used = len;memcpy(tail->buf, s, len);listAddNodeTail(c->reply, tail);                                    // 添加数据c->reply_bytes += tail->size;}asyncCloseClientOnOutputBufferLimitReached(c);
}void addReply(client *c, robj *obj) {if (prepareClientToWrite(c) != C_OK) return;            // 检查客户端是否可以发送数据if (sdsEncodedObject(obj)) {                            // 检查数据的编码方式if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)      // 检查buf中是否可以将该数据添加进去 如果可以添加进去_addReplyStringToList(c,obj->ptr,sdslen(obj->ptr));          // 将该数据添加到列表中等待发送} else if (obj->encoding == OBJ_ENCODING_INT) {/* For integer encoded strings we just convert it into a string* using our optimized function, and attach the resulting string* to the output buffer. */char buf[32];size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);if (_addReplyToBuffer(c,buf,len) != C_OK)                        // 检查缓冲区是否有数据可以发送 如果可以则加入待发送列表中_addReplyStringToList(c,buf,len);} else {serverPanic("Wrong obj->encoding in addReply()");}


int writeToClient(int fd, client *c, int handler_installed) {ssize_t nwritten = 0, totwritten = 0;size_t objlen;clientReplyBlock *o;while(clientHasPendingReplies(c)) {                 // 检查是否还有待发送的数据if (c->bufpos > 0) {                            // 如果有则发送该数据nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);    // 发送数据if (nwritten <= 0) break;                   c->sentlen += nwritten;                     // 统计已经发送出去的数据长度totwritten += nwritten;                     // 统计总的发送出去的数据长度/* If the buffer was sent, set bufpos to zero to continue with* the remainder of the reply. */if ((int)c->sentlen == c->bufpos) {c->bufpos = 0;c->sentlen = 0;}} else {o = listNodeValue(listFirst(c->reply));     // 获取node列表中的数据objlen = o->used;if (objlen == 0) {                          // 如果长度为0 则删除该节点c->reply_bytes -= o->size;listDelNode(c->reply,listFirst(c->reply));continue;}nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);         // 发送数据if (nwritten <= 0) break;c->sentlen += nwritten;                                                 // 统计发送出去的数据totwritten += nwritten;/* If we fully sent the object on head go to the next one */if (c->sentlen == objlen) {c->reply_bytes -= o->size;listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;/* If there are no longer objects in the list, we expect* the count of reply bytes to be exactly zero. */if (listLength(c->reply) == 0)serverAssert(c->reply_bytes == 0);}}/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT* bytes, in a single threaded server it's a good idea to serve* other clients as well, even if a very large request comes from* super fast link that is always able to accept data (in real world* scenario think about 'KEYS *' against the loopback interface).** However if we are over the maxmemory limit we ignore that and* just deliver as much data as it is possible to deliver.** Moreover, we also send as much as possible if the client is* a slave (otherwise, on high-speed traffic, the replication* buffer will grow indefinitely) */if (totwritten > NET_MAX_WRITES_PER_EVENT &&(server.maxmemory == 0 ||zmalloc_used_memory() < server.maxmemory) &&!(c->flags & CLIENT_SLAVE)) break;}server.stat_net_output_bytes += totwritten;if (nwritten == -1) {if (errno == EAGAIN) {nwritten = 0;} else {serverLog(LL_VERBOSE,"Error writing to client: %s", strerror(errno));freeClient(c);return C_ERR;}}if (totwritten > 0) {/* For clients representing masters we don't count sending data* as an interaction, since we always send REPLCONF ACK commands* that take some time to just fill the socket output buffer.* We just rely on data / pings received for timeout detection. */if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;}if (!clientHasPendingReplies(c)) {                                          // 如果没有待发送数据c->sentlen = 0;if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);      // 如果已经注册了读事件则删除读事件/* Close connection after entire reply has been sent. */if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {freeClient(c);return C_ERR;}}return C_OK;






