# This module provides support for recording changes of `Observable`
#
#
# EXAMPLE with InfluxDB:
# from caspia.toolbox.observables import record
# record.register_store(record.InfluxDBStore(url='http://10.0.1.131:8086'))
#
# temp_is_low = temp('my-temp') < 25
# record.record('my-is-low', bool, temp_is_low)
#
import abc
import asyncio
import logging
import os
from time import time
from caspia.toolbox.influxdb import InfluxDBClient
logger = logging.getLogger(__name__)
_global_store = None
__all__ = ('Store', 'InfluxDBStore', 'register_store', 'get_store', 'record')
[docs]class Store(metaclass=abc.ABCMeta):
def __init__(self):
self.value_classes = dict()
[docs] def register(self, observable_name, value_cls):
if observable_name in self.value_classes:
raise RuntimeError('%r already registered' % observable_name)
self.value_classes[observable_name] = value_cls
[docs] def record(self, observable_name, value):
if observable_name not in self.value_classes:
raise RuntimeError('Observable %r not registered' % observable_name)
value_cls = self.value_classes[observable_name]
value = value_cls(value)
self._record(observable_name, value)
def _record(self, observable_name, value):
pass
[docs]class InfluxDBStore(Store):
def __init__(self, url=None, database=None, interval=10.0, loop=None):
super().__init__()
url = url or os.environ['METRICS_INFLUXDB_URL']
database = database or os.environ.get('OBSERVABLES_INFLUXDB_DATABASE', 'observable')
self._loop = loop or asyncio.get_event_loop()
self._client = InfluxDBClient(url, database, logger=logger, precision='ms', loop=self._loop)
def _record(self, observable_name, value):
fields = dict(value=value)
self._client.write(observable_name, int(time() * 1000), fields=fields, tags=dict())
[docs]def register_store(store):
global _global_store
_global_store = store
[docs]def get_store():
if _global_store is None:
raise RuntimeError('No store configured. Use `register_store(...)`')
return _global_store
[docs]def record(name, value_cls, observable):
store = get_store()
store.register(name, value_cls)
def on_change(value, **kwargs):
store.record(name, value)
observable.subscribe(on_change)