所谓“池化技术”,就是程序先向系统申请过量的资源,然后自己管理,以备不时之需。之所以要申请过量的资源,是因为每次申请该资源都有较大的开销,不如提前申请好了,这样使用时就会变得非常快捷,大大提高程序运行效率。在计算机中,有很多使用“池”这种技术的地方,除了内存池,还有连接池、线程池、对象池等。以服务器上的线程池为例,它的主要思想是:先启动若干数量的线程,让它们处于睡眠状态,当接收到客户端的请求时,唤醒池中某个睡眠的线程,让它来处理客户端的请求,当处理完这个请求,线程又进入睡眠状态。
1.2内存池内存池是指程序预先从操作系统申请一块足够大内存,此后,当程序中需要申请内存的时候,不是直接向操作系统申请,而是直接从内存池中获取;同理,当程序释放内存的时候,并不真正将内存返回给操作系统,而是返回内存池。当程序退出(或者特定时间)时,内存池才将之前申请的内存真正释放。减少用户层向系统层的内存申请调用,实现高效;
2.开胃菜-设计一个定长内存池此次设计的定长内存池ObjectPool结构如下:(其单位小内存块的大小为T类型的大小)
需要申请内存时,一次性申请一个大块空间,记录起始位置_memory;
需要使用size大小时,将大块内存抽象切割成小块,将起始位置_memory向后移动size大小;
释放内存时,将需要释放的空间挂入自由链表_freeList,可供下次申请使用;注意:这个链表不是通过内部指针连接下一个,而是通过前一块空间的前4or8个字节记录后一个小空间的起始地址抽象连接的;
这样做的目的是循环利用预先开辟的一大块空间,减少用户层申请内存时与系统层的交互,提高效率;
代码实现:define_CRT_SECURE_NO_WARNINGS1ifdef_WIN32else//包Linux相关头文件,增加代码的可移植性;ifdef_WIN32void*ptr=VirtualAlloc(0,kpage13,MEM_COMMIT|MEM_RESERVE,PAGE_READWRITE);ifif(ptr==nullptr)throwstd::bad_alloc();returnptr;}templateclassTclassObjectPool{char*_memory=nullptr;//指向大块内存的指针size_t_remainBytes=0;//大块内存在切分过程中剩余字节数void*_freeList=nullptr;//还回来过程中链接的自由链表的头指针public:T*New(){T*obj=nullptr;//如果之前有申请的空间被释放,那就先从free了的空间拿;(内存池技术)if(_freeList!=nullptr){//头删//注意!*((void**)_freeList)相当访取_freeList前4or8个字节操作(由32位系统-指针4byteand64位系统-指针8byte决定);void*next=*((void**)_freeList);obj=(T*)_freeList;_freeList=next;}else{if(_remainBytessizeof(T)){//剩余内存不够一个对象大小时,则重新开大块空间_remainBytes=128*1024;//_memory=(char*)malloc(_remainBytes);//使用SystemAlloc直接向堆申请内存,脱离malloc,方便体现malloc和该ObjectPool的差别;_memory=(char*)SystemAlloc(_remainBytes13);//char*类型内存可以使其+1or-1的单位操作为1字节,方便内存管理;if(_memory==nullptr){throwstd::bad_alloc();}}//防止某个T类还没当前系统下一个指针大小大那么就装不下后一个的地址了,这里做特殊处理;至少保证一个对象内足够装的下一个指针大小;intObjsize=sizeof(T)sizeof(void*)?sizeof(void*):sizeof(T);obj=(T*)_memory;_memory+=Objsize;_remainBytes-=Objsize;}//定位new,内置类型不处理,自定义类型调用其构造函数;new(obj)T;returnobj;}voidDelete(T*obj){//内置类型不处理,自定义类型调用其构析构函数;obj-~T();//头插(此处不是真正的删除,而是标志为未使用,挂入自由链表)*(void**)obj=_freeList;_freeList=obj;}};测试::cout;usingstd::l;_tMAX_BYTES=256*1024;//ThreadCache中允许申请的最大字节;staticconstsize_tNFREELIST=208;//通过对齐计算的thread和central中最大哈希桶下表;staticconstsize_tNPAGES=129;//最多128页,为了方便映射哈希,从第1页开始计算;staticconstsize_tPAGE_SHIFT=13;//一页==8kb==2^13字节大小所以p地址PAGE_SHIFT==PAGE_ID可相当于将某连续地址以2^13字节对齐,并标上页号方便管理;eg:0x00000000~0x2^13这连续的2^13个地址经过为PAGE_ID==1,后连续的2^13个地址经过为PAGE_ID==2;//多系统编程;elif_WIN32typedefsize_tPAGE_ID;if//直接去堆上按页申请空间inlinestaticvoid*SystemAlloc(size_tkpage){else//linux下brkmmap等ifdef_WIN32VirtualFree(ptr,0,MEM_RELEASE);if}//NextObj(obj)等价于*(void**)obj,取obj前4or8个字节(存放下一个小空间地址的位置)进行操作,增加语义;staticvoid*NextObj(void*obj){return*(void**)obj;}//管理切分好的小对象的自由链表classFreeList{public:voidPush(void*obj){assert(obj);//头插//*(void**)obj=_freeList;NextObj(obj)=_freeList;_freeList=obj;++_size;}//从central中申请一批obj大小的内存块,range插入;voidPushRange(void*start,void*,size_tn){NextObj()=_freeList;_freeList=start;_size+=n;}///threadcache还一段list给centralcachevoidPopRange(void*start,void*,size_tn)//n=={assert(n=_size);start=_freeList;=start;for(size_ti=0;in-1;++i){=NextObj();}_freeList=NextObj();NextObj()=nullptr;_size-=n;}void*Pop(){assert(_freeList);//头删void*obj=_freeList;_freeList=NextObj(obj);--_size;returnobj;}boolEmpty(){return_freeList==nullptr;}size_tMaxSize(){return_maxSize;}size_tSize(){return_size;}private:void*_freeList=nullptr;size_t_maxSize=1;//慢开始算法--从1启动;size_t_size=0;//当前_freeList桶里的obj小空间个数};//计算对象大小的对齐映射规则方便控制哈希桶用的数量(不要太多)便于管理obj小内存空间classSizeClass{public://整体控制在最多10%左右的内碎片浪费的对齐规则;//[1,128]8byte对齐freelist[0,16)eg:1~128字节的obj按照8byte对齐(eg:1~7字节的对象都放入_freeList[0])则1~128字节的obj需要的桶index为0~16即可;//[128+1,1024]16byte对齐freelist[16,72)//[1024+1,8*1024]128byte对齐freelist[72,128)//[8*1024+1,64*1024]1024byte对齐freelist[128,184)//[64*1024+1,256*1024]8*1024byte对齐freelist[184,208)/*size_t_RoundUp(size_tsize,size_talignNum){size_talignSize;if(size%alignNum!=0){alignSize=(size/alignNum+1)*alignNum;}else{alignSize=size;}returnalignSize;}*///1-8//位运算提高效率staticinlinesize_t_RoundUp(size_tbytes,size_talignNum){return((bytes+alignNum-1)~(alignNum-1));}//通过size大小计算对其数函数;staticinlinesize_tRoundUp(size_tsize){if(size=128){return_RoundUp(size,8);}elseif(size=1024){return_RoundUp(size,16);}elseif(size=8*1024){return_RoundUp(size,128);}elseif(size=64*1024){return_RoundUp(size,1024);}elseif(size=256*1024){return_RoundUp(size,8*1024);}else{return_RoundUp(size,1PAGE_SHIFT);}}/*size_t_Index(size_tbytes,size_talignNum){if(bytes%alignNum==0){returnbytes/alignNum-1;}else{returnbytes/alignNum;}}*///位运算提高效率staticinlinesize_t_Index(size_tbytes,size_talign_shift){return((bytes+(1align_shift)-1)align_shift)-1;}//通过size计算index位置即映射到哪一个自由链表桶staticinlinesize_tIndex(size_tbytes){assert(bytes=MAX_BYTES);//每个区间有多少个链staticintgroup_array[4]={16,56,56,56};if(bytes=128){return_Index(bytes,3);//3表示按8byte对其}elseif(bytes=1024){return_Index(bytes-128,4)+group_array[0];//4表示按16byte对其,-128因为前128个字节按照别的对齐规则的,剩下的这些按照自己的对其数对其最后+前面总共的桶数量即可计算自己的index;}elseif(bytes=8*1024){return_Index(bytes-1024,7)+group_array[1]+group_array[0];}elseif(bytes=64*1024){return_Index(bytes-8*1024,10)+group_array[2]+group_array[1]+group_array[0];}elseif(bytes=256*1024){return_Index(bytes-64*1024,13)+group_array[3]+group_array[2]+group_array[1]+group_array[0];}else{assert(false);}return-1;}//一次threadcache从中心缓存获取多少个,池化技术:当threadcache没有可用obj空间时,会向中心缓存申请一批而不是一个;staticsize_tNumMoveSize(size_tsize){assert(size0);//[2,512],一次批量移动多少个对象的(慢启动)上限值//小对象一次批量上限高//小对象一次批量上限低intnum=MAX_BYTES/size;if(num2)num=2;if(num512)num=512;returnnum;}//计算一次向系统获取几个页;当centralCache对应index无可用span时,向pagecache按页大小申请,之后再把申请的span切割成n个index大小的obj内存块;//单个对象8byte////单个对象256KBstaticsize_tNumMovePage(size_tsize){size_tnum=NumMoveSize(size);size_tnpage=num*size;npage=PAGE_SHIFT;if(npage==0)npage=1;returnnpage;}};//管理多个连续页大块内存跨度结构structSpan{PAGE_ID_pageId=0;//大块内存起始页的页号;将申请的viod*通过PAGE_SHIFT(8K),映射成数字方便管理和挂桶;size_t_n=0;//页的数量Span*_next=nullptr;//双向链表的结构Span*_prev=nullptr;size_t_objSize=0;//切好的小对象的大小size_t_useCount=0;//切好小块内存,被分配给threadcache的计数,方便回收span,_useCount==0;void*_freeList=nullptr;//切好的小块内存的自由链表bool_isUse=false;//是否在被使用,方便合并span,减少内存碎片;};//带头双向循环链表封装Span节点classSpanList{public:SpanList(){_head=newSpan;_head-_next=_head;_head-_prev=_head;}//Begin()和()模拟了带头双向循环链表;Span*Begin(){return_head-_next;}Span*(){return_head;}boolEmpty(){return_head-_next==_head;}voidPushFront(Span*span){Insert(Begin(),span);}Span*PopFront(){Span*front=_head-_next;Erase(front);returnfront;}//“双向链表实现一个Insert即可高效复用”voidInsert(Span*pos,Span*newSpan){assert(pos);assert(newSpan);Span*prev=pos-_prev;//prevnewspanposprev-_next=newSpan;newSpan-_prev=prev;newSpan-_next=pos;pos-_prev=newSpan;}voidErase(Span*pos){assert(pos);assert(pos!=_head);Span*prev=pos-_prev;Span*next=pos-_next;prev-_next=next;next-_prev=prev;}private:Span*_head;public:std::mutex_mtx;//桶锁可能多个threadcache同时访问central中的同一个index桶,加锁-线程安全};threadcache是哈希桶结构,每个桶是一个按桶位置映射大小的内存块对象的自由链表。每个线程都会有一个threadcache对象,这样每个线程在这里获取对象和释放对象时是无锁的。
申请内存:
当内存申请size=256KB,先获取到线程本地存储的threadcache对象,计算size映射的哈希桶自由链表下标i。
如果自由链表_freeLists[i]中有对象,则直接Pop一个内存对象返回。
如果_freeLists[i]中没有对象时,则批量从centralcache中获取一定数量的对象,插入到自由链表并返回一个对象。
释放内存:
1.当释放内存小于256k时将内存释放回threadcache,计算size映射自由链表桶位置i,将对象Push到_freeLists[i]。2.当链表的长度过长,则回收一部分内存对象到centralcache。
//threadcache本质是由一个哈希映射的对象自由链表构成classThreadCache{public://申请和释放内存对象void*Allocate(size_tsize);voidDeallocate(void*ptr,size_tsize);//从中心缓存获取对象void*FetchFromCentralCache(size_tindex,size_tsize);//释放对象时,链表过长时,回收内存回到中心缓存voidListTooLong(FreeListlist,size_tsize);private:FreeList_freeLists[NFREELIST];};//TLSthreadlocalstorageTLS技术,使每个线程里的ThreadCache*独享,互不影响,实现高并发;static_declspec(thread)ThreadCache*pTLSThreadCache=nullptr;//管理切分好的小对象的自由链表classFreeList{public:voidPush(void*obj){assert(obj);//头插//*(void**)obj=_freeList;NextObj(obj)=_freeList;_freeList=obj;++_size;}voidPushRange(void*start,void*,size_tn){NextObj()=_freeList;_freeList=start;_size+=n;}voidPopRange(void*start,void*,size_tn){assert(n=_size);start=_freeList;=start;for(size_ti=0;in-1;++i){=NextObj();}_freeList=NextObj();NextObj()=nullptr;_size-=n;}void*Pop(){assert(_freeList);//头删void*obj=_freeList;_freeList=NextObj(obj);--_size;returnobj;}boolEmpty(){return_freeList==nullptr;}size_tMaxSize(){return_maxSize;}size_tSize(){return_size;}private:void*_freeList=nullptr;size_t_maxSize=1;size_t_size=0;};centralcache也是一个哈希桶结构,他的哈希桶的映射关系跟threadcache是一样的。不同的是他的每个哈希桶位置挂是SpanList链表结构,不过每个映射桶下面的span中的大内存块被按映射关系切成了一个个小内存块对象挂在span的自由链表中。
申请内存:
1.当threadcache中没有内存时,就会批量向centralcache申请一些内存对象,这里的批量获取对象的数量使用了类似网络tcp协议拥塞控制的慢开始算法;centralcache也有一个哈希映射的spanlist,spanlist中挂着span,从span中取出对象给threadcache,这个过程是需要加锁的,不过这里使用的是一个桶锁,尽可能提高效率。
2.centralcache映射的spanlist中所有span的都没有内存以后,则需要向pagecache申请一个新的span对象,拿到span以后将span管理的内存按大小切好作为自由链表链接到一起。然后从span中取对象给threadcache。
3.centralcache的中挂的span中use_count记录分配了多少个对象出去,分配一个对象给threadcache,就++use_count。
释放内存:
1.当thread_cache过长或者线程销毁,则会将内存释放回centralcache中的,释放回来时--use_count。当use_count减到0时则表示所有对象都回到了span,则将span释放回pagecache,pagecache中会对前后相邻的空闲页进行合并。
include""//单例模式(饿汉模式)全局就一个CentralCache;classCentralCache{public:staticCentralCache*GetInstance(){return_sInst;}//获取一个非空的spanSpan*GetOneSpan(SpanListlist,size_tbyte_size);//从中心缓存获取一定数量的对象给threadcachesize_tFetchRangeObj(void*start,void*,size_tbatchNum,size_tsize);//将一定数量的obj释放到span跨度voidReleaseListToSpans(void*start,size_tbyte_size);private:SpanList_spanLists[NFREELIST];private:CentralCache(){}CentralCache(constCentralCache)=delete;staticCentralCache_sInst;};include""CentralCacheCentralCache::_sInst;//获取一个非空的spanSpan*CentralCache::GetOneSpan(SpanListlist,size_tsize)//参数list是某一个确定的index桶{//查看当前的spanlist中是否有还有未分配对象的spanSpan*it=();while(it!=()){if(it-_freeList!=nullptr){returnit;//这里不需要改it的span中的属性,因为等最后分给threadcache了obj以后才算其中的内存被分出去了里面的usecount等才需要改;}else{it=it-_next;}}//走到这里说没有空闲span了,只能找pagecache要//先把centralcache的桶锁解掉,这样如果其他线程释放内存对象回来,不会阻塞(你要从page申请了不能别的线程在这个桶释放)list._();PageCache::GetInstance()-_();//整个pagecache结构可能会从index1~index129挨个操作每个桶因此需要上大锁;Span*span=PageCache::GetInstance()-NewSpan(SizeClass::NumMovePage(size));span-_isUse=true;//分跟central的span标记已使用因为马上就要切分给obj用了span-_objSize=size;//每个span挂的固定小内存块obj大小sizePageCache::GetInstance()-_();//对获取span进行切分,不需要加锁,因为这会其他线程访问不到这个span//计算span的大块内存的起始地址和大块内存的大小(字节数)char*start=(char*)(span-_pageIdPAGE_SHIFT);//这里的_pageId是从底层按页申请内存的时候地址转换来的,现在需要用地址就转换回去;size_tbytes=span-_nPAGE_SHIFT;char*=start+bytes;//把大块内存切成自由链表链接起来//1、先切一块下来去做头,方便尾插(尾插原因,切出来一般是连续的,那么尾插给到span上挂小obj也是连续,提高内存命中率)span-_freeList=start;start+=size;void*tail=span-_freeList;inti=1;while(start){++i;NextObj(tail)=start;tail=NextObj(tail);//tail=start;start+=size;}NextObj(tail)=nullptr;//切好span以后,需要把span挂到桶里面去的时候,再加锁list._();(span);returnspan;}//从中心缓存获取一定数量的对象给threadcachesize_tCentralCache::FetchRangeObj(void*start,void*,size_tbatchNum,size_tsize){size_tindex=SizeClass::Index(size);_spanLists[index]._();//上桶锁Span*span=GetOneSpan(_spanLists[index],size);assert(span);assert(span-_freeList);//从span中获取batchNum个对象//如果不够batchNum个,有多少拿多少start=span-_freeList;=start;intn=1;//n为实际拿到的数量,start直接给了所以起始值为1;for(inti=0;ibatchNum-1;i++){if(NextObj()==nullptr)break;=NextObj();++n;}//span被切出去给obj使用了span的一些属性得改变了;span-_useCount+=n;span-_freeList=NextObj();NextObj()=nullptr;_spanLists[index]._();returnn;}voidCentralCache::ReleaseListToSpans(void*start,size_tsize){size_tindex=SizeClass::Index(size);_spanLists[index]._();while(start){void*next=NextObj(start);Span*span=PageCache::GetInstance()-MapObjectToSpan(start);//小obj头插入span中的_freeListNextObj(start)=span-_freeList;span-_freeList=start;span-_useCount--;//说明span的切分出去的所有小块内存都回来了//这个span就可以再回收给pagecache,pagecache可以再尝试去做前后页的合并if(span-_useCount==0){_spanLists[index].Erase(span);span-_freeList=nullptr;span-_next=nullptr;span-_prev=nullptr;//释放span给pagecache时,使用pagecache的锁就可以了//这时把桶锁解掉不影响其他线程对该index的central操作;_spanLists[index]._();PageCache::GetInstance()-_();PageCache::GetInstance()-ReleaseSpanToPageCache(span);//还给page和page是否需要合并其中的span减少内存碎片都在这函数里PageCache::GetInstance()-_();_spanLists[index]._();}start=next;}_spanLists[index]._();}使用自己定义的PageMap哈希直接映射,避免stl的线程安全,提高效率;
include""//Single-levelarraytemplateintBITSclassTCMalloc_PageMap1{private:staticconstintLENGTH=1BITS;void**array_;public:typedefuintptr_tNumber;//explicitTCMalloc_PageMap1(void*(*allocator)(size_t)){explicitTCMalloc_PageMap1(){//array_=reinterpret_castvoid**((*allocator)(sizeof(void*)BITS));size_tsize=sizeof(void*)BITS;size_talignSize=SizeClass::_RoundUp(size,1PAGE_SHIFT);array_=(void**)SystemAlloc(alignSizePAGE_SHIFT);memset(array_,0,sizeof(void*)BITS);}//,//*get(Numberk)const{if((kBITS)0){returnNULL;}returnarray_[k];}//REQUIRES"k"isinrange"[0,2^BITS-1]".//REQUIRES"k"hasbeenensuredbefore.////Setsthevalue'v'forkey'k'.voidset(Numberk,void*v){array_[k]=v;}};//Two-levelradixtreetemplateintBITSclassTCMalloc_PageMap2{private://Put32entriesintherootand(2^BITS)/32_BITS=5;staticconstintROOT_LENGTH=1ROOT_BITS;staticconstintLEAF_BITS=BITS-ROOT_BITS;staticconstintLEAF_LENGTH=1LEAF_BITS;//LeafnodestructLeaf{void*values[LEAF_LENGTH];};Leaf*root_[ROOT_LENGTH];//Pointersto32childnodesvoid*(*allocator_)(size_t);//Memoryallocatorpublic:typedefuintptr_tNumber;//explicitTCMalloc_PageMap2(void*(*allocator)(size_t)){explicitTCMalloc_PageMap2(){//allocator_=allocator;memset(root_,0,sizeof(root_));PreallocateMoreMemory();}void*get(Numberk)const{constNumberi1=kLEAF_BITS;constNumberi2=k(LEAF_LENGTH-1);if((kBITS)0||root_[i1]==NULL){returnNULL;}returnroot_[i1]-values[i2];}voidset(Numberk,void*v){constNumberi1=kLEAF_BITS;constNumberi2=k(LEAF_LENGTH-1);ASSERT(i1ROOT_LENGTH);root_[i1]-values[i2]=v;}boolEnsure(Numberstart,size_tn){for(Numberkey=start;key=start+n-1;){constNumberi1=keyLEAF_BITS;//Checkforoverflowif(i1=ROOT_LENGTH)returnfalse;//Make2ndlevelnodeifnecessaryif(root_[i1]==NULL){//Leaf*leaf=reinterpret_castLeaf*((*allocator_)(sizeof(Leaf)));//if(leaf==NULL)returnfalse;staticObjectPoolLeafleafPool;Leaf*leaf=(Leaf*)();memset(leaf,0,sizeof(*leaf));root_[i1]=leaf;}//Advancekeypastwhateveriscoveredbythisleafnodekey=((keyLEAF_BITS)+1)LEAF_BITS;}returntrue;}voidPreallocateMoreMemory(){//AllocateenoughtokeeptrackofallpossiblepagesEnsure(0,1BITS);}};//Three-levelradixtreetemplateintBITSclassTCMalloc_PageMap3{private://HowmanybitsshouldweconsumeateachinteriorlevelstaticconstintINTERIOR_BITS=(BITS+2)/3;//Round-upstaticconstintINTERIOR_LENGTH=1INTERIOR_BITS;//HowmanybitsshouldweconsumeatleaflevelstaticconstintLEAF_BITS=BITS-2*INTERIOR_BITS;staticconstintLEAF_LENGTH=1LEAF_BITS;//InteriornodestructNode{Node*ptrs[INTERIOR_LENGTH];};//LeafnodestructLeaf{void*values[LEAF_LENGTH];};Node*root_;//Rootofradixtreevoid*(*allocator_)(size_t);//MemoryallocatorNode*NewNode(){Node*result=reinterpret_castNode*((*allocator_)(sizeof(Node)));if(result!=NULL){memset(result,0,sizeof(*result));}returnresult;}public:typedefuintptr_tNumber;explicitTCMalloc_PageMap3(void*(*allocator)(size_t)){allocator_=allocator;root_=NewNode();}void*get(Numberk)const{constNumberi1=k(LEAF_BITS+INTERIOR_BITS);constNumberi2=(kLEAF_BITS)(INTERIOR_LENGTH-1);constNumberi3=k(LEAF_LENGTH-1);if((kBITS)0||root_-ptrs[i1]==NULL||root_-ptrs[i1]-ptrs[i2]==NULL){returnNULL;}returnreinterpret_castLeaf*(root_-ptrs[i1]-ptrs[i2])-values[i3];}voidset(Numberk,void*v){ASSERT(kBITS==0);constNumberi1=k(LEAF_BITS+INTERIOR_BITS);constNumberi2=(kLEAF_BITS)(INTERIOR_LENGTH-1);constNumberi3=k(LEAF_LENGTH-1);reinterpret_castLeaf*(root_-ptrs[i1]-ptrs[i2])-values[i3]=v;}boolEnsure(Numberstart,size_tn){for(Numberkey=start;key=start+n-1;){constNumberi1=key(LEAF_BITS+INTERIOR_BITS);constNumberi2=(keyLEAF_BITS)(INTERIOR_LENGTH-1);//Checkforoverflowif(i1=INTERIOR_LENGTH||i2=INTERIOR_LENGTH)returnfalse;//Make2ndlevelnodeifnecessaryif(root_-ptrs[i1]==NULL){Node*n=NewNode();if(n==NULL)returnfalse;root_-ptrs[i1]=n;}//Makeleafnodeifnecessaryif(root_-ptrs[i1]-ptrs[i2]==NULL){Leaf*leaf=reinterpret_castLeaf*((*allocator_)(sizeof(Leaf)));if(leaf==NULL)returnfalse;memset(leaf,0,sizeof(*leaf));root_-ptrs[i1]-ptrs[i2]=reinterpret_castNode*(leaf);}//Advancekeypastwhateveriscoveredbythisleafnodekey=((keyLEAF_BITS)+1)LEAF_BITS;}returntrue;}voidPreallocateMoreMemory(){}};申请内存:
当centralcache向pagecache申请内存时,pagecache先检查对应位置有没有span,如果没有则向更大页寻找一个span,如果找到则分裂成两个。比如:申请的是4页page,4页page后面没有挂span,则向后面寻找更大的span,假设在10页page位置找到一个span,则将10页pagespan分裂为一个4页pagespan和一个6页pagespan。
如果找到_spanList[128]都没有合适的span,则向系统使用mmap、brk或者是VirtualAlloc等方式申请128页pagespan挂在自由链表中,再重复1中的过程。3
需要注意的是centralcache和pagecache的核心结构都是spanlist的哈希桶,但是他们是有本质区别的,centralcache中哈希桶,是按跟threadcache一样的大小对齐关系映射的,他的spanlist中挂的span中的内存都被按映射关系切好链接成小块内存的自由链表。而pagecache中的spanlist则是按下标桶号映射的,也就是说第i号桶中挂的span都是i页内存。
释放内存:
如果centralcache释放回一个span,则依次寻找span的前后pageid的没有在使用的空闲span,看是否可以合并,如果合并继续向前寻找。这样就可以将切小的内存合并收缩成大的span,减少内存碎片。
include""include""classPageCache{public:staticPageCache*GetInstance(){return_sInst;}//获取从对象到span的映射Span*MapObjectToSpan(void*obj);//释放空闲span回到Pagecache,并合并相邻的spanvoidReleaseSpanToPageCache(Span*span);//获取一个K页的spanSpan*NewSpan(size_tk);std::mutex_pageMtx;private:SpanList_spanLists[NPAGES];ObjectPoolSpan_spanPool;//这里用上了之前编写的ObjectPool定长内存池用来New(span)脱离malloc//std::unordered_mapPAGE_ID,Span*_idSpanMap;//std::mapPAGE_ID,Span*_idSpanMap;TCMalloc_PageMap132-PAGE_SHIFT_idSpanMap;PageCache(){}PageCache(constPageCache)=delete;staticPageCache_sInst;};pragmaonceinclude""include""//tcmallostaticvoid*ConcurrentAlloc(size_tsize){//一次申请内存大于thread最大的256kb,则直接向page层按页直接申请,不需要经过Thread层;if(sizeMAX_BYTES){size_talignSize=SizeClass::RoundUp(size);size_tkpage=alignSizePAGE_SHIFT;PageCache::GetInstance()-_();Span*span=PageCache::GetInstance()-NewSpan(kpage);span-_objSize=size;//这个span就是为了_objsize单位内存而申请的PageCache::GetInstance()-_();void*ptr=(void*)(span-_pageIdPAGE_SHIFT);returnptr;}else{//通过TLS每个线程无锁的获取自己的专属的ThreadCache对象if(pTLSThreadCache==nullptr){staticObjectPoolThreadCachetcPool;//只需要一个tcpool,用于调用New,所以设置成静态;//pTLSThreadCache=newThreadCache;pTLSThreadCache=();}returnpTLSThreadCache-Allocate(size);}}//tcfreestaticvoidConcurrentFree(void*ptr){Span*span=PageCache::GetInstance()-MapObjectToSpan(ptr);size_tsize=span-_objSize;if(sizeMAX_BYTES)//证明这个Span直接是按页从page申请的_objsizeMAX_SIZE,没经过thread,那么直接ReleaseSpanToPageCache{PageCache::GetInstance()-_();PageCache::GetInstance()-ReleaseSpanToPageCache(span);PageCache::GetInstance()-_();}else{assert(pTLSThreadCache);pTLSThreadCache-Deallocate(ptr,size);}}5.测试文件:#include""//ntimes一轮申请和释放内存的次数//rounds轮次voidBenchmarkMalloc(size_tntimes,size_tnworks,size_trounds){std::vectorstd::threadvthread(nworks);std::atomicsize_tmalloc_costtime=0;std::atomicsize_tfree_costtime=0;for(size_tk=0;knworks;++k){vthread[k]=std::thread([,k](){std::vectorvoid*v;(ntimes);for(size_tj=0;jrounds;++j){size_tbegin1=clock();for(size_ti=0;intimes;i++){//_back(malloc(16));_back(malloc((16+i)%8192+1));}size_t1=clock();size_tbegin2=clock();for(size_ti=0;intimes;i++){free(v[i]);}size_t2=clock();();malloc_costtime+=(1-begin1);free_costtime+=(2-begin2);}});}for(autot:vthread){();}printf("%u个线程并发执行%u轮次,每轮次malloc%u次:花费:%ums\n",nworks,rounds,ntimes,malloc_());printf("%u个线程并发执行%u轮次,每轮次free%u次:花费:%ums\n",nworks,rounds,ntimes,free_());printf("%u个线程并发mallocfree%u次,总计花费:%ums\n",nworks,nworks*rounds*ntimes,malloc_()+free_());}//单轮次申请释放次数线程数轮次voidBenchmarkConcurrentMalloc(size_tntimes,size_tnworks,size_trounds){std::vectorstd::threadvthread(nworks);std::atomicsize_tmalloc_costtime=0;std::atomicsize_tfree_costtime=0;for(size_tk=0;knworks;++k){vthread[k]=std::thread([](){std::vectorvoid*v;(ntimes);for(size_tj=0;jrounds;++j){size_tbegin1=clock();for(size_ti=0;intimes;i++){//_back(ConcurrentAlloc(16));_back(ConcurrentAlloc((16+i)%8192+1));}size_t1=clock();size_tbegin2=clock();for(size_ti=0;intimes;i++){ConcurrentFree(v[i]);}size_t2=clock();();malloc_costtime+=(1-begin1);free_costtime+=(2-begin2);}});}for(autot:vthread){();}printf("%u个线程并发执行%u轮次,每轮次concurrentalloc%u次:花费:%ums\n",nworks,rounds,ntimes,malloc_());printf("%u个线程并发执行%u轮次,每轮次concurrentdealloc%u次:花费:%ums\n",nworks,rounds,ntimes,free_());printf("%u个线程并发concurrentallocdealloc%u次,总计花费:%ums\n",nworks,nworks*rounds*(),malloc_costtime+free_());}intmain(){size_tn=10000;cout"=========================================================="l;BenchmarkConcurrentMalloc(n,4,10);coutll;BenchmarkMalloc(n,4,10);cout"=========================================================="l;return0;}malloc与tcmallo测试对比结果总结在整体框架编写完毕后,进行测试对比并不能体现tcmalloc比malloc效率高,如下图;但是在tcmalloc进行map(Span*,PAGE_ID),这个映射采用基数树的优化后效率大大提高,超过了malloc。
不高效存在性能瓶颈的原因
根据测试,map的find越慢,会导致其lock()越慢,因此map中的find占了整体的15%,加锁解锁的过程占据了程序运行的20%times,这就是未优化的性能瓶颈;优化前Span*和page_id的映射使用stl标准库中的unorder_map,是非线程安全的,需要加锁保证线程安全;优化了后使用PAGE_MAP设计原理是直接开满page_id所有范围的空间,采取直接映射;这样即便是多线程也不需要加锁;
不加锁的原因
1.两个线程对map中不同的位置读写:因为两个线程在对基数树map一个读,一个写的时候,相应空间早都开好了,不会改变结构,因此读写互不影响,一个在自己的地方读,一个在自己的地方写,而STL容器中map红黑树写入会旋转,unorder_map哈希写入也涉及增容,结构有可能会改变,因此两个位置的读写有可能互相影响导致线程不安全;
2.两个线程对map中相同的位置写:因为写只会在NewSpan和ReleaseSpanToPageCache中,一个是central没有span了从pagecache申请,一个是central把没使用的usecount==0的span大块内存还给paghecache,因为page只有有span和无span两种状态这两个函数不可能在两个线程中同时执行
3.两个线程对map中相同的位置读:因为读只会在ConcurrentFree和ReleaseListToSpans中,对于同一个位置index,肯定是先ConcurrentFree(void*obj),之后objfree到一定的量(_freeLists[index].Size()=_freeLists[index].MaxSize(慢开始的maxsize))时,再调用ReleaseListToSpans将freelist[index]从threadcache还给centralcache的,也不会两个线程同时发生;
stl的map中的find会依次遍历O(n),而直接映射的时间效率是O(1),相当于以部分空间换取时间的一种举措;
优化了stl中()慢的问题同时不需要加锁了,两个主要问题在牺牲点空间的情况下完美解决,因此性能瓶颈得到优化,tcmalloc高效与malloc;
项目源代码Github链接





