Source code for caspia.toolbox.monitor.store

import abc
import asyncio
import logging
from copy import copy

from .stores import store_autocreate

logger = logging.getLogger(__name__)


[docs]class Metric: def __init__(self, name, value_type, collect=False, labels=None): self.name = name self.labels = labels or dict() self.value_type = value_type self.collect = collect self.reset()
[docs] def reset(self): self.value = 0
[docs]class MetricStore(metaclass=abc.ABCMeta): def __init__(self, interval=1.0, loop=None): self._loop = loop or asyncio.get_event_loop() self._interval = interval self._metrics = dict() self._collect_stats_task = None async def _collect_stats_periodically(self): while True: await asyncio.sleep(self._interval) for metric in self._metrics.values(): if metric.collect: self._record(metric, metric.value, metric.labels) metric.reset()
[docs] def register(self, metric_name, value_type, labels=None, collect=False): labels = labels or dict() if metric_name in self._metrics: logger.error('Metric %r is already registered', metric_name) return self._metrics[metric_name] = Metric(metric_name, value_type, labels=labels, collect=collect)
[docs] def record(self, metric_name, value, labels): if not self._collect_stats_task: self._collect_stats_task = asyncio.ensure_future(self._collect_stats_periodically()) if metric_name not in self._metrics: logger.error('Unknown metric %r.', metric_name) return metric = self._metrics[metric_name] try: value = self._prepare_value(metric, value) if metric.collect: metric.value += value else: all_labels = copy(metric.labels) all_labels.update(labels) self._record(metric, value, all_labels) except Exception as e: # pylint: disable=broad-except logger.error('Failure when recording metric %r: %r:', metric_name, e)
def _prepare_value(self, metric, value): if isinstance(value, dict): value['value'] = self._prepare_value(metric, value['value']) return value elif metric.value_type == 'float': return float(value) elif metric.value_type == 'string': return str(value) elif metric.value_type == 'boolean': return bool(value) elif metric.value_type == 'integer': return int(value) else: raise RuntimeError('Unsupported value type %r' % metric.value_type) @abc.abstractmethod def _record(self, metric, value, labels): pass async def __aenter__(self): return self async def __aexit__(self, *args): try: if self._collect_stats_task: self._collect_stats_task.cancel() await self._collect_stats_task except asyncio.CancelledError: pass
_store = None
[docs]def register_store(store): global _store _store = store
[docs]def get_store(): global _store if _store is None: _store = store_autocreate() return _store