最近在写一个 简单的MapReduce框架 设计到 内存缓冲区的算法 看了下网上好像 还没有 完整实现的 就 模仿了一个 写完 估计得 700行代码.

环形缓冲区
1.为什么要环形缓冲区?
答:使用环形缓冲区,便于写入缓冲区和写出缓冲区同时进行。
2.为什么不等缓冲区满了再spill?
答:会出现阻塞。
3.数据的分区和排序是在哪完成的?
答:分区是根据元数据meta中的分区号partition来分区的,排序是在spill的时候排序。

这是环形缓冲区的结构示意图:
1.整个环形缓冲区以赤道为起点,开始向两边读写数据
2.之所以元数据信息全部都是整数,是因为 他只存储分区信息(整数)和kvbuffer在数组中的位置,每个元素局信息占16字节4X4
4.环形缓冲区的数据写入(不考虑spill进行)maptask.MapOutputBuffer.collect();
1.根据bufferindex找到key的长度然后序列化之后进行写入

特点:
在溢出时 会启动另一个携程 此时 不影响 写入 除非空间完全满了 那么变回等待回收空间
还有就是 当 回收空间那一刻 将元空间数据区 的内容移到 kv数据区一测时 便会也会产生 阻塞 这个算法 稍加改进 也可以用来 将内存中 的数据加载到本地 由于是数组 不会浪费多余的空间 不会产生 频繁的内存申请操作 并且读写 可以同时进行 效率会比一般的直接写文件高很多。

package mrimport ("bytes""encoding/binary""errors""fmt""io""strings""sync"
)const (NMETA int = 4;//元数据一个字段占用字节 byteMETASIZE = NMETA * 4; //一整个元数据占用长度
)func ReadWithSelect(ch chan bool) (x int, err error) {select {case  <-ch:return x, nildefault:return 0, errors.New("channel has no data")}
}
type metadata []byte
func (mb metadata) getkey(kvbuffer *[]byte) ([]byte,error){d,err := mb.BytesToInt()if err != nil{return nil,err}var tmp []bytetmp = * kvbufferif d[0]< d[1]+1{return tmp[d[0]:d[1]+1],nil}part1 := tmp[d[0]:]part2 := tmp[0:d[1]+1]part1 = append(part1, part2...)return part1,nil
}func (mb metadata) compar(kvbuffer *[]byte,str1 []byte) int {d,err :=mb.getkey(kvbuffer)if err != nil{panic(err)}res := strings.Compare(string(d), string(str1))return res}
func (mb metadata) getKeyByte(kvbuffer *[]byte) []byte {d,err :=mb.getkey(kvbuffer)if err != nil{panic(err)}return d}
func (mb metadata) getKV(kvbuffer *[]byte) ([]byte,error){d,err := mb.BytesToInt()if err != nil{return nil,err}var tmp []bytetmp = * kvbuffer//fmt.Println("keystart",d[0],"keyend",d[1]+d[3]+1)zjy :=  d[1] - d[0] + 1if d[0] > d[1]{zjy =  len(tmp)  - d[0] + d[1] + 1}qjx := zjy + d[3]if d[0] + qjx -1 >= len(tmp) {loc := (d[0] + qjx -1) % len(tmp)if d[0]> len(*kvbuffer) -1{fmt.Println("DEBUG:键值对太大了",d)}part1 := tmp[d[0]:]part2 := tmp[0:loc+1]part1 = append(part1, part2...)if len(part1) != qjx {fmt.Println("Error:System has a Wrong!",len(part1),d)return nil,errors.New("metadata size not math!")}return part1,nil}return tmp[d[0]:d[0] + qjx],nil
}type MetaInt struct {equator *intkvbuffer *[]byte //指向buffermetastart *int //元数据开始metaend *int //元数据结尾metapoint []*metadata //记录指向每个metadata的指针metamark int//记录移动数据isexchange chan bool //扇区转换那一刻用于阻塞
}
func (md *MetaInt) init(){count := md.getlen()p :=make([]*metadata,0)for i := 0 ;i<count; i++  {m,err := md.get(i)if err != nil{panic(err)}//记录指针p = append(p,&m)}md.metapoint = p//for _,v := range p {// a,_ := (*v).getKV(md.kvbuffer)//   fmt.Println(string(a))////}}func (md *MetaInt) getlen() int{buflen :=len(*md.kvbuffer) -1// 判断有没有在环头 和环尾if *md.metastart < (*md.metaend % (buflen +1)) {return (*md.metastart  + 1 + buflen - *md.metaend  + 1)/ METASIZE}return (*md.metastart - *md.metaend  + 1 ) / METASIZE //计算环形区的数组长度
}
//倒过来调用
func (md *MetaInt) getInverse(num int)  (metadata,error){metalen := len(md.metapoint) -1if num > metalen {return nil,errors.New("Out Of Bufferbyte")}metadata,err := md.get(metalen - num )return metadata,err}
func (md *MetaInt) get(num int) (metadata,error){res := make([]byte,0)var tmp []bytetmp = *md.kvbuffer//超出数组数组长度报错if num > md.getlen() -1{return nil,errors.New("Out Of Bufferbyte")}buflen := len(*md.kvbuffer) -1offeset := *md.metastart - (num + 1) * METASIZE //计算偏移 从零索引开始if offeset + 1 < 0{offeset += 1offeset = buflen - ((- offeset) % buflen) + 1}else {offeset+=1res = append(res,tmp[offeset:offeset+METASIZE]...)return res,nil}indexend :=  offeset + METASIZE -1 //如果索引 + 16 个字节 超出数组尾部 则 到环首处理//如果索引 是 1022 数组总长 1023 那么 1022:1023 会有 14 个剩余// 字节 在 0:13 处理 一半索引落在环尾部 一半落在 环首情况if indexend > buflen{partinde := buflen - offeset + 1res = append(res, tmp[offeset:buflen+1]...)res = append(res,tmp[0: METASIZE  - partinde ]...)return res,nil}//else if offeset>= (*md.metaend + METASIZE) % buflen  && offeset <= *md.metastart {return tmp[offeset:indexend+1], nil
}
func NewMetaInt(equator *int,kvbuffer *[]byte,kvstart *int,kvend *int) *MetaInt{return &MetaInt{equator:  equator,kvbuffer: kvbuffer,metastart:kvstart,metaend:kvend,metapoint:nil,}
}//创建元数据
func NewMetaData(keystart int, keyend int,kvpartition int, vallen int) metadata{md:= make([]byte,0)md = append(md,IntToBytes(keystart)...)md = append(md,IntToBytes(keyend)...)md = append(md,IntToBytes(kvpartition)...)md = append(md,IntToBytes(vallen)...)return md
}//整形转换成字节
func IntToBytes(n int) metadata{x := int32(n)bytesBuffer := bytes.NewBuffer([]byte{})binary.Write(bytesBuffer, binary.BigEndian, x)return bytesBuffer.Bytes()
}
//字节转换成整形
func(md metadata) BytesToInt() ([]int,error) {buf := make([]byte,4)var x []intx = make([]int,0)bytesBuffer := bytes.NewBuffer(md)for{_, err := io.ReadFull(bytesBuffer, buf)if err != nil{if err != io.EOF{fmt.Println("Read error",err)}else{break}}var tmp intdatabuff := bytes.NewBuffer(buf)if databuff.Len() == 4{tmp = int(binary.BigEndian.Uint32(buf))x = append(x, tmp)}}return x,nil
}
func formmat(in []int){fmt.Printf("keystart: %d valstart: %d , kvpartition: %d ,vallen: %d nextkeystart: %d \n",in[0],in[1]+1,in[2],in[3],in[1] + in[3] + 1)
}type  RingBuf struct {equator int;   //marks origin of meta/serializationlock sync.Mutexmetaint *MetaIntbufstart int;  //溢出时kv数据的起始位置bufindex int  //下次要写入的kv数据的位置bufmark int //写出时指针指向的位置bufend int       //溢出时raw数据的结束位置bufvoid int //写出数据的截止地方kvindex int //下次要插入的索引的位置kvend int //溢出时索引的结束位置kvstart int //溢出时索引的起始位置spiller int //触发溢出的阙值spillerprecent float64//触发溢出百分比parnum int //分区数flg bool //触发溢出标记isGcfinisned chan bool//bufmark int;     // marks end of record//bufvoid int; // marks the point where we should stop reading at the end of the bufferkvbuffer []byte //main output buffer}
func NewRingBuf(size int) *RingBuf{rb := &RingBuf{equator:  0,bufstart: 0,bufindex: 0,bufvoid:0,bufend:   0, //buf缓冲区的kvindex:  size -16 ,bufmark:0,kvend:    size -1 ,kvstart:  size -1,kvbuffer: make([]byte,size),parnum:10,flg:false,isGcfinisned:nil,}rb.isGcfinisned = make(chan bool,1)rb.metaint = NewMetaInt(&rb.equator,&rb.kvbuffer,&rb.kvstart,&rb.kvend)return rb
}//计算加上下一个值是否会溢出
func (rb *RingBuf) preSurplusSpace(kvlen int) (float64,error){kval :=rb.kvstart - rb.kvend + METASIZEif kval < 0 {kval = len(rb.kvbuffer) - rb.kvend   + rb.kvstart + METASIZE}bval := rb.bufindex - rb.bufstart + kvlenif bval < 0 {bval = rb.bufindex  + len(rb.kvbuffer) - rb.bufstart + kvlen}val := len(rb.kvbuffer) - kval - bval//fmt.Printf("Residual byte:%d Usage ratio:%.0f %s \n",val,float64(val)/float64(len(rb.kvbuffer))* 100,"%")res:= float64(val)/float64(len(rb.kvbuffer))* 100if res < 0 {err := errors.New("capility overflow error!")return res,err}return res,nil}func (rb *RingBuf) collect(kv KeyValue){keyend := rb.bufindex+ len(kv.Key) -1 //键的结束地址vallen := len(kv.Value)//检查容量space ,err := rb.surplusSpace()if err != nil{panic(err)}//容量不足 20%if space <= 20.0 && rb.flg == false{rb.flg = true//读取bufstart 到bufendrb.bufend = rb.bufindexrb.bufmark = rb.bufstart//触发写出rb.metaint.metamark = rb.metaint.getlen()if _, err := ReadWithSelect(rb.isGcfinisned); err != nil {fmt.Println(err)}go rb.SplliOut()}isoverflow,_:= rb.preSurplusSpace(len(kv.ToString()))if isoverflow < 1.0  {//如果读取kv一下子 过大 没触发上面的回收if rb.flg == false {isoverflow,_:= rb.preSurplusSpace(len(kv.ToString()))if isoverflow <1.0 && rb.flg == false{rb.flg = true//读取bufstart 到bufendrb.bufend = rb.bufindexrb.bufmark = rb.bufstart//触发写出rb.metaint.metamark = rb.metaint.getlen()if _, err := ReadWithSelect(rb.isGcfinisned); err != nil {fmt.Println(err)}go rb.SplliOut()}}for{fmt.Println("ByteBuffer Is Full , Waiting GC!")select {case <-rb.isGcfinisned:goto endGC}}endGC:fmt.Println( "GC Is End!")}//移动移动指针时要加锁rb.lock.Lock()rb.bufindex = rb.bufindex % len(rb.kvbuffer)keyend = keyend % len(rb.kvbuffer)//记录要插入的kv的k开始索引tmpmark := rb.bufindexfor i := 0 ;i <  len(kv.ToString());i++{rb.kvbuffer[rb.bufindex] = kv.ToString()[i]rb.bufindex++if rb.bufindex > len(rb.kvbuffer) -1 {rb.bufindex = rb.bufindex % len(rb.kvbuffer)}}par := ihash(kv.Key) % rb.parnum//传入 buf开始地址 val长度if tmpmark>len(rb.kvbuffer){fmt.Println("DEBUG:tmpmark  >len(rb.kvbuffer)!")}rb.addmetadata(tmpmark ,keyend,vallen,par)//bufindex 指向 下一次写入的索引rb.metaint.init()rb.lock.Unlock()
}//执行溢出操作
func (rb *RingBuf) SplliOut(){fmt.Println("Capacity Is Lower than 20% Precent beging gc!")buff := make([]byte,0)spot := ","//var keylen int//metapoints := len(rb.metaint.metapoint)//对数据进行排序for m:= 0; m < rb.metaint.metamark;m++{bufmeta,err := rb.metaint.get(m)if err != nil{fmt.Println("Get Inverse MetaInt Error!")panic(err)}kv,err := bufmeta.getKV(&rb.kvbuffer)if err != nil{fmt.Println("Spill Out Error While Get KeyValue!")panic(err)}//每次读完 标记下rb.bufmark += len(kv)rb.bufmark = (rb.bufmark) % len(rb.kvbuffer)//循环读取buff = append(buff,kv...)buff = append(buff,spot...)par ,_:= bufmeta.BytesToInt()appendToFile("map-out-" +fmt.Sprintf("%d",par[2]) , string(buff))}//fmt.Println(string(buff))//指针 buffer start 置为 endvar tmpkvend intbuflen := len(rb.kvbuffer)-1//与 kv区背靠背tmpkvend = rb.bufend - 1if tmpkvend < 0 {tmpkvend = -(-(tmpkvend) % (buflen+1)) + buflen + 1}rb.lock.Lock()fmt.Println("Older Between New MetaInt Gap:",rb.metaint.getlen() - rb.metaint.metamark)rb.sortMetaData()if rb.metaint.getlen() - rb.metaint.metamark > 0 {tmpkvend = tmpkvend - METASIZE + 1for i := 0;i< rb.metaint.getlen() - rb.metaint.metamark ;i++ {md, err := rb.metaint.getInverse(i)if err != nil{fmt.Println("Get MeatInt Error!")panic(err)}rs,_ := md.getKV(&rb.kvbuffer)fmt.Println("gc",string(rs))if tmpkvend < 0 {a1 := -(-(tmpkvend) % (buflen + 1)) + buflen + 1c := 0for ix:= a1;ix< buflen+1;ix++{rb.kvbuffer[ix] = md[c]c++}cb := 0for iz:=c;iz<16;iz++{rb.kvbuffer[cb] = md[iz]cb++}tmpkvend = a1}else{for ia := 0;ia<METASIZE;ia++{rb.kvbuffer[tmpkvend + ia] = md[ia]}}tmpkvend = tmpkvend - METASIZE}rb.kvindex = tmpkvendrb.kvend = rb.kvindex + METASIZE -1if rb.kvindex <= 0 {fmt.Println("DEBUG")}}else{//tmpkvend 没有更新过rb.kvindex = tmpkvend - METASIZE + 1rb.kvend = tmpkvend}//检查输出的字节 数大小是否 和 index 和end大小 之间一致if rb.bufmark != rb.bufend {fmt.Printf("Between Mark And End  Size Has %d Distance Shoudle Be Zero!\n",rb.bufend -rb.bufmark)panic("OutPut Size is not Match!\n")}rb.bufstart = rb.bufend//背对背 对齐 kv区rb.kvstart = rb.bufend - 1if rb.kvstart == -1 {rb.kvstart = buflen}rb.flg = falserb.isGcfinisned <- trueif rb.kvstart<10{fmt.Printf("==========>GC now rb.kvstart %d , rb.kvend  %d",rb.kvstart,rb.kvindex)}rb.lock.Unlock()
}//对元数据区进行排序
func (rb *RingBuf) sortMetaData()  {var tmp []*metadatatmp = rb.metaint.metapointfor i:=0;i<len(tmp);i++{tmp[i] = rb.metaint.metapoint[i]}start := 0end := len(tmp) - 1tmp = quickSortMehods(&rb.kvbuffer,tmp,start,end)rb.metaint.metapoint = tmp
}func  quickSortMehods(kvbuffer *[]byte,nums []*metadata, start ,end int) []*metadata{if start >= end{return nums}mid := nums[start]leftjx := startright := endfor {if right == leftjx{break}for {if right > leftjx && nums[right].compar(kvbuffer,mid.getKeyByte(kvbuffer)) >= 0{right -= 1}else {break}}nums[leftjx] = nums[right]for {if leftjx < right && nums[leftjx].compar(kvbuffer,mid.getKeyByte(kvbuffer)) <= 0  {leftjx += 1}else {break}}nums[right] = nums[leftjx]}nums[leftjx] = midquickSortMehods(kvbuffer,nums,start,leftjx -1 )quickSortMehods(kvbuffer,nums,right + 1,end)return nums
}func(rb *RingBuf) WriteOut(content *[]byte){buff := bytes.NewBuffer(rb.kvbuffer)n ,err:= io.ReadFull(buff,*content)if err != nil{panic(err)}if n != len(*content){fmt.Println("error!")}
}//计算剩余容量
func(rb *RingBuf) surplusSpace() (float64,error) {kval :=rb.kvstart - rb.kvendif kval < 0 {kval = len(rb.kvbuffer) - rb.kvend  + rb.kvstart}bval := rb.bufindex - rb.bufstartif bval < 0 {bval = rb.bufindex + len(rb.kvbuffer) - rb.bufstart}val := len(rb.kvbuffer) - kval - bval//fmt.Printf("Residual byte:%d Usage ratio:%.0f %s \n",val,float64(val)/float64(len(rb.kvbuffer))* 100,"%")res:= float64(val)/float64(len(rb.kvbuffer))* 100if res < 0{err := errors.New("capility overflow error!")return res,err}return res,nil
}//插入 需要 值的长度 key
func(rb *RingBuf) addmetadata(keyindex int,keyend int,vallen int,par int){//插入的起始 插入的 key结尾 插入的分区号 插入的 val长度bc := NewMetaData(keyindex,keyend,par,vallen)buflen := len(rb.kvbuffer) -1if rb.kvindex == -11{fmt.Println("debug")}if rb.kvindex   < 0 {a1 := -(-(rb.kvindex) % (buflen+1) ) + buflen + 1c := 0for ix:= a1;ix< buflen+1;ix++{rb.kvbuffer[ix] = bc[c]c++}cb := 0for iz:=c;iz<METASIZE;iz++{rb.kvbuffer[cb] = bc[iz]cb++}rb.kvindex = a1}else{for ia := 0;ia<METASIZE;ia++{rb.kvbuffer[rb.kvindex + ia] = bc[ia]}}rb.kvend = rb.kvindexrb.kvindex = rb.kvindex - METASIZE
}
func appendToFile(file, str string) {f, err := os.OpenFile(file, os.O_CREATE|os.O_APPEND|os.O_RDWR, 0660)if err != nil {fmt.Printf("Cannot open file %s!\n", file)return}defer f.Close()f.WriteString(str)
}

GitHUb地址:https://github.com/qiaojinxia/MapReduce

Hadoop Mapreduce组建 核心环形缓冲区 RingBuff 原理及Go实现相关推荐

  1. 环形缓冲区RingBuff的代码实现

    ~今天我们一起来聊一下环形缓冲区RingBuff又叫LoopBuff等等,都是相同的东西,只是一个名字不同罢了. ~我们在编写代码的时候缓冲区是几乎每个代码都必不可少的东西,比如存放串口接收的数据.做 ...

  2. hadoop中mapreduce的内存环形缓冲区个人讲解

    map阶段的内存环形缓冲区 相关参数配置: mapreduce.task.io.sort.mb:排序文件时需要使用的缓冲内存总量,默认100 mapreduce.map.sort.spill.perc ...

  3. Hadoop MapReduce八大步骤以及Yarn工作原理详解

    Hadoop是市面上使用最多的大数据分布式文件存储系统和分布式处理系统, 其中分为两大块分别是hdfs和MapReduce, hdfs是分布式文件存储系统, 借鉴了Google的GFS论文. MapR ...

  4. java环形buff_环形缓冲区.ringbuff(C#和java)

    环形缓冲, 本质就是队列fifo,先进先出的特殊版本,环形队列,是用空间得到了顺序存储的快索引的优点,又避免了删除,移动数据的缺点.并且还享受了单生产/单消费,2线程的无锁线程优势.十分完美. 1.面 ...

  5. 《Hadoop与大数据挖掘》——2.6 TF-IDF算法原理及Hadoop MapReduce实现

    本节书摘来自华章计算机<Hadoop与大数据挖掘>一书中的第2章,第2.6节,作者 张良均 樊哲 位文超 刘名军 许国杰 周龙 焦正升,更多章节内容可以访问云栖社区"华章计算机& ...

  6. 环形缓冲区-----适合在通信中接收数据(例如uart)

    为什么要用环形缓冲区 当有大量数据的时候,我们不能存储所有的数据,那么计算机处理数据的时候,只能先处理先来的,处理之后就会把数据释放掉,再处理下一个.那么已经处理的数据的内存就会被浪费掉.因为后来的数 ...

  7. MapReduce环形缓冲区底层实现

    环形缓冲区底层实现 首先明白改过程发生在Map--Collect阶段:在用户编写的map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果.在该函数内部 ...

  8. Hadoop重点难点:Shuffle过程中的环形缓冲区

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 这篇文章来自一个读者在面试过程中的一个问题,Hadoop在shuffle过程中使用了一个数据结构- ...

  9. 《Hadoop MapReduce性能优化》一1.3 Hadoop MapReduce的工作原理

    本节书摘来异步社区<Hadoop MapReduce性能优化>一书中的第1章,第1.3节,作者: [法]Khaled Tannir 译者: 范欢动 责编: 杨海玲,更多章节内容可以访问云栖 ...

最新文章

  1. C#编码实践:使用委托和特性调用指定函数
  2. c语言指针用法及实际应用详解,通俗易懂超详细!
  3. windoes windoes server 上安装mysql(MSI安装包安装、压缩包安装)
  4. Windbg无源码调试驱动
  5. 在SharePoint 2010系统中安装RBS FILESTREAM Provider
  6. 600兆的html文件怎么打开,如何打开容量600多兆的文本文件
  7. OGNL使用方法总结
  8. 三位对我影响最深的老师
  9. asp.net电子商务开发实战 视频 第二讲 (下)
  10. echarts grid的样式位置_vue使用Echarts vue使用Echarts滚动条
  11. php 重定向 post,使用php curl getpost方法向页面文件发送重定向指令
  12. 运算符重载为类的友元函数
  13. iperf 安卓 灌包命令_iperf工具测速
  14. 逆向之汇编(堆栈平衡函数)
  15. Three.js的uv坐标贴图理解
  16. 如何在html修改图片大小,HTML – 如何在CSS中动态调整图像大小?
  17. 华为鸿蒙系统手表,鸿蒙2.0系统发布!年底适配最新华为旗舰,系统比安卓还要好?...
  18. Toast类实现消息提示框
  19. 黑马JAVA P44 猜数字游戏
  20. 千年之恋HTML5和CSS3

热门文章

  1. 复旦大学邱锡鹏:若优化顺利,MOSS三月底开源;库克或被踢出苹果董事会;华为云联合CSDN发布智能化编程助手Snap|极客头条...
  2. 1 交换机的基本配置与管理
  3. 2020全国语文高考作文
  4. 天地图矢量数据下载_全球谷歌卫星地图影像数据下载
  5. 你是人间的四月天---林徽因
  6. HEVC最优CU划分确定的过程
  7. 操作系统的奋斗(一)计算机系统概述
  8. IoT黑板报0210:Google 发布 Android Things 开发者第二预览版
  9. Docker -- 2 -- 利用docker部署网站和数据库
  10. 2021-11-15----韩顺平Java入门第九天