Influxdb中的Compaction操作

Compaction概述

  • Influxdb的存储引擎使用了TSM文件结构,这其实也是在LSM-Tree基础针对时序特点作了改进,因此其与LSM-Tree类似,也有MemTable, WAL和SSTable;
  • 既然是类似LSM-Tree,也需要Compation, 将内存MemTable的数据持久化到磁盘,将磁盘上的若干文件merge,以便减少文件个数,优化读效率;
  • Influxdb的Compaction通常来说需要两步:
  • 生成一个compaction计划,简单来说就是生成一组可以并行compaction的文件列表;
  • 针对一组tsm文件来作compation;

Compaction计划的生成

CompactionPlanner接口

  • 其实可以使用多种策略来生成这个计划,所谓的计划就是根据特定的规则来获取到一组可以并行作compact的文件组。因此Influxdb首先定义了一个Interface:
type CompactionPlanner interface {Plan(lastWrite time.Time) []CompactionGroupPlanLevel(level int) []CompactionGroupPlanOptimize() []CompactionGroupRelease(group []CompactionGroup)FullyCompacted() bool// ForceFull causes the planner to return a full compaction plan the next// time Plan() is called if there are files that could be compacted.ForceFull()SetFileStore(fs *FileStore)
}

Plan,PlanLevel,PlanOptimize返回的都是[]CompactionGroup, 它的类型其实是 [][]string, 即一组可以并行执行Compaction操作的tsm文件路径的列表;

CompactionPlanner的默认实现 - DefaultPlanner

  • 在讲这个DefaultPlanner之前,我们先来看一下一个tsm文件的命名:000001-01.tsm,前面的000001被称为Generation, 后面的01被 称为Sequence number,也被称为Level
  • tsmGeneration类型介绍: 它封装了属同一个Generation的多个TSM文件
type tsmGeneration struct {id            int // Generationfiles         []FileStat //包含的tsm文件的信息, 并且这个files是按文件名从小到大排序好的parseFileName ParseFileNameFunc //这个函数用来从tsm文件名中解析出Generation和Sequence number
}

因为在compact过程中针对同一个Generation,可以对应有多个不同的sequence,比如 001-001.tsm, 001-002.tsm, 这些都属于同一Generation, 下一次压缩时这两个文件可以被客视为一个大的001-001.tsm文件,这也就是需要这个tsmGeneration的原因

  • PlanLevel: 针对某一level, 抽取出一组tsm文件组, 大概步骤为下:
  • 根据当前file_store包含的所有tsm文件,将相同generation的文件归于一类,生成tsmGenerations, 这是通过fileGenerations完成的;
  • 按level将上面得到的所有tsmGeneration分组, 最后得到的分组的成员是按tsmGenerations.Level()从大到小排列的
  • 按PlanLevel(level int)中的level过滤上面得到的tsmGeneration group
  • 将上面得到的每个tsmGeneration group中的tsmGeneratons按指定大小分堆,作chunk, 这些分为的堆中的tsm文件按堆可以被并行compact;
  • 代码有点多,也不太直观,大体上是这个思路;
func (c *DefaultPlanner) PlanLevel(level int) []CompactionGroup {... // Determine the generations from all files on disk.  We need to treat// a generation conceptually as a single file even though it may be// split across several files in sequence.// 将相同generation id的tsm文件放在一起// generations -> tsmGenerationsgenerations := c.findGenerations(true)...// 按level把tsmGenerations分组// 这些分完组后groups中的tsmGenerations的level从大到小排列的var currentGen tsmGenerationsvar groups []tsmGenerationsfor i := 0; i < len(generations); i++ {cur := generations[i]// See if this generation is orphan'd which would prevent it from being further// compacted until a final full compactin runs.if i < len(generations)-1 {if cur.level() < generations[i+1].level() {currentGen = append(currentGen, cur)continue}}if len(currentGen) == 0 || currentGen.level() == cur.level() {currentGen = append(currentGen, cur)continue}groups = append(groups, currentGen)currentGen = tsmGenerations{}currentGen = append(currentGen, cur)}if len(currentGen) > 0 {groups = append(groups, currentGen)}// Remove any groups in the wrong level// level是这个函数传进来的参数,指明要compact哪一level的file,这里作个过滤// cur.level()返回的是这个tmsGeneration中所有fileState中最小的level, 这样作// 合适吗?var levelGroups []tsmGenerationsfor _, cur := range groups {if cur.level() == level {levelGroups = append(levelGroups, cur)}}minGenerations := 4if level == 1 {//对于level至少要有8个文件,才会compactminGenerations = 8}//type CompactionGroup []stringvar cGroups []CompactionGroupfor _, group := range levelGroups {// 将每个tsmGenerations中的tsmGeneration按给定大小分堆for _, chunk := range group.chunk(minGenerations) {var cGroup CompactionGroupvar hasTombstones boolfor _, gen := range chunk {if gen.hasTombstones() {hasTombstones = true}for _, file := range gen.files {//cGroup里存需要被分组compact的file.PathcGroup = append(cGroup, file.Path)}}// 如果当前的chunk里的tsmGeneration数不够minGeneration大小,// 需要用下一个chunk来凑够这个数// hasTombstones为true, 说明有标记删除的,需要通过 compact 真正删除掉if len(chunk) < minGenerations && !hasTombstones {continue}cGroups = append(cGroups, cGroup)}}if !c.acquire(cGroups) {return nil}return cGroups
}

  • Plan(lastWrite time.Time): 针对full compaction或level >= 4的generation产生一组tsm文件组
  • 代码可以说是又臭又长,规则读起来说实话也不是完全明白;
  • fullCompaction是有时间间隔的,满足了这个时间间隔,作fullCompaction;而且需要根据一些条件作排除;
  • 如果不作fullCompaction, 那就只针对generation.level >= 4的 generations生成compaction计划;
  • 我把代码放在下面,里面有一些注释:
func (c *DefaultPlanner) Plan(lastWrite time.Time) []CompactionGroup {generations := c.findGenerations(true)for _, v := range generations {fmt.Printf("xxx | generations: %vn", v)}c.mu.RLock()forceFull := c.forceFullc.mu.RUnlock()// first check if we should be doing a full compaction because nothing has been written in a long time// fullCompact是有时间间隔的,这里作判断// 这部分处理fullCompact的情况if forceFull || c.compactFullWriteColdDuration > 0 && time.Since(lastWrite) > c.compactFullWriteColdDuration && len(generations) > 1 {// Reset the full schedule if we planned because of it.if forceFull {c.mu.Lock()c.forceFull = falsec.mu.Unlock()}var tsmFiles []stringvar genCount intfor i, group := range generations {var skip bool// Skip the file if it's over the max size and contains a full block and it does not have any tombstonesif len(generations) > 2 && group.size() > uint64(maxTSMFileSize) &&c.FileStore.BlockCount(group.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock &&!group.hasTombstones() {skip = true}// compressed files.if i < len(generations)-1 {if generations[i+1].level() <= 3 {skip = false}}if skip {continue}for _, f := range group.files {tsmFiles = append(tsmFiles, f.Path)}genCount += 1}sort.Strings(tsmFiles)// Make sure we have more than 1 file and more than 1 generationif len(tsmFiles) <= 1 || genCount <= 1 {return nil}group := []CompactionGroup{tsmFiles}if !c.acquire(group) {return nil}return group}// don't plan if nothing has changed in the filestoreif c.lastPlanCheck.After(c.FileStore.LastModified()) && !generations.hasTombstones() {return nil}c.lastPlanCheck = time.Now()// If there is only one generation, return early to avoid re-compacting the same file// over and over again.if len(generations) <= 1 && !generations.hasTombstones() {return nil}// Need to find the ending point for level 4 files.  They will be the oldest files. We scan// each generation in descending break once we see a file less than 4.end := 0start := 0// 因为finGeneratons返回的是按level从大到小排序的// 这里找到level >= 4的截至点for i, g := range generations {if g.level() <= 3 {break}end = i + 1}// As compactions run, the oldest files get bigger.  We don't want to re-compact them during// this planning if they are maxed out so skip over any we see.var hasTombstones boolfor i, g := range generations[:end] {if g.hasTombstones() {hasTombstones = true}if hasTombstones {continue}// 下面这部分主要是跳到过大的tsm文件// Skip the file if it's over the max size and contains a full block or the generation is split// over multiple files.  In the latter case, that would mean the data in the file spilled over// the 2GB limit.if g.size() > uint64(maxTSMFileSize) &&c.FileStore.BlockCount(g.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock {start = i + 1}// This is an edge case that can happen after multiple compactions run.  The files at the beginning// can become larger faster than ones after them.  We want to skip those really big ones and just// compact the smaller ones until they are closer in size.if i > 0 {if g.size()*2 < generations[i-1].size() {start = ibreak}}}// step is how may files to compact in a group.  We want to clamp it at 4 but also stil// return groups smaller than 4.step := 4if step > end {step = end}// slice off the generations that we'll examinegenerations = generations[start:end]// 下面这些代码主要就是将generations分堆,也就是最后要将tsm文件分堆,以便并行作compaction// Loop through the generations in groups of size step and see if we can compact all (or// some of them as group)groups := []tsmGenerations{}for i := 0; i < len(generations); i += step {var skipGroup boolstartIndex := ifor j := i; j < i+step && j < len(generations); j++ {gen := generations[j]lvl := gen.level()// Skip compacting this group if there happens to be any lower level files in the// middle.  These will get picked up by the level compactors.if lvl <= 3 {fmt.Printf("xxx | lvl <= 3")skipGroup = truebreak}// Skip the file if it's over the max size and it contains a full blockif gen.size() >= uint64(maxTSMFileSize) && c.FileStore.BlockCount(gen.files[0].Path, 1) == tsdb.DefaultMaxPointsPerBlock && !gen.hasTombstones() {startIndex++continue}}if skipGroup {continue}endIndex := i + stepif endIndex > len(generations) {endIndex = len(generations)}if endIndex-startIndex > 0 {groups = append(groups, generations[startIndex:endIndex])}}if len(groups) == 0 {return nil}// With the groups, we need to evaluate whether the group as a whole can be compactedcompactable := []tsmGenerations{}for _, group := range groups {//if we don't have enough generations to compact, skip itif len(group) < 4 && !group.hasTombstones() {continue}compactable = append(compactable, group)}// All the files to be compacted must be compacted in order.  We need to convert each// group to the actual set of files in that group to be compacted.var tsmFiles []CompactionGroupfor _, c := range compactable {var cGroup CompactionGroupfor _, group := range c {for _, f := range group.files {cGroup = append(cGroup, f.Path)}}sort.Strings(cGroup)tsmFiles = append(tsmFiles, cGroup)}if !c.acquire(tsmFiles) {return nil}return tsmFiles
}

  • 针对这些compaction策略,我将一般情况用张图表明一下,它不能涵盖所有情况,只作为一般性参考:

Compation的执行

Compactor-Compaction的执行者

两个作用: 将内存的Cache(MemTable)持久化到磁盘TSM文件(SSTable), Influxdb中叫写快照 将磁盘上的多个TSM文件作merge

持久化Cache到TSM文件

Cache回顾

  • 先回顾一下Cache的构成,简单说就是个Key-Value,为了降低读写时锁的竞争,又引入了partiton(桶)的概念,每个partition里又是一个key-value的map;Key通过hash选择一个partition
  • 这里的key是series key + filed, value就是具体的存入influxdb的用户数据
  • 持久化就是将这些key-value存到磁盘,在存之前还要作encode;
  • 按influxdb代码的一贯写法,这里在写入磁盘时需要一个iterator来遍历所有的key-value

Cache的遍历

  • 上面的这些功能都通过cacheKeyIterator完成, 它提供了按key遍历的功能,并且在遍历前已经对Values(包含value和时间戳)作了列编码;
  • 这个编译过程会启动多个goroutine并行进行
  • 针对Cache中的每个key对应的values,都单独编码,结果记录在c.blocks中,Caceh中有几个key,c.blocks中就有几项
  • 对于同一个key的所有values,也不是统一编码到一块block中,每一个cacheBlock最多容纳c.size个vlaues
func (c *cacheKeyIterator) encode() {concurrency := runtime.GOMAXPROCS(0)n := len(c.ready)// Divide the keyset across each CPUchunkSize := 1idx := uint64(0)// 启动多个goroutine来作encodefor i := 0; i < concurrency; i++ {// Run one goroutine per CPU and encode a section of the key space concurrentlygo func() {// 获取Time, Float, Boolean, Unsigned, String, Iterger的编码器tenc := getTimeEncoder(tsdb.DefaultMaxPointsPerBlock)fenc := getFloatEncoder(tsdb.DefaultMaxPointsPerBlock)benc := getBooleanEncoder(tsdb.DefaultMaxPointsPerBlock)uenc := getUnsignedEncoder(tsdb.DefaultMaxPointsPerBlock)senc := getStringEncoder(tsdb.DefaultMaxPointsPerBlock)ienc := getIntegerEncoder(tsdb.DefaultMaxPointsPerBlock)defer putTimeEncoder(tenc)defer putFloatEncoder(fenc)defer putBooleanEncoder(benc)defer putUnsignedEncoder(uenc)defer putStringEncoder(senc)defer putIntegerEncoder(ienc)for {i := int(atomic.AddUint64(&idx, uint64(chunkSize))) - chunkSizeif i >= n {break}key := c.order[i]values := c.cache.values(key)for len(values) > 0 {//每次最多编码c.size个valueend := len(values)if end > c.size {end = c.size}minTime, maxTime := values[0].UnixNano(), values[end-1].UnixNano()var b []bytevar err errorswitch values[0].(type) {case FloatValue:b, err = encodeFloatBlockUsing(nil, values[:end], tenc, fenc)case IntegerValue:b, err = encodeIntegerBlockUsing(nil, values[:end], tenc, ienc)case UnsignedValue:b, err = encodeUnsignedBlockUsing(nil, values[:end], tenc, uenc)case BooleanValue:b, err = encodeBooleanBlockUsing(nil, values[:end], tenc, benc)case StringValue:b, err = encodeStringBlockUsing(nil, values[:end], tenc, senc)default:b, err = Values(values[:end]).Encode(nil)}// 更新values为剩余未编码的values = values[end:]// 每个key对应c.blocks中的一项,里面存储的是cacheBlockc.blocks[i] = append(c.blocks[i], cacheBlock{k:       key,minTime: minTime,maxTime: maxTime,b:       b,err:     err,})if err != nil {c.err = err}}// Notify this key is fully encoded// 对于每个key, 如果全部编码完成,就向这个key对应的chan中写入数据,通知其编码完成c.ready[i] <- struct{}{}}}()}
}

  • 编码结果的遍历
  • Next(): ``` func (c *cacheKeyIterator) Next() bool { //c.i的初值是 -1, 第一次调用或当前c.blocks[c.i]中已读取完,则下面的if不会进入 if c.i >= 0 && c.i < len(c.ready) && len(c.blocks[c.i]) > 0 { c.blocks[c.i] = c.blocks[c.i][1:] if len(c.blocks[c.i]) > 0 { return true } } c.i++
    if c.i >= len(c.ready) { return false }
    // 这里阻塞等待对应的key编码完成 <-c.ready[c.i] return true } 2. `read`读取: func (c *cacheKeyIterator) Read() ([]byte, int64, int64, []byte, error) { // See if snapshot compactions were disabled while we were running. select { case <-c.interrupt: c.err = errCompactionAborted{} return nil, 0, 0, nil, c.err default: }
    blk := c.blocks[c.i][0] return blk.k, blk.minTime, blk.maxTime, blk.b, blk.err } ```
  • Cache的Compaction操作:
  • 先根据cache的规模和cache产生的速度确定是否需要作流控和compact的并发度
  • 根据并发度将Cache分裂成若干个小规模Cache,每个小Cache对应一个goroutine来作compaction
  • compaction过程是通过遍历相应的cacheKeyIterator来写入文件c.writeNewFiles
  • 对于每个并发执行的c.writeNewFiles, 都对应不同的Generation, Sequence number都从0开始
// 将Cache的内容写入到 *.tsm.tmp文件中
// cache中value过多的话,会将cache作split成多个cache,并行处理,每个splited cache有自己的generation
func (c *Compactor) WriteSnapshot(cache *Cache) ([]string, error) {c.mu.RLock()enabled := c.snapshotsEnabledintC := c.snapshotsInterruptc.mu.RUnlock()if !enabled {return nil, errSnapshotsDisabled}start := time.Now()// cache.Count() 返回cache的所有的 value的个数card := cache.Count()// Enable throttling if we have lower cardinality or snapshots are going fast.// 3e6 = 3 x 10的6次方// compaction过程是否要作流控throttle := card < 3e6 && c.snapshotLatencies.avg() < 15*time.Second// Write snapshost concurrently if cardinality is relatively high.concurrency := card / 2e6if concurrency < 1 {concurrency = 1}// Special case very high cardinality, use max concurrency and don't throttle writes.if card >= 3e6 {concurrency = 4throttle = false}splits := cache.Split(concurrency)type res struct {files []stringerr   error}resC := make(chan res, concurrency)for i := 0; i < concurrency; i++ {go func(sp *Cache) {iter := NewCacheKeyIterator(sp, tsdb.DefaultMaxPointsPerBlock, intC)files, err := c.writeNewFiles(c.FileStore.NextGeneration(), 0, nil, iter, throttle)resC <- res{files: files, err: err}}(splits[i])}var err errorfiles := make([]string, 0, concurrency)for i := 0; i < concurrency; i++ {result := <-resCif result.err != nil {err = result.err}files = append(files, result.files...)}... return files, err
}

  • 遍历keyIterator,将编码后的block写入到tsm文件 writeNewFiles
  • 主要就是调用tsmWriter的方法写入文件
  • 写入文件时先写具体的block, 再写索引
  • 文件的大小或block数达到上限时,切下一个文件
func (c *Compactor) writeNewFiles(generation, sequence int, src []string, iter KeyIterator, throttle bool) ([]string, error) {// These are the new TSM files writtenvar files []stringfor {// sequence + 1, 这个sequence其实就是 levelsequence++// 这里写入的文件的命名为 *.tsm.tmp// 它在作fullCompact时被重命名为 *.tsmfileName := filepath.Join(c.Dir, c.formatFileName(generation, sequence)+"."+TSMFileExtension+"."+TmpTSMFileExtension)// Write as much as possible to this file// c.write实现了实际的写入操作err := c.write(fileName, iter, throttle)// We've hit the max file limit and there is more to write.  Create a new file// and continue.// 写入的文件大小或block数达到上限,就切下一个文件,sequence + 1if err == errMaxFileExceeded || err == ErrMaxBlocksExceeded {files = append(files, fileName)continue} else if err == ErrNoValues {// ErrNoValues意味着没有有效的value, 只有tombstoned entires, 就不写入文件// If the file only contained tombstoned entries, then it would be a 0 length// file that we can drop.if err := os.RemoveAll(fileName); err != nil {return nil, err}break} else if _, ok := err.(errCompactionInProgress); ok {// Don't clean up the file as another compaction is using it.  This should not happen as the// planner keeps track of which files are assigned to compaction plans now.return nil, err} else if err != nil {// Remove any tmp files we already completedfor _, f := range files {if err := os.RemoveAll(f); err != nil {return nil, err}}// We hit an error and didn't finish the compaction.  Remove the temp file and abort.if err := os.RemoveAll(fileName); err != nil {return nil, err}return nil, err}files = append(files, fileName)break}return files, nil
}

多个tsm文件的compaction

概述

我们先来简单讲一下这个compaction的过程,这类似于归并合并操作,每个tsm文件中的keys在其索引中都是从小到小排序的,compaction时就是将多个文件中的相同key的block合并在一起,再生成新的索引,说起来就是这么简单,但influxdb在实现时为了效率等作了一些额外的策略;

tsmBatchKeyIterator

  • 和上面的Cache的compatcon一样,这里也需要一个Iterator: tsmBatchKeyIterator, 它用来同时遍历多个tsm文件, 这个是compaction过程的精华所在

tsmBatchKeyIterator的遍历

  1. 先将各tsm文件中的第一个key对应的block一一取出
  2. 扫描1中获取到的所有每一个key,确定一个当前最小的key
  3. 从1中获取到的所有block中提取出key等于2中获取的最小key的block,存在k.blocks
  4. 对3中获取的所有block作merge, 主要是按minTime排序,这样基本就完成了一个Next的操作
  5. 具体代码如下,我在里面加了注释
func (k *tsmBatchKeyIterator) Next() bool {
RETRY:// Any merged blocks pending?if len(k.merged) > 0 {k.merged = k.merged[1:]if len(k.merged) > 0 {return true}}// Any merged values pending?if k.hasMergedValues() {k.merge()if len(k.merged) > 0 || k.hasMergedValues() {return true}}// If we still have blocks from the last read, merge themif len(k.blocks) > 0 {k.merge()if len(k.merged) > 0 || k.hasMergedValues() {return true}}// Read the next block from each TSM iterator// 读每一个tsm文件,将其第一组block都存到k.buf里,看起来是要合并排序// 每个tsm文件对应一个blocks// 这个blocks和tsm的index是一样的,是按key从小到大排序的for i, v := range k.buf {if len(v) != 0 {continue}iter := k.iterators[i]if iter.Next() {key, minTime, maxTime, typ, _, b, err := iter.Read()if err != nil {k.err = err}// This block may have ranges of time removed from it that would// reduce the block min and max time.// 这个tombstones是[]TimeRangetombstones := iter.r.TombstoneRange(key)var blk *block// k.buf[i]的类型是[]blocks -> [][]block// 下面这段逻辑,就是不断向k.buf[i]中append新的bolck// 如果k.buf[i]需要扩容,就在append时扩,扩为原有cap的二倍if cap(k.buf[i]) > len(k.buf[i]) {k.buf[i] = k.buf[i][:len(k.buf[i])+1]blk = k.buf[i][len(k.buf[i])-1]if blk == nil {blk = &block{}k.buf[i][len(k.buf[i])-1] = blk}} else {blk = &block{}k.buf[i] = append(k.buf[i], blk)}blk.minTime = minTimeblk.maxTime = maxTimeblk.key = keyblk.typ = typblk.b = bblk.tombstones = tombstonesblk.readMin = math.MaxInt64blk.readMax = math.MinInt64blockKey := key// 如果这两个key相等,说明还没有遍历完当前的blockfor bytes.Equal(iter.PeekNext(), blockKey) {iter.Next()key, minTime, maxTime, typ, _, b, err := iter.Read()if err != nil {k.err = err}tombstones := iter.r.TombstoneRange(key)var blk *blockif cap(k.buf[i]) > len(k.buf[i]) {k.buf[i] = k.buf[i][:len(k.buf[i])+1]blk = k.buf[i][len(k.buf[i])-1]if blk == nil {blk = &block{}k.buf[i][len(k.buf[i])-1] = blk}} else {blk = &block{}k.buf[i] = append(k.buf[i], blk)}blk.minTime = minTimeblk.maxTime = maxTimeblk.key = keyblk.typ = typblk.b = bblk.tombstones = tombstonesblk.readMin = math.MaxInt64blk.readMax = math.MinInt64}}if iter.Err() != nil {k.err = iter.Err()}}// Each reader could have a different key that it's currently at, need to find// the next smallest one to keep the sort ordering.// 找出当前最小的key(series key + field)// 因为k.buf中的每个blocks都是按key从小到大排好的,// 所以这里只需看每个blocks[0]var minKey []bytevar minType bytefor _, b := range k.buf {// block could be nil if the iterator has been exhausted for that fileif len(b) == 0 {continue}if len(minKey) == 0 || bytes.Compare(b[0].key, minKey) < 0 {minKey = b[0].keyminType = b[0].typ}}k.key = minKeyk.typ = minType// Now we need to find all blocks that match the min key so we can combine and dedupe// the blocks if necessary// 把key都等于上面获取的minKey的block放到k.blocks中for i, b := range k.buf {if len(b) == 0 {continue}//b[0]即为当前的k.buf[i][0], 是一个block// b是[]blockif bytes.Equal(b[0].key, k.key) {//k.blocks => []block// b => []blockk.blocks = append(k.blocks, b...)//k.buf[i]的length被reset为0, 即已有的数据被清掉k.buf[i] = k.buf[i][:0]}}if len(k.blocks) == 0 {return false}k.merge()// After merging all the values for this key, we might not have any.  (e.g. they were all deleted// through many tombstones).  In this case, move on to the next key instead of ending iteration.if len(k.merged) == 0 {goto RETRY}return len(k.merged) > 0
}

tsmBtchKeyIterator的合并

  1. 当前需要合并的block都存在k.blocks里,先将其按block.minTime排序;
  2. 判断是否需要去重,如果k.blocks中的block在[minTime, maxTime]上有重叠或者某个block有tombstones,就都需要重构这些block,需要作去重,删除,重排操作, 相当于将所有的block按minTime重新组合排序;
  3. 我们来看下关键代码,里面我添加了一些注释
func (k *tsmBatchKeyIterator) combineFloat(dedup bool) blocks {if dedup {//实现了按minTime来排序,去重for k.mergedFloatValues.Len() < k.size && len(k.blocks) > 0 {// 去除已经读取过的blockfor len(k.blocks) > 0 && k.blocks[0].read() {k.blocks = k.blocks[1:]}if len(k.blocks) == 0 {break}first := k.blocks[0]minTime := first.minTimemaxTime := first.maxTime// Adjust the min time to the start of any overlapping blocks.// 其实i可以从1开始// 为了按minTime排序,需要确定一个全局最小范围的[minTime, maxTime]for i := 0; i < len(k.blocks); i++ {if k.blocks[i].overlapsTimeRange(minTime, maxTime) && !k.blocks[i].read() {if k.blocks[i].minTime < minTime {minTime = k.blocks[i].minTime}// 将最大值减小if k.blocks[i].maxTime > minTime && k.blocks[i].maxTime < maxTime {maxTime = k.blocks[i].maxTime}}}// We have some overlapping blocks so decode all, append in order and then dedup// 按上面确定的[minTime, maxTime]在所有的blocks中捞数据for i := 0; i < len(k.blocks); i++ {if !k.blocks[i].overlapsTimeRange(minTime, maxTime) || k.blocks[i].read() {continue}var v tsdb.FloatArrayvar err errorif err = DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {k.err = errreturn nil}// Remove values we already readv.Exclude(k.blocks[i].readMin, k.blocks[i].readMax)// Filter out only the values for overlapping block// 这个Include是不是可以不用调用v.Include(minTime, maxTime)if v.Len() > 0 {// Record that we read a subset of the blockk.blocks[i].markRead(v.MinTime(), v.MaxTime())}// Apply each tombstone to the blockfor _, ts := range k.blocks[i].tombstones {v.Exclude(ts.Min, ts.Max)}k.mergedFloatValues.Merge(&v)}}// Since we combined multiple blocks, we could have more values than we should put into// a single block.  We need to chunk them up into groups and re-encode them.return k.chunkFloat(nil)}var i intfor i < len(k.blocks) {// skip this block if it's values were already readif k.blocks[i].read() {i++continue}// If we this block is already full, just add it as is// 遇到一个不full的Block就break, 那如果后续还有full的block怎么办?if BlockCount(k.blocks[i].b) >= k.size {k.merged = append(k.merged, k.blocks[i])} else {break}i++}if k.fast {for i < len(k.blocks) {// skip this block if it's values were already readif k.blocks[i].read() {i++continue}k.merged = append(k.merged, k.blocks[i])i++}}// If we only have 1 blocks left, just append it as is and avoid decoding/recodingif i == len(k.blocks)-1 {if !k.blocks[i].read() {k.merged = append(k.merged, k.blocks[i])}i++}// The remaining blocks can be combined and we know that they do not overlap and// so we can just append each, sort and re-encode.for i < len(k.blocks) && k.mergedFloatValues.Len() < k.size {if k.blocks[i].read() {i++continue}var v tsdb.FloatArrayif err := DecodeFloatArrayBlock(k.blocks[i].b, &v); err != nil {k.err = errreturn nil}// Apply each tombstone to the blockfor _, ts := range k.blocks[i].tombstones {v.Exclude(ts.Min, ts.Max)}k.blocks[i].markRead(k.blocks[i].minTime, k.blocks[i].maxTime)k.mergedFloatValues.Merge(&v)i++}k.blocks = k.blocks[i:]return k.chunkFloat(k.merged)
}

influxdb tsm文件_Influxdb中的Compaction操作相关推荐

  1. influxdb tsm文件_Influxdb中TSM文件结构解析之读写TSM

    TSM文件组成概述每个TSM文件由4部分组成,源码里给出了文件结构,我们在这里搬过来 Header, Blocks, Index, Footer ┌────────┬───────────────── ...

  2. influxdb tsm文件_利用InfluxDB+Grafana搭建Flink on YARN作业监控大屏

    Flink 从入门到精通 系列文章 前言 虽然笔者之前写过基于Prometheus PushGateway搭建Flink监控的过程,但是在我们的生产环境中,使用的是InfluxDB.InfluxDB是 ...

  3. kudu compaction操作

    与hbase相同,kudu也需要定期进行compaction操作.kudu中的compaction操作有两种,一是合并delta文件.二是将一个tablet中的多个diskRowset进行重排.下面分 ...

  4. 按照图片名称移动到文件夹中保存

    今天小编分享一个方法,按照名称将图片批量移动到文件夹中保存,操作非常简单,有需求的朋友可以来看看,相信今后在工作中遇到会大幅度提高工作效率,接下来开始具体的操作步骤. 运行软件[文件批量改名高手]切换 ...

  5. Python计算机视觉读书笔记_04:获得文件夹中所有图片文件名,并生成列表

    在图像处理中,我们经常会遇到要读取文件夹中所有图片的操作,这时,我们想要获得的是带有路径的图片文件名,接下来就记录一下如何得到存放所有图片文件名的列表. 首先,我们的图片路径如下所示,我们要获得 &q ...

  6. java中的文件_JAVA中文件的操作

    在java中,对文件(例如图片)进行操作,包括上传.修改.删除 一,文件上传 1.文件传到哪里,不仅可以保存在数据库中,也可以上传到远程服务器,文件保存的是文件的路径 2.文件上传都需要做什么?写那些 ...

  7. python读取excelsheet-python实现读取excel文件中所有sheet操作示例

    本文实例讲述了python实现读取excel文件中所有sheet操作.分享给大家供大家参考,具体如下: 表格是这样的 实现把此文件所有sheet中 标识为1 的行,取出来,存入一个字典.所有行组成一个 ...

  8. python对文件的读操作有哪些方法-Python中文件的读取和写入操作

    从文件中读取数据 读取整个文件 这里假设在当前目录下有一个文件名为'pi_digits.txt'的文本文件,里面的数据如下: 3.1415926535 8979323846 2643383279 wi ...

  9. python对文件的读操作方法有哪些-Python中文件的读取和写入操作

    从文件中读取数据 读取整个文件 这里假设在当前目录下有一个文件名为'pi_digits.txt'的文本文件,里面的数据如下: 3.1415926535 8979323846 2643383279 wi ...

最新文章

  1. STM32单片机SIM800C创客GSM短信GPRS可编程模块SDK二次开发DIY
  2. linux只提取前两个目录名,Linux技巧:介绍从目录路径获取文件名和目录前缀的方法...
  3. SpringBoot应用的集成测试
  4. ssdp协议 upnp_SSDP 简单服务发现协议
  5. linux hash 算法,识别哈希算法类型hash-identifier
  6. 初探PHP的SQL注入攻击的技术实现以及预防措施
  7. MessageFormat 格式化String
  8. android studio进度条的应用,Android Studio实现进度条效果
  9. 军工电子产品环境可靠性测试试验实验室GJB150A
  10. 展望2025多媒体技术与应用趋势
  11. 100 余个超实用网站
  12. 眼球追踪技术是VR下一个突破口?
  13. 貌似在ubuntu下架了个web服务器,上传上次的flex调色板
  14. Anaconda3最新版2022版的下载安装配置及使用教程(建议收藏,持续更新..)
  15. Linux 系统和安全
  16. 高仿360云盘android端的ui实现,(原创)高仿360云盘android端的UI实现
  17. 基于jsp+mysql+Spring+SpringMVC+mybatis的ssm妇女联合会管理系统
  18. PaddleWeekly | 量化图像感知相似度,这款工具箱超好用!
  19. python关于变量的声明
  20. 超参数优---贝叶斯优化及其改进(PBT优化)

热门文章

  1. java简述会话对象的生命周期_简述Java Web三大作用域对象
  2. 电源开关上的 | 和 O 究竟代表了什么?
  3. 什么?电路板上还要喷漆?
  4. 嵌入式必会!C语言最常用的贪心算法就这么被攻略了
  5. 机器人抓取方式,值得研究。
  6. python秒转换成小时分钟秒_新闻联播66分钟,康辉口播22分38秒,零失误
  7. python简单工厂模式_简单工厂模式-python语言实现
  8. C语言课程学籍管理课程书面报告,C语言学籍管理系统课程设计报告书
  9. php背景图片 存放位置,CSS中背景图片位置 background-position 的使用方法
  10. git 修改分支名字_大牛总结的 Git 使用技巧,写得太好了!