目录

1、项目介绍

2、高并发内存池整体框架设计

3、thread cache

<1>thread cache 哈希桶对齐规则

<2>Thread Cache类设计

4、Central Cache

<1>Central Cache类设计

5、page cache

<1>Page Cache类设计

6、性能分析

<1>定长内存池实现

<2>基数树

7、项目源码及项目总结


1、项目介绍

        当前项目是实现一个高并发的内存池,他的原型是google的一个开源项目tcmalloc,tcmalloc全称Thread-Caching Malloc,即线程缓存的malloc,实现了高效的多线程内存管理,用于替代系统的内存 分配相关的函数(malloc、free)。
        另一方面tcmalloc是全球大厂google开源的,可以认为当时顶尖的C++高手写出来的,他的知名度也是非常高的,不少公司都在用它,Go语言直接用它做了自己内存分配器。
        本文是把tcmalloc最核心的框架简化后拿出来,模拟实现出一个自己的高并发内存池,目的是学习tcamlloc的精华。
应用技术
数据结构(链表、哈希桶)、操作系统内存管理、单例模式、多线程、互斥锁等。
什么是内存池?

想必大家看到这几个字也应该自己能想出个大概,简单来说内存池是指程序预先从操作系统申请一块足够大内存,然后自己管理。此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放

内存池主要解决了效率问题,避免频繁找操作系统申请内存。 其次如果作为系统的内存分配器的角度,还需要解决一下内存碎片的问题。

那么什么是内存碎片呢?

内存碎片分为外碎片和内碎片,内碎片我们在下文项目中具体解释(这里我们简单概述一下),这里我们主要看比较容易理解的外碎片如图:

现有768Byte的空间,但是如果我们要申请超过512Byete的空间却申请不出来,因为这两块空间碎片化不连续了。

内碎片:内部碎片是由于一些对齐的需求,导致分配出去的空间中一些内存无法被利用。(具体见下文哈希桶对齐规则)

注:我们下面实现的内存池主要是是尝试解决的是外部碎片的问题,同时也尽可能的减少内部碎片的产生。

malloc

C/C++中我们要动态申请内存都是通过malloc去申请内存,但是我们要知道,实际我们不是直接去堆获取内存的, 而malloc就是一个内存池。malloc() 相当于向操作系统“批发”了一块较大的内存空间,然后“零售”给程 序用。当全部“售完”或程序有大量的内存需求时,再根据实际需求向操作系统“进货”。

2、高并发内存池整体框架设计

        现代很多的开发环境都是多核多线程,在申请内存的场景下,必然存在激烈的锁竞争问题。malloc本身其实已经很优秀,那么我们项目的原型tcmalloc就是在多线程高并发的场景下更胜一筹,所以这次我们实现的内存池需要考虑以下几方面的问题
  1. 性能问题。
  2. 多线程环境下,锁竞争问题。
  3. 内存碎片问题。

concurrent memory pool主要由3个部分构成:
1. thread cache:线程缓存是每个线程独有的,用于小于256KB的内存的分配,线程从这里申请内存不需要加锁,每个线程独享一个thread cache,这也就是这个并发线程池高效的地方
2. central cache:中心缓存是所有线程所共享,thread cache是按需从central cache中获取的对象。central cache合适的时机回收thread cache中的对象,避免一个线程占用了太多的内存,而其他线程的内存吃紧,达到内存分配在多个线程中更均衡的按需调度的目的。central cache是存在竞争的,所以从这里取内存对象是需要加锁,首先这里用的是桶锁,其次只有thread cache的对象没有内存时才会找central cache,所以这里竞争不会很激烈
3. page cache:页缓存是在central cache缓存上面的一层缓存,存储的内存是以页为单位存储及分配的,central cache没有内存对象时,从page cache分配出一定数量的page,并切割成定长大小的小块内存,分配给central cache。当一个span的几个跨度页的对象都回收以后,page cache 会回收central cache满足条件的span对象,并且合并相邻的页,组成更大的页,缓解内存碎片的问题

3、thread cache

thread cache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个thread cache对象,这样每个线程在这里获取对象和释放对象时是无锁的。

thread cache支持小于等于256KB内存的申请,如果我们将每种字节数的内存块都用一个自由链表进行管理的话,那么此时我们就需要20多万个自由链表,光是存储这些自由链表的头指针就需要消耗大量内存,这显然是得不偿失的。

这时我们可以选择做一些平衡的牺牲,让这些字节数按照某种规则进行对齐(见下文对齐规则),例如我们让这些字节数都按照8字节进行向上对齐(考虑到32位和64位下指针大小),那么thread cache的结构就是下面这样的,此时当线程申请1~8字节的内存时会直接给出8字节,而当线程申请9~16字节的内存时会直接给出16字节,以此类推。

申请内存:
1. 当内存申请 size<=256KB ,先获取到线程本地存储的 thread cache 对象,计算 size 映射的哈希桶自由链表下标i 。
2. 如果自由链表 _freeLists[i] 中有对象,则直接 Pop 一个内存对象返回。
3. 如果 _freeLists[i] 中没有对象时,则批量从 central cache 中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
1. 当释放内存小于 256k 时将内存释放回 thread cache ,计算 size 映射自由链表桶位置 i ,将对象 Push到_freeLists[i] 。
2. 当链表的长度过长,则回收一部分内存对象到 central cache 。

通过上图分析我们需要一个自由链表来管理内存块,下面我们来对自由链表进行封装(仅写出当前一些很容易想到的接口,后序需要我们在进行添加)。

class FreeList {
private:void* _freelist = nullptr;size_t _size = 0;    //记录链表长度size_t _MaxSize = 1; //控制慢增长
public:void push(void* obj) {assert(obj);//头插CurNext(obj) = _freelist;_freelist = obj;++_size;}void* popFront() {assert(_freelist);void* cur = _freelist;_freelist = CurNext(_freelist);--_size;return cur;}bool Empty() {return _freelist == nullptr;}size_t& Size() {return _size;}size_t& MaxSize() {return _MaxSize;}
};

<1>thread cache 哈希桶对齐规则

字节数 对齐数 哈希桶下标 区间桶数量
[1,128] 8byte对齐 freelist[0,16) 16
    [128+1,1024]     16byte对齐 freelist[16,72) 56
[1024+1,8*1024] 128byte对齐 freelist[72,128) 56
[8*1024+1,64*1024] 1024byte对齐 freelist[128,184) 56
[64*1024+1,256*1024] 8*1024byte对齐 freelist[184,208) 24

上文中我们已经提到过内碎片这个概念,我们应该尽可能减少内碎片的产生。按照上面对齐规则的话整体控制在10%左右的内碎片浪费,第一个区间我们不做考虑因为1字节就算对齐到2字节也会产生50%的空间浪费,我们从第二个区间开始计算比如我们申请130个字节实际给到的是145字节实际多给了15字节。15/145 ≈ 0.103,浪费了大概在10左右,下面几个区间大家可以自己计算下浪费率也是大概在10%左右。

有了对齐规则我们还需要计算出相应的内存对应在哪一个桶中,如上表中每个区间都有一定桶的数量如[1,128]有16个桶那么我们申请1~8字节都对应在下标0号桶中,9~16字节都对应在下标1号桶中于是我们需要一个函数来处理计算。

对齐规则和下标映射规则编写

为了后序使用方便我们,将其封装在一个类当中。

static const size_t PAGE_SHIFT = 13;
static const size_t MAX_BYTES = 256 * 1024; //最大字节数//计算对象大小对齐规则
class SizeClass {
public://20 8 --> 24//110 8 --> 112//容易想到的//size_t _AlignSize(size_t bytes, size_t alignNum) {//  size_t alignSize;// if (bytes % alignNum == 0) {//        alignSize = bytes;//   }// else {//        alignSize = (bytes / alignNum + 1) * alignNum;//  }//  return alignSize;//}static inline size_t _AlignSize(size_t bytes, size_t alignNum){return ((bytes + alignNum - 1) & ~(alignNum - 1));}//对齐大小计算static inline size_t AlignSize(size_t bytes) {//assert(bytes <= MAX_BYTES);if (bytes <= 128) {return _AlignSize(bytes, 8);}else if (bytes <= 1024) {return _AlignSize(bytes, 16);}else if (bytes <= 8 * 1024) {return _AlignSize(bytes, 128);}else if (bytes <= 64 * 1024) {return _AlignSize(bytes, 1024);}else if (bytes <= 256 * 1024) {return _AlignSize(bytes, 8 * 1024);}else {return _AlignSize(bytes, 1 << PAGE_SHIFT); //页对齐}}//容易想到的//size_t _Index(size_t bytes, size_t alignNum) {//  if (bytes % alignNum == 0) {//        return bytes / alignNum - 1; //下标从0开始// }// else {//        return bytes / alignNum;//  }//}//20 3  --> 1//130 4 --> 8//好的实现方法static inline size_t _Index(size_t bytes, size_t align_shift){return ((bytes + (1 << align_shift) - 1) >> align_shift) - 1;}// 计算映射的哪一个自由链表桶static inline size_t Index(size_t bytes){assert(bytes <= MAX_BYTES);static int group_array[4] = { 16, 56, 56, 56 }; // 每个区间有多少个链if (bytes <= 128) {return _Index(bytes, 3); //传2的次方}else if (bytes <= 1024) {return _Index(bytes - 128, 4) + group_array[0];}else if (bytes <= 8 * 1024) {return _Index(bytes - 1024, 7) + group_array[1] + group_array[0];}else if (bytes <= 64 * 1024) {return _Index(bytes - 8 * 1024, 10) + group_array[2] + group_array[1] + group_array[0];}else if (bytes <= 256 * 1024) {return _Index(bytes - 64 * 1024, 13) + group_array[3] + group_array[2] + group_array[1] + group_array[0];}else {assert(false);}return -1;}
};

<2>Thread Cache类设计

通过上述内存申请分析,可以想到我们需要申请内存函数和释放内存函数(释放内存函数分两种情况:链表长度较短直接将对象挂在链表中,链表长度过长释放链表)以及从中心缓存(central cache)获取对象的一个函数。如下定义:

class ThreadCache {
private:FreeList _freelists[NFREELIST];
public:void* Allocate(size_t size);  //申请内存void Deallocate(void* ptr, size_t size); //释放内存void ListTooLong(FreeList& list, size_t size); //链表太长释放链表//从中心缓存获取对象void* FetchFromCentralCache(size_t index, size_t size);
};//TLS thread local storage  --> 线程局部存储
static _declspec(thread) ThreadCache* pTLSThreadCache = nullptr;

接口实现:


//申请内存
void* ThreadCache::Allocate(size_t size) {assert(size <= MAX_BYTES);//传过来申请字节数计算出对齐大小(实际给到的内存大小)size_t alignSize = SizeClass::AlignSize(size);size_t index = SizeClass::Index(size);if (!_freelists[index].Empty()) {return _freelists[index].popFront();}else {//从中心缓存获取对象//...}
}
//释放内存
void ThreadCache::Deallocate(void* ptr, size_t size) {assert(ptr);assert(size <= MAX_BYTES);//找到对应桶位置进行插入size_t index = SizeClass::Index(size);_freelists[index].push(ptr);//当链表长度大于一次申请的最大内存值将其还给central cacheif (_freelists[index].Size() >= _freelists[index].MaxSize()) {ListTooLong(_freelists[index], size);}
}
//链表太长释放链表
void ThreadCache::ListTooLong(FreeList& list, size_t size) {//首先从原链表中将这段链表删除,接着还给中心缓存//在freelist中增加区间删除函数void* start = nullptr;void* end = nullptr;list.PopRange(start, end, list.Size());//将链表还给中心缓存//...
}//从中心缓存获取对象
void* ThreadCache::FetchFromCentralCache(size_t index, size_t size) {//慢反馈调节算法//1、最开始不会一次向central cache批量要太多,因为要太多有可能会用不完//2、如果不断size大小内存需求,那么batchNum就会不断增长直到上限//3、size越小一次向central cache要的batchNum越小//4、size越大一次向central cache要的batchNum越大size_t batchNum = std::min(_freelists[index].MaxSize(), SizeClass::NumMoveSize(size));if (_freelists[index].MaxSize() == batchNum) {_freelists[index].MaxSize() += 1;}//调用cnetral cache中获取对象接口//...
}

线程局部存储TLS(Thread Local Storage)

要实现每个线程无锁的访问属于自己的thread cache,我们需要用到线程局部存储TLS(Thread Local Storage),这是一种变量的存储方法,使用该存储方法的变量在它所在的线程是全局可访问的,但是不能被其他线程访问到,这样就保持了数据的线程独立性。但不是每个线程被创建时就立马有了属于自己的thread cache,而是当该线程调用相关申请内存的接口时才会创建自己的thread cache。

在FreeList类中添加PopRange函数

   void PopRange(void*& start, void*& end, size_t n) {assert(n <= _size);start = _freelist;end = start;for (int i = 0; i < n - 1; i++) {end = CurNext(end);}_freelist = CurNext(end);CurNext(end) = nullptr;_size -= n;}

在SizeClass类中添加NumMoveSize函数

  // 一次thread cache从中心缓存获取多少个//也就是计算可以给到你几个对象//其中上限512,下限2也可以理解为限制桶中链表的长度static size_t NumMoveSize(size_t size){assert(size > 0);// [2, 512],一次批量移动多少个对象的(慢启动)上限值//对象越小,计算出的上限越高//对象越大,计算出的上限越低int num = MAX_BYTES / size;if (num < 2)num = 2;if (num > 512)num = 512;return num;}


4、Central Cache

central cache 也是一个哈希桶结构,他的哈希桶的映射关系跟 thread cache 是一样的。不同的是他的每个哈希桶位置挂是SpanList 链表结构,不过每个映射桶下面的 span 中的大内存块被按映射关系切成了一个个小内存块对象挂在span 的自由链表中。

申请内存:
1. 当thread cache中没有内存时,就会批量向central cache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;central cache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给thread cache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
2. central cache映射的spanlist中所有span的都没有内存以后,则需要向page cache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给thread cache。
3. central cache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给thread cache,就++use_count。
释放内存:
1. 当thread_cache过长或者线程销毁,则会将内存释放回central cache中的,释放回来时 --use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回page cache,page cache中会对前后相邻的空闲页进行合并。

<1>Central Cache类设计

根据上图我们可以知道central cache也是一个哈希桶结构,通中挂的是一个个的span并且是双链表,而sapn中又包含了freellist,如下定义:

首先定义一个SpanNode的节点来表示一个个的Span对象


struct SpanNode {PAGE_ID _pageId = 0; //大块内存起始页号size_t _n = 0; //页的数量SpanNode* _next = nullptr;SpanNode* _prev = nullptr;void* _freeList = nullptr; //自由链表size_t _size = 0; //切好小对象大小size_t _useCount = 0; //分配给thread cache小块内存数量bool _isUse = false; //是否正在被使用};

接着实现一个双链表结构用来将Span挂接起来

class SpanList {
private:SpanNode* _head = nullptr; //头节点std::mutex _mtx; //桶锁
public:SpanList() {_head = new SpanNode;_head->_next = _head;_head->_prev = _head;}~SpanList() {delete _head;}std::mutex& getMutex() {return _mtx;}SpanNode* Begin() {return _head->_next;}SpanNode* end() {return _head;}bool Empty() {return _head->_next == _head;}void Insert(SpanNode* pos, SpanNode* newSpan) {assert(pos && newSpan);SpanNode* prev = pos->_prev;prev->_next = newSpan;newSpan->_prev = prev;pos->_prev = newSpan;newSpan->_next = pos; }void Erase(SpanNode* pos) {assert(pos);assert(pos != _head);SpanNode* prev = pos->_prev;SpanNode* next = pos->_next;prev->_next = next;next->_prev = prev;}SpanNode* PopFront() {SpanNode* ret = _head->_next;Erase(ret);return ret;}void PushFront(SpanNode* spanNode) {Insert(Begin(), spanNode);}
};

Central Cache类定义

和Thread Cache不同的是,每个线程中都会有一个Thread Cache对象以防止各个线程互相抢占内存的问题。而Central Cache为所有线程所共享且全局只有一个Central Cache对象,故我们应该将其实现为单例模式(其中懒汉模式涉及线程安全问题,故我们将其实现为懒汉模式)。其主要作用是向下为Thread Cache提供内存,向上向Page Cache申请内存。回收内存也是同理。故我们需要定义一下几个接口,从Page Cache获取非空spanNode、为Thread Cache提供内存、将内存释放回Page Cahce三个函数。如下定义:
//单例模式(懒汉)
class CentralCache {
private:SpanList _SpanLists[NFREELIST];
private:CentralCache() {}CentralCache(const CentralCache&) = delete;static CentralCache _sInst; //类外初始化
public:static CentralCache* GetInStance() {return &_sInst;}// 获取一个非空的spanNodeSpanNode* GetOneSpan(SpanList& list, size_t size);// 从中心缓存获取一定数量的对象给thread cachesize_t FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size);// 将一定数量的对象释放到spanvoid ReleaseListToSpans(void* start, size_t byte_size);
};

接口实现:

// 获取一个非空的spanNode
SpanNode* CentralCache::GetOneSpan(SpanList& list, size_t size) {//查看当前桶中的每个SpanNode节点是否有未分配的对象SpanNode* it = list.Begin();while (it != list.end()) {if (it->_freeList != nullptr) {return it;}else {it = it->_next;}}//走到这里说明当前桶中每个节点没有未分配的对象了,找pageCache要//...}// 从中心缓存获取一定数量的对象给thread cache
size_t CentralCache::FetchRangeObj(void*& start, void*& end, size_t batchNum, size_t size) {size_t index = SizeClass::Index(size);//这里会涉及多个线程同时从中心缓存获取内存的情况,故应该加锁_SpanLists[index].getMutex().lock();SpanNode* spanNode = GetOneSpan(_SpanLists[index], size);assert(spanNode);assert(spanNode->_freeList);//从spanNode中获取batchNum个对象,如果不够有多少拿多少start = spanNode->_freeList;end = start;int i = 0;int ActualNum = 1;while (i < batchNum - 1 && CurNext(end) != nullptr) {end = CurNext(end);++i;++ActualNum;}//从spanNode将这段链表删除,并统计出该spanNode中自由链表已经使用的数量spanNode->_freeList = CurNext(end);CurNext(end) = nullptr;spanNode->_useCount += ActualNum;_SpanLists[index].getMutex().unlock();return ActualNum;
}

关于当链表太长时要将内存还给SpanNode时情况稍微有点复杂,需要好好思考一下。因为还回来的链表中的每个节点的地址我们没办法确定是否连续的。这就导致有可能换回来一个链表但其中的节点分布于_SpanList的多个SpanNode节点中,这就需要对其筛选让其进入不同的桶中(这部分代码我们放到Page Cache中来实现,因为Central Cache中的SpanNode节点都是Page Cache分配给他的),这里我们先把大概逻辑顺一下如下图:

测试代码如下: 

void TestAddressPage() {PAGE_ID id1 = 3000;PAGE_ID id2 = 3001;char* p1 = (char*)(id1 << PAGE_SHIFT);char* p2 = (char*)(id2 << PAGE_SHIFT);while (p1 < p2) {cout << (void*)p1 << " : " << ((PAGE_ID)p1 >> PAGE_SHIFT) << endl;p1 += 8;}
}

 执行结果:

由上图我们可以看出,事实和我们的推论是一样的。释放内存代码如下图:

//将一定数量的对象释放到span
void CentralCache::ReleaseListToSpans(void* start, size_t byte_size) {size_t index = SizeClass::Index(byte_size);_SpanLists[index].getMutex().lock();//将传过来的链表一个个头插进对应的spanNode中while (start) {void* next = CurNext(start);//通过链表每个节点的地址来获取对应的spanNode地址SpanNode* SpanNode = PageCache::GetInstance()->MapObjectToSpan(start);CurNext(start) = SpanNode->_freeList;SpanNode->_freeList = start;SpanNode->_useCount--; //每头插回来一个已使用数量减一//当SpanNode已使用数量为0时说明该节点中的自由链表节点都还回来了,继续归还给PageCache处理if (SpanNode->_useCount == 0) {//归还给PageCache//...}start = next;}_SpanLists[index].getMutex().unlock();
}

5、page cache

首先,central cache的映射规则与thread cache保持一致,而page cache的映射规则与它们都不相同。page cache的哈希桶映射规则采用的是直接定址法,比如1号桶挂的都是1页的span,2号桶挂的都是2页的span,以此类推。

  其次,central cache每个桶中的span被切成了一个个对应大小的对象,以供thread cache申请。而page cache当中的span是没有被进一步切小的,因为page cache服务的是central cache,当central cache没有span时,向page cache申请的是某一固定页数的span,而如何切分申请到的这个span就应该由central cache自己来决定。

注:上图中1page和3page桶下面挂的链在最开始的时候是没有的他们都是由128page(我们每次向系统申请的是固定大小128page)切分后挂到上面的(切分逻辑见下文代码)。

申请内存:
1. 当central cache向page cache申请内存时,page cache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页page span分裂为一个4页page span和一个6页page span。
2. 如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页page span挂在自由链表中,再重复1中的过程。
3. 需要注意的是central cache和page cache 的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,central cache中哈希桶,是按跟thread cache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而page cache 中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
释放内存:
1. 如果 central cache 释放回一个 span , 则依次寻找 span 的前后 page id 的没有在使用的空闲 span 看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的 span ,减少 内存碎片

<1>Page Cache类设计

通过上面的分析我们可以知道Page Cache和Central Cache一样是被多个线程共享的故也应该将其设计为单例模式,上面在讲Central Cache释放逻辑是提到我们要根据自由链的节点地址找到其对应SpanNode节点故我们还应该添加其映射关系这里我们采用unordered_map进行映射。此外我们还需要对Central Cache提供对象的接口、回收SpanNode的接口、获取映射关系的接口,如下定义:

static const size_t NPAGES = 129; //下标是从0开始的class PageCache {
private:SpanList _pageLists[NPAGES];std::unordered_map<PAGE_ID, SpanNode*> _idSpanNodeMap;std::mutex _PageMtx;
private:PageCache() {};PageCache(const PageCache&) = delete;static PageCache _sInst;
public:static PageCache* GetInstance() {return &_sInst;}std::mutex& GetMutex() {return _PageMtx;}//获取n页SpanNodeSpanNode* NewSpan(size_t n);//获取从对象到SpanNode的映射SpanNode* MapObjectToSpanNode(void* obj);//释放空闲SpanNode回到PageCache,并合并相邻的SpanNodevoid ReleaseSpanNodeToPageCache(SpanNode* SpanNode);
};

代码实现:

当Page Cache中无内存时我们需要向系统取申请内存,故我们应该提供一个向系统申请内存的接口,如下

windows和Linux下如何直接向堆申请页为单位的大块内存:// 去堆上按页申请空间
inline static void* SystemAlloc(size_t kpage)
{
#ifdef _WIN32void* ptr = VirtualAlloc(0, kpage << 13, MEM_COMMIT | MEM_RESERVE, PAGE_READWRITE);
#else// linux下brk mmap等
#endifif (ptr == nullptr)throw std::bad_alloc();return ptr;
}inline static void SystemFree(void* ptr)
{
#ifdef _WIN32VirtualFree(ptr, 0, MEM_RELEASE);
#else// linux下sbrk unmmap等
#endif
}
static const size_t NPAGES = 129; //下标是从0开始的PageCache PageCache::_sInst;//获取n页SpanNode
SpanNode* PageCache::NewSpan(size_t n) {assert(n > 0);//大于128页直接向堆申请内存if (n > NPAGES - 1) {void* ptr = SystemAlloc(n);SpanNode* spanNode = new SpanNode;spanNode->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; //右移13位相当于除8kspanNode->_n = n;_idSpanNodeMap[spanNode->_pageId] = spanNode; //建立映射方便释放内存return spanNode;}//检查对应桶中是否有sapnNodeif (!_pageLists[n].Empty()) {SpanNode* nSpanNode = _pageLists[n].PopFront();//将其中节点建立映射关系映射for (PAGE_ID i = 0; i < nSpanNode->_n; i++) {_idSpanNodeMap[nSpanNode->_pageId + i] = nSpanNode;}return nSpanNode;}//检查后面桶中是否有SpanNode节点,有则进行切分for (size_t i = n + 1; i < NPAGES; i++) {if (!_pageLists[i].Empty()) {//找到第i个桶不为空时先将其取出,切分出n个后在放回相应桶中SpanNode* ISpanNode = _pageLists[i].PopFront();//开辟出一个节点进行切分,然后返回SpanNode* NSpanNode = new SpanNode;NSpanNode->_n = n;NSpanNode->_pageId = ISpanNode->_pageId;ISpanNode->_pageId += n;ISpanNode->_n -= n;_pageLists[ISpanNode->_n].PushFront(ISpanNode);//存储ISpanNode起始页号映射方便回收_idSpanNodeMap[ISpanNode->_pageId] = ISpanNode;_idSpanNodeMap[ISpanNode->_pageId + ISpanNode->_n - 1] = ISpanNode;for (PAGE_ID i = 0; i < NSpanNode->_n; i++) {_idSpanNodeMap[NSpanNode->_pageId + i] = NSpanNode;}return NSpanNode;}}//走到这里说明后面的桶中没有剩余的SpanNode节点,找系统申请一页SpanNode* bigSpanNode = new SpanNode;void* ptr = SystemAlloc(NPAGES - 1);bigSpanNode->_pageId = (PAGE_ID)ptr >> PAGE_SHIFT; bigSpanNode->_n = NPAGES - 1;_pageLists[bigSpanNode->_n].PushFront(bigSpanNode); //将申请的大页span放入哈希桶中return NewSpan(n); //重新调用进行切分}
//获取从对象到SpanNode的映射
SpanNode* PageCache::MapObjectToSpanNode(void* obj) {PAGE_ID id = (PAGE_ID)obj >> PAGE_SHIFT;std::unique_lock<std::mutex> lock(PageCache::GetInstance()->GetMutex()); //出作用域自动解锁auto ret = _idSpanNodeMap.find(id);if (ret != _idSpanNodeMap.end()) {//找到了进行返回return ret->second;}else {//找不到说明出问题了直接报错assert(false); return nullptr;}
}
//释放空闲SpanNode回到PageCache,并合并相邻的SpanNode
void PageCache::ReleaseSpanNodeToPageCache(SpanNode* spanNode) {//大于128页直接还给堆if (spanNode->_n > NPAGES - 1) {//根据页号算出响应的地址,然后进行释放void* ptr = (void*)(spanNode->_pageId << PAGE_SHIFT);SystemFree(ptr);return;}//向前合并while (1) {PAGE_ID prev = spanNode->_pageId - 1;//找不到跳出循环auto ret = _idSpanNodeMap.find(prev);if (ret == _idSpanNodeMap.end()) {break;}//prevSpanNode正在被使用跳出循环SpanNode* prevSpanNode = ret->second;if (prevSpanNode->_isUse == true) {break;}//合并出超出128kb的spanNode不进行管理if (prevSpanNode->_n + spanNode->_n > NPAGES - 1) {break;}//进行合并spanNode->_pageId = prevSpanNode->_pageId;spanNode->_n += prevSpanNode->_n;_pageLists[prevSpanNode->_n].Erase(prevSpanNode);delete prevSpanNode;}//向后合并while (1) {PAGE_ID next = spanNode->_pageId - 1;//找不到跳出循环auto ret = _idSpanNodeMap.find(next);if (ret == _idSpanNodeMap.end()) {break;}//prevSpanNode正在被使用跳出循环SpanNode* nextSpanNode = ret->second;if (nextSpanNode->_isUse == true) {break;}//合并出超出128kb的spanNode不进行管理if (nextSpanNode->_n + spanNode->_n > NPAGES - 1) {break;}//进行合并spanNode->_n += nextSpanNode->_n;_pageLists[nextSpanNode->_n].Erase(nextSpanNode);delete nextSpanNode;}//放回哈希桶中_pageLists[spanNode->_n].PushFront(spanNode);spanNode->_isUse = false;存储ISpanNode起始页号映射方便回收_idSpanNodeMap[spanNode->_pageId] = spanNode;_idSpanNodeMap[spanNode->_pageId + spanNode->_n - 1] = spanNode;
}

6、性能分析


参考了一段别人的测试代码,进行测试: 测试代码链接

运行结果,由下图可见我们当前实现的内存池效率和malloc比起来差了许多。

这个时候不要去盲目改代码,我们可以在VS中找到性能分析来分析我们的程序看看是什么原因导致的,如下图:

 

 由此我们可以看出,我们大部分时间都浪费在锁上面了而且是map映射的时候,那么有什么办法呢?

其实我们除了上面的锁浪费时间外,还有程序中的new我们可以替换成一个定长内存池(因为其效率比new要高一些,大家可以自行测试一下)。

<1>定长内存池实现

malloc其实就是一个通用的内存池,在什么场景下都可以使用,但这也意味着malloc在什么场景下都不会有很高的性能,因为malloc并不是针对某种场景专门设计的。定长内存池就是针对固定大小内存块的申请和释放的内存池,由于定长内存池只需要支持固定大小内存块的申请和释放,因此我们性能方面要比malloc高一些,并且在实现定长内存池时不需要考虑内存碎片等问题,因为我们申请/释放的都是固定大小的内存块。

代码如下:

template<class T>
class FiedMemoryPool {
private:char* _memory = nullptr;   //指向大块内存的指针void* _freeList = nullptr;//还内存链接自由链表头指针size_t _residueBytes = 0;  //剩余字节数
public:T* New() {T* obj = nullptr;//优先把还回来的对象重复利用if (_freeList) {void* next = *((void**)_freeList);obj = (T*)_freeList;_freeList = next;}else {//剩余内存不够一个对象大小时重新开辟内存if (_residueBytes < sizeof(T)) {_residueBytes = 128 * 1024;_memory = (char*)SystemAlloc(_residueBytes >> PAGE_SHIFT);if (_memory == nullptr) {throw std::bad_alloc();}}//给目标分配内存obj = (T*)_memory;size_t objSize = sizeof(T) < sizeof(void*) ? sizeof(void*) : sizeof(T); //至少开辟一个指针大小_memory += objSize;_residueBytes -= objSize;}//用定位new显示调用其构造函数new (obj)T;return obj;}void Delete(T* obj) {//显示调用其析构函数obj->~T();//头插进自由链表*(void**)obj = _freeList;_freeList = obj;}
};

有了定长内存池我们只需要将代码中new出来的对象替换成用定长内存池来申请就可以了(new 底层依然是调用malloc)由于这部分替换起来比较简单就不在详细讲述了,具体代码参考文章末尾项目源代码。

<2>基数树

基数树之所以能够提升效率是因为基数树在使用时是不用加锁的,基数树的空间一旦开辟好了就不会发生变化,因此无论什么时候去读取某个页的映射,都是对应在一个固定的位置进行读取的。并且我们不会同时对同一个页进行读取映射和建立映射的操作,因为我们只有在释放对象时才需要读取映射。

基数树代码链接

有了基数树我们只需要将PageCache中unorder_map定义的对象用基数树来定义就可以了,具体代码实现参考文章末尾源代码实现。

使用上面两种方法对项目进行优化后在进行测试,如图:

由上图很明显可以看出,此时我们实现的内存池在并发环境下效率更胜一筹。 


7、项目源码

项目源码

项目到这里就算完成了,感觉有用的话期待大家点赞关注,项目中有哪些地方有疑问的话欢迎大家评论留言。

[项目设计]高并发内存池相关推荐

  1. 【项目设计】高并发内存池

    文章目录 项目介绍 内存池介绍 定长内存池的实现 高并发内存池整体框架设计 threadcache threadcache整体设计 threadcache哈希桶映射对齐规则 threadcacheTL ...

  2. C++项目:高并发内存池

    文章目录 项目介绍 什么是内存池 池化技术 内存池 malloc 页 定长的内存池 对比测试 高并发内存池整体框架设计 thread cache 整体设计 哈希桶映射对齐规则 TLS无锁访问 Cent ...

  3. 【项目】实现一个mini的tcmalloc(高并发内存池)

    文章目录 tcmalloc 池化技术 内存池解决的问题 malloc的相关知识 玩具malloc原理简述 ptmalloc简述 铺垫 chunk 线程安全 小结 实现一个定长内存池 原理 代码 测试 ...

  4. 【C】高并发内存池设计

    高并发内存池设计 高并发下传统方式的弊端 在传统C语言中,我们使用malloc.calloc.realloc.free来进行内存的申请分配与释放,函数原型如下.C++中则是new.delete. vo ...

  5. 高并发内存池设计_内存池

    高并发内存池设计 1. 常用的内存操作函数 2. 高性能内存池设计_弊端解决之道 弊端一 弊端二 弊端三 弊端四 3. 弊端解决之道 内存管理维度分析 内存管理组件选型 4. 高并发内存管理最佳实践 ...

  6. java设计高并发内存池_高并发服务器-连接池的设计

    高并发服务器-连接池的设计 高并发服务器需要有一些池的设计,如内存池,连接池,数据库连接池. 池(pool)的设计主要考虑到一些资源的频繁申请和释放,尤其是在高并发的服务器中,几万甚至几十万并发每秒, ...

  7. 【高并发内存池】第一篇:定长内存池设计

    文章目录 一. 什么是内存池? 1. 池化技术 2. 内存池概念 二. 为什么要有内存池? 1. 内存碎片问题 2. 内存池带来的好处 三. 定长内存池设计 1. 定长内存池特点 2. 定长内存池基本 ...

  8. 【项目设计】高并发内存池(一)[项目介绍|内存池介绍|定长内存池的实现]

  9. android+对象池使用,Android开发中对高并发对象池的重复利用

    这两天在整理一套Android的路由框架,在整理的过程中,发现在路由消息传递过程中,传输载体类会大量的生成,对于这种载体类来说,他们本身是可重复利用的,并不需要大量的创建,大量的废弃,所以,我打算引入 ...

最新文章

  1. firefox下的调试工具
  2. 某33岁国企程序员求助:目前税后60+,工作975,拿到蚂蚁p7offer,3.8k,6200期权,有必要去镀金吗?...
  3. OceanBase是如何解决城市级故障容灾的
  4. php 时间转换编号,PHP 时间的格式转换
  5. django引入现有数据库
  6. Mathematica该注意的两种特殊的输入方式(blanksequence and ruledelayed)
  7. Java基础:char类型字节占用数
  8. 集体奔赴农业战场 互联网巨头对话中国农民丰收节交易会
  9. kotlin转java_Kotlin热身篇: 简介与基本用法
  10. java 判断一个词是不是成语_Java 判断字符串a和b是否互为旋转词
  11. iQOO Z5内置5000mAh大电池:超长续航安全感爆棚
  12. easyui panel异步获取后台数据在前台显示
  13. Oracle修改字段长度
  14. 新偶像时代:被直播重构的粉丝经济和社交平台
  15. CactiEZ中文解决方案和使用教程
  16. 电子合同助力企业实现全程无纸化闭环
  17. 对话夏琳·查布利斯:Primer.AI机器学习工程师是怎样炼成的?
  18. apicloud中阿里云推送使用
  19. Android TV开发 焦点返回ListView时, 返回到离开时的位置
  20. 强人工智能基本问题:自上而下还是自下而上? 1

热门文章

  1. Kubernetes 实现 Guestbook 留言板
  2. 2021-06-25kali更新火狐教程转
  3. 周二,晴,风大,寒冷
  4. [08] 需要留意的一些东西
  5. Lombok之@ToString使用
  6. Python中复制文件的两种简单方式
  7. TYPE-C 转OTG(USB2.0传输数据)+PD充电协议芯片 乐得瑞LDR6028/LDR6023SS
  8. 能用虚拟服务器建站吗,虚拟主机只能用来建站吗
  9. MySQL 中判断字符串是否相等
  10. uboot使用命令整理(2016.03)