数据写入流程分析

本篇不涉及存储层的写入,只分析写入请求的处理流程

Influxdb名词介绍

如果想搞清楚Influxdb数据写入流程,Influxdb本身的用法和其一些主要的专用词还是要明白是什么意思,比如measurement, field key,field value, tag key, tag value, tag set, line protocol, point, series, query, retention policy等;

分析入口

我们还是以http写请求为入口来分析,在httpd/handler.go中创建Handler时有如下代码:

Route{

"write", // Data-ingest route.

"POST", "/write", true, writeLogEnabled, h.serveWrite,

}

因此对写入请求的处理就在函数 func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user meta.User)中。

Handler.serveWrite流程梳理:

2.1 获取写入的db并判断db是否存在

database := r.URL.Query().Get("db")

if database == "" {

h.httpError(w, "database is required", http.StatusBadRequest)

return

}

if di := h.MetaClient.Database(database); di == nil {

h.httpError(w, fmt.Sprintf("database not found: %q", database), http.StatusNotFound)

return

}

2.2 权限验证

if h.Config.AuthEnabled {

if user == nil {

h.httpError(w, fmt.Sprintf("user is required to write to database %q", database), http.StatusForbidden)

return

}

if err := h.WriteAuthorizer.AuthorizeWrite(user.ID(), database); err != nil {

h.httpError(w, fmt.Sprintf("%q user is not authorized to write to database %q", user.ID(), database), http.StatusForbidden)

return

}

}

2.3 获取http请求的body部分,如需gzip解压缩则解压,并且作body size的校验,因为有body size大小限制

body := r.Body

if h.Config.MaxBodySize > 0 {

body = truncateReader(body, int64(h.Config.MaxBodySize))

}

...

_, err := buf.ReadFrom(body)

2.4 从http body中解析出 points

points, parseError := models.ParsePointsWithPrecision(buf.Bytes(), time.Now().UTC(),

r.URL.Query().Get("precision"))

2.5 将解析出的points写入db

h.PointsWriter.WritePoints(database, r.URL.Query().Get("rp"), consistency, user, points);

Points的解析

将http body解析成Points是写入前的最主要的一步, 相关内容定义在 models/points.go中;

我们先来看一下一条写入语句是什么样子的: insert test_mea_1,tag1=v1,tag2=v2 cpu=1,memory=10

其中test_mea_1是measurement, tag key是tag1和tag2, 对应的tag value是v1和v2, field key是cpu和memory, field value是1和10;

先来看下point的定义,它实现了Point interface

type point struct {

time time.Time

//这个 key包括了measurement和tag set, 且tag set是排序好的

key []byte

// text encoding of field data

fields []byte

// text encoding of timestamp

ts []byte

// cached version of parsed fields from data

cachedFields map[string]interface{}

// cached version of parsed name from key

cachedName string

// cached version of parsed tags

cachedTags Tags

//用来遍历所有的field

it fieldIterator

}

解析出Points

func ParsePointsWithPrecision(buf []byte, defaultTime time.Time, precision string) ([]Point, error) {

points := make([]Point, 0, bytes.Count(buf, []byte{'\n'})+1)

var (

pos int

block []byte

failed []string

)

for pos < len(buf) {

pos, block = scanLine(buf, pos)

pos++

...

pt, err := parsePoint(block[start:], defaultTime, precision)

if err != nil {

failed = append(failed, fmt.Sprintf("unable to parse '%s': %v", string(block[start:]), err))

} else {

points = append(points, pt)

}

}

return points, nil

}

这里的解析并没有用正则之类的方案,纯的字符串逐次扫描,这里不详细展开说了.

PointsWriter分析

定义在coordinator/points_writer.go中

主要负责将数据写入到本地的存储,我们重点分析下WritePointsPrivileged

func (w *PointsWriter) WritePointsPrivileged(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error {

....

//将point按time对应到相应的Shar上, 这个对应关系存储在shardMappings里, 这个MapShareds我们后面会分析

shardMappings, err := w.MapShards(&WritePointsRequest{Database: database, RetentionPolicy: retentionPolicy, Points: points})

if err != nil {

return err

}

// Write each shard in it's own goroutine and return as soon as one fails.

ch := make(chan error, len(shardMappings.Points))

for shardID, points := range shardMappings.Points {

// 每个 Shard启动一个goroutine作写入操作, 真正的写入操作w.writeToShard

go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {

err := w.writeToShard(shard, database, retentionPolicy, points)

if err == tsdb.ErrShardDeletion {

err = tsdb.PartialWriteError{Reason: fmt.Sprintf("shard %d is pending deletion", shard.ID), Dropped: len(points)}

}

ch

}(shardMappings.Shards[shardID], database, retentionPolicy, points)

}

...

// 写入超时会return ErrTimeout

timeout := time.NewTimer(w.WriteTimeout)

defer timeout.Stop()

for range shardMappings.Points {

select {

case

return ErrWriteFailed

case

atomic.AddInt64(&w.stats.WriteTimeout, 1)

// return timeout error to caller

return ErrTimeout

case err :=

if err != nil {

return err

}

}

}

return err

}

Point到Shard的映谢

3.1 先根据point的time找到对应的ShardGroup, 没有就创建新的ShardGroup;

3.2 按Point的key(measurement + tag set取hash)来散

sgi.Shards[hash%uint64(len(sgi.Shards))]

influxdb 插入数据_Influxdb 数据写入流程相关推荐

  1. influxdb 插入数据_influxdb 插入数据遇到的坑

    partial write: max-values-per-tag limit exceeded 这个问题可能会出现较早的版本,有些版本限定了tag的数目,不能超过10w.过多tag会导致的问题在前面 ...

  2. influxdb 插入数据_Influx Sql系列教程五:insert 添加数据

    接下来开始进入influxdb的curd篇,首先我们看一下如何添加数据,也就是insert的使用姿势 在进入本篇之前,对于不了解什么是retention policy, tag, field的同学,有 ...

  3. influxdb源码解析-数据写入细节

    前言 ~~  这是一个分析inlfuxdb源码的系列.在此之前,已经分析了数据的基本模型,以及写入流程.在上一章数据写入部分,我们分析的是数据写入的基本流程,怎么从一个http的请求解析数据,然后计算 ...

  4. HBase - 数据写入流程解析

    本文由  网易云 发布. 作者:范欣欣 本篇文章仅限内部分享,如需转载,请联系网易获取授权. 众所周知,HBase默认适用于写多读少的应用,正是依赖于它相当出色的写入性能:一个100台RS的集群可以轻 ...

  5. hfds_HFDS的数据写入流程

    1.HFDS的数据写入流程的基本参数 首先了解数据写入过程中,什么是block, packet, chunk 1.block:数据块,当上传的文件太大时, 就需要分块,一个块默认设置时128M, 在客 ...

  6. Apache Iceberg核心原理分析文件存储及数据写入流程

    点击上方蓝色字体,选择"设为星标" 回复"面试"获取更多惊喜 全网最全大数据面试提升手册! 第一部分:Iceberg文件存储格式 Apache Iceberg作 ...

  7. mysql批量写入redis_如何高效地向Redis插入大量的数据(推荐)

    最近有个哥们在群里问,有一个日志,里面存的是IP地址(一行一个),如何将这些IP快速导入到Redis中. 我刚开始的建议是Shell+redis客户端. 今天,查看Redis官档,发现文档的首页部分( ...

  8. grafana模板_EMQ X + InfluxDB + Grafana:物联网数据监控可视化方案

    本文以常见物联网使用场景为例,介绍了如何利用 EMQ X 消息中间件与开源数据可视化方案 InfluxDB + Grafana ,将物联网设备大量基于时序的数据便捷地展示出来. 在物联网项目中接入平台 ...

  9. 基于InfluxDB+Grafana打造大数据监控利器--转

    这是一个大数据爆发的时代.面对信息的激流.多元化数据的涌现,我们在获取.存储.传输.理解.分析.应用.维护大数据时,无疑需要一种便捷的信息交流通道,以便快速.有效.准确地理解和驾驭这个过程.本文将通过 ...

最新文章

  1. 第十六天-企业应用架构模式-离线并发模式
  2. Oracle11g服务详细介绍及哪些服务是必须开启的?
  3. MFC的静态库.lib、动态库.dll(包含引入库.lib)以及Unicode库示例
  4. DM8168 --交叉编译ARM版 Qt (qt-everywhere-opensource-src-4.8.4)
  5. .net random伪随机数
  6. Oracle 分页查询
  7. Bootstrap框架(二)
  8. 软件核心研发迎来又一春!
  9. car-like robot运动模型及应用分析(图片版)
  10. 编译原理教程_8 静态语义分析和中间代码生成
  11. HelloWorld
  12. Js 嵌套if选择结构
  13. 加入共享宽带,让你的闲置宽带循环利用再变现
  14. Ubuntu系统下有效的安装gcc/icc
  15. java会员卡管理系统下载_基于jsp的会员卡管理系统-JavaEE实现会员卡管理系统 - java项目源码...
  16. Python数据分析-NumPy模块-选取数组元素
  17. 游戏辅助制作核心--植物大战僵尸逆向之太阳花加速生产阳光(三)
  18. xampp mysql远程连接_XAMPP mysql远程连接
  19. 基于POA搭建ETH联盟链
  20. k8s之PV以及PVC

热门文章

  1. ios realm 文件_iOS数据持久化之-Realm使用深入详解篇
  2. 中国特殊钢行业市场供需与战略研究报告
  3. JS的字符串操作和各种格式转换
  4. 新风作浪博客学习(八)代码实现UIPickerView .
  5. python排序函数
  6. 培养 逻辑思维和抽象能力
  7. 【GD32F427开发板试用】硬件SPI通信驱动CH376芯片,用单片机实现U盘数据下载
  8. 全球与中国人参营养品市场深度研究分析报告
  9. 多伦多计算机科学大学,多伦多大学计算机科学开设了哪些课程
  10. 对STIX2.0标准12个构件的解读(续)——对STIX2.0官方文档的翻译