初始化客户端连接的read handler处理流程

redis源码分析之九网络服务端

Redis接受连接请求流程

Redis事件循环

postponeClientRead的用处

上文:

1. 读事件的后续流程

上文的2.2.4节

1.1关注acceptCommonHandler


4. 关注acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);(在大步骤1中最后一步)
acceptCommonHandler的实现:
static void acceptCommonHandler(connection *conn, int flags, char *ip) {client *c;char conninfo[100];UNUSED(ip);if (connGetState(conn) != CONN_STATE_ACCEPTING) {serverLog(LL_VERBOSE,"Accepted client connection in error state: %s (conn: %s)",connGetLastError(conn),connGetInfo(conn, conninfo, sizeof(conninfo)));connClose(conn);return;}/* Limit the number of connections we take at the same time.** Admission control will happen before a client is created and connAccept()* called, because we don't want to even start transport-level negotiation* if rejected. */if (listLength(server.clients) + getClusterConnectionsCount()>= server.maxclients){char *err;if (server.cluster_enabled)err = "-ERR max number of clients + cluster ""connections reached\r\n";elseerr = "-ERR max number of clients reached\r\n";/* That's a best effort error message, don't check write errors.* Note that for TLS connections, no handshake was done yet so nothing* is written and the connection will just drop. */if (connWrite(conn,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}server.stat_rejected_conn++;connClose(conn);return;}/* Create connection and client */if ((c = createClient(conn)) == NULL) {serverLog(LL_WARNING,"Error registering fd event for the new client: %s (conn: %s)",connGetLastError(conn),connGetInfo(conn, conninfo, sizeof(conninfo)));connClose(conn); /* May be already closed, just ignore errors */return;}/* Last chance to keep flags */c->flags |= flags;/* Initiate accept.** Note that connAccept() is free to do two things here:* 1. Call clientAcceptHandler() immediately;* 2. Schedule a future call to clientAcceptHandler().** Because of that, we must do nothing else afterwards.*/if (connAccept(conn, clientAcceptHandler) == C_ERR) {char conninfo[100];if (connGetState(conn) == CONN_STATE_ERROR)serverLog(LL_WARNING,"Error accepting a client connection: %s (conn: %s)",connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));freeClient(connGetPrivateData(conn));return;}
}
————————————————
版权声明:本文为CSDN博主「Muten」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/Edidaughter/article/details/115940107

1.2 关注acceptCommonHandler的关键子步骤createClient(conn)

client *createClient(connection *conn) {client *c = zmalloc(sizeof(client));/* passing NULL as conn it is possible to create a non connected client.* This is useful since all the commands needs to be executed* in the context of a client. When commands are executed in other* contexts (for instance a Lua script) we need a non connected client. */if (conn) {connNonBlock(conn);connEnableTcpNoDelay(conn);if (server.tcpkeepalive)connKeepAlive(conn,server.tcpkeepalive);connSetReadHandler(conn, readQueryFromClient);connSetPrivateData(conn, c);}selectDb(c,0);uint64_t client_id = ++server.next_client_id;c->id = client_id;c->resp = 2;c->conn = conn;c->name = NULL;c->bufpos = 0;c->qb_pos = 0;c->querybuf = sdsempty();c->pending_querybuf = sdsempty();c->querybuf_peak = 0;c->reqtype = 0;c->argc = 0;c->argv = NULL;c->cmd = c->lastcmd = NULL;c->user = DefaultUser;c->multibulklen = 0;c->bulklen = -1;c->sentlen = 0;c->flags = 0;c->ctime = c->lastinteraction = server.unixtime;/* If the default user does not require authentication, the user is* directly authenticated. */c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&!(c->user->flags & USER_FLAG_DISABLED);c->replstate = REPL_STATE_NONE;c->repl_put_online_on_ack = 0;c->reploff = 0;c->read_reploff = 0;c->repl_ack_off = 0;c->repl_ack_time = 0;c->slave_listening_port = 0;c->slave_ip[0] = '\0';c->slave_capa = SLAVE_CAPA_NONE;c->reply = listCreate();c->reply_bytes = 0;c->obuf_soft_limit_reached_time = 0;listSetFreeMethod(c->reply,freeClientReplyValue);listSetDupMethod(c->reply,dupClientReplyValue);c->btype = BLOCKED_NONE;c->bpop.timeout = 0;c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);c->bpop.target = NULL;c->bpop.xread_group = NULL;c->bpop.xread_consumer = NULL;c->bpop.xread_group_noack = 0;c->bpop.numreplicas = 0;c->bpop.reploffset = 0;c->woff = 0;c->watched_keys = listCreate();c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);c->pubsub_patterns = listCreate();c->peerid = NULL;c->client_list_node = NULL;c->client_tracking_redirection = 0;c->client_tracking_prefixes = NULL;c->client_cron_last_memory_usage = 0;c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;c->auth_callback = NULL;c->auth_callback_privdata = NULL;c->auth_module = NULL;listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);listSetMatchMethod(c->pubsub_patterns,listMatchObjects);if (conn) linkClient(c);initClientMultiState(c);return c;
}

1.2.1 connNonBlock

int connNonBlock(connection *conn) {if (conn->fd == -1) return C_ERR;return anetNonBlock(NULL, conn->fd);
}int anetNonBlock(char *err, int fd) {return anetSetBlock(err,fd,1);
}int anetSetBlock(char *err, int fd, int non_block) {int flags;/* Set the socket blocking (if non_block is zero) or non-blocking.* Note that fcntl(2) for F_GETFL and F_SETFL can't be* interrupted by a signal. */if ((flags = fcntl(fd, F_GETFL)) == -1) {anetSetError(err, "fcntl(F_GETFL): %s", strerror(errno));return ANET_ERR;}if (non_block)flags |= O_NONBLOCK;elseflags &= ~O_NONBLOCK;if (fcntl(fd, F_SETFL, flags) == -1) {anetSetError(err, "fcntl(F_SETFL,O_NONBLOCK): %s", strerror(errno));return ANET_ERR;}return ANET_OK;
}

1.2.2  connEnableTcpNoDelay

nginx指令-浅谈tcp_nodelay

Redis 复制参数 repl-disable-tcp-nodelay 学习笔记

int connEnableTcpNoDelay(connection *conn) {if (conn->fd == -1) return C_ERR;return anetEnableTcpNoDelay(NULL, conn->fd);
}int anetEnableTcpNoDelay(char *err, int fd)
{return anetSetTcpNoDelay(err, fd, 1);
}static int anetSetTcpNoDelay(char *err, int fd, int val)
{if (setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1){anetSetError(err, "setsockopt TCP_NODELAY: %s", strerror(errno));return ANET_ERR;}return ANET_OK;
}

1.2.3  connKeepAlive(conn,server.tcpkeepalive)

TCP选项之TCP_KEEPALIVE

int connKeepAlive(connection *conn, int interval) {if (conn->fd == -1) return C_ERR;return anetKeepAlive(NULL, conn->fd, interval);
}
int anetKeepAlive(char *err, int fd, int interval)
{int val = 1;if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val)) == -1){anetSetError(err, "setsockopt SO_KEEPALIVE: %s", strerror(errno));return ANET_ERR;}#ifdef __linux__/* Default settings are more or less garbage, with the keepalive time* set to 7200 by default on Linux. Modify settings to make the feature* actually useful. *//* Send first probe after interval. */val = interval;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPIDLE: %s\n", strerror(errno));return ANET_ERR;}/* Send next probes after the specified interval. Note that we set the* delay as interval / 3, as we send three probes before detecting* an error (see the next setsockopt call). */val = interval/3;if (val == 0) val = 1;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPINTVL: %s\n", strerror(errno));return ANET_ERR;}/* Consider the socket in error state after three we send three ACK* probes without getting a reply. */val = 3;if (setsockopt(fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val)) < 0) {anetSetError(err, "setsockopt TCP_KEEPCNT: %s\n", strerror(errno));return ANET_ERR;}
#else((void) interval); /* Avoid unused var warning for non Linux systems. */
#endifreturn ANET_OK;
}

1.2.4 connSetReadHandler(conn, readQueryFromClient)

(conn, readQueryFromClient)--将conn->read_handler设置为readQueryFromClient/* Register a read handler, to be called when the connection is readable.* If NULL, the existing handler is removed.*/
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_read_handler(conn, func);
}ConnectionType CT_Socket = {
...
.set_read_handler = connSocketSetReadHandler,
...
}static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {if (func == conn->read_handler) return C_OK;conn->read_handler = func;if (!conn->read_handler)aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);elseif (aeCreateFileEvent(server.el,conn->fd,AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;return C_OK;
}看看readQueryFromClient的具体实现:
void readQueryFromClient(connection *conn) {client *c = connGetPrivateData(conn);int nread, readlen;size_t qblen;/* Check if we want to read from the client later when exiting from* the event loop. This is the case if threaded I/O is enabled. */if (postponeClientRead(c)) return;/* Update total number of reads on server */server.stat_total_reads_processed++;readlen = PROTO_IOBUF_LEN;/* If this is a multi bulk request, and we are processing a bulk reply* that is large enough, try to maximize the probability that the query* buffer contains exactly the SDS string representing the object, even* at the risk of requiring more read(2) calls. This way the function* processMultiBulkBuffer() can avoid copying buffers to create the* Redis Object representing the argument. */if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1&& c->bulklen >= PROTO_MBULK_BIG_ARG){ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);/* Note that the 'remaining' variable may be zero in some edge case,* for example once we resume a blocked client after CLIENT PAUSE. */if (remaining > 0 && remaining < readlen) readlen = remaining;}qblen = sdslen(c->querybuf);if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);nread = connRead(c->conn, c->querybuf+qblen, readlen);if (nread == -1) {if (connGetState(conn) == CONN_STATE_CONNECTED) {return;} else {serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));freeClientAsync(c);return;}} else if (nread == 0) {serverLog(LL_VERBOSE, "Client closed connection");freeClientAsync(c);return;} else if (c->flags & CLIENT_MASTER) {/* Append the query buffer to the pending (not applied) buffer* of the master. We'll use this buffer later in order to have a* copy of the string applied by the last command executed. */c->pending_querybuf = sdscatlen(c->pending_querybuf,c->querybuf+qblen,nread);}sdsIncrLen(c->querybuf,nread);c->lastinteraction = server.unixtime;if (c->flags & CLIENT_MASTER) c->read_reploff += nread;server.stat_net_input_bytes += nread;if (sdslen(c->querybuf) > server.client_max_querybuf_len) {sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();bytes = sdscatrepr(bytes,c->querybuf,64);serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);sdsfree(ci);sdsfree(bytes);freeClientAsync(c);return;}/* There is more data in the client input buffer, continue parsing it* in case to check if there is a full command to execute. */processInputBuffer(c); /* 见下面的具体实现 */
}/* processInputBuffer的具体实现 */
/* This function is called every time, in the client structure 'c', there is* more query buffer to process, because we read more data from the socket* or because a client was blocked and later reactivated, so there could be* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {/* Keep processing while there is something in the input buffer */while(c->qb_pos < sdslen(c->querybuf)) {/* Return if clients are paused. */if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;/* Immediately abort if the client is in the middle of something. */if (c->flags & CLIENT_BLOCKED) break;/* Don't process more buffers from clients that have already pending* commands to execute in c->argv. */if (c->flags & CLIENT_PENDING_COMMAND) break;/* Don't process input from the master while there is a busy script* condition on the slave. We want just to accumulate the replication* stream (instead of replying -BUSY like we do with other clients) and* later resume the processing. */if (server.lua_timedout && c->flags & CLIENT_MASTER) break;/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is* written to the client. Make sure to not let the reply grow after* this flag has been set (i.e. don't process more commands).** The same applies for clients we want to terminate ASAP. */if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;/* Determine request type when unknown. */if (!c->reqtype) {if (c->querybuf[c->qb_pos] == '*') {c->reqtype = PROTO_REQ_MULTIBULK;} else {c->reqtype = PROTO_REQ_INLINE;}}if (c->reqtype == PROTO_REQ_INLINE) {if (processInlineBuffer(c) != C_OK) break;/* If the Gopher mode and we got zero or one argument, process* the request in Gopher mode. */if (server.gopher_enabled &&((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||c->argc == 0)){processGopherRequest(c);resetClient(c);c->flags |= CLIENT_CLOSE_AFTER_REPLY;break;}} else if (c->reqtype == PROTO_REQ_MULTIBULK) {if (processMultibulkBuffer(c) != C_OK) break;} else {serverPanic("Unknown request type");}/* Multibulk processing could see a <= 0 length. */if (c->argc == 0) {resetClient(c);} else {/* If we are in the context of an I/O thread, we can't really* execute the command here. All we can do is to flag the client* as one that needs to process the command. */if (c->flags & CLIENT_PENDING_READ) {c->flags |= CLIENT_PENDING_COMMAND;break;}/* We are finally ready to execute the command. */if (processCommandAndResetClient(c) == C_ERR) {/* If the client is no longer valid, we avoid exiting this* loop and trimming the client buffer later. So we return* ASAP in that case. */return;}}}/* Trim to pos */if (c->qb_pos) {sdsrange(c->querybuf,c->qb_pos,-1);c->qb_pos = 0;}
}

1.2.5 connSetPrivateData(conn, c)

void connSetPrivateData(connection *conn, void *data) {conn->private_data = data;
}

1.2.6 客户端结构体

typedef struct client {uint64_t id;            /* Client incremental unique ID. */connection *conn;int resp;               /* RESP protocol version. Can be 2 or 3. */redisDb *db;            /* Pointer to currently SELECTed DB. */robj *name;             /* As set by CLIENT SETNAME. */sds querybuf;           /* Buffer we use to accumulate client queries. */size_t qb_pos;          /* The position we have read in querybuf. */sds pending_querybuf;   /* If this client is flagged as master, this bufferrepresents the yet not applied portion of thereplication stream that we are receiving fromthe master. */size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */int argc;               /* Num of arguments of current command. */robj **argv;            /* Arguments of current command. */struct redisCommand *cmd, *lastcmd;  /* Last command executed. */user *user;             /* User associated with this connection. If theuser is set to NULL the connection can doanything (admin). */int reqtype;            /* Request protocol type: PROTO_REQ_* */int multibulklen;       /* Number of multi bulk arguments left to read. */long bulklen;           /* Length of bulk argument in multi bulk request. */list *reply;            /* List of reply objects to send to the client. */unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */size_t sentlen;         /* Amount of bytes already sent in the currentbuffer or object being sent. */time_t ctime;           /* Client creation time. */time_t lastinteraction; /* Time of the last interaction, used for timeout */time_t obuf_soft_limit_reached_time;uint64_t flags;         /* Client flags: CLIENT_* macros. */int authenticated;      /* Needed when the default user requires auth. */int replstate;          /* Replication state if this is a slave. */int repl_put_online_on_ack; /* Install slave write handler on first ACK. */int repldbfd;           /* Replication DB file descriptor. */off_t repldboff;        /* Replication DB file offset. */off_t repldbsize;       /* Replication DB file size. */sds replpreamble;       /* Replication DB preamble. */long long read_reploff; /* Read replication offset if this is a master. */long long reploff;      /* Applied replication offset if this is a master. */long long repl_ack_off; /* Replication ack offset, if this is a slave. */long long repl_ack_time;/* Replication ack time, if this is a slave. */long long psync_initial_offset; /* FULLRESYNC reply offset other slavescopying this slave output buffershould use. */char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */int slave_listening_port; /* As configured with: SLAVECONF listening-port */char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */multiState mstate;      /* MULTI/EXEC state */int btype;              /* Type of blocking op if CLIENT_BLOCKED. */blockingState bpop;     /* blocking state */long long woff;         /* Last write global replication offset. */list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */sds peerid;             /* Cached peer ID. */listNode *client_list_node; /* list node in client list */RedisModuleUserChangedFunc auth_callback; /* Module callback to execute* when the authenticated user* changes. */void *auth_callback_privdata; /* Private data that is passed when the auth* changed callback is executed. Opaque for* Redis Core. */void *auth_module;      /* The module that owns the callback, which is used* to disconnect the client if the module is* unloaded for cleanup. Opaque for Redis Core.*//* If this client is in tracking mode and this field is non zero,* invalidation messages for keys fetched by this client will be send to* the specified client ID. */uint64_t client_tracking_redirection;rax *client_tracking_prefixes; /* A dictionary of prefixes we are alreadysubscribed to in BCAST mode, in thecontext of client side caching. *//* In clientsCronTrackClientsMemUsage() we track the memory usage of* each client and add it to the sum of all the clients of a given type,* however we need to remember what was the old contribution of each* client, and in which categoty the client was, in order to remove it* before adding it the new value. */uint64_t client_cron_last_memory_usage;int      client_cron_last_memory_type;/* Response buffer */int bufpos;char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

1.3 关注acceptCommonHandler的关键子步骤connAccept

connAccept(conn, clientAcceptHandler) static inline int connAccept(connection *conn, ConnectionCallbackFunc accept_handler) {return conn->type->accept(conn, accept_handler);
}ConnectionType CT_Socket = {
...
.accept = connSocketAccept,
...
}
所以,conn->type->accept(conn, accept_handler)即是调用:
connSocketAccept(conn,clientAcceptHandler)static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {int ret = C_OK;if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;conn->state = CONN_STATE_CONNECTED;connIncrRefs(conn);if (!callHandler(conn, accept_handler)) ret = C_ERR; // 调用clientAcceptHandlerconnDecrRefs(conn);return ret;
}void clientAcceptHandler(connection *conn) {client *c = connGetPrivateData(conn);if (connGetState(conn) != CONN_STATE_CONNECTED) {serverLog(LL_WARNING,"Error accepting a client connection: %s",connGetLastError(conn));freeClientAsync(c);return;}/* If the server is running in protected mode (the default) and there* is no password set, nor a specific interface is bound, we don't accept* requests from non loopback interfaces. Instead we try to explain the* user what to do to fix it if needed. */if (server.protected_mode &&server.bindaddr_count == 0 &&DefaultUser->flags & USER_FLAG_NOPASS &&!(c->flags & CLIENT_UNIX_SOCKET)){char cip[NET_IP_STR_LEN+1] = { 0 };connPeerToString(conn, cip, sizeof(cip)-1, NULL);if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) {char *err ="-DENIED Redis is running in protected mode because protected ""mode is enabled, no bind address was specified, no ""authentication password is requested to clients. In this mode ""connections are only accepted from the loopback interface. ""If you want to connect from external computers to Redis you ""may adopt one of the following solutions: ""1) Just disable protected mode sending the command ""'CONFIG SET protected-mode no' from the loopback interface ""by connecting to Redis from the same host the server is ""running, however MAKE SURE Redis is not publicly accessible ""from internet if you do so. Use CONFIG REWRITE to make this ""change permanent. ""2) Alternatively you can just disable the protected mode by ""editing the Redis configuration file, and setting the protected ""mode option to 'no', and then restarting the server. ""3) If you started the server manually just for testing, restart ""it with the '--protected-mode no' option. ""4) Setup a bind address or an authentication password. ""NOTE: You only need to do one of the above things in order for ""the server to start accepting connections from the outside.\r\n";if (connWrite(c->conn,err,strlen(err)) == -1) {/* Nothing to do, Just to avoid the warning... */}server.stat_rejected_conn++;freeClientAsync(c);return;}}server.stat_numconnections++;moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,c);
}

1.3.1 moduleFireServerEvent

2.写事件

/* Register a write handler, to be called when the connection is writable.* If NULL, the existing handler is removed.*/
static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {return conn->type->set_write_handler(conn, func, 0);
}void beforeSleep(struct aeEventLoop *eventLoop) {...handleClientsWithPendingReadsUsingThreads();...
}int handleClientsWithPendingWritesUsingThreads(void) {
...
while((ln = listNext(&li))) {client *c = listNodeValue(ln);/* Install the write handler if there are pending writes in some* of the clients. */if (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){freeClientAsync(c);}}
...
}/* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {client *c = connGetPrivateData(conn);writeToClient(c,1);
}/* Get the associated private data pointer */
void *connGetPrivateData(connection *conn) {return conn->private_data;
}int writeToClient(client *c, int handler_installed) {/* Update total number of writes on server */server.stat_total_writes_processed++;ssize_t nwritten = 0, totwritten = 0;size_t objlen;clientReplyBlock *o;while(clientHasPendingReplies(c)) {if (c->bufpos > 0) {nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);if (nwritten <= 0) break;c->sentlen += nwritten;totwritten += nwritten;/* If the buffer was sent, set bufpos to zero to continue with* the remainder of the reply. */if ((int)c->sentlen == c->bufpos) {c->bufpos = 0;c->sentlen = 0;}} else {o = listNodeValue(listFirst(c->reply));objlen = o->used;if (objlen == 0) {c->reply_bytes -= o->size;listDelNode(c->reply,listFirst(c->reply));continue;}nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);if (nwritten <= 0) break;c->sentlen += nwritten;totwritten += nwritten;/* If we fully sent the object on head go to the next one */if (c->sentlen == objlen) {c->reply_bytes -= o->size;listDelNode(c->reply,listFirst(c->reply));c->sentlen = 0;/* If there are no longer objects in the list, we expect* the count of reply bytes to be exactly zero. */if (listLength(c->reply) == 0)serverAssert(c->reply_bytes == 0);}}/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT* bytes, in a single threaded server it's a good idea to serve* other clients as well, even if a very large request comes from* super fast link that is always able to accept data (in real world* scenario think about 'KEYS *' against the loopback interface).** However if we are over the maxmemory limit we ignore that and* just deliver as much data as it is possible to deliver.** Moreover, we also send as much as possible if the client is* a slave or a monitor (otherwise, on high-speed traffic, the* replication/output buffer will grow indefinitely) */if (totwritten > NET_MAX_WRITES_PER_EVENT &&(server.maxmemory == 0 ||zmalloc_used_memory() < server.maxmemory) &&!(c->flags & CLIENT_SLAVE)) break;}server.stat_net_output_bytes += totwritten;if (nwritten == -1) {if (connGetState(c->conn) == CONN_STATE_CONNECTED) {nwritten = 0;} else {serverLog(LL_VERBOSE,"Error writing to client: %s", connGetLastError(c->conn));freeClientAsync(c);return C_ERR;}}if (totwritten > 0) {/* For clients representing masters we don't count sending data* as an interaction, since we always send REPLCONF ACK commands* that take some time to just fill the socket output buffer.* We just rely on data / pings received for timeout detection. */if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;}if (!clientHasPendingReplies(c)) {c->sentlen = 0;/* Note that writeToClient() is called in a threaded way, but* adDeleteFileEvent() is not thread safe: however writeToClient()* is always called with handler_installed set to 0 from threads* so we are fine. */if (handler_installed) connSetWriteHandler(c->conn, NULL);/* Close connection after entire reply has been sent. */if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {freeClientAsync(c);return C_ERR;}}return C_OK;
}

【Redis-6.0.8】事件循环器AE(下)相关推荐

  1. 【Redis-6.0.8】事件循环器AE(上)

    目录 0.阅读 1.引言 1.1 redis中的事件分类 1.2 为兼容各平台写的代码 1.3 linux上巧妙借用系统函数epoll_wait设置阻塞时间以达到触发条件的代码 1.4 两种事件的调度 ...

  2. 事件库之Redis自己的事件模型-ae

    2019独角兽企业重金招聘Python工程师标准>>> #Redis自己的事件模型 ae ##1.Redis的事件模型库 大家到网上Google"Redis libeven ...

  3. centos / Linux 服务环境下安装 Redis 5.0.3

    centos / Linux 服务环境下安装 Redis 5.0.3 原文:centos / Linux 服务环境下安装 Redis 5.0.3 1.首先进入你要安装的目录 cd /usr/local ...

  4. Windows 7下vc2010编译使用redis 3.0

    Windows 7下vc2010编译使用redis 3.0 项目中,有多台机器频繁读写.同步一些参数.起初的方案是通过MySQL的临时表实现,对效率有一些影响,故改为redis方案.项目中redis和 ...

  5. 龙芯3a5000下编译redis 7.0源码

    1.下载redis 7.0源码后解压缩备用 https://redis.io/download/ 2.下载最新版本的config.guess和config.sub redis 用到了jemalloc库 ...

  6. redis(12)--事件,客户端,服务器

    目录 事件 文件事件 读事件 写事件 同时关联写事件和读事件 时间事件 实现 服务器常规操作 事件的执行与调度 客户端 客户端属性 套接字描述符 名字 标志 输入缓冲区 命令与命令参数 命令的实现函数 ...

  7. Redis 6.0 如何实现大幅度的性能提升?

    导读: Redis可以轻松支撑100k+ QPS,离不开基于Reactor模型的I/O Multiplexing,In-memory操作,以及单线程执行命令避免竞态消耗.尽管性能已经能满足大多数应用场 ...

  8. redis文件事件和时间事件

    Redis在6.0以前是单线程的,在6.0之后可以通过配置文件开启多线程,6.0之后的多线程是指在io方面使用多线程来执行以加快I/O的速度. Redis服务器是一个事件驱动程序,服务器需要处理以下两 ...

  9. Redis | 第6章 事件与客户端《Redis设计与实现》

    第6章 事件与客户端 前言 1. 事件 1.1 文件事件 1.1.1 I/O 多路复用程序的实现 1.1.2 事件类型与 API 1.1.3 文件事件的处理器 1.2 时间事件 1.2.1 API 1 ...

最新文章

  1. 测试开发面试集锦-测试方面(搬运)
  2. matlab的支持向量机调参,支持向量机(2)-应用
  3. 程序员如何打破 30 岁职业瓶颈?
  4. [Js插件]使用JqueryUI的弹出框做一个“炫”的登录页面
  5. 枚举很好用啊,为啥阿里不建议返回值用枚举?看看作者孤尽的回答
  6. GSON简单实用及常用方法(附 .jar 地址)
  7. eclipse 闪退原因
  8. java bean规范 is_深入了解JavaBean规范中的属性名和setter/getter方法
  9. Fineui 添加打印控件
  10. 网络推销经典案例——所有的骗子都应该向他学习
  11. 认识Http协议(超文本传输协议)
  12. 冯东阳:解读纯文本链接到底算不算外链
  13. 文献解读 | 科学家发现代谢调控促进肿瘤转移新机制
  14. 【强推文章】如果你在犹豫要不要去外包公司,不妨看看这篇文章(自己深有感触)
  15. Graphite系统监控
  16. 对中间层的一些浅略的思考
  17. 科大讯飞语音识别_科大讯飞 语音识别_科大讯飞语音识别系统 - 云+社区 - 腾讯云...
  18. 鼠标拖曳盒子案例(限定边界)
  19. 百度搜索接口api_百度站长平台上线落地页视频转存功能,确保专业问答视频资源的质量和稳定性...
  20. 年度推荐7款新媒体运营神器

热门文章

  1. java arm_移植java到arm开发板
  2. 2022年金融市场分析
  3. 新版白话空间统计(1):前言与地理学第一定律
  4. Qt设置窗口不在系统的任务栏上显示
  5. 福建瑞芯微电子实习第一天
  6. 初学者专题:信息收集理论篇
  7. 安装t3服务器检测系统环境,T3标准版一体化实施路径图.doc
  8. 转 中国的支付清算体系是怎么玩的?
  9. Adobe Photoshop CC 键盘快捷键
  10. vue+elementui微信支付状态问题