初始化客户端连接的read handler处理流程






1. 读事件的后续流程



4. 关注acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);(在大步骤1中最后一步)
static void acceptCommonHandler(connection *conn, int flags, char *ip) {client *c;char conninfo[100];UNUSED(ip);if (connGetState(conn) != CONN_STATE_ACCEPTING) {serverLog(LL_VERBOSE,"Accepted client connection in error state: %s (conn: %s)",connGetLastError(conn),connGetInfo(conn, conninfo, sizeof(conninfo)));connClose(conn);return;}/* Limit the number of connections we take at the same time.** Admission control will happen before a client is created and connAccept()* called, because we don't want to even start transport-level negotiation* if rejected. */if (listLength(server.clients) + getClusterConnectionsCount()>= server.maxclients){char *err;if (server.cluster_enabled)err = "-ERR max number of clients + cluster ""connections reached\r\n";elseerr = "-ERR max number of clients reached\r\n";/* That's a best effort error message, don't check write errors.* Note that for TLS connections, no handshake was done yet so nothing* is written and the connection will just drop. */if (connWrite(conn,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}server.stat_rejected_conn++;connClose(conn);return;}/* Create connection and client */if ((c = createClient(conn)) == NULL) {serverLog(LL_WARNING,"Error registering fd event for the new client: %s (conn: %s)",connGetLastError(conn),connGetInfo(conn, conninfo, sizeof(conninfo)));connClose(conn); /* May be already closed, just ignore errors */return;}/* Last chance to keep flags */c->flags |= flags;/* Initiate accept.** Note that connAccept() is free to do two things here:* 1. Call clientAcceptHandler() immediately;* 2. Schedule a future call to clientAcceptHandler().** Because of that, we must do nothing else afterwards.*/if (connAccept(conn, clientAcceptHandler) == C_ERR) {char conninfo[100];if (connGetState(conn) == CONN_STATE_ERROR)serverLog(LL_WARNING,"Error accepting a client connection: %s (conn: %s)",connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));freeClient(connGetPrivateData(conn));return;}
1.2 关注acceptCommonHandler的关键子步骤createClient(conn)

client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));/* passing NULL as conn it is possible to create a non connected client.* This is useful since all the commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */if (conn) {connNonBlock(conn);connEnableTcpNoDelay(conn);if (server.tcpkeepalive)connKeepAlive(conn,server.tcpkeepalive);connSetReadHandler(conn, readQueryFromClient);connSetPrivateData(conn, c);}selectDb(c,0);uint64_t client_id = ++server.next_client_id;c->id = client_id;c->resp = 2;c->conn = conn;c->name = NULL;c->bufpos = 0;c->qb_pos = 0;c->querybuf = sdsempty();c->pending_querybuf = sdsempty();c->querybuf_peak = 0;c->reqtype = 0;c->argc = 0;c->argv = NULL;c->cmd = c->lastcmd = NULL;c->user = DefaultUser;c->multibulklen = 0;c->bulklen = -1;c->sentlen = 0;c->flags = 0;c->ctime = c->lastinteraction = server.unixtime;/* If the default user does not require authentication, the user is* directly authenticated. */c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&!(c->user->flags & USER_FLAG_DISABLED);c->replstate = REPL_STATE_NONE;c->repl_put_online_on_ack = 0;c->reploff = 0;c->read_reploff = 0;c->repl_ack_off = 0;c->repl_ack_time = 0;c->slave_listening_port = 0;c->slave_ip[0] = '\0';c->slave_capa = SLAVE_CAPA_NONE;c->reply = listCreate();c->reply_bytes = 0;c->obuf_soft_limit_reached_time = 0;listSetFreeMethod(c->reply,freeClientReplyValue);listSetDupMethod(c->reply,dupClientReplyValue);c->btype = BLOCKED_NONE;c->bpop.timeout = 0;c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);c->bpop.target = NULL;c->bpop.xread_group = NULL;c->bpop.xread_consumer = NULL;c->bpop.xread_group_noack = 0;c->bpop.numreplicas = 0;c->bpop.reploffset = 0;c->woff = 0;c->watched_keys = listCreate();c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);c->pubsub_patterns = listCreate();c->peerid = NULL;c->client_list_node = NULL;c->client_tracking_redirection = 0;c->client_tracking_prefixes = NULL;c->client_cron_last_memory_usage = 0;c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;c->auth_callback = NULL;c->auth_callback_privdata = NULL;c->auth_module = NULL;listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);listSetMatchMethod(c->pubsub_patterns,listMatchObjects);if (conn) linkClient(c);initClientMultiState(c);return c;

1.2.1 connNonBlock

int connNonBlock(connection *conn) {if (conn->fd == -1) return C_ERR;return anetNonBlock(NULL, conn->fd);
}int anetNonBlock(char *err, int fd) {return anetSetBlock(err,fd,1);
}int anetSetBlock(char *err, int fd, int non_block) {int flags;/* Set the socket blocking (if non_block is zero) or non-blocking.* Note that fcntl(2) for F_GETFL and F_SETFL can't be* interrupted by a signal. */if ((flags = fcntl(fd, F_GETFL)) == -1) {anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));return ANET_ERR;}if (non_block)flags |= O_NONBLOCK;elseflags &= ~O_NONBLOCK;if (fcntl(fd, F_SETFL, flags) == -1) {anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));return ANET_ERR;}return ANET_OK;

1.2.2  connEnableTcpNoDelay


Redis 复制参数 repl-disable-tcp-nodelay 学习笔记

int connEnableTcpNoDelay(connection *conn) {if (conn->fd == -1) return C_ERR;return anetEnableTcpNoDelay(NULL, conn->fd);
}int anetEnableTcpNoDelay(char *err, int fd)
{return anetSetTcpNoDelay(err, fd, 1);
}static int anetSetTcpNoDelay(char *err, int fd, int val)
{if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1){anetSetError(err, "setsockopt TCP_NODELAY: %s", strerror(errno));return ANET_ERR;}return ANET_OK;

1.2.3  connKeepAlive(conn,server.tcpkeepalive)


int connKeepAlive(connection *conn, int interval) {if (conn->fd == -1) return C_ERR;return anetKeepAlive(NULL, conn->fd, interval);
int anetKeepAlive(char *err, int fd, int interval)
{int val = 1;if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1){anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));return ANET_ERR;}#ifdef __linux__/* Default settings are more or less garbage, with the keepalive time* set to 7200 by default on Linux. Modify settings to make the feature* actually useful. *//* Send first probe after interval. */val = interval;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPIDLE: %s\n", strerror(errno));return ANET_ERR;}/* Send next probes after the specified interval. Note that we set the* delay as interval / 3, as we send three probes before detecting* an error (see the next setsockopt call). */val = interval/3;if (val == 0) val = 1;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPINTVL: %s\n", strerror(errno));return ANET_ERR;}/* Consider the socket in error state after three we send three ACK* probes without getting a reply. */val = 3;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPCNT: %s\n", strerror(errno));return ANET_ERR;}
#else((void) interval); /* Avoid unused var warning for non Linux systems. */
#endifreturn ANET_OK;

1.2.4 connSetReadHandler(conn, readQueryFromClient)

(conn, readQueryFromClient)--将conn->read_handler设置为readQueryFromClient/* Register a read handler, to be called when the connection is readable.* If NULL, the existing handler is removed.*/
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_read_handler(conn, func);
}ConnectionType CT_Socket = {
.set_read_handler = connSocketSetReadHandler,
}static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {if (func == conn->read_handler) return C_OK;conn->read_handler = func;if (!conn->read_handler)aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
void readQueryFromClient(connection *conn) {client *c = connGetPrivateData(conn);int nread, readlen;size_t qblen;/* Check if we want to read from the client later when exiting from* the event loop. This is the case if threaded I/O is enabled. */if (postponeClientRead(c)) return;/* Update total number of reads on server */server.stat_total_reads_processed++;readlen = PROTO_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG){ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);/* Note that the 'remaining' variable may be zero in some edge case,* for example once we resume a blocked client after CLIENT PAUSE. */if (remaining > 0 && remaining < readlen) readlen = remaining;}qblen = sdslen(c->querybuf);if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);nread = connRead(c->conn, c->querybuf+qblen, readlen);if (nread == -1) {if (connGetState(conn) == CONN_STATE_CONNECTED) {return;} else {serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));freeClientAsync(c);return;}} else if (nread == 0) {serverLog(LL_VERBOSE, "Client closed connection");freeClientAsync(c);return;} else if (c->flags & CLIENT_MASTER) {/* Append the query buffer to the pending (not applied) buffer* of the master. We'll use this buffer later in order to have a* copy of the string applied by the last command executed. */c->pending_querybuf = sdscatlen(c->pending_querybuf,c->querybuf+qblen,nread);}sdsIncrLen(c->querybuf,nread);c->lastinteraction = server.unixtime;if (c->flags & CLIENT_MASTER) c->read_reploff += nread;server.stat_net_input_bytes += nread;if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClientAsync(c);return;}/* There is more data in the client input buffer, continue parsing it* in case to check if there is a full command to execute. */processInputBuffer(c); /* 见下面的具体实现 */
}/* processInputBuffer的具体实现 */
/* This function is called every time, in the client structure 'c', there is* more query buffer to process, because we read more data from the socket* or because a client was blocked and later reactivated, so there could be* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {/* Return if clients are paused. */if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;/* Immediately abort if the client is in the middle of something. */if (c->flags & CLIENT_BLOCKED) break;/* Don't process more buffers from clients that have already pending* commands to execute in c->argv. */if (c->flags & CLIENT_PENDING_COMMAND) break;/* Don't process input from the master while there is a busy script* condition on the slave. We want just to accumulate the replication* stream (instead of replying -BUSY like we do with other clients) and* later resume the processing. */if (server.lua_timedout && c->flags & CLIENT_MASTER) break;/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands).** The same applies for clients we want to terminate ASAP. */if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;/* Determine request type when unknown. */if (!c->reqtype) {if (c->querybuf[c->qb_pos] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;} else {c->reqtype = PROTO_REQ_INLINE;}}if (c->reqtype == PROTO_REQ_INLINE) {if (processInlineBuffer(c) != C_OK) break;/* If the Gopher mode and we got zero or one argument, process* the request in Gopher mode. */if (server.gopher_enabled &&((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||c->argc == 0)){processGopherRequest(c);resetClient(c);c->flags |= CLIENT_CLOSE_AFTER_REPLY;break;}} else if (c->reqtype == PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != C_OK) break;} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* If we are in the context of an I/O thread, we can't really* execute the command here. All we can do is to flag the client* as one that needs to process the command. */if (c->flags & CLIENT_PENDING_READ) {c->flags |= CLIENT_PENDING_COMMAND;break;}/* We are finally ready to execute the command. */if (processCommandAndResetClient(c) == C_ERR) {/* If the client is no longer valid, we avoid exiting this* loop and trimming the client buffer later. So we return* ASAP in that case. */return;}}}/* Trim to pos */if (c->qb_pos) {sdsrange(c->querybuf,c->qb_pos,-1);c->qb_pos = 0;}

1.2.5 connSetPrivateData(conn, c)

void connSetPrivateData(connection *conn, void *data) {conn->private_data = data;

1.2.6 客户端结构体

typedef struct client {uint64_t id;            /* Client incremental unique ID. */connection *conn;int resp;               /* RESP protocol version. Can be 2 or 3. */redisDb *db;            /* Pointer to currently SELECTed DB. */robj *name;             /* As set by CLIENT SETNAME. */sds querybuf;           /* Buffer we use to accumulate client queries. */size_t qb_pos;          /* The position we have read in querybuf. */sds pending_querybuf;   /* If this client is flagged as master, this bufferrepresents the yet not applied portion of thereplication stream that we are receiving fromthe master. */size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */int argc;               /* Num of arguments of current command. */robj **argv;            /* Arguments of current command. */struct redisCommand *cmd, *lastcmd;  /* Last command executed. */user *user;             /* User associated with this connection. If theuser is set to NULL the connection can doanything (admin). */int reqtype;            /* Request protocol type: PROTO_REQ_* */int multibulklen;       /* Number of multi bulk arguments left to read. */long bulklen;           /* Length of bulk argument in multi bulk request. */list *reply;            /* List of reply objects to send to the client. */unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */size_t sentlen;         /* Amount of bytes already sent in the currentbuffer or object being sent. */time_t ctime;           /* Client creation time. */time_t lastinteraction; /* Time of the last interaction, used for timeout */time_t obuf_soft_limit_reached_time;uint64_t flags;         /* Client flags: CLIENT_* macros. */int authenticated;      /* Needed when the default user requires auth. */int replstate;          /* Replication state if this is a slave. */int repl_put_online_on_ack; /* Install slave write handler on first ACK. */int repldbfd;           /* Replication DB file descriptor. */off_t repldboff;        /* Replication DB file offset. */off_t repldbsize;       /* Replication DB file size. */sds replpreamble;       /* Replication DB preamble. */long long read_reploff; /* Read replication offset if this is a master. */long long reploff;      /* Applied replication offset if this is a master. */long long repl_ack_off; /* Replication ack offset, if this is a slave. */long long repl_ack_time;/* Replication ack time, if this is a slave. */long long psync_initial_offset; /* FULLRESYNC reply offset other slavescopying this slave output buffershould use. */char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */int slave_listening_port; /* As configured with: SLAVECONF listening-port */char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */multiState mstate;      /* MULTI/EXEC state */int btype;              /* Type of blocking op if CLIENT_BLOCKED. */blockingState bpop;     /* blocking state */long long woff;         /* Last write global replication offset. */list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */sds peerid;             /* Cached peer ID. */listNode *client_list_node; /* list node in client list */RedisModuleUserChangedFunc auth_callback; /* Module callback to execute* when the authenticated user* changes. */void *auth_callback_privdata; /* Private data that is passed when the auth* changed callback is executed. Opaque for* Redis Core. */void *auth_module;      /* The module that owns the callback, which is used* to disconnect the client if the module is* unloaded for cleanup. Opaque for Redis Core.*//* If this client is in tracking mode and this field is non zero,* invalidation messages for keys fetched by this client will be send to* the specified client ID. */uint64_t client_tracking_redirection;rax *client_tracking_prefixes; /* A dictionary of prefixes we are alreadysubscribed to in BCAST mode, in thecontext of client side caching. *//* In clientsCronTrackClientsMemUsage() we track the memory usage of* each client and add it to the sum of all the clients of a given type,* however we need to remember what was the old contribution of each* client, and in which categoty the client was, in order to remove it* before adding it the new value. */uint64_t client_cron_last_memory_usage;int      client_cron_last_memory_type;/* Response buffer */int bufpos;char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

1.3 关注acceptCommonHandler的关键子步骤connAccept

connAccept(conn, clientAcceptHandler) static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {return conn->type->accept(conn, accept_handler);
}ConnectionType CT_Socket = {
.accept = connSocketAccept,
所以,conn->type->accept(conn, accept_handler)即是调用:
connSocketAccept(conn,clientAcceptHandler)static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {int ret = C_OK;if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;conn->state = CONN_STATE_CONNECTED;connIncrRefs(conn);if (!callHandler(conn, accept_handler)) ret = C_ERR; // 调用clientAcceptHandlerconnDecrRefs(conn);return ret;
}void clientAcceptHandler(connection *conn) {client *c = connGetPrivateData(conn);if (connGetState(conn) != CONN_STATE_CONNECTED) {serverLog(LL_WARNING,"Error accepting a client connection: %s",connGetLastError(conn));freeClientAsync(c);return;}/* If the server is running in protected mode (the default) and there* is no password set, nor a specific interface is bound, we don't accept* requests from non loopback interfaces. Instead we try to explain the* user what to do to fix it if needed. */if (server.protected_mode &&server.bindaddr_count == 0 &&DefaultUser->flags & USER_FLAG_NOPASS &&!(c->flags & CLIENT_UNIX_SOCKET)){char cip[NET_IP_STR_LEN+1] = { 0 };connPeerToString(conn, cip, sizeof(cip)-1, NULL);if (strcmp(cip,"") && strcmp(cip,"::1")) {char *err ="-DENIED Redis is running in protected mode because protected ""mode is enabled, no bind address was specified, no ""authentication password is requested to clients. In this mode ""connections are only accepted from the loopback interface. ""If you want to connect from external computers to Redis you ""may adopt one of the following solutions: ""1) Just disable protected mode sending the command ""'CONFIG SET protected-mode no' from the loopback interface ""by connecting to Redis from the same host the server is ""running, however MAKE SURE Redis is not publicly accessible ""from internet if you do so. Use CONFIG REWRITE to make this ""change permanent. ""2) Alternatively you can just disable the protected mode by ""editing the Redis configuration file, and setting the protected ""mode option to 'no', and then restarting the server. ""3) If you started the server manually just for testing, restart ""it with the '--protected-mode no' option. ""4) Setup a bind address or an authentication password. ""NOTE: You only need to do one of the above things in order for ""the server to start accepting connections from the outside.\r\n";if (connWrite(c->conn,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}server.stat_rejected_conn++;freeClientAsync(c);return;}}server.stat_numconnections++;moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,c);

1.3.1 moduleFireServerEvent


/* Register a write handler, to be called when the connection is writable.* If NULL, the existing handler is removed.*/
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_write_handler(conn, func, 0);
}void beforeSleep(struct aeEventLoop *eventLoop) {...handleClientsWithPendingReadsUsingThreads();...
}int handleClientsWithPendingWritesUsingThreads(void) {
while((ln = listNext(&li))) {client *c = listNodeValue(ln);/* Install the write handler if there are pending writes in some* of the clients. */if (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){freeClientAsync(c);}}
}/* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {client *c = connGetPrivateData(conn);writeToClient(c,1);
}/* Get the associated private data pointer */
void *connGetPrivateData(connection *conn) {return conn->private_data;
}int writeToClient(client *c, int handler_installed) {/* Update total number of writes on server */server.stat_total_writes_processed++;ssize_t nwritten = 0, totwritten = 0;size_t objlen;clientReplyBlock *o;while(clientHasPendingReplies(c)) {if (c->bufpos > 0) {nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);if (nwritten <= 0) break;c->sentlen += nwritten;totwritten += nwritten;/* If the buffer was sent, set bufpos to zero to continue with* the remainder of the reply. */if ((int)c->sentlen == c->bufpos) {c->bufpos = 0;c->sentlen = 0;}} else {o = listNodeValue(listFirst(c->reply));objlen = o->used;if (objlen == 0) {c->reply_bytes -= o->size;listDelNode(c->reply,listFirst(c->reply));continue;}nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);if (nwritten <= 0) break;c->sentlen += nwritten;totwritten += nwritten;/* If we fully sent the object on head go to the next one */if (c->sentlen == objlen) {c->reply_bytes -= o->size;listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;/* If there are no longer objects in the list, we expect* the count of reply bytes to be exactly zero. */if (listLength(c->reply) == 0)serverAssert(c->reply_bytes == 0);}}/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT* bytes, in a single threaded server it's a good idea to serve* other clients as well, even if a very large request comes from* super fast link that is always able to accept data (in real world* scenario think about 'KEYS *' against the loopback interface).** However if we are over the maxmemory limit we ignore that and* just deliver as much data as it is possible to deliver.** Moreover, we also send as much as possible if the client is* a slave or a monitor (otherwise, on high-speed traffic, the* replication/output buffer will grow indefinitely) */if (totwritten > NET_MAX_WRITES_PER_EVENT &&(server.maxmemory == 0 ||zmalloc_used_memory() < server.maxmemory) &&!(c->flags & CLIENT_SLAVE)) break;}server.stat_net_output_bytes += totwritten;if (nwritten == -1) {if (connGetState(c->conn) == CONN_STATE_CONNECTED) {nwritten = 0;} else {serverLog(LL_VERBOSE,"Error writing to client: %s", connGetLastError(c->conn));freeClientAsync(c);return C_ERR;}}if (totwritten > 0) {/* For clients representing masters we don't count sending data* as an interaction, since we always send REPLCONF ACK commands* that take some time to just fill the socket output buffer.* We just rely on data / pings received for timeout detection. */if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;}if (!clientHasPendingReplies(c)) {c->sentlen = 0;/* Note that writeToClient() is called in a threaded way, but* adDeleteFileEvent() is not thread safe: however writeToClient()* is always called with handler_installed set to 0 from threads* so we are fine. */if (handler_installed) connSetWriteHandler(c->conn, NULL);/* Close connection after entire reply has been sent. */if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {freeClientAsync(c);return C_ERR;}}return C_OK;


