(广告时间: 最近在写一个基于 Leveldb 存储引擎的数据服务器,C开发,使用 Libevent 处理网络事件,后台利用多线程并发处理客户端连接,理论上单机就应该支持数千-上万的客户端连接(未测试),框架已基本成型,暂取名LLDB(Libevent-based and Leveldb-backended DataBase),等代码成熟以后将它开源,希望能有同学试用。)

Memcached  是以 LiveJournal 旗下 Danga Interactive 公司的 Brad Fitzpatric  为首开发的一款分布式缓存服务器,基于内存,性能非常高,现在已成为mixi、hatena、Facebook、Vox、LiveJournal等众多服务中提高Web应用扩展性的重要因素(更多介绍参见:维基百科,百科百科)。下面粗略地分析一下 Memcached 的启动流程(基于 memcached-1.4.14),此处只列出了代码的梗概。

int main (int argc, char **argv) {int c;bool lock_memory = false;bool do_daemonize = false;bool preallocate = false;int maxcore = 0;char *username = NULL;char *pid_file = NULL;struct passwd *pw;struct rlimit rlim;char unit = '\0';int size_max = 0;int retval = EXIT_SUCCESS;/* listening sockets */static int *l_socket = NULL;/* 更多的参数设置 *//* 有效性检查 */if (!sanitycheck()) {return EX_OSERR;}/* 注册信号处理函数*/signal(SIGINT, sig_handler);/* 数据库配置初始化 */settings_init();/* 处理输入参数,并初始化 memcached 配置,代码略 *//* 如果指定了 -S 参数,则初始化 sasl 模块 */if (settings.sasl) {init_sasl();}/* 是否以守护进程方式运行 memcached*//* if we want to ensure our ability to dump core, don't chdir to / */if (do_daemonize) {if (sigignore(SIGHUP) == -1) {perror("Failed to ignore SIGHUP");}if (daemonize(maxcore, settings.verbose) == -1) {fprintf(stderr, "failed to daemon() in order to daemonize\n");exit(EXIT_FAILURE);}}/* 初始化 libevent 主线程实例 */main_base = event_init();/* 其他模块初始化 */stats_init();assoc_init(settings.hashpower_init);conn_init();slabs_init(settings.maxbytes, settings.factor, preallocate);/** 忽视 SIGPIPE 信号,如果我们需要 SIGPIPE 信号,可以检测条件 errno == EPIPE*/if (sigignore(SIGPIPE) == -1) {perror("failed to ignore SIGPIPE; sigaction");exit(EX_OSERR);}/* 如果以多线程模式运行 memcached,则启动工作者线程 */thread_init(settings.num_threads, main_base);/* 启动 assoc 维护线程*/if (start_assoc_maintenance_thread() == -1) {exit(EXIT_FAILURE);}/* 启动 slab 维护线程 */if (settings.slab_reassign &&start_slab_maintenance_thread() == -1) {exit(EXIT_FAILURE);}/* 初始化时钟处理函数 */clock_handler(0, 0, 0);/* 释放特权后创建 unix 模式套接字 */if (settings.socketpath != NULL) {errno = 0;if (server_socket_unix(settings.socketpath,settings.access)) {vperror("failed to listen on UNIX socket: %s", settings.socketpath);exit(EX_OSERR);}}/* 创建监听套接字,绑定该套接字,然后进行相关初始化 */if (settings.socketpath == NULL) {const char *portnumber_filename = getenv("MEMCACHED_PORT_FILENAME");char temp_portnumber_filename[PATH_MAX];FILE *portnumber_file = NULL;if (portnumber_filename != NULL) {snprintf(temp_portnumber_filename,sizeof(temp_portnumber_filename),"%s.lck", portnumber_filename);portnumber_file = fopen(temp_portnumber_filename, "a");if (portnumber_file == NULL) {fprintf(stderr, "Failed to open \"%s\": %s\n",temp_portnumber_filename, strerror(errno));}}errno = 0;if (settings.port && server_sockets(settings.port, tcp_transport,portnumber_file)) {vperror("failed to listen on TCP port %d", settings.port);exit(EX_OSERR);}/** 初始化顺序:首先创建监听套接字(低端口的套接字可能需要root权限),* 然后释放 root 权限,如果设置以守护进程运行 memcached,则 Daemonise it。* 然后初始化 libevent 库。*//* 创建 UDP 监听套接字,并绑定该套接字 */errno = 0;if (settings.udpport && server_sockets(settings.udpport, udp_transport,portnumber_file)) {vperror("failed to listen on UDP port %d", settings.udpport);exit(EX_OSERR);}if (portnumber_file) {fclose(portnumber_file);rename(temp_portnumber_filename, portnumber_filename);}}if (pid_file != NULL) {save_pid(pid_file);}/* 释放特权 */drop_privileges();/* 进入事件循环 */if (event_base_loop(main_base, 0) != 0) {retval = EXIT_FAILURE;}stop_assoc_maintenance_thread();/* 如果不是守护进程,则删除 PID 文件 */if (do_daemonize)remove_pidfile(pid_file);/* Clean up strdup() call for bind() address */if (settings.inter)free(settings.inter);if (l_socket)free(l_socket);if (u_socket)free(u_socket);return retval;
}

main 函数中值得注意的几个函数调用如下:

  • conn_init();
  • thread_init(settings.num_threads, main_base);
  • clock_handler(0, 0, 0);
  • server_socket_unix(settings.socketpath,settings.access)
  • server_sockets(settings.port, tcp_transport, portnumber_file);
  • event_base_loop(main_base, 0);

在分析上面几个函数之前我们来看看一些重要的变量和结构体的定义:

  • 重要变量声明
static conn *listen_conn = NULL;
static struct event_base *main_base;static conn **freeconns;

  • struct conn 结构体定义:
struct conn {int    sfd;sasl_conn_t *sasl_conn;enum conn_states  state;enum bin_substates substate;struct event event;short  ev_flags;short  which;   /** which events were just triggered */char   *rbuf;   /** buffer to read commands into */char   *rcurr;  /** but if we parsed some already, this is where we stopped */int    rsize;   /** total allocated size of rbuf */int    rbytes;  /** how much data, starting from rcur, do we have unparsed */char   *wbuf;char   *wcurr;int    wsize;int    wbytes;/** which state to go into after finishing current write */enum conn_states  write_and_go;void   *write_and_free; /** free this memory after finishing writing */char   *ritem;  /** when we read in an item's value, it goes here */int    rlbytes;/* data for the nread state *//*** item is used to hold an item structure created after reading the command* line of set/add/replace commands, but before we finished reading the actual* data. The data is read into ITEM_data(item) to avoid extra copying.*/void   *item;     /* for commands set/add/replace  *//* data for the swallow state */int    sbytes;    /* how many bytes to swallow *//* data for the mwrite state */struct iovec *iov;int    iovsize;   /* number of elements allocated in iov[] */int    iovused;   /* number of elements used in iov[] */struct msghdr *msglist;int    msgsize;   /* number of elements allocated in msglist[] */int    msgused;   /* number of elements used in msglist[] */int    msgcurr;   /* element in msglist[] being transmitted now */int    msgbytes;  /* number of bytes in current msg */item   **ilist;   /* list of items to write out */int    isize;item   **icurr;int    ileft;char   **suffixlist;int    suffixsize;char   **suffixcurr;int    suffixleft;enum protocol protocol;   /* which protocol this connection speaks */enum network_transport transport; /* what transport is used by this connection *//* data for UDP clients */int    request_id; /* Incoming UDP request ID, if this is a UDP "connection" */struct sockaddr request_addr; /* Who sent the most recent request */socklen_t request_addr_size;unsigned char *hdrbuf; /* udp packet headers */int    hdrsize;   /* number of headers' worth of space is allocated */bool   noreply;   /* True if the reply should not be sent. *//* current stats command */struct {char *buffer;size_t size;size_t offset;} stats;/* Binary protocol stuff *//* This is where the binary header goes */protocol_binary_request_header binary_header;uint64_t cas; /* the cas to return */short cmd; /* current command being processed */int opaque;int keylen;conn   *next;     /* Used for generating a list of conn structures */LIBEVENT_THREAD *thread; /* Pointer to the thread object serving this connection */
};

  • LIBEVENT_THREAD 和 LIBEVENT_DISPATCHER_THREAD定义:
typedef struct {pthread_t thread_id;        /* unique ID of this thread */struct event_base *base;    /* libevent handle this thread uses */struct event notify_event;  /* listen event for notify pipe */int notify_receive_fd;      /* receiving end of notify pipe */int notify_send_fd;         /* sending end of notify pipe */struct thread_stats stats;  /* Stats generated by this thread */struct conn_queue *new_conn_queue; /* queue of new connections to handle */cache_t *suffix_cache;      /* suffix cache */
} LIBEVENT_THREAD;typedef struct {pthread_t thread_id;        /* unique ID of this thread */struct event_base *base;    /* libevent handle this thread uses */
} LIBEVENT_DISPATCHER_THREAD;

下面分析conn_init(); 函数:

static void conn_init(void) {freetotal = 200;freecurr = 0;if ((freeconns = calloc(freetotal, sizeof(conn *))) == NULL) {fprintf(stderr, "Failed to allocate connection structures\n");}return;
}

基本上就是分配 freetotal 个 conn * 空间,非常简单,

接下来是另外一个重要的函数调用:thread_init();

/** 初始化线程子模块,创建各种 worker 线程。** nthreads 代表 worker 事件处理线程的数目* main_base 是主线程的event base。*/
void thread_init(int nthreads, struct event_base *main_base) {int         i;int         power;/* 初始化锁 */pthread_mutex_init(&cache_lock, NULL);pthread_mutex_init(&stats_lock, NULL);pthread_mutex_init(&init_lock, NULL);pthread_cond_init(&init_cond, NULL);pthread_mutex_init(&cqi_freelist_lock, NULL);cqi_freelist = NULL;/* Want a wide lock table, but don't waste memory */if (nthreads < 3) {power = 10;} else if (nthreads < 4) {power = 11;} else if (nthreads < 5) {power = 12;} else {/* 8192 buckets, and central locks don't scale much past 5 threads */power = 13;}item_lock_count = ((unsigned long int)1 << (power));item_lock_mask  = item_lock_count - 1;item_locks = calloc(item_lock_count, sizeof(pthread_mutex_t));if (! item_locks) {perror("Can't allocate item locks");exit(1);}for (i = 0; i < item_lock_count; i++) {pthread_mutex_init(&item_locks[i], NULL);}threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));if (! threads) {perror("Can't allocate thread descriptors");exit(1);}/* 设置 dispatcher_thread (即主线程)的相关结构 */dispatcher_thread.base = main_base;dispatcher_thread.thread_id = pthread_self();for (i = 0; i < nthreads; i++) {int fds[2];if (pipe(fds)) {perror("Can't create notify pipe");exit(1);}/* 此处用了一个 trick,worker 线程通过读取 notify_receice_fd * 一个字节获知主线程接受到了事件。 */threads[i].notify_receive_fd = fds[0];threads[i].notify_send_fd = fds[1];setup_thread(&threads[i]);/* 为 libevent 保留三个 fd,另外两个预留给管道 */stats.reserved_fds += 5;}/* 完成了所有的 libevent 设置后创建 worker 线程 */for (i = 0; i < nthreads; i++) {create_worker(worker_libevent, &threads[i]);}/* 主线程等待所有的线程设置好了以后在返回 */pthread_mutex_lock(&init_lock);while (init_count < nthreads) {pthread_cond_wait(&init_cond, &init_lock);}pthread_mutex_unlock(&init_lock);
}

static LIBEVENT_DISPATCHER_THREAD dispatcher_thread;/** 每个 Libevent 实例都有一对唤醒的管道,其他线程可以想管道中写入数据* 来告知他在队列中放入了一个新的连接*/
static LIBEVENT_THREAD *threads;

thread_init() 中又调用了 setup_thread() 来设置每个 worker 线程的信息。

static void setup_thread(LIBEVENT_THREAD *me) {me->base = event_init();if (! me->base) {fprintf(stderr, "Can't allocate event base\n");exit(1);}/* Listen for notifications from other threads */event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);event_base_set(me->base, &me->notify_event);if (event_add(&me->notify_event, 0) == -1) {fprintf(stderr, "Can't monitor libevent notify pipe\n");exit(1);}me->new_conn_queue = malloc(sizeof(struct conn_queue));if (me->new_conn_queue == NULL) {perror("Failed to allocate memory for connection queue");exit(EXIT_FAILURE);}cq_init(me->new_conn_queue);if (pthread_mutex_init(&me->stats.mutex, NULL) != 0) {perror("Failed to initialize mutex");exit(EXIT_FAILURE);}me->suffix_cache = cache_create("suffix", SUFFIX_SIZE, sizeof(char*),NULL, NULL);if (me->suffix_cache == NULL) {fprintf(stderr, "Failed to create suffix cache\n");exit(EXIT_FAILURE);}
}

并在setup_thread() 中设置 worker 线程的回调函数,thread_libevent_process() :

/** 当每个 worker 线程的唤醒管道(wakeup pipe)收到有连接到来的通知时,* 就调用该函数。*/
static void thread_libevent_process(int fd, short which, void *arg) {LIBEVENT_THREAD *me = arg;CQ_ITEM *item;char buf[1];if (read(fd, buf, 1) != 1)if (settings.verbose > 0)fprintf(stderr, "Can't read from libevent pipe\n");item = cq_pop(me->new_conn_queue);if (NULL != item) {conn *c = conn_new(item->sfd, item->init_state, item->event_flags,item->read_buffer_size, item->transport, me->base);if (c == NULL) {if (IS_UDP(item->transport)) {fprintf(stderr, "Can't listen for events on UDP socket\n");exit(1);} else {if (settings.verbose > 0) {fprintf(stderr, "Can't listen for events on fd %d\n",item->sfd);}close(item->sfd);}} else {c->thread = me;}cqi_free(item);}
}

thread_init() 中还调用了create_worker() 函数创建 worker 线程,同时设置worker 线程的回调函数为 worker_libevent():

/** Worker 线程: 事件循环*/
static void *worker_libevent(void *arg) {LIBEVENT_THREAD *me = arg;/* thread_init() 会一直阻塞到所有的线程完成初始化*/pthread_mutex_lock(&init_lock);init_count++;pthread_cond_signal(&init_cond);pthread_mutex_unlock(&init_lock);/* worker 线程进入事件循环 */event_base_loop(me->base, 0);return NULL;
}

至此 thread_init() 函数返回。

接下来一个比较重要的调用是server_sockets(),server_sockets() 中又调用了 server_socket(),然后在在 server_socket() 中又调用了 conn_new(),并在 conn_new()中设置事件的回调函数 event_handler(),

void event_handler(const int fd, const short which, void *arg) {conn *c;c = (conn *)arg;assert(c != NULL);c->which = which;/* sanity */if (fd != c->sfd) {if (settings.verbose > 0)fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");conn_close(c);return;}drive_machine(c);/* wait for next event */return;
}

drive_machine() 函数可以说是一个大的状态机,函数很长,

static void drive_machine(conn *c) {bool stop = false;int sfd, flags = 1;socklen_t addrlen;struct sockaddr_storage addr;int nreqs = settings.reqs_per_event;int res;const char *str;assert(c != NULL);while (!stop) {switch(c->state) {case conn_listening:addrlen = sizeof(addr);if ((sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen)) == -1) {if (errno == EAGAIN || errno == EWOULDBLOCK) {/* these are transient, so don't log anything */stop = true;} else if (errno == EMFILE) {if (settings.verbose > 0)fprintf(stderr, "Too many open connections\n");accept_new_conns(false);stop = true;} else {perror("accept()");stop = true;}break;}if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {perror("setting O_NONBLOCK");close(sfd);break;}if (settings.maxconns_fast &&stats.curr_conns + stats.reserved_fds >= settings.maxconns - 1) {str = "ERROR Too many open connections\r\n";res = write(sfd, str, strlen(str));close(sfd);STATS_LOCK();stats.rejected_conns++;STATS_UNLOCK();} else {dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,DATA_BUFFER_SIZE, tcp_transport);}stop = true;break;case conn_waiting:if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}conn_set_state(c, conn_read);stop = true;break;case conn_read:res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);switch (res) {case READ_NO_DATA_RECEIVED:conn_set_state(c, conn_waiting);break;case READ_DATA_RECEIVED:conn_set_state(c, conn_parse_cmd);break;case READ_ERROR:conn_set_state(c, conn_closing);break;case READ_MEMORY_ERROR: /* Failed to allocate more memory *//* State already set by try_read_network */break;}break;case conn_parse_cmd :if (try_read_command(c) == 0) {/* wee need more data! */conn_set_state(c, conn_waiting);}break;case conn_new_cmd:/* Only process nreqs at a time to avoid starving otherconnections */--nreqs;if (nreqs >= 0) {reset_cmd_handler(c);} else {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.conn_yields++;pthread_mutex_unlock(&c->thread->stats.mutex);if (c->rbytes > 0) {/* We have already read in data into the input buffer,so libevent will most likely not signal read eventson the socket (unless more data is available. As ahack we should just put in a request to write data,because that should be possible ;-)*/if (!update_event(c, EV_WRITE | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);}}stop = true;}break;case conn_nread:if (c->rlbytes == 0) {complete_nread(c);break;}/* first check if we have leftovers in the conn_read buffer */if (c->rbytes > 0) {int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;if (c->ritem != c->rcurr) {memmove(c->ritem, c->rcurr, tocopy);}c->ritem += tocopy;c->rlbytes -= tocopy;c->rcurr += tocopy;c->rbytes -= tocopy;if (c->rlbytes == 0) {break;}}/*  now try reading from the socket */res = read(c->sfd, c->ritem, c->rlbytes);if (res > 0) {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.bytes_read += res;pthread_mutex_unlock(&c->thread->stats.mutex);if (c->rcurr == c->ritem) {c->rcurr += res;}c->ritem += res;c->rlbytes -= res;break;}if (res == 0) { /* end of stream */conn_set_state(c, conn_closing);break;}if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}stop = true;break;}/* otherwise we have a real error, on which we close the connection */if (settings.verbose > 0) {fprintf(stderr, "Failed to read, and not due to blocking:\n""errno: %d %s \n""rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",errno, strerror(errno),(long)c->rcurr, (long)c->ritem, (long)c->rbuf,(int)c->rlbytes, (int)c->rsize);}conn_set_state(c, conn_closing);break;case conn_swallow:/* we are reading sbytes and throwing them away */if (c->sbytes == 0) {conn_set_state(c, conn_new_cmd);break;}/* first check if we have leftovers in the conn_read buffer */if (c->rbytes > 0) {int tocopy = c->rbytes > c->sbytes ? c->sbytes : c->rbytes;c->sbytes -= tocopy;c->rcurr += tocopy;c->rbytes -= tocopy;break;}/*  now try reading from the socket */res = read(c->sfd, c->rbuf, c->rsize > c->sbytes ? c->sbytes : c->rsize);if (res > 0) {pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.bytes_read += res;pthread_mutex_unlock(&c->thread->stats.mutex);c->sbytes -= res;break;}if (res == 0) { /* end of stream */conn_set_state(c, conn_closing);break;}if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {if (!update_event(c, EV_READ | EV_PERSIST)) {if (settings.verbose > 0)fprintf(stderr, "Couldn't update event\n");conn_set_state(c, conn_closing);break;}stop = true;break;}/* otherwise we have a real error, on which we close the connection */if (settings.verbose > 0)fprintf(stderr, "Failed to read, and not due to blocking\n");conn_set_state(c, conn_closing);break;case conn_write:/** We want to write out a simple response. If we haven't already,* assemble it into a msgbuf list (this will be a single-entry* list for TCP or a two-entry list for UDP).*/if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {if (add_iov(c, c->wcurr, c->wbytes) != 0) {if (settings.verbose > 0)fprintf(stderr, "Couldn't build response\n");conn_set_state(c, conn_closing);break;}}/* fall through... */case conn_mwrite:if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {if (settings.verbose > 0)fprintf(stderr, "Failed to build UDP headers\n");conn_set_state(c, conn_closing);break;}switch (transmit(c)) {case TRANSMIT_COMPLETE:if (c->state == conn_mwrite) {while (c->ileft > 0) {item *it = *(c->icurr);assert((it->it_flags & ITEM_SLABBED) == 0);item_remove(it);c->icurr++;c->ileft--;}while (c->suffixleft > 0) {char *suffix = *(c->suffixcurr);cache_free(c->thread->suffix_cache, suffix);c->suffixcurr++;c->suffixleft--;}/* XXX:  I don't know why this wasn't the general case */if(c->protocol == binary_prot) {conn_set_state(c, c->write_and_go);} else {conn_set_state(c, conn_new_cmd);}} else if (c->state == conn_write) {if (c->write_and_free) {free(c->write_and_free);c->write_and_free = 0;}conn_set_state(c, c->write_and_go);} else {if (settings.verbose > 0)fprintf(stderr, "Unexpected state %d\n", c->state);conn_set_state(c, conn_closing);}break;case TRANSMIT_INCOMPLETE:case TRANSMIT_HARD_ERROR:break;                   /* Continue in state machine. */case TRANSMIT_SOFT_ERROR:stop = true;break;}break;case conn_closing:if (IS_UDP(c->transport))conn_cleanup(c);elseconn_close(c);stop = true;break;case conn_max_state:assert(false);break;}}return;
}

可以说整个 memcached 就是围绕这个状态机运行的,可能的状态如下:

enum conn_states {conn_listening,  /**< 套接字监听端口,等待新的连接 */conn_new_cmd,    /**< 准备下一次命令的连接 */conn_waiting,    /**< 等待可读套接字 */conn_read,       /**< 读入命令行 */conn_parse_cmd,  /**< 从输入缓冲区中分析命令 */conn_write,      /**< 响应写出 */conn_nread,      /**< 读入固定大小的字节 */conn_swallow,    /**< 去除不必要的存储字节 */conn_closing,    /**< 关闭连接 */conn_mwrite,     /**< 顺序写 item */conn_max_state   /**< 最大的状态值,用于状态Assertion(断言) */
};

在 conn_listening 状态时,接受新的客户端连接,然后调用dispatch_new_connection():

/** 分发新的连接至其他线程,该函数只会在主线程中调用,* 调用时机为:主线程初始化(UDP模式)或者* 存在新的连接到来*/
void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,int read_buffer_size, enum network_transport transport) {CQ_ITEM *item = cqi_new();int tid = (last_thread + 1) % settings.num_threads;LIBEVENT_THREAD *thread = threads + tid;last_thread = tid;item->sfd = sfd;item->init_state = init_state;item->event_flags = event_flags;item->read_buffer_size = read_buffer_size;item->transport = transport;cq_push(thread->new_conn_queue, item);MEMCACHED_CONN_DISPATCH(sfd, thread->thread_id);if (write(thread->notify_send_fd, "", 1) != 1) {perror("Writing to thread notify pipe");}
}

至此,主线程和 worker 线程大部分逻辑均已介绍完毕,并各自进入自己的事件循环处理相应的业务。

读后语:memcached 代码简介易读,基于 Libevent 处理网络事件,并采用了多线程机制,大大利用了多核的计算能力,提高了系统接受客户端请求并发数量。

同时 memcached 在主线程和 worker 线程之间关于新连接到来的通知的处理也比较有趣,主线程和 worker 线程之间使用了一对管道来通信,每当主线程接受到新的连接时,它就向管道的一段写入一个字节的数据,然后由于 worker 线程监听了管道另外一端的事件,所以 worker 线程可以感知到新的连接到了,然后该连接被主线程 Dispatch 到某一个线程的队列中,再由 worker 线程进行处理。

Memcached 源码分析——从 main 函数说起相关推荐

  1. Memcached源码分析 - 内存存储机制Slabs(5)

    Memcached源码分析 - 网络模型(1) Memcached源码分析 - 命令解析(2) Memcached源码分析 - 数据存储(3) Memcached源码分析 - 增删改查操作(4) Me ...

  2. memcached 源码分析

    1.Memcached概述 memcached是一个高性能的分布式内存缓存服务器,memcached在Linux上可以通过yum命令安装,这样方便很多,在生产环境下建议用Linux系统,memcach ...

  3. SequoiaDB 系列之五 :源码分析之main函数

    好久好久没有写博客了,因为一直要做各种事,工作上的,生活上的,这一下就是半年. 时光如梭. 这两天回头看了看写的博客,感觉都是贻笑大方. 但是还是想坚持把SequoiaDB系列写完. 初步的打算已经确 ...

  4. x265源码分析:main函数及CLIOptions结构体解释

    /** * 返回码信息:* 0 – 编码成功:* 1 – 命令行解释失败:* 2 – 编码器打开失败:* 3 – 生成流头部失败:* 4 – 编码出错:* 5 – 打开csv文件失败.*/ int m ...

  5. Memcached源码分析

    http://blog.csdn.net/lcli2009/article/details/22167319 转载于:https://www.cnblogs.com/zengkefu/p/483884 ...

  6. x265源码分析 main函数 x265.cpp

    图片转载于x265源码流程分析_Dillon2015的博客-CSDN博客_x265编码流程 cliopt.prase main ()函数--解析函数参数并进行编码准备工作:x265.cpp (1)Ge ...

  7. libdvbpsi源码分析(三)PSI decocder详细分析

    2019独角兽企业重金招聘Python工程师标准>>> 由上一篇 libdvbpsi源码分析(二)main函数,简单分析了demo程序中main函数的执行流程.现在将对具体的PSI表 ...

  8. SequoiaDB 系列之六 :源码分析之coord节点

    好久不见. 在上一篇SequoiaDB 系列之五   :源码分析之main函数,有讲述进程开始运行时,会根据自身的角色,来初始化不同的CB(控制块,control block). 在之前的一篇Sequ ...

  9. dnsspoof工作原理、编译、源码分析

    dnsspoof 是一个DNS欺骗工具,只要给出将要重定向的域名和域名重定向到的IP,就可以实现DNS欺骗. 下载地址:http://monkey.org/~dugsong/dsniff/ dnssp ...

最新文章

  1. 在 Swift 中调用 OC 代码
  2. Linux内核的并发与竞态、信号量、互斥锁、自旋锁
  3. python json模块rodas方法_json模块使用总结——Python
  4. linux ojvm补丁安装,打补丁PSU
  5. 原生js实现选中所有的checkbox
  6. Linux入门之VIM快捷使用
  7. 长沙戴维营教育iOS开发面试题周刊
  8. DroidCam通过网络调用手机摄像头的方法二
  9. Shell 脚本大全(收藏好)
  10. PYsystem003 中职网络安全
  11. 一次惨痛的线下机房上云的经历
  12. git安装 苹果笔记本_远程系统重装安装电脑维修笔记本台式xpwin7810苹果mac双系统安装...
  13. java采用MD5加密解密
  14. 地方门户网站发展与SEO优化瓶颈之谈
  15. python地图可视化把直辖市和地级市画在一起_Python地理地图可视化:Folium解析百度地图上中国城市中心的经纬度并显示在地图上(3),folium,把,出来,展示,三...
  16. Android APN设置接口
  17. python猜字小游戏
  18. Autodesk_Inventor_2013_Simplified_Chinese_Win_3...
  19. 2023年上海交通大学数院应用统计考研上岸前辈备考经验指导
  20. 微信公众号开发学习(一)

热门文章

  1. 2015 跨年博文总结
  2. 将 gitblog 的博客内容搬迁到 CSDN
  3. Android 阅读器架构图,网上收集,留做存货
  4. php glob() 列出目录及文件
  5. MVC Areas的使用
  6. Python-类基础
  7. 软件工程启程篇章:C#和四则运算生成与运算
  8. javascript canvas九宫格小程序
  9. android chrome iframe设置src属性无法启动app
  10. Jsonp 跨域请求实例