您的位置:首页 > 其它

gnocchi-采样数据存储流程分析(002)--数据的异步统计

2016-05-20 14:28 489 查看

1 数据的异步统计

在gnocchi的核心思想中,是通过后台的异步处理ceilometer发送过来的采样数据,然后根据存储策略定义的汇聚方式,对数据进行预处理。然后用户获取统计数据的时候,直接获取到对应的已经统计好的数据,以此来提升性能,以及减少存储的采样数据。这边主要分析下gnocchi的异步统计流程。

进程为:

/usr/bin/python2 /usr/bin/gnocchi-metricd --logfile /var/log/gnocchi/metricd.log

启动入口:

1.1 start

源码位于: gnocchi/cli.py 中的metricd中,其异步处理对象为MetricProcessor:

def metricd():

conf = service.prepare_service()

if (conf.storage.metric_reporting_delay <

conf.storage.metric_processing_delay):

LOG.error("Metric reporting must run less frequently then processing")

sys.exit(0)

signal.signal(signal.SIGTERM, _metricd_terminate)

try:

queues = []

workers = []

for worker
in
range(conf.metricd.workers):

queue = multiprocessing.Queue()

metric_worker = MetricProcessor(

conf, worker, conf.storage.metric_processing_delay, queue)

metric_worker.start()

queues.append(queue)

workers.append(metric_worker)

metric_report = MetricReporting(

conf, 0, conf.storage.metric_reporting_delay, queues)

metric_report.start()

workers.append(metric_report)

for worker
in
workers:

worker.join()

except KeyboardInterrupt:

_metricd_cleanup(workers)

sys.exit(0)

except Exception:

LOG.warning("exiting",
exc_info=True)

_metricd_cleanup(workers)

sys.exit(1)

1.2 MetricProcessor

该类主要是调用存储后端的 process_background_tasks

class MetricProcessor(MetricProcessBase):

def __init__(self, conf, worker_id=0, interval_delay=0,
queue=None):

super(MetricProcessor,
self).__init__(conf, worker_id, interval_delay)

self.queue = queue

self.block_size =
128

def
_run_job(self):

try:

if self.queue:

while not self.queue.empty():

self.block_size =
self.queue.get()

LOG.debug("Re-configuring worker to handle up to %s "

"metrics"
,
self.block_size)

self.store.process_background_tasks(self.index,
self.block_size)

except Exception:

LOG.error("Unexpected error during measures processing",

exc_info=True)

1.3 process_background_tasks

process_background_tasks代码定义于存储后端的父类驱动中:

Gnocchi/storage/__init__.py中的StorageDriver中的process_background_tasks

def process_background_tasks(self, index, block_size=128, sync=False):

"""Process background tasks for this storage.

This calls :func:`process_measures` to process new measures and

:func:`expunge_metrics` to expunge deleted metrics.

:param index: An indexer to be used for querying metrics

:param block_size: number of metrics to process

:param sync: If True, then process everything synchronously and raise

on error

:type sync: bool

"""

LOG.debug("Processing new and to delete measures")

try:

self.process_measures(index, block_size, sync)

except Exception:

if sync:

raise

LOG.error("Unexpected error during measures processing",

exc_info=True)

LOG.debug("Expunging deleted metrics")

try:

#这个主要是删除被删除的metric信息,并将相关采集数据删除

self.expunge_metrics(index, sync)

except Exception:

if sync:

raise

LOG.error("Unexpected error during deleting metrics",

exc_info=True)

1.4 process_measures

该方法定义于gnocchi/storage/_carbonara.py中

def process_measures(self, indexer, block_size, sync=False):

#获取当前上报的采样metric记录

metrics_to_process = self._list_metric_with_measures_to_process(

block_size, full=sync)

metrics = indexer.list_metrics(ids=metrics_to_process)

# This build the list of deleted metrics, i.e. the metrics we have

# measures to process for but that are not in the indexer anymore.

deleted_metrics_id = (set(map(uuid.UUID, metrics_to_process))

- set(m.id
for
m in
metrics))

for metric_id
in
deleted_metrics_id:

# NOTE(jd): We need to lock the metric otherwise we might delete

# measures that another worker might be processing. Deleting

# measurement files under its feet is not nice!

with
self._lock(metric_id)(blocking=sync):

#删除未处理的并上报的采样数据,主要是metric统计和采样的上报的异步的,存在删除的时候,但是采样值还在没被统计的情况。

self._delete_unprocessed_measures_for_metric_id(metric_id)

for metric in
metrics:

lock = self._lock(metric.id)

agg_methods = list(metric.archive_policy.aggregation_methods)

# Do not block if we cannot acquire the lock, that means some other

# worker is doing the job. We'll just ignore this metric and may

# get back later to it if needed.

if
lock.acquire(blocking=sync):

try:

LOG.debug("Processing measures for %s"
% metric)

# measures
为新增加的采样的数据

#存放的路径为/var/lib/gnocchi/measure/$metric.id

with self._process_measure_for_metric(metric)
as measures:

# NOTE(mnaser): The metric could have been handled by

# another worker, ignore if no measures.

if
len(measures) ==
0:

LOG.debug("Skipping %s (already processed)"

% metric)

continue

try
:

with timeutils.StopWatch()
as sw:

#file存储路径为/var/lib/gnocchi/$metric.id/none

raw_measures = (

self._get_unaggregated_timeserie(

metric)

)

LOG.debug(

"Retrieve unaggregated measures "

"for %s in %.2fs"

% (metric.id, sw.elapsed()))

except storage.MetricDoesNotExist:

try:

self._create_metric(metric)

except storage.MetricAlreadyExists:

# Created in the mean time, do not worry

pass

ts =
None

else:

try:

ts = carbonara.BoundTimeSerie.unserialize(

raw_measures)

except ValueError:

ts = None

LOG.error(

"Data corruption detected for %s "

"unaggregated timeserie, "

"recreating an empty one."

% metric.id)

if ts
is None:

# This is the first time we treat measures for this

# metric, or data are corrupted, create a new one

mbs = metric.archive_policy.max_block_size

ts = carbonara.BoundTimeSerie(

block_size=mbs,

back_window=metric.archive_policy.back_window)

def _map_add_measures(bound_timeserie):

self._map_in_thread(

self._add_measures,

((aggregation, d, metric, bound_timeserie)

for
aggregation in agg_methods

for
d in metric.archive_policy.definition))

with timeutils.StopWatch()
as sw:

ts.set_values(

measures,

before_truncate_callback=_map_add_measures,

ignore_too_old_timestamps=True)

LOG.debug(

"Computed new metric %s with %d new measures "

"in %.2f seconds"

% (metric.id,
len(measures), sw.elapsed()))

self._store_unaggregated_timeserie(metric,

ts.serialize())

except Exception:

if sync:

raise

LOG.error("Error processing new measures",
exc_info=True)

finally:

lock.release()

1.4.1 _list_metric_with_measures_to_process

该方法定义在对应的存储后端代码中,本文以file为例:

Gnocchi/gnocchi/storage/file.py 中的,

#获取当前上报的采样metric记录

def _list_metric_with_measures_to_process(self, block_size, full=False):

if full:

return os.listdir(self.measure_path)

return os.listdir(self.measure_path)[

block_size * self.partition:block_size * (self.partition +
1)]

1.4.2 _delete_unprocessed_measures_for_metric_id

该源码位于Gnocchi/gnocchi/storage/file.py
中的,

#删除未处理的并上报的采样数据

def _delete_unprocessed_measures_for_metric_id(self, metric_id):

files = self._list_measures_container_for_metric_id(metric_id)

self._delete_measures_files_for_metric_id(metric_id, files)

1.4.3 _process_measure_for_metric

该源码位于Gnocchi/gnocchi/storage/file.py
中的,

#获取保存在measure/metric_id中的采样信息,处理后删除

@contextlib.contextmanager

def _process_measure_for_metric(self, metric):

files = self._list_measures_container_for_metric_id(metric.id)

measures = []

for f in
files:

abspath = self._build_measure_path(metric.id, f)

with open(abspath,
"rb") as
e:

measures.extend(self._unserialize_measures(e.read()))

yield measures

self._delete_measures_files_for_metric_id(metric.id, files)

1.4.4 _get_unaggregated_timeserie

该源码位于Gnocchi/gnocchi/storage/file.py
中的,

#获取保存的未统计过的采样数据

#file存储路径为/var/lib/gnocchi/$metric.id/none

def _get_unaggregated_timeserie(self, metric):

path = self._build_unaggregated_timeserie_path(metric)

try:

with open(path,
'rb') as
f:

return f.read()

except IOError
as e:

if e.errno == errno.ENOENT:

raise storage.MetricDoesNotExist(metric)

raise

def _get_unaggregated_timeserie(self, metric):

path = self._build_unaggregated_timeserie_path(metric)

try:

with open(path,
'rb') as
f:

return f.read()

except IOError
as e:

if e.errno == errno.ENOENT:

raise storage.MetricDoesNotExist(metric)

raise

def _build_unaggregated_timeserie_path(self, metric):

return os.path.join(self._build_metric_dir(metric),
'none')

def _store_unaggregated_timeserie(self, metric, data):

self._atomic_file_store(

self._build_unaggregated_timeserie_path(metric),

data)

1.4.5 _create_metric

#根据metric生成对应agg数据的存储路径:

def _create_metric(self, metric):

path = self._build_metric_dir(metric)

try:

os.mkdir(path, 0o750)

except OSError
as e:

if e.errno == errno.EEXIST:

raise storage.MetricAlreadyExists(metric)

raise

for
agg in
metric.archive_policy.aggregation_methods:

try:

os.mkdir(self._build_metric_path(metric, agg),
0o750)

except OSError
as e:

if e.errno != errno.EEXIST:

raise

1.4.6 set_values

进行数据的汇聚处理

该源码位于gnocchi/carbonara.py中的
set_values

# 调用panna库,进行数据的异步处理功能

def set_values(self, values, before_truncate_callback=None,

ignore_too_old_timestamps=False):

if self.block_size
is not None
and not self.ts.empty:

values = sorted(values,
key=operator.itemgetter(0))

first_block_timestamp = self._first_block_timestamp()

if ignore_too_old_timestamps:

for index, (timestamp, value)
in enumerate(values):

if timestamp >= first_block_timestamp:

values = values[index:]

break

else
:

values = []

else:

# Check that the smallest timestamp does not go too much back

# in time.

smallest_timestamp = values[0][0]

if smallest_timestamp < first_block_timestamp:

raise NoDeloreanAvailable(first_block_timestamp,

smallest_timestamp)

super(BoundTimeSerie,
self).set_values(values)

if before_truncate_callback:

before_truncate_callback(self)

self._truncate()

1.4.7 _add_measures

代码位于 gnocchi/storage/_carbonara.py中

#这里就是进行最终的异步统计过程,使用第三方统计工具pandas,并根据archive policy定义的时间戳,保留最长的记录。

#并删除超过时间戳的数据

def _map_add_measures(bound_timeserie):

self._map_in_thread(

self._add_measures,

((aggregation, d, metric, bound_timeserie)

for aggregation
in agg_methods

for d in
metric.archive_policy.definition))

def _add_measures(self, aggregation, archive_policy_def,

metric, timeserie):

with timeutils.StopWatch()
as sw:

ts = self._get_measures_timeserie(metric, aggregation,

archive_policy_def.granularity,

timeserie.first, timeserie.last)

LOG.debug("Retrieve measures"

"for %s/%s/%s in %.2fs"

% (metric.id, aggregation, archive_policy_def.

granularity, sw.elapsed()))

ts.update(timeserie)

with timeutils.StopWatch()
as sw:

for key, split
in
ts.split():

self._store_metric_measures(metric, key, aggregation,

archive_policy_def.granularity,

split.serialize())

LOG.debug("Store measures for %s/%s/%s in %.2fs"

% (metric.id, aggregation,

archive_policy_def.granularity, sw.elapsed()))

if ts.last and
archive_policy_def.timespan:

with timeutils.StopWatch()
as sw:

oldest_point_to_keep = ts.last - datetime.timedelta(

seconds=archive_policy_def.timespan)

self._delete_metric_measures_before(

metric, aggregation, archive_policy_def.granularity,

oldest_point_to_keep)

LOG.debug("Expire measures for %s/%s/%s in %.2fs"

% (metric.id, aggregation,

archive_policy_def.granularity, sw.elapsed()))
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: