import asyncio
import json
import logging
from caspia.meadow.errors import MeadowError
from caspia.toolbox import monitor
from .connection import Connection
from .subscriber import ReadSubscriber, RulesSubscriber, WriteSubscriber
logger = logging.getLogger(__name__)
[docs]class GatewayConnection(Connection):
def __init__(self, *args, **kwargs):
self.gateway_registered_handler = None
self._registration = None
super().__init__(*args, **kwargs)
# some metrics are handled in subscribers
labels = dict(connection_name=str(kwargs.get('name')))
monitor.register_metric('meadow-connection:handle-read-ops',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:handle-read-err',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:handle-write-ops',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:handle-write-err',
'integer',
collect=True,
labels=labels)
monitor.register_metric('meadow-connection:notifications-out',
'integer',
collect=True,
labels=labels)
[docs] def on_connect(self, client, userdata, flags, rc):
super().on_connect(client, userdata, flags, rc)
if self._registration:
logger.info('Republishing gateway')
# pylint: disable=assignment-from-no-return
fut = self.update_registration(self._registration)
asyncio.ensure_future(fut, loop=self.loop)
[docs] def handle_read(self, service, characteristic, target, handle_dolar=False):
"""Handle read requests and return an object which has .unsubscribe() attr."""
topic = self._read_topic(service, characteristic)
logger.debug('Adding read handler for %s', topic)
subscriber = ReadSubscriber(topic, target)
subscriber.allow_dolar_characteristics = handle_dolar
self.attach_subscriber(subscriber)
return subscriber
[docs] def handle_write(self, service, characteristic, target, handle_dolar=False):
"""Handle write requests and return an object which has .unsubscribe() attr."""
topic = self._write_topic(service, characteristic)
logger.debug('Adding write handler for %s', topic)
subscriber = WriteSubscriber(topic, target)
subscriber.allow_dolar_characteristics = handle_dolar
self.attach_subscriber(subscriber)
return subscriber
[docs] async def notify(self, service, characteristic, value, extra=None, retain=False, **_):
if not isinstance(value, Exception):
payload = dict(val=value, extra=extra) if extra else dict(val=value)
else:
payload = MeadowError.from_exception(value).to_dict()
topic = self._notification_topic(service, characteristic)
await self._publish(topic, payload, retain=retain)
monitor.record_metric('meadow-connection:notifications-out', 1)
[docs] async def update_registration(self, registration):
assert 'status' in registration
assert 'services' in registration
self._registration = registration
if self.connected:
await self._publish(f'gateway/{self.name}', registration, retain=True)
if self.gateway_registered_handler:
# pylint: disable=not-callable
await self.gateway_registered_handler()
#
# Rules
#
[docs] async def handle_rules_request(self, target):
"""Handle rule activation requests for given gateway name.
`target` signature should be target(rules: list of serialized Rule instances)
Returns an object with .unsubscribe() method.
"""
topic = 'gateway/{}/requested_rules'.format(self.name)
subscriber = RulesSubscriber(topic, target)
self.attach_subscriber(subscriber)
return subscriber
[docs] async def update_rules(self, rules=None):
"""Update active rules for given gateway name.
:param rules: list of serialized Rule instances
"""
topic = 'gateway/{}/active_rules'.format(self.name)
payload = dict(rules=rules or dict())
await self._publish(topic, payload, retain=True)
#
# Connection Configuration
#
def _create_client(self):
client = super()._create_client()
client.will_set(f'gateway/{self.name}',
payload=json.dumps(dict(services=[], status='offline')).encode('utf-8'),
qos=2,
retain=True)
return client