1.Memcached概述

memcached是一个高性能的分布式内存缓存服务器,memcached在Linux上可以通过yum命令安装,这样方便很多,在生产环境下建议用Linux系统,memcached使用libevent这个库在Linux系统上才能发挥它的高性能。它的分布式其实在服务端是不具有分布式的特征的,是依靠客户端的分布式算法进行了分布式,memcached是一个纯内存型的数据库,这样在读写速度上相对来说比较快。

MemCache虽然被称为分布式缓存,但是MemCache本身完全不具备分布式的功能,MemCache集群之间不会相互通信(与之形成对比的,比如JBoss Cache,某台服务器有缓存数据更新时,会通知集群中其他机器更新缓存或清除缓存数据),所谓的分布式,完全依赖于客户端程序的实现,就像上面这张图的流程一样。

命 令

作 用

get

返回Key对应的Value值

add

添加一个Key值,没有则添加成功并提示STORED,有则失败并提示NOT_STORED

set

无条件地设置一个Key值,没有就增加,有就覆盖,操作成功提示STORED

replace

按照相应的Key值替换数据,如果Key值不存在则会操作失败

stats

返回MemCache通用统计信息

stats items

返回各个slab中item的数目和最老的item的年龄

stats slabs

返回MemCache运行期间创建的每个slab的信息

version

返回当前MemCache版本号

flush_all

清空所有键值,但不会删除items,所以此时MemCache依旧占用内存

quit

关闭连接

2.memcached内存管理

MemCache的数据存放在内存中,存放在内存中认为意味着几点:

1)访问数据的速度比传统的关系型数据库要快,因为Oracle、MySQL这些传统的关系型数据库为了保持数据的持久性,数据存放在硬盘中,IO操作速度慢

2)MemCache的数据存放在内存中同时意味着只要MemCache重启了,数据就会消失

3)既然MemCache的数据存放在内存中,那么势必受到机器位数的限制,32位机器最多只能使用2GB的内存空间,64位机器可以认为没有上限

MemCache采用的内存分配方式是固定空间分配

这里面涉及了slab_class、slab、page、chunk四个概念,它们之间的关系是:

1)MemCache将内存空间分为一组slab

2)每个slab下又有若干个page,每个page默认是1M,如果一个slab占用100M内存的话,那么这个slab下应该有100个page

3)每个page里面包含一组chunk,chunk是真正存放数据的地方,同一个slab里面的chunk的大小是固定的

4)有相同大小chunk的slab被组织在一起,称为slab_class

MemCache内存分配的方式称为allocator,slab的数量是有限的,几个、十几个或者几十个,这个和启动参数的配置相关。MemCache中的value过来存放的地方是由value的大小决定的,value总是会被存放到与chunk大小最接近的一个slab中,比如slab[1]的chunk大小为80字节、slab[2]的chunk大小为100字节、slab[3]的chunk大小为128字节(相邻slab内的chunk基本以1.25为比例进行增长,MemCache启动时可以用-f指定这个比例),那么过来一个88字节的value,这个value将被放到2号slab中。放slab的时候,首先slab要申请内存,申请内存是以page为单位的,所以在放入第一个数据的时候,无论大小为多少,都会有1M大小的page被分配给该slab。申请到page后,slab会将这个page的内存按chunk的大小进行切分,这样就变成了一个chunk数组,最后从这个chunk数组中选择一个用于存储数据。

如果这个slab中没有chunk可以分配了怎么办,如果MemCache启动没有追加-M(禁止LRU,这种情况下内存不够会报Out Of Memory错误),那么MemCache会把这个slab中最近最少使用的chunk中的数据清理掉,然后放上最新的数据。针对MemCache的内存分配及回收算法,总结三点:

1)MemCache的内存分配chunk里面会有内存浪费,88字节的value分配在128字节(紧接着大的用)的chunk中,就损失了30字节,但是这也避免了管理内存碎片的问题

MemCache的LRU算法不是针对全局的,是针对slab的

2)应该可以理解为什么MemCache存放的value大小是限制的,因为一个新数据过来,3)slab会先以page为单位申请一块内存,申请的内存最多就只有1M,所以value大小自然不能大于1M了

2.1MemCache的特性和限制

  1. 序号
  1. 限制描述
  1. 1
  1. MemCache中可以保存的item数据量是没有限制的,只要内存足够
  1. 2
  1. MemCache单进程在32位机中最大使用内存为2G,64位机则没有限制
  1. 3
  1. Key最大为250个字节,超过该长度无法存储
  1. 4
  1. 单个item最大数据是1MB,超过1MB的数据不予存储
  1. 5
  1. MemCache服务端是不安全的,比如已知某个MemCache节点,可以直接telnet过去,并通过flush_all让已经存在的键值对立即失效
  1. 6
  1. 不能够遍历MemCache中所有的item,因为这个操作的速度相对缓慢且会阻塞其他的操作
  1. 7
  1. MemCache的高性能源自于两阶段哈希结构:第一阶段在客户端,通过Hash算法根据Key值算出一个节点;第二阶段在服务端,通过一个内部的Hash算法,查找真正的item并返回给客户端。从实现的角度看,MemCache是一个非阻塞的、基于事件的服务器程序

2.2 slab内存管理

2.2.1 item 数据存储节点

typedef struct _stritem {/* Protected by LRU locks *///一个item的地址, 主要用于LRU链和freelist链struct _stritem *next;//下一个item的地址,主要用于LRU链和freelist链struct _stritem *prev;/* Rest are protected by an item lock *///用于记录哈希表槽中下一个item节点的地址struct _stritem *h_next;    /* hash chain next *///最近访问时间rel_time_t      time;       /* least recent access *///缓存过期时间rel_time_t      exptime;    /* expire time */int             nbytes;     /* size of data *///当前item被引用的次数,用于判断item是否被其它的线程在操作中//refcount == 1的情况下该节点才可以被删除unsigned short  refcount;uint8_t         nsuffix;    /* length of flags-and-length string */uint8_t         it_flags;   /* ITEM_* above *///记录该item节点位于哪个slabclass_t中uint8_t         slabs_clsid;/* which slab class we're in */uint8_t         nkey;       /* key length, w/terminating null and padding *//* this odd type prevents type-punning issues when we do* the little shuffle to save space when not using CAS. */union {uint64_t cas;char end;} data[];/* if it_flags & ITEM_CAS we have 8 bytes CAS *//* then null-terminated key *//* then " flags length\r\n" (no terminating null) *//* then data with terminating \r\n (no terminating null; it's binary!) */} item;

slab与chunk

slab是一块内存空间,默认大小为1M,memcached会把一个slab分割成一个个chunk, 这些被切割的小的内存块,主要用来存储item

slabclass

每个item的大小都可能不一样,item存储于chunk,如果chunk大小不够,则不足以分配给item使用,如果chunk过大,则太过于浪费内存空间。因此memcached采取的做法是,将slab切割成不同大小的chunk,这样就满足了不同大小item的存储。被划分不同大小chunk的slab的内存在memcached就是用slabclass这个结构体来表现的

typedef struct {unsigned int size;      /* sizes of items */unsigned int perslab;   /* how many items per slab */void *slots;           /* list of item ptrs */unsigned int sl_curr;   /* total free items in list */unsigned int slabs;     /* how many slabs were allocated for this class */void **slab_list;       /* array of slab pointers */unsigned int list_size; /* size of prev array */
} slabclass_t;

1)slabclass数组初始化的时候,每个slabclass_t都会分配一个1M大小的slab,slab会被切分为N个小的内存块,这个小的内存块的大小取决于slabclass_t结构上的size的大小

2)每个slabclass_t都只存储一定大小范围的数据,并且下一个slabclass切割的chunk块大于前一个slabclass切割的chunk块大小

3)memcached中slabclass数组默认大小为64,slabclass切割块大小的增长因子默认是1.25

例如:slabclass[1]切割的chunk块大小为100字节,slabclass[2]为125,如果需要存储一个110字节的缓存,那么就需要到slabclass[2] 的空闲链表中获取一个空闲节点进行存储。

2.2.2 slabclass的初始化

slabs_init

|---> slabs_preallocate

|---> do_slabs_newslab

|--->grow_slab_list

|--->split_slab_page_into_freelist

2.2.3 item节点的分配流程

序号

描述

1

根据大小,找到合适的slabclass

2

slabclass空闲列表中是否有空闲的item节点,如果有直接分配item,用于存储内容

3

空闲列表没有空闲的item可以分配,会重新开辟一个slab(默认大小为1M)的内存块,然后切割slab并放入到空闲列表中,然后从空闲列表中获取节点

void *slabs_alloc(size_t size, unsigned int id,

unsigned int flags) {

void *ret;

pthread_mutex_lock(&slabs_lock);

ret = do_slabs_alloc(size, id, flags);

pthread_mutex_unlock(&slabs_lock);

return ret;

}

/*@null@*/

static void *do_slabs_alloc(const size_t size, unsigned int id,

unsigned int flags) {

slabclass_t *p;

void *ret = NULL;

item *it = NULL;

if (id < POWER_SMALLEST || id > power_largest) {

MEMCACHED_SLABS_ALLOCATE_FAILED(size, 0);

return NULL;

}

p = &slabclass[id];

assert(p->sl_curr == 0 || (((item *)p->slots)->it_flags & ITEM_SLABBED));

assert(size <= p->size);

/* 没有空闲slab时,需要重新分配*/

if (p->sl_curr == 0 && flags != SLABS_ALLOC_NO_NEWPAGE) {

do_slabs_newslab(id);

}

if (p->sl_curr != 0) {

/* return off our freelist */

it = (item *)p->slots;

p->slots = it->next;

if (it->next) it->next->prev = 0;

/* Kill flag and initialize refcount here for lock safety in slab

* mover's freeness detection. */

it->it_flags &= ~ITEM_SLABBED;

it->refcount = 1;

p->sl_curr--;

ret = (void *)it;

} else {

ret = NULL;

}

if (ret) {

MEMCACHED_SLABS_ALLOCATE(size, id, p->size, ret);

} else {

MEMCACHED_SLABS_ALLOCATE_FAILED(size, id);

}

return ret;

}

2.2.4 item节点的释放

释放一个item节点,并不会free内存空间,而是将item节点归还到slabclass的空闲列表中

void slabs_free(void *ptr, size_t size, unsigned int id) {

pthread_mutex_lock(&slabs_lock);

do_slabs_free(ptr, size, id);

pthread_mutex_unlock(&slabs_lock);

}

static void do_slabs_free(void *ptr, const size_t size, unsigned int id) {

slabclass_t *p;

item *it;

assert(id >= POWER_SMALLEST && id <= power_largest);

if (id < POWER_SMALLEST || id > power_largest)

return;

MEMCACHED_SLABS_FREE(size, id, ptr);

p = &slabclass[id];

it = (item *)ptr;

if ((it->it_flags & ITEM_CHUNKED) == 0) {

it->it_flags = ITEM_SLABBED;

it->slabs_clsid = id;

it->prev = 0;

it->next = p->slots;

if (it->next) it->next->prev = it;

p->slots = it;

p->sl_curr++;

} else {

do_slabs_free_chunked(it, size);

}

return;

}

3 Memcached网络模型

1

Memcached主要是基于Libevent 网络事件库进行开发的

2

Memcached的网络模型分为两部分:主线程和工作线程。主线程主要用来接收客户端的连接信息;工作线程主要用来接管客户端连接,处理具体的业务逻辑

3

主线程和工作线程之间主要是通过pipe管道来进行通信。当主线程接收到客户端的连接的时候,会通过轮询的方式选择一个工作线程,然后向该工作线程的管道pipe写数据。工作线程监听到管道中有数据写入的时候,就会触发代码逻辑去接管客户端的连接

4

每个工作线程也是基于Libevent的事件机制,当客户端有数据写入的时候,就会触发读取的操作

"主线程统一accept/dispatch子线程"的基础设施:主线程创建多个子线程(这些子线程也称为worker线程),每一个线程都维持自己的事件循环,即每个线程都有自己的epoll,并且都会调用epoll_wait函数进入事件监听状态。每一个worker线程(子线程)和主线程之间都用一条管道相互通信。每一个子线程都监听自己对应那条管道的读端。当主线程想和某一个worker线程进行通信,直接往对应的那条管道写入数据即可。

"主线程统一accept/dispatch子线程"模型的工作流程:主线程负责监听进程对外的TCP监听端口。当客户端申请连接connect到进程的时候,主线程负责接收accept客户端的连接请求。然后主线程选择其中一个worker线程,把客户端fd通过对应的管道传给worker线程。worker线程得到客户端的fd后负责和这个客户端进行一切的通信。

1. memcached使用libevent作为进行事件监听;

2.memcached往管道里面写的内容不是fd,而是一个简单的字符。每一个worker线程都维护一个CQ队列,主线程把fd和一些信息写入一个CQ_ITEM里面,然后主线程往worker线程的CQ队列里面push这个CQ_ITEM。接着主线程使用管道通知worker线程:“唤醒work线程处理新的链接请求”。

3.1 CQ_ITEM

memcached每一个worker线程都有一个CQ队列,主线程accept到新客户端后,就把新客户端的信息封装成一个CQ_ITEM,然后push到选定线程的CQ队列中

typedef struct conn_queue_item CQ_ITEM;

struct conn_queue_item {

int               sfd;

enum conn_states  init_state;

int               event_flags;

int               read_buffer_size;

enum network_transport     transport;

enum conn_queue_item_modes mode;

conn *c;

void    *ssl;

io_pending_t *io; // IO when used for deferred IO handling.

STAILQ_ENTRY(conn_queue_item) i_next;

};

/* A connection queue. */

typedef struct conn_queue CQ;

struct conn_queue {

STAILQ_HEAD(conn_ev_head, conn_queue_item) head;

pthread_mutex_t lock;

cache_t *cache; /* freelisted objects */

};

可以看到结构体conn_queue(即CQ队列结构体)有一个pthread_mutex_t类型变量lock,这说明主线程往某个worker线程的CQ队列里面push一个CQ_ITEM的时候必然要加锁的。下面是初始化CQ队列,以及push、pop一个CQ_ITEM的代码

static void cq_init(CQ *cq) {

pthread_mutex_init(&cq->lock, NULL);

cq->head = NULL;

cq->tail = NULL;

}

static CQ_ITEM *cq_pop(CQ *cq) {

CQ_ITEM *item;

pthread_mutex_lock(&cq->lock);

item = cq->head;

if (NULL != item) {

cq->head = item->next;

if (NULL == cq->head)

cq->tail = NULL;

}

pthread_mutex_unlock(&cq->lock);

return item;

}

/*

* Adds an item to a connection queue.

*/

static void cq_push(CQ *cq, CQ_ITEM *item) {

item->next = NULL;

pthread_mutex_lock(&cq->lock);

if (NULL == cq->tail)

cq->head = item;

else

cq->tail->next = item;

cq->tail = item;

pthread_mutex_unlock(&cq->lock);

}

3.2 线程模型

3.2.1 主线程初始化逻辑

Memcached主线程的初始化逻辑比较简单,主要作用是启动监听的master线程和工作的worker线程。,其中启动worker线程通过memcached_thread_init函数进行实现,这部分逻辑分析在worker线程初始化当中进行分析,这里主要分析监听的master线程。整个master线程的启动过程就是socket的server端初始化结合libevent的初始化。整个过程如下:

1)server_sockets,该方法主要是遍历所有listen的socket列表并逐个进行绑定。

2)server_socket,该方法主要是操作单个socket到listen状态。

3)conn_new,将socket注册到libevent当中。

4)event_handler,监听socket的回调函数。

5)最后event_base_loop让整个libevent进行循环工作状态

int main (int argc, char **argv) {

//检查libevent的版本是否足够新.1.3即可

if (!sanitycheck()) {

return EX_OSERR;

}

//对memcached的关键设置取默认值

settings_init();

...//解析memcached启动参数

//main_base是一个struct event_base类型的全局变量

main_base = event_init();//为主线程创建一个event_base

conn_init();//先不管,后面会说到

//创建settings.num_threads个worker线程,并且为每个worker线程创建一个CQ队列

//并为这些worker申请各自的event_base,worker线程然后进入事件循环中

thread_init(settings.num_threads, main_base);

//设置一个定时event(也叫超时event),定时(频率为一秒)更新current_time变量

//这个超时event是add到全局变量main_base里面的,所以主线程负责更新current_time(这是一个很重要的全局变量)

clock_handler(0, 0, 0);

/* create the listening socket, bind it, and init */

if (settings.socketpath == NULL) {

FILE *portnumber_file = NULL;

//创建监听客户端的socket

if (settings.port && server_sockets(settings.port, tcp_transport,//tcp_transport是枚举类型

portnumber_file)) {

vperror("failed to listen on TCP port %d", settings.port);

exit(EX_OSERR);

}

...

}

if (event_base_loop(main_base, 0) != 0) {//主线程进入事件循环

retval = EXIT_FAILURE;

}

return retval;

}

解析参数并把遍历所有的监听socket进行绑定。执行方法server_socket(p, the_port, transport, portnumber_file)

static int server_sockets(int port, enum network_transport transport,

FILE *portnumber_file) {

if (settings.inter == NULL) {

return server_socket(settings.inter, port, transport, portnumber_file);

} else {

// tokenize them and bind to each one of them..

char *b;

int ret = 0;

char *list = strdup(settings.inter);

for (char *p = strtok_r(list, ";,", &b);

ret |= server_socket(p, the_port, transport, portnumber_file);

}

free(list);

return ret;

}

}

 针对单个listen的socket的初始化过程,这里主要做的事情是socket的相关初始化过程,主要是指设置socket相关的一些参数;进行socket的bind操作;通过方法conn_new关联socket和libevent当中

static int server_socket(const char *interface,

int port,

enum network_transport transport,

FILE *portnumber_file) {

int sfd;

struct linger ling = {0, 0};

struct addrinfo *ai;

struct addrinfo *next;

struct addrinfo hints = { .ai_flags = AI_PASSIVE,

.ai_family = AF_UNSPEC };

char port_buf[NI_MAXSERV];

int error;

int success = 0;

int flags =1;

for (next= ai; next; next= next->ai_next) {

conn *listen_conn_add;

if ((sfd = new_socket(next)) == -1) {

continue;

}

//todo 设置socket相关的属性,这里省略相关代码

// 绑定socket,省略相关代码

if (bind(sfd, next->ai_addr, next->ai_addrlen) == -1) {}

// 暂时只关心TCP协议的,忽略UDP协议实现

if (IS_UDP(transport)) {

} else {

if (!(listen_conn_add = conn_new(sfd, conn_listening,

EV_READ | EV_PERSIST, 1,

transport, main_base))) {

fprintf(stderr, "failed to create listening connection\n");

exit(EXIT_FAILURE);

}

listen_conn_add->next = listen_conn;

listen_conn = listen_conn_add;

}

}

freeaddrinfo(ai);

/* Return zero iff we detected no errors in starting up connections */

return success == 0;

}

conn_new内部就是执行libevent相关的配置,包括event_set和event_base_set,这里需要关注的是event_set当中绑定了回调函数event_handler,用于连接到来后执行的逻辑

conn *conn_new(const int sfd, enum conn_states init_state,

const int event_flags,

const int read_buffer_size, enum network_transport transport,

struct event_base *base, void *ssl) {

conn *c;

assert(sfd >= 0 && sfd < max_fds);

c = conns[sfd];

。。。。。。。。。。。

event_set(&c->event, sfd, event_flags, event_handler, (void *)c);

event_base_set(base, &c->event);

c->ev_flags = event_flags;

if (event_add(&c->event, 0) == -1) {

perror("event_add");

return NULL;

}

return c;

}

回调函数event_handler的核心在于drive_machine,这个函数是整个Memcached的状态转移中心,所有的操作都通过drive_machine进行驱动来实现的

void event_handler(const evutil_socket_t fd, const short which, void *arg) {

conn *c;

c = (conn *)arg;

assert(c != NULL);

c->which = which;

/* sanity */

if (fd != c->sfd) {

if (settings.verbose > 0)

fprintf(stderr, "Catastrophic: event fd doesn't match conn fd!\n");

conn_close(c);

return;

}

drive_machine(c);

/* wait for next event */

return;

}

3.2.2 work线程初始化

memcached_thread_init主要用于工作线程worker的初始化,核心的三个操作主要是:

1)初始化master线程和worker线程通信的pipe管道,pipe(fds)。

2)setup_thread,主要用于设置工作线程libevent相关的参数。

3)create_worker,主要是启动工作线程开始循环处理工作

void memcached_thread_init(int nthreads, void *arg) {

int         i;

int         power;

threads = calloc(nthreads, sizeof(LIBEVENT_THREAD));

for (i = 0; i < nthreads; i++) {

#ifdef HAVE_EVENTFD

threads[i].notify_event_fd = eventfd(0, EFD_NONBLOCK);

if (threads[i].notify_event_fd == -1) {

perror("failed creating eventfd for worker thread");

exit(1);

}

#else

int fds[2];

if (pipe(fds)) {

perror("Can't create notify pipe");

exit(1);

}

threads[i].notify_receive_fd = fds[0];

threads[i].notify_send_fd = fds[1];

#endif

#ifdef EXTSTORE

threads[i].storage = arg;

#endif

setup_thread(&threads[i]);

/* Reserve three fds for the libevent base, and two for the pipe */

stats_state.reserved_fds += 5;

}

/* Create threads after we've done all the libevent setup. */

for (i = 0; i < nthreads; i++) {

create_worker(worker_libevent, &threads[i]);

}

/* Wait for all the threads to set themselves up before returning. */

pthread_mutex_lock(&init_lock);

wait_for_thread_registration(nthreads);

pthread_mutex_unlock(&init_lock);

}

setup_thread内部主要是初始化工作线程worker的libevent相关参数,这里我们重点关注包括:

1)回调函数thread_libevent_process。

2)初始化master线程和worker线程通信的队cq_init(me->new_conn_queue)

static void setup_thread(LIBEVENT_THREAD *me) {me->base = event_init();event_set(&me->notify_event, me->notify_receive_fd,EV_READ | EV_PERSIST, thread_libevent_process, me);event_base_set(me->base, &me->notify_event);if (event_add(&me->notify_event, 0) == -1) {fprintf(stderr, "Can't monitor libevent notify pipe\n");exit(1);}me->new_conn_queue = malloc(sizeof(struct conn_queue));if (me->new_conn_queue == NULL) {perror("Failed to allocate memory for connection queue");exit(EXIT_FAILURE);}cq_init(me->new_conn_queue);
}

create_worker主要是启动工作线程worker使其开始工作就可以了。

create_worker(worker_libevent, &threads[i])传入函数是worker_libevent

通过pthread_create方法触发worker_libevent的工作,在worker_libevent方法内部通过event_base_loop最终使得libevent开始工作

static void create_worker(void *(*func)(void *), void *arg) {

pthread_attr_t  attr;

int             ret;

pthread_attr_init(&attr);

if ((ret = pthread_create(&((LIBEVENT_THREAD*)arg)->thread_id, &attr, func, arg)) != 0) {

fprintf(stderr, "Can't create thread: %s\n",

strerror(ret));

exit(1);

}

}

static void *worker_libevent(void *arg) {

LIBEVENT_THREAD *me = arg;

/* Any per-thread setup can happen here; memcached_thread_init() will block until

* all threads have finished initializing.

*/

me->l = logger_create();

me->lru_bump_buf = item_lru_bump_buf_create();

if (me->l == NULL || me->lru_bump_buf == NULL) {

abort();

}

if (settings.drop_privileges) {

drop_worker_privileges();

}

register_thread_initialized();

event_base_loop(me->base, 0);

// same mechanism used to watch for all threads exiting.

register_thread_initialized();

event_base_free(me->base);

return NULL;

}

3.2.3 主从线程通信

在master线程接受连接以后会触发drive_machine方法,其中master的状态为conn_listening,最终我们通过dispatch_conn_new方法实现master到worker的分发操作

static void drive_machine(conn *c) {

bool stop = false;

int sfd;

socklen_t addrlen;

struct sockaddr_storage addr;

int nreqs = settings.reqs_per_event;

int res;

const char *str;

#ifdef HAVE_ACCEPT4

static int  use_accept4 = 1;

#else

static int  use_accept4 = 0;

#endif

assert(c != NULL);

while (!stop) {

switch(c->state) {

case conn_listening:

addrlen = sizeof(addr);

sfd = accept(c->sfd, (struct sockaddr *)&addr, &addrlen);

// 中间省略一系列的socket相关的初始化工作

if (settings.maxconns_fast &&

} else {

dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,

DATA_BUFFER_SIZE, c->transport);

}

stop = true;

break;

void dispatch_conn_new(int sfd, enum conn_states init_state, int event_flags,

int read_buffer_size, enum network_transport transport, void *ssl) {

CQ_ITEM *item = NULL;

LIBEVENT_THREAD *thread;

if (!settings.num_napi_ids)

thread = select_thread_round_robin();

else

thread = select_thread_by_napi_id(sfd);

item = cqi_new(thread->ev_queue);

if (item == NULL) {

close(sfd);

/* given that malloc failed this may also fail, but let's try */

fprintf(stderr, "Failed to allocate memory for connection object\n");

return;

}

item->sfd = sfd;

item->init_state = init_state;

item->event_flags = event_flags;

item->read_buffer_size = read_buffer_size;

item->transport = transport;

item->mode = queue_new_conn;

item->ssl = ssl;

MEMCACHED_CONN_DISPATCH(sfd, (int64_t)thread->thread_id);

notify_worker(thread, item);

}

static void notify_worker(LIBEVENT_THREAD *t, CQ_ITEM *item) {

cq_push(t->ev_queue, item);

#ifdef HAVE_EVENTFD

uint64_t u = 1;

if (write(t->notify_event_fd, &u, sizeof(uint64_t)) != sizeof(uint64_t)) {

perror("failed writing to worker eventfd");

/* TODO: This is a fatal problem. Can it ever happen temporarily? */

}

#else

char buf[1] = "c";

if (write(t->notify_send_fd, buf, 1) != 1) {

perror("Failed writing to notify pipe");

/* TODO: This is a fatal problem. Can it ever happen temporarily? */

}

#endif

}

thread_libevent_process是worker线程接受master分发新来连接时候的回调函数,内部通过conn_new来处理新连接的到来,conn_new的内部操作就是把新连接的socket注册到worker线程的libevent当中。

static void thread_libevent_process(evutil_socket_t fd, short which, void *arg) {

LIBEVENT_THREAD *me = arg;

CQ_ITEM *item;

conn *c;

uint64_t ev_count = 0; // max number of events to loop through this run.

#ifdef HAVE_EVENTFD

if (read(fd, &ev_count, sizeof(uint64_t)) != sizeof(uint64_t)) {

if (settings.verbose > 0)

fprintf(stderr, "Can't read from libevent pipe\n");

return;

}

#else

char buf[MAX_PIPE_EVENTS];

ev_count = read(fd, buf, MAX_PIPE_EVENTS);

if (ev_count == 0) {

if (settings.verbose > 0)

fprintf(stderr, "Can't read from libevent pipe\n");

return;

}

#endif

for (int x = 0; x < ev_count; x++) {

item = cq_pop(me->ev_queue);

if (item == NULL) {

return;

}

switch (item->mode) {

case queue_new_conn:

c = conn_new(item->sfd, item->init_state, item->event_flags,

item->read_buffer_size, item->transport,

me->base, item->ssl);

3.3 命令解析

memcached 的命令协议从直观逻辑上可以分为获取类型、变更类型、其他类型。但从实际处理层面区分,则可以细分为 get 类型、update 类型、delete 类型、算术类型、touch 类型、stats 类型,以及其他类型。对应的处理函数为,process_get_command, process_update_command, process_arithmetic_command, process_touch_command 等。每个处理函数能够处理不同的协议

工作线程监听到主线程的管道通知后,会从连接队列弹出一个新连接,然后就会创建一个 conn 结构体,注册该 conn 读事件,然后继续监听该连接上的 IO 事件。后续这个连接有命令进来时,工作线程会读取 client 发来的命令,进行解析并处理,最后返回响应。工作线程的主要处理逻辑也是在状态机中,一个名叫 drive_machine 的函数。整个处理流程如下:

序号

描述

1

当客户端和Memcached建立TCP连接后,Memcached会基于Libevent的event事件来监听客户端是否有可以读取的数据

2

当客户端有命令数据报文上报的时候,就会触发drive_machine方法中的conn_read这个Case,在进入这个状态之前经过conn_new_cmd->conn_waiting->conn_read的流程

3

memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中

4

conn_parse_cmd主要的工作就是用来解析命令。主要通过try_read_command这个方法来读取c->rbuf中的命令数据,通过\n来分隔数据报文的命令。如果c->buf内存块中的数据匹配不到\n,则返回继续等待客户端的命令数据报文到来conn_waiting;否则就会转交给process_command方法,来处理具体的命令(命令解析会通过\0符号来分隔)

5

process_command主要用来处理具体的命令。其中tokenize_command这个方法非常重要,将命令拆解成多个元素(KEY的最大长度250)。例如我们以get命令为例,最终会跳转到process_get_command这个命令 process_*_command这一系列就是处理具体的命令逻辑的

6

我们进入process_get_command,当获取数据处理完毕之后,会转交到conn_mwrite这个状态。如果获取数据失败,则关闭连接

7

进入conn_mwrite后,主要是通过transmit方法来向客户端提交数据。如果写数据失败,则关闭连接或退出drive_machine循环;如果写入成功,则又转交到conn_new_cmd这个状态

8

conn_new_cmd这个状态主要是处理c->rbuf中剩余的命令。主要看一下reset_cmd_handler这个方法,这个方法会去判断c->rbytes中是否还有剩余的报文没处理,如果未处理,则转交到conn_parse_cmd(第四步)继续解析剩余命令;如果已经处理了,则转交到conn_waiting,等待新的事件到来。在转交之前,每次都会执行一次conn_shrink方法

9

conn_shrink方法主要用来处理命令报文容器c->rbuf和输出内容的容器是否数据满了?是否需要扩大buffer的大小,是否需要移动内存块。接受命令报文的初始化内存块大小2048,最大8192

 1)memcached启动后,主线程监听并准备接受新连接接入。当有新连接接入时,主线程进入 conn_listening 状态,accept 新连接,并将新连接调度给工作线程。

2)Worker 线程监听管道,当收到主线程通过管道发送的消息后,工作线程中的连接进入 conn_new_cmd 状态,创建 conn 结构体,并做一些初始化重置操作,然后进入 conn_waiting 状态,注册读事件,并等待网络 IO。

3)有数据到来时,连接进入 conn_read 状态,读取网络数据。

4)读取成功后,就进入 conn_parse_cmd 状态,然后根据 Mc 协议解析指令。

5)对于读取指令,获取到 value 结果后,进入 conn_mwrite 状态。

6)对于变更指令,则进入 conn_nread,进行 value 的读取,读取到 value 后,对 key 进行变更,当变更完毕后,进入 conn_write,然后将结果写入缓冲。然后和读取指令一样,也进入 conn_mwrite 状态。

7)进入到 conn_mwrite 状态后,将结果响应发送给 client。发送响应完毕后,再次进入到 conn_new_cmd 状态,进行连接重置,准备下一次命令处理循环。

8)在读取、解析、处理、响应过程,遇到任何异常就进入 conn_closing,关闭连接

3.3.1 状态机解析

状态机

说明

conn_new_cmd

主线程通过调用 dispatch_conn_new,把新连接调度给工作线程后,worker 线程创建 conn 对象,这个连接初始状态就是 conn_new_cmd。除了通过新建连接进入 conn_new_cmd 状态之外,如果连接命令处理完毕,准备接受新指令时,也会将连接的状态设置为 conn_new_cmd 状态。

进入 conn_new_cmd 后,工作线程会调用 reset_cmd_handler 函数,重置 conn 的 cmd 和 substate 字段,并在必要时对连接 buf 进行收缩。因为连接在处理 client 来的命令时,对于写指令,需要分配较大的读 buf 来存待更新的 key value,而对于读指令,则需要分配较大的写 buf 来缓冲待发送给 client 的 value 结果。持续运行中,随着大 size value 的相关操作,这些缓冲会占用很多内存,所以需要设置一个阀值,超过阀值后就进行缓冲内存收缩,避免连接占用太多内存。在后端服务以及中间件开发中,这个操作很重要,因为线上服务的连接很容易达到万级别,如果一个连接占用几十 KB 以上的内存,后端系统仅连接就会占用数百 MB 甚至数 GB 以上的内存空间。

工作线程处理完 conn_new_cmd 状态的主要逻辑后,如果读缓冲区有数据可以读取,则进入 conn_parse_cmd 状态,否则就会进入到 conn_waiting 状态,等待网络数据进来。

conn_waiting

连接进入 conn_waiting 状态后,处理逻辑很简单,直接通过 update_event 函数注册读事件即可,之后会将连接状态更新为 conn_read。

conn_read

当工作线程监听到网络数据进来,连接就进入 conn_read 状态。对 conn_read 的处理,是通过 try_read_network 从 socket 中读取网络数据。如果读取失败,则进入 conn_closing 状态,关闭连接。如果没有读取到任何数据,则会返回 conn_waiting,继续等待 client 端的数据到来。如果读取数据成功,则会将读取的数据存入 conn 的 rbuf 缓冲,并进入 conn_parse_cmd 状态,准备解析 cmd

conn_parse_cmd

conn_parse_cmd 状态的处理逻辑就是解析命令。工作线程首先通过 try_read_command 读取连接的读缓冲,并通过 \n 来分隔数据报文的命令。如果命令首行长度大于 1024,关闭连接,这就意味着 key 长度加上其他各项命令字段的总长度要小于 1024 字节。当然对于 key,Mc 有个默认的最大长度,key_max_length,默认设置为 250 字节。校验完毕首行报文的长度,接下来会在 process_command 函数中对首行指令进行处理。

备注:

conn_parse_cmd 的状态处理,只有读取到 \n,有了完整的命令首行协议,才会进入 process_command,否则会跳转到 conn_waiting,继续等待客户端的命令数据报文。在 process_command 处理中,如果是获取类命令,在获取到 key 对应的 value 后,则跳转到 conn_mwrite,准备写响应给连接缓冲。而对于 update 变更类型的指令,则需要继续读取 value 数据,此时连接会跳转到 conn_nread 状态。在 conn_parse_cmd 处理过程中,如果遇到任何失败,都会跳转到 conn_closing 关闭连接

conn_write

连接 conn_write 状态处理逻辑很简单,直接进入 conn_mwrite 状态。或者当 conn 的 iovused 为 0 或对于 udp 协议,将响应写入 conn 消息缓冲后,再进入 conn_mwrite 状态。

conn_mwrite

进入 conn_mwrite 状态后,工作线程将通过 transmit 来向客户端写数据。如果写数据失败,跳转到 conn_closing,关闭连接退出状态机。如果写数据成功,则跳转到 conn_new_cmd,准备下一次新指令的获取

conn_closing

最后一个 conn_closing 状态,前面提到过很多次,在任何状态的处理过程中,如果出现异常,就会进入到这个状态,关闭连接。

3.3.2 状态机源码分析

3.3.2.1 conn_new_cmd

conn_new_cmd内部通过reset_cmd_handler将状态设置为conn_parse_cmd,重新进入命令解析过程。重新进行一个大循环

case conn_new_cmd:

/* Only process nreqs at a time to avoid starving other

connections */

--nreqs;

if (nreqs >= 0) {

reset_cmd_handler(c);

} else if (c->resp_head) {

// flush response pipe on yield.

conn_set_state(c, conn_mwrite);

} else {

pthread_mutex_lock(&c->thread->stats.mutex);

c->thread->stats.conn_yields++;

pthread_mutex_unlock(&c->thread->stats.mutex);

if (c->rbytes > 0) {

/* We have already read in data into the input buffer,

so libevent will most likely not signal read events

on the socket (unless more data is available. As a

hack we should just put in a request to write data,

because that should be possible ;-)

*/

if (!update_event(c, EV_WRITE | EV_PERSIST)) {

if (settings.verbose > 0)

fprintf(stderr, "Couldn't update event\n");

conn_set_state(c, conn_closing);

break;

}

}

stop = true;

}

static void reset_cmd_handler(conn *c) {

c->cmd = -1;

c->substate = bin_no_state;

if (c->item != NULL) {

// TODO: Any other way to get here?

// SASL auth was mistakenly using it. Nothing else should?

if (c->item_malloced) {

free(c->item);

c->item_malloced = false;

} else {

item_remove(c->item);

}

c->item = NULL;

}

if (c->rbytes > 0) {

conn_set_state(c, conn_parse_cmd);

} else if (c->resp_head) {

conn_set_state(c, conn_mwrite);

} else {

conn_set_state(c, conn_waiting);

}

}

客户端输入的第一条命令时,epoll触发了两次函数(epoll为水平触发,没有read的话会有第二次触发)

3.3.2.2 conn_waiting

case conn_waiting:

rbuf_release(c);

if (!update_event(c, EV_READ | EV_PERSIST)) {

if (settings.verbose > 0)

fprintf(stderr, "Couldn't update event\n");

conn_set_state(c, conn_closing);

break;

}

conn_set_state(c, conn_read);

stop = true;

break;

3.3.2.3 conn_read

memcached通过try_read_network方法读取客户端的报文。如果读取失败,则返回conn_closing,去关闭客户端的连接;如果没有读取到任何数据,则会返回conn_waiting,继续等待客户端的事件到来,并且退出drive_machine的循环;如果数据读取成功,则会将状态转交给conn_parse_cmd处理,读取到的数据会存储在c->rbuf容器中

case conn_read:

if (!IS_UDP(c->transport)) {

// Assign a read buffer if necessary.

if (!rbuf_alloc(c)) {

// TODO: Some way to allow for temporary failures.

conn_set_state(c, conn_closing);

break;

}

res = try_read_network(c);

} else {

// UDP connections always have a static buffer.

res = try_read_udp(c);

}

switch (res) {

case READ_NO_DATA_RECEIVED:

conn_set_state(c, conn_waiting);

break;

case READ_DATA_RECEIVED:

conn_set_state(c, conn_parse_cmd);

break;

case READ_ERROR:

conn_set_state(c, conn_closing);

break;

case READ_MEMORY_ERROR: /* Failed to allocate more memory */

/* State already set by try_read_network */

break;

}

break;

3.3.2.4 conn_parse_cmd

这个方法主要是用来读取rbuf中的命令的。因为数据报文会有粘包和拆包的特性,所以只有等到命令行完整了才能进行解析。所有只有匹配到了\n符号,才能匹配一个完整的命令。在整个解析过程中,每次解析到\n符号就说明一个完整的命令了,然后就进入处理这个命令的过程,进行处理后返回客户端后再次解析。

case conn_parse_cmd:

c->noreply = false;

if (c->try_read_command(c) == 0) {

/* we need more data! */

if (c->resp_head) {

// Buffered responses waiting, flush in the meantime.

conn_set_state(c, conn_mwrite);

} else {

conn_set_state(c, conn_waiting);

}

}

break;

int try_read_command_ascii(conn *c) {

char *el, *cont;

if (c->rbytes == 0)

return 0;

el = memchr(c->rcurr, '\n', c->rbytes);

if (!el) {

if (c->rbytes > 2048) {

char *ptr = c->rcurr;

while (*ptr == ' ') { /* ignore leading whitespaces */

++ptr;

}

if (ptr - c->rcurr > 100 ||

(strncmp(ptr, "get ", 4) && strncmp(ptr, "gets ", 5))) {

conn_set_state(c, conn_closing);

return 1;

}

if (!c->rbuf_malloced) {

if (!rbuf_switch_to_malloc(c)) {

conn_set_state(c, conn_closing);

return 1;

}

}

}

return 0;

}

cont = el + 1;

if ((el - c->rcurr) > 1 && *(el - 1) == '\r') {

el--;

}

*el = '\0';

assert(cont <= (c->rcurr + c->rbytes));

c->last_cmd_time = current_time;

process_command_ascii(c, c->rcurr);

c->rbytes -= (cont - c->rcurr);

c->rcurr = cont;

assert(c->rcurr <= (c->rbuf + c->rsize));

return 1;

}

tokenize_command需要分析的下一个细节就是关于最后一个元素的问题,如果解析的命令个数没有达到max_tokens,最后一个元素内容为空,如果达到了max_tokens,最后一个元素时剩余的未解析字符串

static size_t tokenize_command(char *command, token_t *tokens, const size_t max_tokens) {

char *s, *e;

size_t ntokens = 0;

assert(command != NULL && tokens != NULL && max_tokens > 1);

size_t len = strlen(command);

unsigned int i = 0;

s = e = command;

for (i = 0; i < len; i++) {

if (*e == ' ') {

if (s != e) {

tokens[ntokens].value = s;

tokens[ntokens].length = e - s;

ntokens++;

*e = '\0';

if (ntokens == max_tokens - 1) {

e++;

s = e; /* so we don't add an extra token */

break;

}

}

s = e + 1;

}

e++;

}

if (s != e) {

tokens[ntokens].value = s;

tokens[ntokens].length = e - s;

ntokens++;

}

/*

* If we scanned the whole string, the terminal value pointer is null,

* otherwise it is the first unprocessed character.

*/

tokens[ntokens].value =  *e == '\0' ? NULL : e;

tokens[ntokens].length = 0;

ntokens++;

return ntokens;

}

void process_command_ascii(conn *c, char *command) {

token_t tokens[MAX_TOKENS];

size_t ntokens;

int comm;

assert(c != NULL);

MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->rcurr, c->rbytes);

if (settings.verbose > 1)

fprintf(stderr, "<%d %s\n", c->sfd, command);

// Prep the response object for this query.

if (!resp_start(c)) {

conn_set_state(c, conn_closing);

return;

}

ntokens = tokenize_command(command, tokens, MAX_TOKENS);

// All commands need a minimum of two tokens: cmd and NULL finalizer

// There are also no valid commands shorter than two bytes.

if (ntokens < 2 || tokens[COMMAND_TOKEN].length < 2) {

out_string(c, "ERROR");

return;

}

// Meta commands are all 2-char in length.

char first = tokens[COMMAND_TOKEN].value[0];

if (first == 'm' && tokens[COMMAND_TOKEN].length == 2) {

switch (tokens[COMMAND_TOKEN].value[1]) {

…………………………………………..

} else if (first == 'g') {

// Various get commands are very common.

WANT_TOKENS_MIN(ntokens, 3);

if (strcmp(tokens[COMMAND_TOKEN].value, "get") == 0) {

process_get_command(c, tokens, ntokens, false, false);

} else if (strcmp(tokens[COMMAND_TOKEN].value, "gets") == 0) {

process_get_command(c, tokens, ntokens, true, false);

} else if (strcmp(tokens[COMMAND_TOKEN].value, "gat") == 0) {

process_get_command(c, tokens, ntokens, false, true);

} else if (strcmp(tokens[COMMAND_TOKEN].value, "gats") == 0) {

process_get_command(c, tokens, ntokens, true, true);

} else {

out_string(c, "ERROR");

}

} else if (first == 's') {

if (strcmp(tokens[COMMAND_TOKEN].value, "set") == 0 && (comm = NREAD_SET)) {

WANT_TOKENS_OR(ntokens, 6, 7);

process_update_command(c, tokens, ntokens, comm, false);

} else if (strcmp(tokens[COMMAND_TOKEN].value, "stats") == 0) {

process_stat(c, tokens, ntokens);

} else if (strcmp(tokens[COMMAND_TOKEN].value, "shutdown") == 0) {

process_shutdown_command(c, tokens, ntokens);

} else if (strcmp(tokens[COMMAND_TOKEN].value, "slabs") == 0) {

process_slabs_command(c, tokens, ntokens);

} else {

out_string(c, "ERROR");

}

………………………………………..

process_command_ascii 根据tokens里存储的命令字段进行不同的操作,按照set 操作进行分析。

process_update_command函数申请分配一个item后,并没有直接把这个item插入到LRU队列和哈希表中,而不过用conn结构体的item成员指向这个申请得到的item,而且用ritem成员指向item结构体的数据域(这为了方便写入数据)。最后conn的状态改动为conn_nread

static void process_update_command(conn *c, token_t *tokens, const size_t ntokens, int comm, bool handle_cas) {

…………………………………………………….

item *it;

assert(c != NULL);

set_noreply_maybe(c, tokens, ntokens);

if (tokens[KEY_TOKEN].length > KEY_MAX_LENGTH) {

out_string(c, "CLIENT_ERROR bad command line format");

return;

}

key = tokens[KEY_TOKEN].value;

nkey = tokens[KEY_TOKEN].length;

if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)

&& safe_strtol(tokens[3].value, &exptime_int)

&& safe_strtol(tokens[4].value, (int32_t *)&vlen))) {

out_string(c, "CLIENT_ERROR bad command line format");

return;

}

exptime = realtime(EXPTIME_TO_POSITIVE_TIME(exptime_int));

// does cas value exist?

if (handle_cas) {

if (!safe_strtoull(tokens[5].value, &req_cas_id)) {

out_string(c, "CLIENT_ERROR bad command line format");

return;

}

}

if (vlen < 0 || vlen > (INT_MAX - 2)) {

out_string(c, "CLIENT_ERROR bad command line format");

return;

}

vlen += 2;

it = item_alloc(key, nkey, flags, exptime, vlen);

…………………………………………………

ITEM_set_cas(it, req_cas_id);

c->item = it;

c->ritem = ITEM_data(it);

c->rlbytes = it->nbytes;

c->cmd = comm;

conn_set_state(c, conn_nread);

}

3.3.2.5 conn_nread

conn_parse_cmd状态机中的process_update_command命令处理过程是没有把item的数据写入到item结构体中。只是把状态机迁移到了conn_nread就退出到drive_machine函数中。

尽管process_update_command留下了尾巴,但它也用conn的成员变量记录了一些重要值,用于填充item的数据域。rlbytes表示须要用多少字节填充item(需要填充的数据的长度),rbytes表示读缓冲区还有多少字节能够使用,ritem指向数据填充地点。

case conn_nread:

if (c->rlbytes == 0) {

complete_nread(c);

break;

}

/* Check if rbytes < 0, to prevent crash */

if (c->rlbytes < 0) {

if (settings.verbose) {

fprintf(stderr, "Invalid rlbytes to read: len %d\n", c->rlbytes);

}

conn_set_state(c, conn_closing);

break;

}

if (c->item_malloced || ((((item *)c->item)->it_flags & ITEM_CHUNKED) == 0) ) {

/* first check if we have leftovers in the conn_read buffer */

if (c->rbytes > 0) {

int tocopy = c->rbytes > c->rlbytes ? c->rlbytes : c->rbytes;

memmove(c->ritem, c->rcurr, tocopy);

c->ritem += tocopy;

c->rlbytes -= tocopy;

c->rcurr += tocopy;

c->rbytes -= tocopy;

if (c->rlbytes == 0) {

break;

}

}

/*  now try reading from the socket */

res = c->read(c, c->ritem, c->rlbytes);

if (res > 0) {

pthread_mutex_lock(&c->thread->stats.mutex);

c->thread->stats.bytes_read += res;

pthread_mutex_unlock(&c->thread->stats.mutex);

if (c->rcurr == c->ritem) {

c->rcurr += res;

}

c->ritem += res;

c->rlbytes -= res;

break;

}

} else {

res = read_into_chunked_item(c);

if (res > 0)

break;

}

if (res == 0) { /* end of stream */

c->close_reason = NORMAL_CLOSE;

conn_set_state(c, conn_closing);

break;

}

if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {

if (!update_event(c, EV_READ | EV_PERSIST)) {

if (settings.verbose > 0)

fprintf(stderr, "Couldn't update event\n");

conn_set_state(c, conn_closing);

break;

}

stop = true;

break;

}

/* Memory allocation failure */

if (res == -2) {

out_of_memory(c, "SERVER_ERROR Out of memory during read");

c->sbytes = c->rlbytes;

conn_set_state(c, conn_swallow);

// Ensure this flag gets cleared. It gets killed on conn_new()

// so any conn_closing is fine, calling complete_nread is

// fine. This swallow semms to be the only other case.

c->set_stale = false;

c->mset_res = false;

break;

}

/* otherwise we have a real error, on which we close the connection */

if (settings.verbose > 0) {

fprintf(stderr, "Failed to read, and not due to blocking:\n"

"errno: %d %s \n"

"rcurr=%p ritem=%p rbuf=%p rlbytes=%d rsize=%d\n",

errno, strerror(errno),

(void *)c->rcurr, (void *)c->ritem, (void *)c->rbuf,

(int)c->rlbytes, (int)c->rsize);

}

conn_set_state(c, conn_closing);

break;

当rlbytes值减少到0后,代表需要的数据值全部读取出来了,会进行complete_nread处理,

void complete_nread_ascii(conn *c) {

assert(c != NULL);

item *it = c->item;

int comm = c->cmd;

enum store_item_type ret;

bool is_valid = false;

pthread_mutex_lock(&c->thread->stats.mutex);

c->thread->stats.slab_stats[ITEM_clsid(it)].set_cmds++;

pthread_mutex_unlock(&c->thread->stats.mutex);

if ((it->it_flags & ITEM_CHUNKED) == 0) {

if (strncmp(ITEM_data(it) + it->nbytes - 2, "\r\n", 2) == 0) {

is_valid = true;

}

}

if (!is_valid) {

// metaset mode always returns errors.

if (c->mset_res) {

c->noreply = false;

}

out_string(c, "CLIENT_ERROR bad data chunk");

} else {

ret = store_item(it, comm, c);

………………………………………….

if (c->mset_res) {

_finalize_mset(c, ret);

} else {

switch (ret) {

case STORED:

out_string(c, "STORED");

break;

case EXISTS:

out_string(c, "EXISTS");

break;

case NOT_FOUND:

out_string(c, "NOT_FOUND");

break;

case NOT_STORED:

out_string(c, "NOT_STORED");

break;

default:

out_string(c, "SERVER_ERROR Unhandled storage type.");

}

}

}

c->set_stale = false; /* force flag to be off just in case */

c->mset_res = false;

item_remove(c->item);       /* release the c->item reference */

c->item = 0;

}

根据处理结果给client返回不同信息,并把状态机迁移到conn_new_cmd

void out_string(conn *c, const char *str) {

size_t len;

assert(c != NULL);

mc_resp *resp = c->resp;

// if response was original filled with something, but we're now writing

// out an error or similar, have to reset the object first.

// TODO: since this is often redundant with allocation, how many callers

// are actually requiring it be reset? Can we fast test by just looking at

// tosend and reset if nonzero?

resp_reset(resp);

if (c->noreply) {

// TODO: just invalidate the response since nothing's been attempted

// to send yet?

resp->skip = true;

if (settings.verbose > 1)

fprintf(stderr, ">%d NOREPLY %s\n", c->sfd, str);

conn_set_state(c, conn_new_cmd);

return;

}

if (settings.verbose > 1)

fprintf(stderr, ">%d %s\n", c->sfd, str);

// Fill response object with static string.

len = strlen(str);

if ((len + 2) > WRITE_BUFFER_SIZE) {

/* ought to be always enough. just fail for simplicity */

str = "SERVER_ERROR output line too long";

len = strlen(str);

}

memcpy(resp->wbuf, str, len);

memcpy(resp->wbuf + len, "\r\n", 2);

resp_add_iov(resp, resp->wbuf, len + 2);

conn_set_state(c, conn_new_cmd);

return;

}

在conn_new_cmd状态下,判断resp_head不为空,迁移状态到conn_mwrite

3.3.2.6 conn_mwrite

case conn_write:

case conn_mwrite:

for (io_queue_t *q = c->io_queues; q->type != IO_QUEUE_NONE; q++) {

if (q->stack_ctx != NULL) {

io_queue_cb_t *qcb = thread_io_queue_get(c->thread, q->type);

qcb->submit_cb(q);

c->io_queues_submitted++;

}

}

if (c->io_queues_submitted != 0) {

conn_set_state(c, conn_io_queue);

event_del(&c->event);

stop = true;

break;

}

switch (!IS_UDP(c->transport) ? transmit(c) : transmit_udp(c)) {

case TRANSMIT_COMPLETE:

if (c->state == conn_mwrite) {

// Free up IO wraps and any half-uploaded items.

conn_release_items(c);

conn_set_state(c, conn_new_cmd);

if (c->close_after_write) {

conn_set_state(c, conn_closing);

}

} else {

if (settings.verbose > 0)

fprintf(stderr, "Unexpected state %d\n", c->state);

conn_set_state(c, conn_closing);

}

break;

发送完出返回TRANSMIT_COMPLETE,迁移状态机到conn_new_cmd。

4 memcached数据存储

序号

描述

1

Memcached在启动的时候,会默认初始化一个HashTable,这个table的默认长度为65536

2

我们将这个HashTable中的每一个元素称为桶,每个桶就是一个item结构的单向链表

3

Memcached会将key值hash成一个变量名称为hv的uint32_t类型的值

4

通过hv与桶的个数之间的按位与计算,hv & hashmask(hashpower),就可以得到当前的key会落在哪个桶上面

5

然后会将item挂到这个桶的链表上面。链表主要是通过item结构中的h_next实现

4.1 Memcached存储结构分析

assoc_init负责初始化hashtable数据结构,通过初始化hashsize(hashpower)大小的数组指针,默认应该是2*16次方大小的数组。

void assoc_init(const int hashtable_init) {

if (hashtable_init) {

hashpower = hashtable_init;

}

primary_hashtable = calloc(hashsize(hashpower), sizeof(void *));

if (! primary_hashtable) {

fprintf(stderr, "Failed to init hashtable.\n");

exit(EXIT_FAILURE);

}

STATS_LOCK();

stats_state.hash_power_level = hashpower;

stats_state.hash_bytes = hashsize(hashpower) * sizeof(void *);

STATS_UNLOCK();

}

Memcached存储数据结构item定义,item的结构分两部分, 第一部分定义 item 结构的属性,第二部分是 item 的数据

typedef struct _stritem {

/* Protected by LRU locks */

struct _stritem *next;

struct _stritem *prev;

/* Rest are protected by an item lock */

struct _stritem *h_next;    /* hash chain next */

rel_time_t      time;       /* least recent access */

rel_time_t      exptime;    /* expire time */

int             nbytes;     /* size of data */

unsigned short  refcount;

uint16_t        it_flags;   /* ITEM_* above */

uint8_t         slabs_clsid;/* which slab class we're in */

uint8_t         nkey;       /* key length, w/terminating null and padding */

/* this odd type prevents type-punning issues when we do

* the little shuffle to save space when not using CAS. */

union {

uint64_t cas;

char end;

} data[];

/* if it_flags & ITEM_CAS we have 8 bytes CAS */

/* then null-terminated key */

/* then " flags length\r\n" (no terminating null) */

/* then data with terminating \r\n (no terminating null; it's binary!) */

} item;

4.2 数据查找过程

1)首先通过key的hash值hv找到对应的桶,区分是否在扩容。 primary_hashtable[hv & hashmask(hashpower)];

2)然后遍历桶的单链表,比较key值并找到对应item。

item *assoc_find(const char *key, const size_t nkey, const uint32_t hv) {

item *it;

uint64_t oldbucket;

if (expanding &&

(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)

{

it = old_hashtable[oldbucket];

} else {

it = primary_hashtable[hv & hashmask(hashpower)];

}

item *ret = NULL;

int depth = 0;

while (it) {

if ((nkey == it->nkey) && (memcmp(key, ITEM_key(it), nkey) == 0)) {

ret = it;

break;

}

it = it->h_next;

++depth;

}

MEMCACHED_ASSOC_FIND(key, nkey, depth);

return ret;

}

4.3 数据插入过程

1)首先通过key的hash值hv找到对应的桶。

2)然后将item放到对应桶的单链表的头部

int assoc_insert(item *it, const uint32_t hv) {

uint64_t oldbucket;

//    assert(assoc_find(ITEM_key(it), it->nkey) == 0);  /* shouldn't have duplicately named things defined */

if (expanding &&

(oldbucket = (hv & hashmask(hashpower - 1))) >= expand_bucket)

{

it->h_next = old_hashtable[oldbucket];

old_hashtable[oldbucket] = it;

} else {

it->h_next = primary_hashtable[hv & hashmask(hashpower)];

primary_hashtable[hv & hashmask(hashpower)] = it;

}

MEMCACHED_ASSOC_INSERT(ITEM_key(it), it->nkey);

return 1;

}

4.4数据删除过程

1)首先通过key的hash值hv找到对应的桶。

2)找到桶对应的链表,遍历单链表,删除对应的Item。

void assoc_delete(const char *key, const size_t nkey, const uint32_t hv) {

item **before = _hashitem_before(key, nkey, hv);

if (*before) {

item *nxt;

/* The DTrace probe cannot be triggered as the last instruction

* due to possible tail-optimization by the compiler

*/

MEMCACHED_ASSOC_DELETE(key, nkey);

nxt = (*before)->h_next;

(*before)->h_next = 0;   /* probably pointless, but whatever. */

*before = nxt;

return;

}

/* Note:  we never actually get here.  the callers don't delete things

they can't find. */

assert(*before != 0);

}

4.5 数据扩容过程

1)数据扩容过程是由一个单独线程在检测是否需要扩容,扩容的前提条件是curr_items > (hashsize(hashpower) * 3) / 2,也就是说数据量是原来的1.5倍

2)检测需要扩容后通过信号通知pthread_cond_signal(&maintenance_cond)开始执行扩容

3)以2倍的扩容速度进行扩容,primary_hashtable = calloc(hashsize(hashpower + 1), sizeof(void *))

4)迁移过程是一个逐步迁移过程,每次都只迁移一个桶里面的Item数据

5 LRU内存回收

以往的LRU算法,基本做法都是这样的:

  1. 创建一个LRU链表,每次新加入的元素都放在链表头。
  2. 如果元素被访问了一次,同样从当前链表中摘除放到链表头。

3)需要淘汰元素时,从链表尾开始找可以淘汰的元素出来淘汰。

这个算法有如下几个问题:

1)元素被访问一次就会被放到LRU链表的头部,这样即便这个元素可以被淘汰,也会需要很久才会淘汰掉这个元素。

2)由于上面的原因,从链表尾部开始找可以淘汰的元素时,实际可能访问到的是一些虽然不常被访问,但是还没到淘汰时间(即有效时间TTL还未过期)的数据,这样会一直沿着链表往前找很久才能找到适合淘汰的元素。由于这个查找被淘汰元素的过程是需要加锁保护的,加锁时间一长影响了系统的并发。

5.1 改进的分段LRU算法(Segmented LRU)

分段LRU算法中将LRU链表根据活跃度分成了4类:

  1. HOT_LRU:存储热数据的LRU链表。
  2. WARM_LRU:存储温数据(即活跃度不如热数据)的LRU链表。

3)COLD_LRU:存储冷数据的LRU链表。

4)TEMP_LRU:存储临时数据(默认不开启)

需要说明的是:热(参数settings.hot_lru_pct)和暖(参数settings.warm_lru_pct)数据的占总体内存的比例有限制,而冷数据则无限。

#define HOT_LRU 0
#define WARM_LRU 64
#define COLD_LRU 128
#define TEMP_LRU 192

同时,使用了headstails两个数组用来保存LRU链表:

#define POWER_LARGEST  256 /* actual cap is 255 */#define LARGEST_ID POWER_LARGESTstatic item *heads[LARGEST_ID];
static item *tails[LARGEST_ID];

上面分析slabclass的时候提到过,首先会根据被分配内存大小计算出来一个slabclass数组的索引。在需要从LRU链表中淘汰数据时,由于LRU链表分为了上面三类,那么就还需要再进行一次slabid | lru id计算(其实就是slabid + lru id),到对应的LRU链表中查找元素:

由于从链表尾部往前查找可以淘汰的元素,中间可能会经历很多不能被淘汰的元素,影响了淘汰的速度,因此前面的分级LRU链表就能帮助程序快速识别出哪些元素可以被淘汰。三个分级的LRU链表之间的转换规则如下:

HOT_LRU

在HOT LRU队列中的数据绝不会到HOT_LRU队列的前面,只会往更冷的队列中放。规则是:如果元素变得活跃,就放到WARM队列中;否则如果不活跃,就直接放到COLD队列中

WARM_LRU

如果WARM队列的元素变的活跃,就会移动到WARM队列头;否则往COLD队列移动

COLD_LRU

COLD队列中的元素,都是不太活跃的了,所以当需要淘汰元素时都会首先到COLD LRU队列中找可以淘汰的数据。当一个在COLD队列的元素重新变成活跃元素时,并不会移动到COLD队列的头部,而是直接移动回去WARM队列

PS:任何操作都不能将一个元素从WARM和COLD队列中移动回去HOT队列了,也就是从HOT队列中移动元素出去的操作是单向操作

原有LRU算法最大的问题是:只要元素被访问过一次,就马上会被移动到LRU链表的前面,影响了后面对这个元素的淘汰。

改进的算法中,加入了一个机制:只有当元素被访问两次以后,才会标记成活跃元素。

代码中引入了两个标志位,其置位的规则如下:

1)ITEM_FETCHED:第一次被访问时置位该标志位。

2)ITEM_ACTIVE:第二次被访问时(即it->it_flags & ITEM_FETCHED为true的情况下)置位该标志位。

3)INACTIVE:不活跃状态。

4)ITEM_ACTIVE标志位清除的规则,从链表尾遍历到某一个LRU链表时,该元素是链表的最后一个元素,则认为是不活跃的元素,即可以清除ITEM_ACTIVE标志位;

这样,有效避免了只访问一次就变成活跃元素的问题,所以元素变成活跃就意指“至少被访问两次以”。

5.2 memcached 内存回收

惰性删除

memcached一般不会主动去清除已经过期或者失效的缓存,当get请求一个item才会去检查item是否失效

flush命令

flush命令会将所有的item设置为失效

创建的时候检查

Memcached会在创建ITEM的时候去LRU的链表尾部开始检查,是否有失效的ITEM,如果没有的话就重新创建

LRU爬虫

memcached默认是关闭LRU爬虫的。LRU爬虫是一个单独的线程,会去清理失效的ITEM

LRU淘汰

当缓存没有内存可以分配给新的元素的时候,memcached会从LRU链表的尾部开始淘汰一个ITEM,不管这个ITEM是否还在有效期都将会面临淘汰。LRU链表插入缓存ITEM的时候有先后顺序,所以淘汰一个ITEM也是从尾部进行 也就是先淘汰最早的ITEM。

5.2.1惰性删除

惰性删除删除其实就是在get数据的时候进行比较判断数据是否过期,这里会跟flush_all命令过期结合起来使用,判断的时候依据了flush_all设置的过期时间settings.oldest_liv

item *do_item_get(const char *key, const size_t nkey, const uint32_t hv, conn *c, const bool do_update) {item *it = assoc_find(key, nkey, hv);if (it != NULL) {refcount_incr(it);/* Optimization for slab reassignment. prevents popular items from* jamming in busy wait. Can only do this here to satisfy lock order* of item_lock, slabs_lock. *//* This was made unsafe by removal of the cache_lock:* slab_rebalance_signal and slab_rebal.* are modified in a separate* thread under slabs_lock. If slab_rebalance_signal = 1, slab_start =* NULL (0), but slab_end is still equal to some value, this would end* up unlinking every item fetched.* This is either an acceptable loss, or if slab_rebalance_signal is* true, slab_start/slab_end should be put behind the slabs_lock.* Which would cause a huge potential slowdown.* Could also use a specific lock for slab_rebal.* and* slab_rebalance_signal (shorter lock?)*//*if (slab_rebalance_signal &&((void *)it >= slab_rebal.slab_start && (void *)it < slab_rebal.slab_end)) {do_item_unlink(it, hv);do_item_remove(it);it = NULL;}*/}int was_found = 0;if (settings.verbose > 2) {int ii;if (it == NULL) {fprintf(stderr, "> NOT FOUND ");} else {fprintf(stderr, "> FOUND KEY ");}for (ii = 0; ii < nkey; ++ii) {fprintf(stderr, "%c", key[ii]);}}if (it != NULL) {was_found = 1;if (item_is_flushed(it)) {do_item_unlink(it, hv);STORAGE_delete(c->thread->storage, it);do_item_remove(it);it = NULL;pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.get_flushed++;pthread_mutex_unlock(&c->thread->stats.mutex);if (settings.verbose > 2) {fprintf(stderr, " -nuked by flush");}was_found = 2;} else if (it->exptime != 0 && it->exptime <= current_time) {do_item_unlink(it, hv);STORAGE_delete(c->thread->storage, it);do_item_remove(it);it = NULL;pthread_mutex_lock(&c->thread->stats.mutex);c->thread->stats.get_expired++;pthread_mutex_unlock(&c->thread->stats.mutex);if (settings.verbose > 2) {fprintf(stderr, " -nuked by expire");}was_found = 3;} else {if (do_update) {do_item_bump(c, it, hv);}DEBUG_REFCNT(it, '+');}}if (settings.verbose > 2)fprintf(stderr, "\n");/* For now this is in addition to the above verbose logging. */LOGGER_LOG(c->thread->l, LOG_FETCHERS, LOGGER_ITEM_GET, NULL, was_found, key,nkey, (it) ? it->nbytes : 0, (it) ? ITEM_clsid(it) : 0, c->sfd);return it;
}

5.2.2 flush_all命令删除

int item_is_flushed(item *it) {

rel_time_t oldest_live = settings.oldest_live;

uint64_t cas = ITEM_get_cas(it);

uint64_t oldest_cas = settings.oldest_cas;

if (oldest_live == 0 || oldest_live > current_time)

return 0;

if ((it->time <= oldest_live)

|| (oldest_cas != 0 && cas != 0 && cas < oldest_cas)) {

return 1;

}

return 0;

}

5.2.3 新建item会检查数据过期

序号

描述

1

do_item_alloc进入新增item的内存申请流程

2

do_item_alloc_pull进入item申请的逻辑处理,最多处理10次

3

do_item_alloc_pull内部逻辑是尝试通过slabs_alloc申请内存,失败则尝试通过lru_pull_tail方法释放LRU队列中的item变成可用item

4

lru_pull_tail执行释放LRU队列中item的过程,内部包括各种过期item的回收

5.2.4 LRU爬虫线程定时清理

后续分析

6 memcached应用中存在的问题

memcached 源码分析相关推荐

  1. Memcached源码分析 - 内存存储机制Slabs(5)

    Memcached源码分析 - 网络模型(1) Memcached源码分析 - 命令解析(2) Memcached源码分析 - 数据存储(3) Memcached源码分析 - 增删改查操作(4) Me ...

  2. Memcached 源码分析——从 main 函数说起

    (广告时间: 最近在写一个基于 Leveldb 存储引擎的数据服务器,C开发,使用 Libevent 处理网络事件,后台利用多线程并发处理客户端连接,理论上单机就应该支持数千-上万的客户端连接(未测试 ...

  3. Memcached源码分析

    http://blog.csdn.net/lcli2009/article/details/22167319 转载于:https://www.cnblogs.com/zengkefu/p/483884 ...

  4. 实际测试例子+源码分析的方式解剖MyBatis缓存的概念

    前言: 前方高能! 本文内容有点多,通过实际测试例子+源码分析的方式解剖MyBatis缓存的概念,对这方面有兴趣的小伙伴请继续看下去~ 欢迎工作一到五年的Java工程师朋友们加入Java架构开发:79 ...

  5. MyBatis 源码分析 - 缓存原理

    1.简介 在 Web 应用中,缓存是必不可少的组件.通常我们都会用 Redis 或 memcached 等缓存中间件,拦截大量奔向数据库的请求,减轻数据库压力.作为一个重要的组件,MyBatis 自然 ...

  6. 【深入浅出MyBatis系列十一】缓存源码分析

    为什么80%的码农都做不了架构师?>>>    #0 系列目录# 深入浅出MyBatis系列 [深入浅出MyBatis系列一]MyBatis入门 [深入浅出MyBatis系列二]配置 ...

  7. openstack-ceilometer第二式:源码分析-polling

    openstack-ceilometer第二式:源码分析-polling 以社区 N 版代码为例 一.启动命令 exec ceilometer-polling --polling-namespaces ...

  8. HashMap实现原理及源码分析为何选用红黑树

    目录 一.什么是哈希表 二.HashMap实现原理 三.为何HashMap的数组长度一定是2的次幂? 四.重写equals方法需同时重写hashCode方法 五.总结 为什么HashMap使用红黑树而 ...

  9. Redis分布式锁解析源码分析

    Redis分布式锁解析&源码分析 概述 实战 简单的分布式锁 Redisson实现分布式锁 Redission源码分析 构造方法 获取锁lock 解锁 锁失效 红锁 案例分析 原始的写法 进化 ...

最新文章

  1. Timestamp、String、Date之间的转换
  2. 开发日记-20190404
  3. mybatis-generator自动生成mapper
  4. 【深度学习】修改每张人像---ImageNet 的衰落
  5. 计算商品价格找零(Python)
  6. *【Hihocoder - offer编程练习赛94 - A】最短管道距离(中位数)
  7. c++将.cpp编译为.so文件
  8. Java多线程的使用
  9. Linux下SVN搭建与配置
  10. 12. 星际争霸之php设计模式--模板模式
  11. win10亮度_安利一款PC端调节多显示器亮度的软件
  12. android 截屏源码分析,android 截图功能源码解析
  13. jsDoc的使用文档
  14. OpenCV中(rows,cols)与图像(x,y)
  15. 千瓜小红书直播达人、笔记排行榜
  16. python 小数乘法_TMS320C55xDSP应用系统设计
  17. 二十.组织级项目管理与大项目管理
  18. excel怎么拆分成多个独立表格文件
  19. matlab 向量变标量,MATLAB变量——标量,向量,矩阵
  20. Android根据图片名字获取图片ID

热门文章

  1. Lesson 5 The facts 确切数字
  2. numpy数组中元素单个选取或部分选取
  3. Torch知识点总结【持续更新中......】
  4. [Go实战]怎么写测试类,运用testing.T
  5. 千锋android培训学院!双非渣本Android四年磨一剑,真香!
  6. Springboot中使用阿里云短信验证码服务
  7. iOS:内存优化思路
  8. 多元线性回归分析spss结果解读_SPSS经典线性回归分析之一——线性回归分析
  9. 调试winddows程序(windbg 和 Debug Diagnostic Tool)
  10. 3、 如何搭建高德离线地图服务