前言

  • influxdb安装和使用
  • influxdb概念详解1
  • influxdb概念详解2
  • influxdb源码编译
  • influxdb启动分析
  • influxdb源码分析-meta部分
  • infludb源码分析-数据写入
  • influxdb数据写入细节
  • influxdb源码解析-series
  • influxdb源码解析-inmem index
  • influxdb源码解析-tsi index
  • influxdb源码分析-Store
  • influxdb源码分析-Shard和TSM Engine Cache

这是一个influxdb源码分析系列的文章,上一章分析了Shard结构,主要是Engine。并且着重分析了Engine的Cache实现,结尾部分看到了CacheLoader使用WAL来恢复内存中的Cache。那么本篇文章就来分析一下Engine的WAL部分

WAL 的定位

WAL是(Write Ahead Log)的缩写,翻译过来叫做预写日志,是一种保证数据可靠性的方案。一般写入数据时,会先写入WAL,然后再更新本地的数据。当数据被持久化到真正用于存储它的文件上(SSTable),WAL就会被删除。

基本结构

TSM Engine的WAL位于tsdb/tsm1/wal.go。结构如下(有简化):

// WAL represents the write-ahead log used for writing TSM files.
type WAL struct {syncCount   uint64syncWaiters chan chan errormu            sync.RWMutexlastWriteTime time.Timepath stringcurrentSegmentID     intcurrentSegmentWriter *WALSegmentWriteronce    sync.Onceclosing chan struct{}syncDelay time.DurationSegmentSize intstats   *WALStatisticslimiter limiter.Fixed
}

从这个结构可以看出以下信息:

  • 基本信息,path等
  • currentSegmentID和currentSegmentWriter等对应的文件信息
  • 一些限制信息

其中currentSegmentID是wal 文件名字的一部分,是一个递增的id。用来区分wal的新旧程度。这里是很重要的,等会会解释。currentSegmentWriter是一个WALSegmentWriter,用来写文件的。

WAL Open

看一下核心的函数,首先是Open函数。在看这个函数之前,先看另外一个:查找当前目录下所有的wal文件


// segmentFileNames will return all files that are WAL segment files in sorted order by ascending ID.
func segmentFileNames(dir string) ([]string, error) {names, err := filepath.Glob(filepath.Join(dir, fmt.Sprintf("%s*.%s", WALFilePrefix, WALFileExtension)))if err != nil {return nil, err}sort.Strings(names)return names, nil
}

这里会返回一个数组,代表所有的wal 文件。格式为:_xxxx.wal,xxxx是id。注意这里返回的时候是做了排序的,字典序小的在前面,大的在后面。这里就是上面说的区分新旧程度。为什么这样做呢?这里要和时间线保持一致,比如现在对于同一个series有写入和删除,在wal里面就会存在两个entry,第一个是insert,第二个是带有delete标记的insert这样在读取wal的时候,最终的结果是后面的覆盖了前面的,数据被标记位delete

​ 然后是Open函数:

   func (l *WAL) Open() error {segments, err := segmentFileNames(l.path)if len(segments) > 0 {lastSegment := segments[len(segments)-1]id, err := idFromFileName(lastSegment)if err != nil {return err}l.currentSegmentID = idstat, err := os.Stat(lastSegment)if err != nil {return err}if stat.Size() == 0 {os.Remove(lastSegment)segments = segments[:len(segments)-1]} else {fd, err := os.OpenFile(lastSegment, os.O_RDWR, 0666)if err != nil {return err}if _, err := fd.Seek(0, io.SeekEnd); err != nil {return err}l.currentSegmentWriter = NewWALSegmentWriter(fd)// Set the correct size on the segment writeratomic.StoreInt64(&l.stats.CurrentBytes, stat.Size())l.currentSegmentWriter.size = int(stat.Size())}}return nil
}

这里会选取最后一个文件,因为最后一个文件是最新的文件,也就是active的,其他WAL是达到了写入限制,不能被写入的。

WALEntry

和其他的WAL文件一样(series,index) WAL的写入也是以entry粒度的。WAL Entry有不同类型,代表不同的操作,主要有三种:

  • WriteWALEntry
  • DeleteWALEntry
  • DeleteRangeWALEntry

这些结构有一个顶层的抽象:

type WALEntry interface {Type() WalEntryTypeEncode(dst []byte) ([]byte, error)MarshalBinary() ([]byte, error)UnmarshalBinary(b []byte) errorMarshalSize() int
}

不同的Type代表:

const (// WriteWALEntryType indicates a write entry.WriteWALEntryType WalEntryType = 0x01// DeleteWALEntryType indicates a delete entry.DeleteWALEntryType WalEntryType = 0x02// DeleteRangeWALEntryType indicates a delete range entry.DeleteRangeWALEntryType WalEntryType = 0x03
)

例如WriteWALEntry

type WriteWALEntry struct {Values map[string][]Valuesz     int
}

核心结构是一个Values的map,key是Measurement+tags+field ,value是这个series key对应的Value集合。

WAL在写入到文件时,会做Encode,变成成一个byte数组,这段逻辑由具体的实现结构自己完成。如WriteWALEntry的Encode逻辑:

func (w *WriteWALEntry) Encode(dst []byte) ([]byte, error) {// The entries values are encode as follows://// For each key and slice of values, first a 1 byte type for the []Values// slice is written.  Following the type, the length and key bytes are written.// Following the key, a 4 byte count followed by each value as a 8 byte time// and N byte value.  The value is dependent on the type being encoded.  float64,// int64, use 8 bytes, boolean uses 1 byte, and string is similar to the key encoding,// except that string values have a 4-byte length, and keys only use 2 bytes.//// This structure is then repeated for each key an value slices.//// ┌────────────────────────────────────────────────────────────────────┐// │                           WriteWALEntry                            │// ├──────┬─────────┬────────┬───────┬─────────┬─────────┬───┬──────┬───┤// │ Type │ Key Len │   Key  │ Count │  Time   │  Value  │...│ Type │...│// │1 byte│ 2 bytes │ N bytes│4 bytes│ 8 bytes │ N bytes │   │1 byte│   │// └──────┴─────────┴────────┴───────┴─────────┴─────────┴───┴──────┴───┘encLen := w.MarshalSize() // Type (1), Key Length (2), and Count (4) for each key// allocate or re-slice to correct sizeif len(dst) < encLen {dst = make([]byte, encLen)} else {dst = dst[:encLen]}// Finally, encode the entryvar n intvar curType bytefor k, v := range w.Values {switch v[0].(type) {case FloatValue:curType = float64EntryTypecase IntegerValue:curType = integerEntryTypecase UnsignedValue:curType = unsignedEntryTypecase BooleanValue:curType = booleanEntryTypecase StringValue:curType = stringEntryTypedefault:return nil, fmt.Errorf("unsupported value type: %T", v[0])}dst[n] = curTypen++binary.BigEndian.PutUint16(dst[n:n+2], uint16(len(k)))n += 2n += copy(dst[n:], k)binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(v)))n += 4for _, vv := range v {binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.UnixNano()))n += 8switch vv := vv.(type) {case FloatValue:if curType != float64EntryType {return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)}binary.BigEndian.PutUint64(dst[n:n+8], math.Float64bits(vv.value))n += 8case IntegerValue:if curType != integerEntryType {return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)}binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))n += 8case UnsignedValue:if curType != unsignedEntryType {return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)}binary.BigEndian.PutUint64(dst[n:n+8], uint64(vv.value))n += 8case BooleanValue:if curType != booleanEntryType {return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)}if vv.value {dst[n] = 1} else {dst[n] = 0}n++case StringValue:if curType != stringEntryType {return nil, fmt.Errorf("incorrect value found in %T slice: %T", v[0].Value(), vv)}binary.BigEndian.PutUint32(dst[n:n+4], uint32(len(vv.value)))n += 4n += copy(dst[n:], vv.value)default:return nil, fmt.Errorf("unsupported value found in %T slice: %T", v[0].Value(), vv)}}}return dst[:n], nil
}

格式已经给出来了。按照这个格式做编码即可。这里会区分一下field的类型,不同value的类型有点区别。但是总得逻辑是一样的。既然有encode,那么就会有decode。这个就不再细说了,是逆操作。

WAL WriteMulti和writeToLog

有了上面的基础,看一下写入WAL的逻辑。

首先是writeToLog:

func (l *WAL) writeToLog(entry WALEntry) (int, error) {// 获取bytebufferbytes := bytesPool.Get(entry.MarshalSize())// 编码b, err := entry.Encode(bytes)if err != nil {bytesPool.Put(bytes)return -1, err}encBuf := bytesPool.Get(snappy.MaxEncodedLen(len(b)))// 做snappy 压缩compressed := snappy.Encode(encBuf, b)bytesPool.Put(bytes)syncErr := make(chan error)segID, err := func() (int, error) {l.mu.Lock()defer l.mu.Unlock()// Make sure the log has not been closedselect {case <-l.closing:return -1, ErrWALCloseddefault:}// 尝试选取下一个segment,这里是因为file size达到限制。// roll the segment file if neededif err := l.rollSegment(); err != nil {return -1, fmt.Errorf("error rolling WAL segment: %v", err)}// write and syncif err := l.currentSegmentWriter.Write(entry.Type(), compressed); err != nil {return -1, fmt.Errorf("error writing WAL entry: %v", err)}select {case l.syncWaiters <- syncErr:default:return -1, fmt.Errorf("error syncing wal")}l.scheduleSync()// Update stats for current segment sizeatomic.StoreInt64(&l.stats.CurrentBytes, int64(l.currentSegmentWriter.size))l.lastWriteTime = time.Now().UTC()return l.currentSegmentID, nil}()bytesPool.Put(encBuf)if err != nil {return segID, err}// schedule an fsync and wait for it to completereturn segID, <-syncErr
}

代码有点长,没做简化,因为每一步骤都很重要。简单解释一下。

  • 首先是encode,这里会对数据做压缩,所以做了个byte pool复用 byte数组
  • 得到压缩之后的数据,开始写入到wal中,开始会尝试做一下文件滚动,这里的逻辑就是尝试新建下一个wal文件,会check一些条件,主要是wal的大小。
  • roll之后,开始写入文件。这里是写入到linux page cache,没有调用sync 方法。
  • 出发sync,强制刷盘
  • 归还byte buffer

WriteMulti

func (l *WAL) WriteMulti(values map[string][]Value) (int, error) {// 这个map 是key-> values 的mapentry := &WriteWALEntry{Values: values,}id, err := l.writeToLog(entry)if err != nil {atomic.AddInt64(&l.stats.WriteErr, 1)return -1, err}atomic.AddInt64(&l.stats.WriteOK, 1)return id, nil
}

看懂了writeToLog再看WriteMulti就很简单了,把数据构建为entry,然后调用writeToLog。这里多提一下,是不是很眼熟?在上一章Cache部分和数据写入部分,都有这个函数的身影。WriteMulti是datapoint 写入最后几步操作。可以去回顾一下。

WAL scheduleSync

在write时,调用的bufferWrite的write方法,这部分数据可能存在buffer,或者存在page cache里,没有被真正sync到磁盘上。所以会定时刷盘。把数据刷到磁盘上。

func (l *WAL) scheduleSync() {// If we're not the first to sync, then another goroutine is fsyncing the wal for us.if !atomic.CompareAndSwapUint64(&l.syncCount, 0, 1) {return}// Fsync the wal and notify all pending waitersgo func() {var timerCh <-chan time.Time// time.NewTicker requires a > 0 delay, since 0 indicates no delay, use a closed// channel which will always be ready to read from.if l.syncDelay == 0 {// Create a RW chan and close ittimerChrw := make(chan time.Time)close(timerChrw)// Convert it to a read-onlytimerCh = timerChrw} else {t := time.NewTicker(l.syncDelay)defer t.Stop()timerCh = t.C}for {select {case <-timerCh:l.mu.Lock()if len(l.syncWaiters) == 0 {atomic.StoreUint64(&l.syncCount, 0)l.mu.Unlock()return}l.sync()l.mu.Unlock()case <-l.closing:atomic.StoreUint64(&l.syncCount, 0)return}}}()
}

WAL Remove

上面看到了WAL的写入,那么这个文件什么时候删除呢?首先删除方法是有的:Remove。

func (l *WAL) Remove(files []string) error {l.mu.Lock()defer l.mu.Unlock()for _, fn := range files {l.traceLogger.Info("Removing WAL file", zap.String("path", fn))os.RemoveAll(fn)}// Refresh the on-disk size statssegments, err := segmentFileNames(l.path)if err != nil {return err}var totalOldDiskSize int64for _, seg := range segments {stat, err := os.Stat(seg)if err != nil {return err}totalOldDiskSize += stat.Size()}atomic.StoreInt64(&l.stats.OldBytes, totalOldDiskSize)return nil
}

提供文件全路径,调用os.RemoveAll,来完成文件删除。怎么删除的已经知道了,那么什么时候删除呢?

WAL的删除

这部分是一个单独的逻辑,和其他模块是联动的。这里先看一个主要的部分。先说结论,Cache被compact时,会删除对应的WAL来看一下这部分怎么实现的。

Engine compactCache

首先是对cache做compact:

// compactCache continually checks if the WAL cache should be written to disk.
func (e *Engine) compactCache() {t := time.NewTicker(time.Second)defer t.Stop()for {e.mu.RLock()quit := e.snapDonee.mu.RUnlock()select {case <-quit:returncase <-t.C:e.Cache.UpdateAge()if e.ShouldCompactCache(time.Now()) {start := time.Now()e.traceLogger.Info("Compacting cache", zap.String("path", e.path))err := e.WriteSnapshot()if err != nil && err != errCompactionsDisabled {e.logger.Info("Error writing snapshot", zap.Error(err))atomic.AddInt64(&e.stats.CacheCompactionErrors, 1)} else {atomic.AddInt64(&e.stats.CacheCompactions, 1)}atomic.AddInt64(&e.stats.CacheCompactionDuration, time.Since(start).Nanoseconds())}}}
}

这是个定时任务,会定时检查,然后调用WriteSnapshot方法(方法有删减)

func (e *Engine) WriteSnapshot() (err error) {started := time.Now()closedFiles, snapshot, err := func() (segments []string, snapshot *Cache, err error) {e.mu.Lock()defer e.mu.Unlock()if e.WALEnabled {if err = e.WAL.CloseSegment(); err != nil {return}segments, err = e.WAL.ClosedSegments()}snapshot, err = e.Cache.Snapshot()return}()return e.writeSnapshotAndCommit(log, closedFiles, snapshot)
}

WriteSnapshot的核心逻辑是两个:

  • 关闭当前所有的segment,也就是wal file
  • 对Cache做snapshot

拿到这两份数据之后,调用writeSnapshotAndCommit,来把数据写到文件里。

func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) {newFiles, err := e.Compactor.WriteSnapshot(snapshot)if err != nil {log.Info("Error writing snapshot from compactor", zap.Error(err))return err}e.mu.RLock()defer e.mu.RUnlock()if err := e.FileStore.Replace(nil, newFiles); err != nil {log.Info("Error adding new TSM files from snapshot. Removing temp files.", zap.Error(err))// Remove the new snapshot files. We will try again.for _, file := range newFiles {if err := os.Remove(file); err != nil {log.Info("Unable to remove file", zap.String("path", file), zap.Error(err))}}return err}// clear the snapshot from the in-memory cache, then the old WAL filese.Cache.ClearSnapshot(true)if e.WALEnabled {if err := e.WAL.Remove(closedFiles); err != nil {log.Info("Error removing closed WAL segments", zap.Error(err))}}return nil
}

这里面就能看到WAL什么时候被移除了,在Cache被WriteSnapShot之后,会清除掉当前的Cache。所以得到的结论是:

Cache每次在Commit时,会清除当前系统的WAL

总结

本篇文章分析一下WAL 的工作原理,什么时候创建,怎么写入,具体的用处,以及什么时候删除。分析完这个之后,接下来就是用来存储数据的TSM File,或者是我们理解的SSTable。这部分刚才其实提到了一写,最后Cache在WriteSnapshot时,为啥可以删除WAL了?因为这个WriteSnapshot就是把数据写到TSM File里,已经持久化到磁盘了,所以这里WAL的作用就不再存在了。

Influxdb源码分析-TSM Engine WAL相关推荐

  1. Influxdb源码分析-Shard和TSM Engine Cache

    前言 influxdb安装和使用 influxdb概念详解1 influxdb概念详解2 influxdb源码编译 influxdb启动分析 influxdb源码分析-meta部分 infludb源码 ...

  2. influxdb源码解析-数据写入细节

    前言 ~~  这是一个分析inlfuxdb源码的系列.在此之前,已经分析了数据的基本模型,以及写入流程.在上一章数据写入部分,我们分析的是数据写入的基本流程,怎么从一个http的请求解析数据,然后计算 ...

  3. golang gin框架源码分析(二)---- 渐入佳境 摸索Engine ServeHTTP访问前缀树真正原理

    文章目录 全系列总结博客链接 前引 golang gin框架源码分析(二)---- 渐入佳境 摸索Engine ServeHTTP访问前缀树真正远原理 1.再列示例代码 从示例代码入手 2.r.Run ...

  4. PostgreSQL源码分析

    PostgreSQL源码结构 PostgreSQL的使用形态 PostgreSQL采用C/S(客户机/服务器)模式结构.应用层通过INET或者Unix Socket利用既定的协议与数据库服务器进行通信 ...

  5. Django源码分析10:makemigrations命令概述

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-makemigrations命令概述 Django项目中的数据库管理命令就是通过makemig ...

  6. Django源码分析8:单元测试test命令浅析

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-test命令分析 Django项目中提供了,test命令行命令来执行django的单元测试,该 ...

  7. Django源码分析7:migrate命令的浅析

    django源码分析 本文环境python3.5.2,django1.10.x系列 django源码分析-migrate命令分析 Django项目中提供了,通过migrations操作数据库的结构的命 ...

  8. Django源码分析5:session会话中间件分析

    django源码分析 本文环境python3.5.2,django1.10.x系列 1.这次分析django框架中的会话中间件. 2.会话保持是目前框架都支持的一个功能,因为http是无状态协议,无法 ...

  9. tornado源码分析

    tornado源码分析 本源码为tornado1.0版本 源码附带例子helloworld import tornado.httpserver import tornado.ioloop import ...

最新文章

  1. AES加密补位填充的一个问题
  2. Web 开发在 2015 年及未来的发展趋势
  3. WebAssembly + Dapr = 下一代云原生运行时?
  4. linux下,在挂载设备之前,查看设备的文件系统类型
  5. windows 10 开启全盘瞬间索引功能
  6. go语言os.exit(1)_Go语言-信号os.Interrupt和信号syscall.SIGTERM的应用
  7. 利用border制作三角形原理
  8. 太大如何翻页固定表头_外行学 Python 爬虫 第六篇 动态翻页
  9. 如何在NEO区块链上实现信息加密
  10. DHCP原理及报文格式
  11. 安装 VS 2015 报错 kb2999226
  12. 主梁弹性模量计算_各排立杆传至梁上荷载标准值、设计值是那一个数据
  13. 【Java知识体系】Redis实用教程,深入原理
  14. 模拟按键 —— 鼠标
  15. 2010年企业级信息技术的九大应用
  16. hadoop的组件有哪些
  17. 2022学科门类汇总
  18. 服务器系统数据完全备份,2012服务器系统如何备份数据库
  19. 聚合码趋势如何?未来前景怎么样?
  20. 信息与通信工程学科面试——线性代数

热门文章

  1. 让一个技术人员主动离职的20个妙招
  2. 学java记不住单词怎么办_单词记不住怎么办?
  3. python气象科研学习路线和常用技巧
  4. 【10.22 牛客普及(三)】 牛半仙的妹子串 题解
  5. python三维数据转换成二维_Python = 48/365
  6. a标签的target属性
  7. eclipse使用小技巧集合
  8. 用PhotoShop校正歪斜的照片
  9. Treeview使用方法
  10. Java利用继承和多态来求矩形、正方形和圆形的面积与周长