Source code for caspia.toolbox.reactive.record

# This module provides support for recording changes of `Observable`
#
#
# EXAMPLE with InfluxDB:
# from caspia.toolbox.observables import record
# record.register_store(record.InfluxDBStore(url='http://10.0.1.131:8086'))
#
# temp_is_low = temp('my-temp') < 25
# record.record('my-is-low', bool, temp_is_low)
#

import abc
import asyncio
import logging
import os
from time import time

from caspia.toolbox.influxdb import InfluxDBClient

logger = logging.getLogger(__name__)
_global_store = None
__all__ = ('Store', 'InfluxDBStore', 'register_store', 'get_store', 'record')


[docs]class Store(metaclass=abc.ABCMeta): def __init__(self): self.value_classes = dict()
[docs] def register(self, observable_name, value_cls): if observable_name in self.value_classes: raise RuntimeError('%r already registered' % observable_name) self.value_classes[observable_name] = value_cls
[docs] def record(self, observable_name, value): if observable_name not in self.value_classes: raise RuntimeError('Observable %r not registered' % observable_name) value_cls = self.value_classes[observable_name] value = value_cls(value) self._record(observable_name, value)
def _record(self, observable_name, value): pass
[docs]class InfluxDBStore(Store): def __init__(self, url=None, database=None, interval=10.0, loop=None): super().__init__() url = url or os.environ['METRICS_INFLUXDB_URL'] database = database or os.environ.get('OBSERVABLES_INFLUXDB_DATABASE', 'observable') self._loop = loop or asyncio.get_event_loop() self._client = InfluxDBClient(url, database, logger=logger, precision='ms', loop=self._loop) def _record(self, observable_name, value): fields = dict(value=value) self._client.write(observable_name, int(time() * 1000), fields=fields, tags=dict())
[docs]def register_store(store): global _global_store _global_store = store
[docs]def get_store(): if _global_store is None: raise RuntimeError('No store configured. Use `register_store(...)`') return _global_store
[docs]def record(name, value_cls, observable): store = get_store() store.register(name, value_cls) def on_change(value, **kwargs): store.record(name, value) observable.subscribe(on_change)