Source code for caspia.meadow.client.connection.gateway

import asyncio
import json
import logging

from caspia.meadow.errors import MeadowError
from caspia.toolbox import monitor

from .connection import Connection
from .subscriber import ReadSubscriber, RulesSubscriber, WriteSubscriber

logger = logging.getLogger(__name__)


[docs]class GatewayConnection(Connection): def __init__(self, *args, **kwargs): self.gateway_registered_handler = None self._registration = None super().__init__(*args, **kwargs) # some metrics are handled in subscribers labels = dict(connection_name=str(kwargs.get('name'))) monitor.register_metric('meadow-connection:handle-read-ops', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:handle-read-err', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:handle-write-ops', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:handle-write-err', 'integer', collect=True, labels=labels) monitor.register_metric('meadow-connection:notifications-out', 'integer', collect=True, labels=labels)
[docs] def on_connect(self, client, userdata, flags, rc): super().on_connect(client, userdata, flags, rc) if self._registration: logger.info('Republishing gateway') # pylint: disable=assignment-from-no-return fut = self.update_registration(self._registration) asyncio.ensure_future(fut, loop=self.loop)
[docs] def handle_read(self, service, characteristic, target, handle_dolar=False): """Handle read requests and return an object which has .unsubscribe() attr.""" topic = self._read_topic(service, characteristic) logger.debug('Adding read handler for %s', topic) subscriber = ReadSubscriber(topic, target) subscriber.allow_dolar_characteristics = handle_dolar self.attach_subscriber(subscriber) return subscriber
[docs] def handle_write(self, service, characteristic, target, handle_dolar=False): """Handle write requests and return an object which has .unsubscribe() attr.""" topic = self._write_topic(service, characteristic) logger.debug('Adding write handler for %s', topic) subscriber = WriteSubscriber(topic, target) subscriber.allow_dolar_characteristics = handle_dolar self.attach_subscriber(subscriber) return subscriber
[docs] async def notify(self, service, characteristic, value, extra=None, retain=False, **_): if not isinstance(value, Exception): payload = dict(val=value, extra=extra) if extra else dict(val=value) else: payload = MeadowError.from_exception(value).to_dict() topic = self._notification_topic(service, characteristic) await self._publish(topic, payload, retain=retain) monitor.record_metric('meadow-connection:notifications-out', 1)
[docs] async def update_registration(self, registration): assert 'status' in registration assert 'services' in registration self._registration = registration if self.connected: await self._publish(f'gateway/{self.name}', registration, retain=True) if self.gateway_registered_handler: # pylint: disable=not-callable await self.gateway_registered_handler()
# # Rules #
[docs] async def handle_rules_request(self, target): """Handle rule activation requests for given gateway name. `target` signature should be target(rules: list of serialized Rule instances) Returns an object with .unsubscribe() method. """ topic = 'gateway/{}/requested_rules'.format(self.name) subscriber = RulesSubscriber(topic, target) self.attach_subscriber(subscriber) return subscriber
[docs] async def update_rules(self, rules=None): """Update active rules for given gateway name. :param rules: list of serialized Rule instances """ topic = 'gateway/{}/active_rules'.format(self.name) payload = dict(rules=rules or dict()) await self._publish(topic, payload, retain=True)
# # Connection Configuration # def _create_client(self): client = super()._create_client() client.will_set(f'gateway/{self.name}', payload=json.dumps(dict(services=[], status='offline')).encode('utf-8'), qos=2, retain=True) return client