在memcachedd中,作者为了专注于缓存的设计,使用了libevent来开发事件模型。memcachedd的时间模型同nginx的类似,拥有一个主进行(master)以及多个工作者线程(woker)。

流程图

在memcached中,是先对工作者线程进行初始化并启动,然后才会创建启动主线程。

工作者线程

初始化

memcached对工作者线程进行初始化,参数分别为线程数量以及`main_base`,

/* start up worker threads if MT mode */
thread_init(settings.num_threads, main_base);

/** Initializes the thread subsystem, creating various worker threads.** nthreads  Number of worker event handler threads to spawn* main_base Event base for main thread*/
void thread_init(int nthreads, struct event_base *main_base) {int         i;int         power;pthread_mutex_init(&cache_lock, NULL);pthread_mutex_init(&stats_lock, NULL);pthread_mutex_init(&init_lock, NULL);pthread_cond_init(&init_cond, NULL);pthread_mutex_init(&cqi_freelist_lock, NULL);cqi_freelist = NULL;/* Want a wide lock table, but don't waste memory */if (nthreads < 3) {power = 10;} else if (nthreads < 4) {power = 11;} else if (nthreads < 5) {power = 12;} else {/* 8192 buckets, and central locks don't scale much past 5 threads */power = 13;}item_lock_count = hashsize(power);item_lock_hashpower = power;item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));if (! item_locks) {perror("Can't allocate item locks");exit(1);}for (i = 0; i < item_lock_count; i++) {pthread_mutex_init(&item_locks[i], NULL);}pthread_key_create(&item_lock_type_key, NULL);pthread_mutex_init(&item_global_lock, NULL);threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));if (! threads) {perror("Can't allocate thread descriptors");exit(1);}dispatcher_thread.base = main_base;dispatcher_thread.thread_id = pthread_self();for (i = 0; i < nthreads; i++) {int fds[2];if (pipe(fds)) {perror("Can't create notify pipe");exit(1);}threads[i].notify_receive_fd = fds[0];threads[i].notify_send_fd = fds[1];setup_thread(&threads[i]);/* Reserve three fds for the libevent base, and two for the pipe */stats.reserved_fds += 5;}/* Create threads after we've done all the libevent setup. */for (i = 0; i < nthreads; i++) {create_worker(worker_libevent, &threads[i]);}/* Wait for all the threads to set themselves up before returning. */pthread_mutex_lock(&init_lock);wait_for_thread_registration(nthreads);pthread_mutex_unlock(&init_lock);
}

在memcachedd中为了避免多线程共享资源的使用使用了很多锁,这里对锁不做介绍。

线程的结构体

typedef struct {pthread_t thread_id;        /* unique ID of this thread 线程ID*/struct event_base *base;    /* libevent handle this thread uses libevent事件*/struct event notify_event;  /* listen event for notify pipe 注册事件*/int notify_receive_fd;      /* receiving end of notify pipe 管道中接收端*/int notify_send_fd;         /* sending end of notify pipe 管道中发送端*/struct thread_stats stats;  /* Stats generated by this thread 线程状态*/struct conn_queue *new_conn_queue; /* queue of new connections to handle 消息队列*/cache_t *suffix_cache;      /* suffix cache */uint8_t item_lock_type;     /* use fine-grained or global item lock */
} LIBEVENT_THREAD;

初始化工作者线程

for (i = 0; i < nthreads; i++) {int fds[2];/* 创建管道 */if (pipe(fds)) {perror("Can't create notify pipe");exit(1);}/* 设置线程管道的读写入口 */threads[i].notify_receive_fd = fds[0];threads[i].notify_send_fd = fds[1];/*  设置线程属性 */setup_thread(&threads[i]);/* Reserve three fds for the libevent base, and two for the pipe */stats.reserved_fds += 5;}

设置线程属性

/** Set up a thread's information.*/
static void setup_thread(LIBEVENT_THREAD *me) {me->base = event_init(); //初始化线程事件if (! me->base) {fprintf(stderr, "Can't allocate event base\n");exit(1);}/* 初始化监听事件 *//* Listen for notifications from other threads */event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);/* 把事件绑定到线程事件 */event_base_set(me->base, &me->notify_event);/* 注册事件到监听状态 */if (event_add(&me->notify_event, 0) == -1) {fprintf(stderr, "Can't monitor libevent notify pipe\n");exit(1);}...
}

READ回调函数

/** Processes an incoming "handle a new connection" item. This is called when* input arrives on the libevent wakeup pipe.*/
static void thread_libevent_process(int fd, short which, void *arg) {.../* 从管道读取消息 */if (read(fd, buf, 1) != 1)if (settings.verbose > 0)fprintf(stderr, "Can't read from libevent pipe\n");item = cq_pop(me->new_conn_queue); //读取连接
...
}    

启动工作者线程

/* Create threads after we've done all the libevent setup. */
for (i = 0; i < nthreads; i++) {create_worker(worker_libevent, &threads[i]);
}

`create_woker`函数创建工作者线程,

/** Creates a worker thread.*/
static void create_worker(void *(*func)(void *), void *arg) {pthread_t       thread;pthread_attr_t  attr;int             ret;pthread_attr_init(&attr);if ((ret = pthread_create(&thread, &attr, func, arg)) != 0) {fprintf(stderr, "Can't create thread: %s\n",strerror(ret));exit(1);}
}

`worker_libevent`函数进入线程循环监听状态,

/** Worker thread: main event loop*/
static void *worker_libevent(void *arg) {LIBEVENT_THREAD *me = arg;/* Any per-thread setup can happen here; thread_init() will block until* all threads have finished initializing.*//* set an indexable thread-specific memory item for the lock type.* this could be unnecessary if we pass the conn *c struct through* all item_lock calls...*/me->item_lock_type = ITEM_LOCK_GRANULAR;pthread_setspecific(item_lock_type_key, &me->item_lock_type);register_thread_initialized();event_base_loop(me->base, 0);return NULL;
}

主线程

初始化

static struct event_base* mian_base;/* initialize main thread libevent instance */
main_base = event_init();

在`memcached.c`的主函数中,使用`libevent`的事件初始化函数来初始化`main_base`。

初始化socket

这里只介绍tcp连接,其中使用`server_sockets`来调用`server_socket`来初始化连接。

if (settings.port && server_sockets(settings.port, tcp_transport,  portnumber_file)) {vperror("failed to listzhefen on TCP port %d", settings.port);exit(EX_OSERR);
}

static int server_sockets(int port, enum network_transport transport,FILE *portnumber_file) {if (settings.inter == NULL) {return server_socket(settings.inter, port, transport, portnumber_file);}...
}

而在`server_socket`中完成了socket的初始化、绑定等操作。

/*** Create a socket and bind it to a specific port number* @param interface the interface to bind to* @param port the port number to bind to* @param transport the transport protocol (TCP / UDP)* @param portnumber_file A filepointer to write the port numbers to*        when they are successfully added to the list of ports we*        listen on.*/
static int server_socket(const char *interface,int port,enum network_transport transport,FILE *portnumber_file) {int sfd;struct linger ling = {0, 0};struct addrinfo *ai;struct addrinfo *next;struct addrinfo hints = { .ai_flags = AI_PASSIVE,.ai_family = AF_UNSPEC };char port_buf[NI_MAXSERV];int error;int success = 0;int flags =1;hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;if (port == -1) {port = 0;}snprintf(port_buf, sizeof(port_buf), "%d", port);error= getaddrinfo(interface, port_buf, &hints, &ai);if (error != 0) {if (error != EAI_SYSTEM)fprintf(stderr, "getaddrinfo(): %s\n", gai_strerror(error));elseperror("getaddrinfo()");return 1;}for (next= ai; next; next= next->ai_next) {conn *listen_conn_add;if ((sfd = new_socket(next)) == -1) {/* getaddrinfo can return "junk" addresses,* we make sure at least one works before erroring.*/if (errno == EMFILE) {/* ...unless we're out of fds */perror("server_socket");exit(EX_OSERR);}continue;}#ifdef IPV6_V6ONLYif (next->ai_family == AF_INET6) {error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (char *) &flags, sizeof(flags));if (error != 0) {perror("setsockopt");close(sfd);continue;}}
#endifsetsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));if (IS_UDP(transport)) {maximize_sndbuf(sfd);} else {error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));if (error != 0)perror("setsockopt");error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));if (error != 0)perror("setsockopt");error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));if (error != 0)perror("setsockopt");}if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {if (errno != EADDRINUSE) {perror("bind()");close(sfd);freeaddrinfo(ai);return 1;}close(sfd);continue;} else {success++;if (!IS_UDP(transport) && listen(sfd, settings.backlog) == -1) {perror("listen()");close(sfd);freeaddrinfo(ai);return 1;}if (portnumber_file != NULL &&(next->ai_addr->sa_family == AF_INET ||next->ai_addr->sa_family == AF_INET6)) {union {struct sockaddr_in in;struct sockaddr_in6 in6;} my_sockaddr;socklen_t len = sizeof(my_sockaddr);if (getsockname(sfd, (struct sockaddr*)&my_sockaddr, &len)==0) {if (next->ai_addr->sa_family == AF_INET) {fprintf(portnumber_file, "%s INET: %u\n",IS_UDP(transport) ? "UDP" : "TCP",ntohs(my_sockaddr.in.sin_port));} else {fprintf(portnumber_file, "%s INET6: %u\n",IS_UDP(transport) ? "UDP" : "TCP",ntohs(my_sockaddr.in6.sin6_port));}}}}if (IS_UDP(transport)) {int c;for (c = 0; c < settings.num_threads_per_udp; c++) {/* Allocate one UDP file descriptor per worker thread;* this allows "stats conns" to separately list multiple* parallel UDP requests in progress.** The dispatch code round-robins new connection requests* among threads, so this is guaranteed to assign one* FD to each thread.*/int per_thread_fd = c ? dup(sfd) : sfd;dispatch_conn_new(per_thread_fd, conn_read,EV_READ | EV_PERSIST,UDP_READ_BUFFER_SIZE, transport);}} else {if (!(listen_conn_add = conn_new(sfd, conn_listening,EV_READ | EV_PERSIST, 1,transport, main_base))) {fprintf(stderr, "failed to create listening connection\n");exit(EXIT_FAILURE);}listen_conn_add->next = listen_conn;listen_conn = listen_conn_add;}}freeaddrinfo(ai);/* Return zero iff we detected no errors in starting up connections */return success == 0;
}

主线程事件

在主线程中通过`conn_new`函数来建立主线程和工作者线程之间的关系。

/* 设置线程事件 */
event_set(&c->event, sfd, event_flags, event_handler, (void *)c);
event_base_set(base, &c->event);
c->ev_flags = event_flags;/* 注册事件到监听 */
if (event_add(&c->event, 0) == -1) {perror("event_add");return NULL;
}

事件处理

上面中设置了事件的回调函数`event_handler`,而在`event_handler`中,主要调用了`driver_machine`函数。

driver_machine看名字就知道,想发动机一样的函数,那么该函数主要是处理各种事件以及相应的处理方法。

这里只简要介绍一个函数调用`dispatch_conn_new`。

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,int read_buffer_size, enum network_transport transport) {CQ_ITEM *item = cqi_new();char buf[1];if (item == NULL) {close(sfd);/* given that malloc failed this may also fail, but let's try */fprintf(stderr, "Failed to allocate memory for connection object\n");return ;}int tid = (last_thread + 1) % settings.num_threads;LIBEVENT_THREAD *thread = threads + tid; //循环获取工作者线程
last_thread = tid;item->sfd = sfd;item->init_state = init_state;item->event_flags = event_flags;item->read_buffer_size = read_buffer_size;item->transport = transport;cq_push(thread->new_conn_queue, item); //连接加入懂啊队列
memcachedD_CONN_DISPATCH(sfd, thread->thread_id);buf[0] = 'c';if (write(thread->notify_send_fd, buf, 1) != 1) {//向管道写入消息perror("Writing to thread notify pipe");}
}


本文 由 cococo点点 创作,采用 知识共享 署名-非商业性使用-相同方式共享 3.0 中国大陆 许可协议进行许可。欢迎转载,请注明出处:
转载自:cococo点点 http://www.cnblogs.com/coder2012

memcached(二)事件模型源码分析相关推荐

  1. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 二 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  2. 【Android 事件分发】ItemTouchHelper 源码分析 ( OnItemTouchListener 事件监听器源码分析 )

    Android 事件分发 系列文章目录 [Android 事件分发]事件分发源码分析 ( 驱动层通过中断传递事件 | WindowManagerService 向 View 层传递事件 ) [Andr ...

  3. Android Touch事件分发(源码分析)

    Android一文让你轻松搞定Touch事件分发 源码分析 下面,咱们一起通过源码,全面解析事件分发机制,即按顺序讲解: Activity事件分发机制 ViewGroup事件分发机制 View事件分发 ...

  4. ViewGroup的Touch事件分发(源码分析)

    Android中Touch事件的分发又分为View和ViewGroup的事件分发,View的touch事件分发相对比较简单,可参考 View的Touch事件分发(一.初步了解) View的Touch事 ...

  5. WPF(六) Command 命令模型源码分析

    1.ICommand源码分析 ​ 在之前 WPF(三) WPF命令 中我们已经分析过了 WPF 的命令系统,包括WPF默认的 RoutedCommand 以及我们自定义的 ICommand 命令实现. ...

  6. C#调用obs studio 二次开发 源码分析 编译

    C#二次开发obs studio obs studio二次开发视频教程,录制.推流.调整分辨率.调整位置.画面回调.推流回调等功能 obs二次开发还是比较繁琐的,我在学习的时候也是很痛苦,有需要的朋友 ...

  7. Flutter事件响应源码分析

    Flutter作为一个UI框架,本身也有自己的事件处理方式,本文主要阐述触摸事件从native传递到Flutter后是如何被widget识别以及分发的.至于native系统是如何监听触摸事件以及传递事 ...

  8. Java源码详解二:HashMap源码分析--openjdk java 11源码

    文章目录 HashMap.java介绍 1.HashMap的get和put操作平均时间复杂度和最坏时间复杂度 2.为什么链表长度超过8才转换为红黑树 3.红黑树中的节点如何排序 本系列是Java详解, ...

  9. AXI_lite代码简解(二)-AXI-Lite 源码分析

    AXI-Lite 源码分析   对于使用AXI总线,最开始肯定要了解顶层接口定义,这样才能针对顶层接口进行调用和例化,打开axi_lite_v1_0.v文件,第一段就是顶层的接口定义: 代码4 1 a ...

最新文章

  1. 如何大写字符串中每个单词的第一个字符
  2. Android URL
  3. STM32中3个延时函数
  4. 常用binlog日志操作命令
  5. NSOperation的使用细节 [1]
  6. delphi 中的dll编程注意事项
  7. PowerDesigner的汉化破解安装到逆向工程(ORACLE)
  8. SQL笔记-通过构建索引表方便数据库管理
  9. linux安装redis插件,Linux平台安装redis及redis扩展的方法
  10. python scikit库
  11. dataframe列互换 python_统计学原理之python数据分析基础
  12. MySQL的show profile(已过时)简介以及该功能在MySQL 5.7中performance_schema中的替代
  13. eclipse解压版_Eclipse配置JavaWeb开发环境
  14. spring与jdk版本要求
  15. 图解DbgView使用
  16. 关于微信小程序使用WebSokect
  17. Python根据身份证得知性别
  18. 比ietest 更好的浏览器调试工具 Browser Sandbox 使用教程
  19. logisim 快速加法器设计实验报告_数电课程实验一二
  20. excel中多列内容显示不全

热门文章

  1. Fedora 14下安装使用rarlinux
  2. Linux(centOS)手动安装Apache+MySQL+PHP+Memcached+Nginx原创无错版
  3. asp.net中DataGrid性能测试
  4. Linux TCP/IP协议栈笔记
  5. 一种清除windows通知区域“僵尸”图标的方案——Windows7系统解决方案
  6. Caffe源码中common文件分析
  7. NEON在Android中的使用举例
  8. 图像处理和图像识别中常用的matlab函数
  9. 【linux】Valgrind工具集详解(六):使用Valgrind gdbserver和GDB调试程序
  10. linux驱动:i2c驱动(四)流程图之注册驱动