import abc
import asyncio
import logging
from copy import copy
from .stores import store_autocreate
logger = logging.getLogger(__name__)
[docs]class Metric:
def __init__(self, name, value_type, collect=False, labels=None):
self.name = name
self.labels = labels or dict()
self.value_type = value_type
self.collect = collect
self.reset()
[docs] def reset(self):
self.value = 0
[docs]class MetricStore(metaclass=abc.ABCMeta):
def __init__(self, interval=1.0, loop=None):
self._loop = loop or asyncio.get_event_loop()
self._interval = interval
self._metrics = dict()
self._collect_stats_task = None
async def _collect_stats_periodically(self):
while True:
await asyncio.sleep(self._interval)
for metric in self._metrics.values():
if metric.collect:
self._record(metric, metric.value, metric.labels)
metric.reset()
[docs] def register(self, metric_name, value_type, labels=None, collect=False):
labels = labels or dict()
if metric_name in self._metrics:
logger.error('Metric %r is already registered', metric_name)
return
self._metrics[metric_name] = Metric(metric_name, value_type, labels=labels, collect=collect)
[docs] def record(self, metric_name, value, labels):
if not self._collect_stats_task:
self._collect_stats_task = asyncio.ensure_future(self._collect_stats_periodically())
if metric_name not in self._metrics:
logger.error('Unknown metric %r.', metric_name)
return
metric = self._metrics[metric_name]
try:
value = self._prepare_value(metric, value)
if metric.collect:
metric.value += value
else:
all_labels = copy(metric.labels)
all_labels.update(labels)
self._record(metric, value, all_labels)
except Exception as e: # pylint: disable=broad-except
logger.error('Failure when recording metric %r: %r:', metric_name, e)
def _prepare_value(self, metric, value):
if isinstance(value, dict):
value['value'] = self._prepare_value(metric, value['value'])
return value
elif metric.value_type == 'float':
return float(value)
elif metric.value_type == 'string':
return str(value)
elif metric.value_type == 'boolean':
return bool(value)
elif metric.value_type == 'integer':
return int(value)
else:
raise RuntimeError('Unsupported value type %r' % metric.value_type)
@abc.abstractmethod
def _record(self, metric, value, labels):
pass
async def __aenter__(self):
return self
async def __aexit__(self, *args):
try:
if self._collect_stats_task:
self._collect_stats_task.cancel()
await self._collect_stats_task
except asyncio.CancelledError:
pass
_store = None
[docs]def register_store(store):
global _store
_store = store
[docs]def get_store():
global _store
if _store is None:
_store = store_autocreate()
return _store