import json
import logging
from time import time
import caspia.meadow.client.connection
from caspia import meadow
from caspia.meadow.client import ServiceBrowser
from caspia.toolbox.influxdb import InfluxDBClient
logger = logging.getLogger(__name__)
[docs]class InfluxDBConnector:
""" Listens for all meadow notification and stores them to InfluxDB. """
def __init__(self,
consumer_conn: caspia.meadow.client.ConsumerConnection,
url: str,
interval=5.0,
db_name='notification'):
""" Initialize new InfluxDBConnector.
:param consumer_conn: Prepared meadow consumer connection to use for event listening.
:param url: Url of the influxdb server.
:param interval: Minimum interval for writing notifications to influxdb.
"""
self._consumer_conn = consumer_conn
self._browser = ServiceBrowser(connection=consumer_conn)
self._loop = consumer_conn.loop
self._influxdb = InfluxDBClient(url,
database=db_name,
interval=interval,
loop=self._loop,
precision='ms')
[docs] async def start(self):
# setup service browser
await self._browser.prepare()
# subscribe for all notifications
await self._consumer_conn.subscribe(None, None, self._on_meadow_notification)
def _preprocess_value(self, value, nested=False):
""" Returns an iterable with values to store in influxdb for given meadow notification.
The values of the iterable are (postfix, value).
Postfix is optional (might be None. It is an identifier of the given value
within the notification.
"""
if isinstance(value, (float, bool, int, str)): # simple scalar
if nested and isinstance(value, (float, bool, int)):
value = float(value)
yield (None, value)
elif isinstance(value, dict): # compound value, break it up to scalars
for key in value:
for (inner_key, inner_value) in self._preprocess_value(value[key], nested=True):
if inner_key:
full_key = key + ':' + inner_key
else:
full_key = key
yield (full_key, inner_value)
async def _on_meadow_notification(self, value, extra, service, characteristic):
logger.debug('Notification received - enqueing to be written to influxdb')
# first, find characteristic instance
service_i = self._browser.lookup.find(service)
if service_i is None:
logger.debug('Service %r not found on meadow', service)
return
stype, spath = meadow.name.split(service)
extra = {k: json.dumps(v) if isinstance(v, (dict, list)) else v for k, v in extra.items()}
measurement = stype + ':' + characteristic
fields = dict()
for key, scalar_value in self._preprocess_value(value):
fields[key or 'value'] = scalar_value
if fields:
self._influxdb.write(
measurement=measurement,
timestamp=int(time() * 1000), # ms precision
fields=fields,
tags=dict(service=spath))