数据的异步统计

在gnocchi的核心思想中,是通过后台的异步处理ceilometer发送过来的采样数据,然后根据存储策略定义的汇聚方式,对数据进行预处理。然后用户获取统计数据的时候,直接获取到对应的已经统计好的数据,以此来提升性能,以及减少存储的采样数据。这边主要分析下gnocchi的异步统计流程。

进程为:

/usr/bin/python2 /usr/bin/gnocchi-metricd --logfile /var/log/gnocchi/metricd.log

启动入口:

1.1 start

源码位于: gnocchi/cli.py 中的metricd中,其异步处理对象为MetricProcessor:

def metricd():
    conf = service.prepare_service()
    if (conf.storage.metric_reporting_delay <
            conf.storage.metric_processing_delay):
        LOG.error("Metric reporting must run less frequently then processing")
        sys.exit(0)

signal.signal(signal.SIGTERM, _metricd_terminate)

try:
        queues = []
        workers = []
        for worker in range(conf.metricd.workers):
            queue = multiprocessing.Queue()
            metric_worker = MetricProcessor(
                conf, worker, conf.storage.metric_processing_delay, queue)
            metric_worker.start()
            queues.append(queue)
            workers.append(metric_worker)

metric_report = MetricReporting(
            conf, 0, conf.storage.metric_reporting_delay, queues)
        metric_report.start()
        workers.append(metric_report)

for worker in workers:
            worker.join()
    except KeyboardInterrupt:
        _metricd_cleanup(workers)
        sys.exit(0)
    except Exception:
        LOG.warning("exiting", exc_info=True)
        _metricd_cleanup(workers)
        sys.exit(1)

1.2 MetricProcessor

该类主要是调用存储后端的 process_background_tasks

class MetricProcessor(MetricProcessBase):
    def __init__(self, conf, worker_id=0, interval_delay=0, queue=None):
        super(MetricProcessor, self).__init__(conf, worker_id, interval_delay)
        self.queue = queue
        self.block_size = 128

def _run_job(self):
        try:
            if self.queue:
                while not self.queue.empty():
                    self.block_size = self.queue.get()
                    LOG.debug("Re-configuring worker to handle up to %s "
                              "metrics"
, self.block_size)
            self.store.process_background_tasks(self.index, self.block_size)
        except Exception:
            LOG.error("Unexpected error during measures processing",
                      exc_info=True)

1.3 process_background_tasks

process_background_tasks代码定义于存储后端的父类驱动中:

Gnocchi/storage/__init__.py中的StorageDriver中的process_background_tasks

def process_background_tasks(self, index, block_size=128, sync=False):
    """Process background tasks for this storage.

This calls :func:`process_measures` to process new measures and
    :func:`expunge_metrics` to expunge deleted metrics.

:param index: An indexer to be used for querying metrics
    
:param block_size: number of metrics to process
    
:param sync: If True, then process everything synchronously and raise
                 on error
    
:type sync: bool
    """
    
LOG.debug("Processing new and to delete measures")
    try:
        self.process_measures(index, block_size, sync)
    except Exception:
        if sync:
            raise
        
LOG.error("Unexpected error during measures processing",
                  exc_info=True)
    LOG.debug("Expunging deleted metrics")
    try:

#这个主要是删除被删除的metric信息,并将相关采集数据删除
        self.expunge_metrics(index, sync)
    except Exception:
        if sync:
            raise
        
LOG.error("Unexpected error during deleting metrics",
                  exc_info=True)

1.4 process_measures

该方法定义于gnocchi/storage/_carbonara.py中

def process_measures(self, indexer, block_size, sync=False):

#获取当前上报的采样metric记录
    metrics_to_process = self._list_metric_with_measures_to_process(
        block_size, full=sync)
    metrics = indexer.list_metrics(ids=metrics_to_process)
    # This build the list of deleted metrics, i.e. the metrics we have
    # measures to process for but that are not in the indexer anymore.
    
deleted_metrics_id = (set(map(uuid.UUID, metrics_to_process))
                          - set(m.id for m in metrics))
    for metric_id in deleted_metrics_id:
        # NOTE(jd): We need to lock the metric otherwise we might delete
        # measures that another worker might be processing. Deleting
        # measurement files under its feet is not nice!
        
with self._lock(metric_id)(blocking=sync):

#删除未处理的并上报的采样数据,主要是metric统计和采样的上报的异步的,存在删除的时候,但是采样值还在没被统计的情况。
            self._delete_unprocessed_measures_for_metric_id(metric_id)
    for metric in metrics:
        lock = self._lock(metric.id)
        agg_methods = list(metric.archive_policy.aggregation_methods)
        # Do not block if we cannot acquire the lock, that means some other
        # worker is doing the job. We'll just ignore this metric and may
        # get back later to it if needed.
        
if lock.acquire(blocking=sync):
            try:
                LOG.debug("Processing measures for %s" % metric)

# measures 为新增加的采样的数据

#存放的路径为/var/lib/gnocchi/measure/$metric.id
                with self._process_measure_for_metric(metric) as measures:
                    # NOTE(mnaser): The metric could have been handled by
                    #               another worker, ignore if no measures.
                    
if len(measures) == 0:
                        LOG.debug("Skipping %s (already processed)"
                                  
% metric)
                        continue
                    try
:
                        with timeutils.StopWatch() as sw:

#file存储路径为/var/lib/gnocchi/$metric.id/none
                            raw_measures = (
                                self._get_unaggregated_timeserie(
                                    metric)
                            )
                            LOG.debug(
                                "Retrieve unaggregated measures "
                                "for %s in %.2fs"
                                
% (metric.id, sw.elapsed()))
                    except storage.MetricDoesNotExist:
                        try:
                            self._create_metric(metric)
                        except storage.MetricAlreadyExists:
                            # Created in the mean time, do not worry
                            
pass
                        
ts = None
                    else:
                        try:
                            ts = carbonara.BoundTimeSerie.unserialize(
                                raw_measures)
                        except ValueError:
                            ts = None
                            LOG.error(
                                "Data corruption detected for %s "
                                "unaggregated timeserie, "
                                "recreating an empty one."
                                
% metric.id)

if ts is None:
                        # This is the first time we treat measures for this
                        # metric, or data are corrupted, create a new one
                        
mbs = metric.archive_policy.max_block_size
                        ts = carbonara.BoundTimeSerie(
                            block_size=mbs,
                            back_window=metric.archive_policy.back_window)

def _map_add_measures(bound_timeserie):
                        self._map_in_thread(
                            self._add_measures,
                            ((aggregation, d, metric, bound_timeserie)
                             for aggregation in agg_methods
                             for d in metric.archive_policy.definition))

with timeutils.StopWatch() as sw:
                        ts.set_values(
                            measures,
                            before_truncate_callback=_map_add_measures,
                            ignore_too_old_timestamps=True)
                        LOG.debug(
                            "Computed new metric %s with %d new measures "
                            "in %.2f seconds"
                            
% (metric.id, len(measures), sw.elapsed()))

self._store_unaggregated_timeserie(metric,
                                                       ts.serialize())
            except Exception:
                if sync:
                    raise
                
LOG.error("Error processing new measures", exc_info=True)
            finally:
                lock.release()

1.4.1 _list_metric_with_measures_to_process

该方法定义在对应的存储后端代码中,本文以file为例:

Gnocchi/gnocchi/storage/file.py 中的,

#获取当前上报的采样metric记录

def _list_metric_with_measures_to_process(self, block_size, full=False):
    if full:
        return os.listdir(self.measure_path)
    return os.listdir(self.measure_path)[
        block_size * self.partition:block_size * (self.partition + 1)]

1.4.2 _delete_unprocessed_measures_for_metric_id

该源码位于Gnocchi/gnocchi/storage/file.py 中的,

#删除未处理的并上报的采样数据

def _delete_unprocessed_measures_for_metric_id(self, metric_id):
    files = self._list_measures_container_for_metric_id(metric_id)
    self._delete_measures_files_for_metric_id(metric_id, files)

1.4.3 _process_measure_for_metric

该源码位于Gnocchi/gnocchi/storage/file.py 中的,

#获取保存在measure/metric_id中的采样信息,处理后删除

@contextlib.contextmanager
def _process_measure_for_metric(self, metric):
    files = self._list_measures_container_for_metric_id(metric.id)
    measures = []
    for f in files:
        abspath = self._build_measure_path(metric.id, f)
        with open(abspath, "rb") as e:
            measures.extend(self._unserialize_measures(e.read()))

yield measures

self._delete_measures_files_for_metric_id(metric.id, files)

1.4.4 _get_unaggregated_timeserie

该源码位于Gnocchi/gnocchi/storage/file.py 中的,

#获取保存的未统计过的采样数据

#file存储路径为/var/lib/gnocchi/$metric.id/none

def _get_unaggregated_timeserie(self, metric):
    path = self._build_unaggregated_timeserie_path(metric)
    try:
        with open(path, 'rb') as f:
            return f.read()
    except IOError as e:
        if e.errno == errno.ENOENT:
            raise storage.MetricDoesNotExist(metric)
        raise

def _get_unaggregated_timeserie(self, metric):
    path = self._build_unaggregated_timeserie_path(metric)
    try:
        with open(path, 'rb') as f:
            return f.read()
    except IOError as e:
        if e.errno == errno.ENOENT:
            raise storage.MetricDoesNotExist(metric)
        raise

def _build_unaggregated_timeserie_path(self, metric):
    return os.path.join(self._build_metric_dir(metric), 'none')

def _store_unaggregated_timeserie(self, metric, data):
    self._atomic_file_store(
        self._build_unaggregated_timeserie_path(metric),
        data)

1.4.5 _create_metric

#根据metric生成对应agg数据的存储路径:

def _create_metric(self, metric):
    path = self._build_metric_dir(metric)
    try:
        os.mkdir(path, 0o750)
    except OSError as e:
        if e.errno == errno.EEXIST:
            raise storage.MetricAlreadyExists(metric)
        raise
    for
agg in metric.archive_policy.aggregation_methods:
        try:
            os.mkdir(self._build_metric_path(metric, agg), 0o750)
        except OSError as e:
            if e.errno != errno.EEXIST:
                raise

1.4.6 set_values

进行数据的汇聚处理

该源码位于gnocchi/carbonara.py中的 set_values

# 调用panna库,进行数据的异步处理功能

def set_values(self, values, before_truncate_callback=None,
               ignore_too_old_timestamps=False):
    if self.block_size is not None and not self.ts.empty:
        values = sorted(values, key=operator.itemgetter(0))
        first_block_timestamp = self._first_block_timestamp()
        if ignore_too_old_timestamps:
            for index, (timestamp, value) in enumerate(values):
                if timestamp >= first_block_timestamp:
                    values = values[index:]
                    break
            else
:
                values = []
        else:
            # Check that the smallest timestamp does not go too much back
            # in time.
            
smallest_timestamp = values[0][0]
            if smallest_timestamp < first_block_timestamp:
                raise NoDeloreanAvailable(first_block_timestamp,
                                          smallest_timestamp)
    super(BoundTimeSerie, self).set_values(values)
    if before_truncate_callback:
        before_truncate_callback(self)
    self._truncate()

1.4.7 _add_measures

代码位于 gnocchi/storage/_carbonara.py中

#这里就是进行最终的异步统计过程,使用第三方统计工具pandas,并根据archive policy定义的时间戳,保留最长的记录。

#并删除超过时间戳的数据

def _map_add_measures(bound_timeserie):
    self._map_in_thread(
        self._add_measures,
        ((aggregation, d, metric, bound_timeserie)
         for aggregation in agg_methods
         for d in metric.archive_policy.definition))

def _add_measures(self, aggregation, archive_policy_def,
                  metric, timeserie):
    with timeutils.StopWatch() as sw:
        ts = self._get_measures_timeserie(metric, aggregation,
                                          archive_policy_def.granularity,
                                          timeserie.first, timeserie.last)
        LOG.debug("Retrieve measures"
                  "for %s/%s/%s in %.2fs"
                  
% (metric.id, aggregation, archive_policy_def.
                     granularity, sw.elapsed()))
    ts.update(timeserie)
    with timeutils.StopWatch() as sw:
        for key, split in ts.split():
            self._store_metric_measures(metric, key, aggregation,
                                        archive_policy_def.granularity,
                                        split.serialize())
        LOG.debug("Store measures for %s/%s/%s in %.2fs"
                  
% (metric.id, aggregation,
                     archive_policy_def.granularity, sw.elapsed()))

if ts.last and archive_policy_def.timespan:
        with timeutils.StopWatch() as sw:
            oldest_point_to_keep = ts.last - datetime.timedelta(
                seconds=archive_policy_def.timespan)
            self._delete_metric_measures_before(
                metric, aggregation, archive_policy_def.granularity,
                oldest_point_to_keep)
            LOG.debug("Expire measures for %s/%s/%s in %.2fs"
                      
% (metric.id, aggregation,
                         archive_policy_def.granularity, sw.elapsed()))

gnocchi-采样数据存储流程分析(002)--数据的异步统计相关推荐

  1. 面向智能电网的电力大数据存储与分析应用

    面向智能电网的电力大数据存储与分析应用 崔立真1, 史玉良1, 刘磊1, 赵卓峰2, 毕艳冰3 1. 山东大学计算机科学与技术学院,山东 济南 250101 2. 北方工业大学云计算研究中心,北京 1 ...

  2. 轨迹时空数据存储对比分析

    轨迹时空数据存储对比分析 背景 最近有一批车辆的轨迹数据需要用postgresql管理起来.数据格式相对比较简单,就是一堆csv文件,每行一个点,包括x,y,t和其他的一些速度,方向属性信息. 方案对 ...

  3. 给我一个西门子plc采集大数据存储与分析方案

    对于西门子PLC采集大数据存储与分析方案,下面是一个建议: 数据采集: 在PLC中设置数据采集程序,以记录关键数据并定期发送到数据存储仓库. 数据存储: 使用大数据存储技术,例如 Hadoop.Spa ...

  4. 【SemiDrive源码分析】【MailBox核间通信】46 - Android侧 RPMSG_IPCC_RPC驱动分析(下) 之 RPMSG_IPCC_RPC驱动初始化、数据收发流程分析

    [SemiDrive源码分析][MailBox核间通信]46 - Android侧 RPMSG_IPCC_RPC驱动分析(下) 之 RPMSG_IPCC_RPC驱动初始化.数据收发流程分析 三. rp ...

  5. 弘易信泰,企业级SAAS数据存储技术分析

    前言 "大数据" 通常指的是那些数量巨大.难于收集.处理.分析的数据集,指那些在传统基础设施中长期保存的数据,大数据存储将这些数据集持久化到计算机中.行政易作为一款SaaS软件产品 ...

  6. 5 Android数据存储 任务二 应用程序数据文件夹里的文件读写 ,

    Android中提供了两个方法用来打开应用程序的数据文件夹IO流. 1.FileInputStream openFileInput(String name):参数name表示某个文件名,该方法用于打开 ...

  7. java sql变更存储,MySQL更改数据库数据存储目录,mysql数据存储

    MySQL更改数据库数据存储目录,mysql数据存储 MySQL数据库默认的数据库文件位于 /var/lib/mysql 下,有时候由于存储规划等原因,需要更改 MySQL 数据库的数据存储目录.下文 ...

  8. 数据存储(1):从数据存储看人类文明-数据存储器发展历程

    传统文本存储 泥版/钟鼎/甲骨/莎草纸/羊皮纸等文字存储 传统的考古学家和历史学家认为,楔形文字起源于美索不达米亚特殊的渔猎生活方式.这是较为通行的看法,西方的各种百科全书大都持这一观点.约在公元前3 ...

  9. BigData NoSQL —— ApsaraDB HBase数据存储与分析平台概览

    一.引言 时间到了2019年,数据库也发展到了一个新的拐点,有三个明显的趋势: 越来越多的数据库会做云原生(CloudNative),会不断利用新的硬件及云本身的优势打造CloudNative数据库, ...

最新文章

  1. paddle中的自动求解梯度 : autograd.backward
  2. 使用Troll对ARM Cortex-M处理器进行系统内核调试
  3. Python--三元运算与lambda表达式
  4. 用户自定义类型(User-defined Type)参数的传递
  5. java:蓝桥杯,矩形面积交
  6. 7-8垃圾箱分布_您认为有关垃圾收集的7件事-完全错了
  7. SQLExecption:Operation not allowed after ResultSet closed解决办法
  8. 数据结构Java版之查找算法(三)
  9. 如何使用内联onclick属性停止事件传播?
  10. wampserver的下载与安装配置
  11. 汉字区位码转换为“汉字ASCII码“
  12. 谷歌浏览器 performance 详解
  13. uboot源码阅读(二)什么是江湖,链接文件u-boot.lds
  14. 1186: 零起点学算法93——改革春风吹满地(C)
  15. win10 设置为静态ip地址
  16. layui ztree 实现下拉树
  17. 电磁场主要应用在哪些领域
  18. RHCE(tuend,stratis,vdo)
  19. 贝叶斯神经网络----从贝叶斯准则到变分推断
  20. 阿里巴巴私有化:B2B夕阳西下 马云布局大阿里

热门文章

  1. 企业数据无忧 飞客功不可没
  2. 阿里P7晒工资条,看完真的扎心了……
  3. elementary os安装后配置
  4. 生产者与消费者(夜王与守夜人之间的斗争)
  5. 电力电子技术复习笔记2
  6. 第三十七章 SQL函数 CURRENT_TIMESTAMP
  7. 图解八道经典指针笔试题
  8. 计算机主机面板上的reset,一但中了IE窗口炸弹马上按下主机面板上的Reset键,重起计算机是对的吗...
  9. 小学教师资格证笔试答题模板
  10. 区块链量化投资系列课程(4) - 动态平衡策略