Cluster下的数据写入

数据写入的实现主要分析cluster/points_writer.go中的WritePoints函数的实现// WritePoints writes across multiple local and remote data nodes according the consistency level.func (w *PointsWriter) WritePoints(p *WritePointsRequest) error {

w.statMap.Add(statWriteReq, 1)

w.statMap.Add(statPointWriteReq, int64(len(p.Points)))    //2.1 先获取RetentionPolicy

if p.RetentionPolicy == "" {

db, err := w.MetaClient.Database(p.Database)        if err != nil {            return err

} else if db == nil {            return influxdb.ErrDatabaseNotFound(p.Database)

}

p.RetentionPolicy = db.DefaultRetentionPolicy

}    // 2.2 生成 shardMap

shardMappings, err := w.MapShards(p)    if err != nil {        return err

}    // Write each shard in it's own goroutine and return as soon

// as one fails.

ch := make(chan error, len(shardMappings.Points))    for shardID, points := range shardMappings.Points {

// 2.3 写入数据到Shard

go func(shard *meta.ShardInfo, database, retentionPolicy string, points []models.Point) {

ch

}(shardMappings.Shards[shardID], p.Database, p.RetentionPolicy, points)

}    // Send points to subscriptions if possible.

ok := false

// We need to lock just in case the channel is about to be nil'ed

w.mu.RLock()

select {    case w.subPoints

ok = true

default:

}

w.mu.RUnlock()    if ok {

w.statMap.Add(statSubWriteOK, 1)

} else {

w.statMap.Add(statSubWriteDrop, 1)

}    // 2.4 等待写入完成

for range shardMappings.Points {

select {        case

}

}

}    return nil}上面的函数实现主要分如下几个步骤

2.1 获取对应的RetentionPolicy

2.2 生成ShardMap, 将各个point对应到相应ShardGroup中的Shard中, 这步很关键

2.3 按ShardId不同,开启新的goroutine, 将points写入相应的Shard,可能设计对写入数据到其它的DataNode上;

2.4 等待写入完成或退出

ShardMap的生成先讲一下ShardGroup的概念

1.1 写入Influxdb的每一条数据对带有相应的time时间,每一个SharGroup都有自己的start和end时间,这个时间跨度是由用户写入时选取的RetentionPolicy时的ShardGroupDarution决定,这样每条写入的数据就必然仅属于一个确定的ShardGroup中;

主要实现在cluster/points_writer.go中的MapShards中func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error) {    // holds the start time ranges for required shard groups

timeRanges := map[time.Time]*meta.ShardGroupInfo{}

rp, err := w.MetaClient.RetentionPolicy(wp.Database, wp.RetentionPolicy)    if err != nil {        return nil, err

}    if rp == nil {        return nil, influxdb.ErrRetentionPolicyNotFound(wp.RetentionPolicy)

}    for _, p := range wp.Points {

timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil

}    // holds all the shard groups and shards that are required for writes

for t := range timeRanges {

sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)        if err != nil {            return nil, err

}

timeRanges[t] = sg

}

mapping := NewShardMapping()    for _, p := range wp.Points {

sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]

sh := sg.ShardFor(p.HashID())

mapping.MapPoint(&sh, p)

}    return mapping, nil}我们来拆解下上面函数的实现

3.1 扫描所有的points, 按时间确定我们需要多个ShardGroupfor _, p := range wp.Points {

timeRanges[p.Time().Truncate(rp.ShardGroupDuration)] = nil

}

3.2 调用w.MetaClient.CreateShardGroup, 如果ShardGroup存在直接返回ShardGroup信息,如果不存在创建,创建过程涉及到将CreateShardGroup的请求发送给MetadataServer并等待本地更新到新的MetaData数据;sg, err := w.MetaClient.CreateShardGroup(wp.Database, wp.RetentionPolicy, t)

3.3 分析ShardGroup的分配规则, 在services/meta/data.go中的CreateShardGroupfunc (data *Data) CreateShardGroup(database, policy string, timestamp time.Time) error {

...    // Require at least one replica but no more replicas than nodes.

// 确认复本数,不能大于DataNode节点总数

replicaN := rpi.ReplicaN    if replicaN == 0 {

replicaN = 1

} else if replicaN > len(data.DataNodes) {

replicaN = len(data.DataNodes)

}    // Determine shard count by node count divided by replication factor.

// This will ensure nodes will get distributed across nodes evenly and

// replicated the correct number of times.

// 根据复本数确定Shard数量

shardN := len(data.DataNodes) / replicaN    // Create the shard group.

// 创建ShardGroup

data.MaxShardGroupID++

sgi := ShardGroupInfo{}

sgi.ID = data.MaxShardGroupID

sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()

sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()    // Create shards on the group.

sgi.Shards = make([]ShardInfo, shardN)    for i := range sgi.Shards {

data.MaxShardID++

sgi.Shards[i] = ShardInfo{ID: data.MaxShardID}

}    // Assign data nodes to shards via round robin.

// Start from a repeatably "random" place in the node list.

// ShardInfo中的Owners记录了当前Shard所有复本所在DataNode的信息

// 分Shard的所有复本分配DataNode

// 使用data.Index作为基数确定开始的DataNode,然后使用 round robin策略分配

// data.Index:每次meta信息有更新,Index就会更新, 可以理解为meta信息的版本号

nodeIndex := int(data.Index % uint64(len(data.DataNodes)))    for i := range sgi.Shards {

si := &sgi.Shards[i]        for j := 0; j

nodeID := data.DataNodes[nodeIndex%len(data.DataNodes)].ID

si.Owners = append(si.Owners, ShardOwner{NodeID: nodeID})

nodeIndex++

}

}    // Retention policy has a new shard group, so update the policy. Shard

// Groups must be stored in sorted order, as other parts of the system

// assume this to be the case.

rpi.ShardGroups = append(rpi.ShardGroups, sgi)

sort.Sort(ShardGroupInfos(rpi.ShardGroups))    return nil

}

3.3 按每一个具体的point对应到ShardGroup中的一个Shard: 按point的HashID来对Shard总数取模,HashID是measurment + tag set的Hash值for _, p := range wp.Points {

sg := timeRanges[p.Time().Truncate(rp.ShardGroupDuration)]

sh := sg.ShardFor(p.HashID())

mapping.MapPoint(&sh, p)

}

....

func (sgi *ShardGroupInfo) ShardFor(hash uint64) ShardInfo {    return sgi.Shards[hash%uint64(len(sgi.Shards))]

}

数据按一致性要求写入过程简述

1.1 根据一致性要求确认需要成功写入几份switch consistency {    // 对于ConsistencyLevelAny, ConsistencyLevelOne只需要写入一份即满足一致性要求,返回客户端

case ConsistencyLevelAny, ConsistencyLevelOne:

required = 1

case ConsistencyLevelQuorum:

required = required/2 + 1

}

1.2 根据Shard.Owners对应的DataNode, 向其中的每个DataNode写入数据,如果是本机,直接调用w.TSDBStore.WriteToShard写入;如果非本机,调用err := w.ShardWriter.WriteShard(shardID, owner.NodeID, points);

1.3 写入远端失败时,数据写入HintedHandoff本地磁盘队列多次重试写到远端,直到数据过期被清理;对于一致性要求是ConsistencyLevelAny, 写入本地HintedHandoff成功,就算是写入成功;w.statMap.Add(statWritePointReqHH, int64(len(points)))

hherr := w.HintedHandoff.WriteShard(shardID, owner.NodeID, points)                if hherr != nil {

ch

}                if hherr == nil && consistency == ConsistencyLevelAny {

ch

}

1.4 等待写入超时或完成for range shard.Owners {

select {        case

w.statMap.Add(statWriteTimeout, 1)            // return timeout error to caller

return ErrTimeout        case result :=

if result.Err != nil {                if writeError == nil {

writeError = result.Err

}                continue

}

wrote++            // 写入已达到一致性要求,就立即返回

if wrote >= required {

w.statMap.Add(statWriteOK, 1)                return nil

}

}

}

HintedHandoff服务定义在services/hh/service.go中

写入HintedHandoff中的数据,按NodeID的不同写入不同的目录,每个目录下又分多个文件,每个文件作为一个segment, 命名规则就是依次递增的id, id的大小按序就是写入的时间按从旧到新排序;

hitnedhandoff.png

HintedHandoff服务会针对每一个远端DataNode创建NodeProcessor, 每个负责自己DataNode的写入, 运行在一个单独的goroutine中

在每个goroutine中,作两件事:一个是定时清理过期的数据,如果被清理掉的数据还没有成功写入到远端,则会丢失;二是从文件读取数据写入到远端;func (n *NodeProcessor) run() {

defer n.wg.Done()

...    for {

select {        case

case

n.Logger.Printf("failed to purge for node %d: %s", n.nodeID, err.Error())

}        case

limiter := NewRateLimiter(n.RetryRateLimit)            for {

c, err := n.SendWrite()                if err != nil {                    if err == io.EOF {                        // No more data, return to configured interval

currInterval = time.Duration(n.RetryInterval)

} else {

currInterval = currInterval * 2

if currInterval > time.Duration(n.RetryMaxInterval) {

currInterval = time.Duration(n.RetryMaxInterval)

}

}                    break

}                // Success! Ensure backoff is cancelled.

currInterval = time.Duration(n.RetryInterval)                // Update how many bytes we've sent

limiter.Update(c)                // Block to maintain the throughput rate

time.Sleep(limiter.Delay())

}

}

}

}数据的本地存储和读取

5.1 定义在services/hh/queue.go,所有的segment file在内存中组织成一个队列,读从head指向的segment读取,写入到tail指向的segment, 每个segment文件的最后8字节记录当前segment文件已经读到什么位置

5.2 清理,当这个segment文件内容都发送完当前文件会被删除,周期性清理每次只会check当前head指向的segment是否需要清理掉

作者:扫帚的影子

链接:https://www.jianshu.com/p/6a94486b2daa

influxdb数据过期_Influxdb Cluster下的数据写入相关推荐

  1. influxdb数据过期_InfluxDB 学习笔记

    InfluxDB 是什么 InfluxDB 是用Go语言编写的一个开源分布式时序.事件和指标数据库,无需外部依赖. InfluxDB在DB-Engines的时序数据库类别里排名第一. 重要特性极简架构 ...

  2. 数据备份 linux,linux下的数据备份

    制定备份策略 将原始设备转储到文件,或从文件恢复原始设备 执行部分备份和手工备份 检验备份文件的完整性 从备份部分地或完全恢复文件系统 完善的备份是系统管理的必要部分,但是决定对什么进行备份以及何时和 ...

  3. linux 数据绘图软件,linux下的数据绘图工具-gnuplot

    Gnuplot 是一种免费分发的绘图工具,可以移植到各种主流平台.它可以下列两种模式之一进行操作:当需要调整和修饰图表使其正常显示时,通过在 gnuplot 提示符中发出命令,可以在交互模式下操作该工 ...

  4. 跳过数据准备,下秒数据让飞书维格表

    随着业务场景的多元化发展,消费者需求的个性化,海量数据暴增.数字化时代,传统的生产工具已经无法跟上时代的步伐,传统办公软件也无法满足企业的多元化需求,能够提升业务效率的软件产品成了企业数智化转型的首选 ...

  5. 跳过数据准备,下秒数据让飞书维格表数据应用更高效

    跳过数据准备,下秒数据让飞书&维格表数据应用更高效 随着业务场景的多元化发展,消费者需求的个性化,海量数据暴增.数字化时代,传统的生产工具已经无法跟上时代的步伐,传统办公软件也无法满足企业的多 ...

  6. 万字长文!深度剖析《数据安全法》下多方数据协同应用和隐私计算发展趋势

    本文作者:程勇 <数据安全法>的表决通过标志着国家鼓励数据依法合规利用.保障数据依法有序流通,明确国家实施大数据战略,推动以数据为关键生产要素的数字经济发展.<数据安全法>强调 ...

  7. 下秒数据入驻用友云生态布局,全面升级数智化,释放数据价值

    下秒数据受邀入驻用友云生态,与用友共创大数据服务解决方案.用友作为全球领先的企业云服务与软件提供商,在开放的商业模式下与伙伴携手共创,让伙伴与用友的集成应用更智能高效.合作更紧密融合.并以平台化.生态 ...

  8. 下拉数据过多,超过3000条浏览器卡顿

    浏览器是单线程,浏览器的处理和渲染能力都很差,碰到下拉框数据超过3000条就会有卡顿,基于element-UI的select我做了一个下拉框筛选的封装,可以控制前台显示下拉框中数据的数量 父组件使用案 ...

  9. 大数据技术基础_网易大数据体系之时序数据技术

    分享嘉宾:范欣欣 网易大数据技术专家 编辑整理:王吉东 内容来源:AI科学前沿大会 出品社区:DataFun 注:欢迎转载,转载请注明出处. 本次分享内容: 时序数据平台主要业务场景 时序数据平台体系 ...

最新文章

  1. nginx下启动php-fpm相关错误信息集锦(长期补充)
  2. gz键盘增强小工具_这些不起眼的Mac小工具,能让你的Macbook效率倍增!
  3. 图形处理(十三)基于可变形模板的三维人脸重建-学习笔记
  4. 不允许创建临时变量交换两个变量的内容
  5. 【蓝桥杯单片机】NE555在CT107D上的使用
  6. 國外空間亂碼解決方法
  7. 网络博客营销之博客设置和优化
  8. 他一篇论文未发博士毕业!中科院最年轻院士入职浙大!
  9. linux系统uptime解读,Linux中的uptime命令详解
  10. 网络流四种主流算法时间复杂度分析
  11. 对COM组件的调用返回了错误HRESULT E_FAIL
  12. 基于Web的代码编辑器 Ace的使用
  13. java加按钮_剪辑大神都在用的加字幕神器,你知道嘛
  14. 永恒之塔总是服务器未响应,《剑网3》《永恒之塔》怀旧服刚开上演“冲级热”,八月怀旧游戏集体搞事...
  15. frp内网穿透疑难杂症【1】do http proxy request [host:www.xxx.xxx] error: no root found: www.xxx.xxx
  16. Python如何打印出26个大写字母和26个小写字母
  17. 2个阶乘什么意思_两个阶乘符号连在一起是什么意思
  18. 全国天气查询API接口
  19. Mac SCP简单使用(Mac WinSCP)
  20. 什么是微信防火墙_【听课】第4节 什么是“合并单元”(MU)?智能站中的合并单元有什么作用?...

热门文章

  1. 重装系统后sqlserver安装失败_Windows 10八月更新再遇尴尬:安装失败 或安装后随机重启...
  2. cudnn下载_记录新电脑安装Ubuntu18.04,CUDA, cuDNN全过程
  3. struts數據庫訪問
  4. mysql hash分区 数目_mysql8 参考手册-HASH分区
  5. PolandBall and Forest(并查集)
  6. E - 嗯? 51Nod - 1432(二分)
  7. c语言试卷大全,C语言试题大全
  8. c++ 检查缓冲大小与记录大小是否匹配_后端程序员不得不会的 Nginx 转发匹配规则...
  9. python应用程序类型_python – 类型提示条件可变参数应用程序
  10. 开发转运维有什么好点的理由_芜湖好点的团购社区费用