1、初始化

初始化是通过InitServerLast中完成的。如果配置的线程数为1,则不创建创建,限制线程数为128。循环创建io线程列表 (主要用于存放需要写或者读的client),初始化io线程pending标识(0表io线程已经处理完毕),线程互斥量,线程。在创建io线程前,锁住io_thread_mutex[i],避免io线程运行。

void InitServerLast() {bioInit();initThreadedIO();set_jemalloc_bg_thread(server.jemalloc_bg_thread);server.initial_memory_usage = zmalloc_used_memory();
}void initThreadedIO(void) {server.io_threads_active = 0; /* We start with threads not active. *//* Don't spawn any thread if the user selected a single thread:* we'll handle I/O directly from the main thread. */if (server.io_threads_num == 1) return;if (server.io_threads_num > IO_THREADS_MAX_NUM) {serverLog(LL_WARNING,"Fatal: too many I/O threads configured. ""The maximum number is %d.", IO_THREADS_MAX_NUM);exit(1);}/* Spawn and initialize the I/O threads. */for (int i = 0; i < server.io_threads_num; i++) {/* Things we do for all the threads including the main thread. */io_threads_list[i] = listCreate();if (i == 0) continue; /* Thread 0 is the main thread. *//* Things we do only for the additional threads. */pthread_t tid;pthread_mutex_init(&io_threads_mutex[i],NULL);io_threads_pending[i] = 0;pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");exit(1);}io_threads[i] = tid;}
}

io线程工作时即线程对应的io_threads_pending不等于0,在线程对应的io_threads_list中遍历获取client,根据操作类型io_thread_op(读或者写)来写入或者读取,操作完后,清空对应的io_threads_list并且把io_theads_pending设置为0,等待在下一次的事件循环中处理。

void *IOThreadMain(void *myid) {/* The ID is the thread number (from 0 to server.iothreads_num-1), and is* used by the thread to just manipulate a single sub-array of clients. */long id = (unsigned long)myid;char thdname[16];snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);redis_set_thread_title(thdname);redisSetCpuAffinity(server.server_cpulist);makeThreadKillable();while(1) {/* Wait for start */for (int j = 0; j < 1000000; j++) {if (io_threads_pending[id] != 0) break;}/* Give the main thread a chance to stop this thread. */if (io_threads_pending[id] == 0) {pthread_mutex_lock(&io_threads_mutex[id]);pthread_mutex_unlock(&io_threads_mutex[id]);continue;}serverAssert(io_threads_pending[id] != 0);if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));/* Process: note that the main thread will never touch our list* before we drop the pending count to 0. */listIter li;listNode *ln;listRewind(io_threads_list[id],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);if (io_threads_op == IO_THREADS_OP_WRITE) {writeToClient(c,0);} else if (io_threads_op == IO_THREADS_OP_READ) {readQueryFromClient(c->conn);} else {serverPanic("io_threads_op value is unknown");}}listEmpty(io_threads_list[id]);io_threads_pending[id] = 0;if (tio_debug) printf("[%ld] Done\n", id);}
}

2、运行

在aeMain事件循环中会开启io线程

void aeMain(aeEventLoop *eventLoop) {eventLoop->stop = 0;while (!eventLoop->stop) {aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);}
}int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{int processed = 0, numevents;/* Nothing to do? return ASAP */if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;/* Note that we want call select() even if there are no* file events to process as long as we want to process time* events, in order to sleep until the next time event is ready* to fire. */if (eventLoop->maxfd != -1 ||((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {int j;aeTimeEvent *shortest = NULL;struct timeval tv, *tvp;if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))shortest = aeSearchNearestTimer(eventLoop);if (shortest) {long now_sec, now_ms;aeGetTime(&now_sec, &now_ms);tvp = &tv;/* How many milliseconds we need to wait for the next* time event to fire? */long long ms =(shortest->when_sec - now_sec)*1000 +shortest->when_ms - now_ms;if (ms > 0) {tvp->tv_sec = ms/1000;tvp->tv_usec = (ms % 1000)*1000;} else {tvp->tv_sec = 0;tvp->tv_usec = 0;}} else {/* If we have to check for events but need to return* ASAP because of AE_DONT_WAIT we need to set the timeout* to zero */if (flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;} else {/* Otherwise we can block */tvp = NULL; /* wait forever */}}if (eventLoop->flags & AE_DONT_WAIT) {tv.tv_sec = tv.tv_usec = 0;tvp = &tv;}if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)eventLoop->beforesleep(eventLoop);/* Call the multiplexing API, will return only on timeout or when* some event fires. */numevents = aeApiPoll(eventLoop, tvp);/* After sleep callback. */if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)eventLoop->aftersleep(eventLoop);for (j = 0; j < numevents; j++) {aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];int mask = eventLoop->fired[j].mask;int fd = eventLoop->fired[j].fd;int fired = 0; /* Number of events fired for current fd. *//* Normally we execute the readable event first, and the writable* event later. This is useful as sometimes we may be able* to serve the reply of a query immediately after processing the* query.** However if AE_BARRIER is set in the mask, our application is* asking us to do the reverse: never fire the writable event* after the readable. In such a case, we invert the calls.* This is useful when, for instance, we want to do things* in the beforeSleep() hook, like fsyncing a file to disk,* before replying to a client. */int invert = fe->mask & AE_BARRIER;/* Note the "fe->mask & mask & ..." code: maybe an already* processed event removed an element that fired and we still* didn't processed, so we check if the event is still valid.** Fire the readable event if the call sequence is not* inverted. */if (!invert && fe->mask & mask & AE_READABLE) {fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;fe = &eventLoop->events[fd]; /* Refresh in case of resize. */}/* Fire the writable event. */if (fe->mask & mask & AE_WRITABLE) {if (!fired || fe->wfileProc != fe->rfileProc) {fe->wfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}/* If we have to invert the call, fire the readable event now* after the writable one. */if (invert) {fe = &eventLoop->events[fd]; /* Refresh in case of resize. */if ((fe->mask & mask & AE_READABLE) &&(!fired || fe->wfileProc != fe->rfileProc)){fe->rfileProc(eventLoop,fd,fe->clientData,mask);fired++;}}processed++;}}/* Check time events */if (flags & AE_TIME_EVENTS)processed += processTimeEvents(eventLoop);return processed; /* return the number of processed file/time events */
}

在aeApiPoll之前会调用eventLoop->beforeSleep,beforeSleep在服务器初始化时通过initServer设置的

 aeSetBeforeSleepProc(server.el,beforeSleep);aeSetAfterSleepProc(server.el,afterSleep);

beforeSlepp处理代码为

void beforeSleep(struct aeEventLoop *eventLoop) {UNUSED(eventLoop);size_t zmalloc_used = zmalloc_used_memory();if (zmalloc_used > server.stat_peak_memory)server.stat_peak_memory = zmalloc_used;/* Just call a subset of vital functions in case we are re-entering* the event loop from processEventsWhileBlocked(). Note that in this* case we keep track of the number of events we are processing, since* processEventsWhileBlocked() wants to stop ASAP if there are no longer* events to handle. */if (ProcessingEventsWhileBlocked) {uint64_t processed = 0;processed += handleClientsWithPendingReadsUsingThreads();processed += tlsProcessPendingData();processed += handleClientsWithPendingWrites();processed += freeClientsInAsyncFreeQueue();server.events_processed_while_blocked += processed;return;}/* Handle precise timeouts of blocked clients. */handleBlockedClientsTimeout();/* We should handle pending reads clients ASAP after event loop. */handleClientsWithPendingReadsUsingThreads();/* Handle TLS pending data. (must be done before flushAppendOnlyFile) */tlsProcessPendingData();/* If tls still has pending unread data don't sleep at all. */aeSetDontWait(server.el, tlsHasPendingData());/* Call the Redis Cluster before sleep function. Note that this function* may change the state of Redis Cluster (from ok to fail or vice versa),* so it's a good idea to call it before serving the unblocked clients* later in this function. */if (server.cluster_enabled) clusterBeforeSleep();/* Run a fast expire cycle (the called function will return* ASAP if a fast cycle is not needed). */if (server.active_expire_enabled && server.masterhost == NULL)activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);/* Unblock all the clients blocked for synchronous replication* in WAIT. */if (listLength(server.clients_waiting_acks))processClientsWaitingReplicas();/* Check if there are clients unblocked by modules that implement* blocking commands. */if (moduleCount()) moduleHandleBlockedClients();/* Try to process pending commands for clients that were just unblocked. */if (listLength(server.unblocked_clients))processUnblockedClients();/* Send all the slaves an ACK request if at least one client blocked* during the previous event loop iteration. Note that we do this after* processUnblockedClients(), so if there are multiple pipelined WAITs* and the just unblocked WAIT gets blocked again, we don't have to wait* a server cron cycle in absence of other event loop events. See #6623. */if (server.get_ack_from_slaves) {robj *argv[3];argv[0] = createStringObject("REPLCONF",8);argv[1] = createStringObject("GETACK",6);argv[2] = createStringObject("*",1); /* Not used argument. */replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);decrRefCount(argv[0]);decrRefCount(argv[1]);decrRefCount(argv[2]);server.get_ack_from_slaves = 0;}/* Send the invalidation messages to clients participating to the* client side caching protocol in broadcasting (BCAST) mode. */trackingBroadcastInvalidationMessages();/* Write the AOF buffer on disk */flushAppendOnlyFile(0);/* Handle writes with pending output buffers. */handleClientsWithPendingWritesUsingThreads();/* Close clients that need to be closed asynchronous */freeClientsInAsyncFreeQueue();/* Try to process blocked clients every once in while. Example: A module* calls RM_SignalKeyAsReady from within a timer callback (So we don't* visit processCommand() at all). */handleClientsBlockedOnKeys();/* Before we are going to sleep, let the threads access the dataset by* releasing the GIL. Redis main thread will not touch anything at this* time. */if (moduleCount()) moduleReleaseGIL();/* Do NOT add anything below moduleReleaseGIL !!! */
}

当有写操作时,会将client添加到server.clients_pending_write中,同时将数据添加到client的缓冲区中。

int prepareClientToWrite(client *c) {if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) return C_OK;if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;if ((c->flags & CLIENT_MASTER) &&!(c->flags & CLIENT_MASTER_FORCE_REPLY)) return C_ERR;if (!c->conn) return C_ERR; if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))clientInstallWriteHandler(c);return C_OK;
}void clientInstallWriteHandler(client *c) {if (!(c->flags & CLIENT_PENDING_WRITE) &&(c->replstate == REPL_STATE_NONE ||(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack))){c->flags |= CLIENT_PENDING_WRITE;listAddNodeHead(server.clients_pending_write,c);}
}int _addReplyToBuffer(client *c, const char *s, size_t len) {size_t available = sizeof(c->buf)-c->bufpos;if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;if (listLength(c->reply) > 0) return C_ERR;if (len > available) return C_ERR;memcpy(c->buf+c->bufpos,s,len);c->bufpos+=len;return C_OK;
}

beforeSleep在处理是否有等待写的客户端时handleClientsWithPendingWritesUsingThreads,如果io线程没有激活,此时会开启io线程。将需要写的客户端分配到线程对应的io_threads_list中。同时设置io_threads_op为IO_THREADS_OP_WRITE,将各个线程的io_threads_pending设置为非0来开启线程写操作。主线程也会处理一部分客户端的写操作。等待线程操作处理完成。最后遍历client,如果仍然有需要写操作,设置connection的WriteHandler

int handleClientsWithPendingWritesUsingThreads(void) {int processed = listLength(server.clients_pending_write);if (processed == 0) return 0; /* Return ASAP if there are no clients. *//* If I/O threads are disabled or we have few clients to serve, don't* use I/O threads, but thejboring synchronous code. */if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {return handleClientsWithPendingWrites();}/* Start threads if needed. */if (!server.io_threads_active) startThreadedIO();if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);/* Distribute the clients across N different lists. */listIter li;listNode *ln;listRewind(server.clients_pending_write,&li);int item_id = 0;while((ln = listNext(&li))) {client *c = listNodeValue(ln);c->flags &= ~CLIENT_PENDING_WRITE;/* Remove clients from the list of pending writes since* they are going to be closed ASAP. */if (c->flags & CLIENT_CLOSE_ASAP) {listDelNode(server.clients_pending_write, ln);continue;}int target_id = item_id % server.io_threads_num;listAddNodeTail(io_threads_list[target_id],c);item_id++;}/* Give the start condition to the waiting threads, by setting the* start condition atomic var. */io_threads_op = IO_THREADS_OP_WRITE;for (int j = 1; j < server.io_threads_num; j++) {int count = listLength(io_threads_list[j]);io_threads_pending[j] = count;}/* Also use the main thread to process a slice of clients. */listRewind(io_threads_list[0],&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);writeToClient(c,0);}listEmpty(io_threads_list[0]);/* Wait for all the other threads to end their work. */while(1) {unsigned long pending = 0;for (int j = 1; j < server.io_threads_num; j++)pending += io_threads_pending[j];if (pending == 0) break;}if (tio_debug) printf("I/O WRITE All threads finshed\n");/* Run the list of clients again to install the write handler where* needed. */listRewind(server.clients_pending_write,&li);while((ln = listNext(&li))) {client *c = listNodeValue(ln);/* Install the write handler if there are pending writes in some* of the clients. */if (clientHasPendingReplies(c) &&connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR){freeClientAsync(c);}}listEmpty(server.clients_pending_write);/* Update processed count on server */server.stat_io_writes_processed += processed;return processed;
}

redis6.0中的多线程相关推荐

  1. 【JAVA知识每日一问】:Redis6.0为什么引入多线程?

    前言 Redis官方在 2020 年 5 月正式推出 6.0 版本,提供很多振奋人心的新特性,所以备受关注. 提供了啥特性呀?知道了我能加薪么? 主要特性如下: 多线程处理网络 IO: 客户端缓存: ...

  2. 单线程不香吗?Redis6.0为何引入多线程?

    Redis 作为一个基于内存的缓存系统,一直以高性能著称,因没有上下文切换以及无锁操作,即使在单线程处理情况下,读速度仍可达到 11 万次/s,写速度达到 8.1 万次/s. 但是,单线程的设计也给 ...

  3. 「JAVA知识每日一问」:Redis6.0为什么引入多线程?

    前言 Redis 官方在 2020 年 5 月正式推出 6.0 版本,提供很多振奋人心的新特性,所以备受关注. 一键获取Redis合集资料文档 提供了啥特性呀?知道了我能加薪么? 主要特性如下: 多线 ...

  4. 实战派 | Java项目中玩转Redis6.0客户端缓存

    原创:微信公众号 码农参上,欢迎分享,转载请保留出处. 哈喽大家好啊,我是Hydra. 在前面的文章中,我们介绍了Redis6.0中的新特性客户端缓存client-side caching,通过tel ...

  5. Redis 6.0 新特性-多线程连环13问!

    来自:码大叔 导读:支持多线程的Redis6.0版本于2020-05-02终于发布了,为什么Redis忽然要支持多线程?如何开启多线程?开启后性能提升效果如何?线程数量该如何设置?开启多线程后会不会有 ...

  6. Redis 6.0 新特性-多线程连环 13 问!

    Redis 6.0 来了 在全国一片祥和IT民工欢度五一节假日的时候,Redis 6.0不声不响地于5 月 2 日正式发布了,吓得我赶紧从床上爬起来,学无止境!学无止境! 对于6.0版本,Redis之 ...

  7. Redis6.0新版本开始引入多线程,到底改善了什么

    Redis的性能瓶颈并不在CPU上,而是在内存和网络上.因此6.0发布的多线程并未将事件处理改成多线程,而是在I/O上,此外,如果把事件处理改成多线程,不但会导致锁竞争,而且会有频繁的上下文切换,即使 ...

  8. redis 多线程_Java架构师Redis单线程?别逗了,Redis6.0多线程重磅来袭

    2019年的 RedisConf 比以往时候来的更早一些,今年会议时间是4月1-3号,仍然是在旧金山鱼人码头Pier 27.恰逢今年是 Redis 第10周年,规模也比以往大一些,注册人数超过1600 ...

  9. DBA:介里有你没有用过的“CHUAN”新社区版本Redis6.0

    摘要:华为云DCS Redis 6.0社区版带来了极致性能.功能全面.可靠性强.性价比高的云服务,并且完全兼容开源Redis,客户端无需修改代码,开通后即可使用,使企业完全无需后顾之忧就能享受到业务响 ...

最新文章

  1. android上下文关系,Android Context上下文的理解 Hua
  2. 新手探索NLP(六)——全文检索
  3. 模型可解释性-贝叶斯方法
  4. TortoiseGit git push提示fatal: HttpRequestException encountered remote: Invalid username or password.
  5. Browser Page Parsing Details
  6. onnx模型转tensorflow模型
  7. 前端学习(2762):如何使用scss
  8. Node.js入门初体验
  9. NHibernate,我越看越傻了.........
  10. loadrunner性能测试步骤_性能测试LoadRunner操作流程之一
  11. angular遇到问题
  12. 刘意JavaSE 学习笔记 Day1-Day6——环境配置,基本语法
  13. sqlserver 当月、 时间_SQLServer取系统当前时间
  14. ulead gif animator 5.11中文破解版|ulead gif animator绿色中文破解版下载 v5.11
  15. RHCE 考试经验总结
  16. 美团3年阿里4年,我的坎坷进阶之路
  17. 【自用】Mybatis的学习笔记(第一天)
  18. 安卓强制横屏的坑!正确设置横屏的姿势!
  19. transporter上传卡正在交付_Transporter上传卡在—正在验证 APP
  20. watir学习笔记/ruby

热门文章

  1. jdbc执行Statement接口的步骤
  2. /proc/sys/vm/ 内存参数
  3. 让我们来开发一种更类似人脑的神经网络吧(三)
  4. 实验二 建立基本的游戏场景
  5. 几个.Net开源的CMS系统 (转)
  6. php如何设定隐藏四位号码,PHP问题:php手机号码中间四位如何隐藏?
  7. python代码示例图形-Python使用matplotlib绘制3D图形(代码示例)
  8. python怎么读取列表-python读入列表
  9. python学费多少-2020年10月徐州学python要多少学费
  10. ipython安装教程-CentOS 5安装IPython