Source code for caspia.meadow.client.gateway

import asyncio
import logging
from functools import partial
from typing import Iterable, Union

import caspia.meadow.client
from caspia.meadow import errors
from caspia.meadow.client import GatewayConnection

logger = logging.getLogger(__name__)

GatewayService = Union['GatewayServiceMixin', 'caspia.meadow.services.ServiceBase']


[docs]class Gateway: def __init__(self, name, broker_url=None, connection=None, services=None, loop=None): self.name = name if connection is None: self._conn = GatewayConnection(broker_url, name=name, loop=loop) self.loop = self._conn.loop asyncio.ensure_future(self._conn.run_forever(), loop=self.loop) else: self._conn = connection self.loop = connection.loop self.__prepare_task = asyncio.ensure_future(self.__setup(), loop=self.loop) self.__scheduled_gateway_update = None self.__services = set() # Set[GatewayService] self.__busy = False self._conn.gateway_registered_handler = self.on_gateway_registered if services: self.add(services) @property def connection(self): return self._conn @property def busy(self): return self.__busy @busy.setter def busy(self, value): if value == self.__busy: return self.__busy = value if value: logger.info('Gateway %r became busy', self.name) else: logger.info('Gateway %r is no longer busy', self.name) self.__schedule_gateway_update(delay=0)
[docs] async def prepare(self): if not self.__prepare_task.done(): await self.__prepare_task
async def __setup(self): self.__schedule_gateway_update()
[docs] async def on_gateway_registered(self): if not self.busy: async def do_work(): logger.info('Starting broadcast of current values') start = asyncio.get_event_loop().time() failures = 0 for service in self.services: try: await service.published() except Exception: # pylint: disable=broad-except failures += 1 end = asyncio.get_event_loop().time() logger.info('Current values broadcasted in %f s (%d failures)', end - start, failures) asyncio.ensure_future(do_work())
def __schedule_gateway_update(self, delay=0.5): if self.__scheduled_gateway_update: self.__scheduled_gateway_update.cancel() if not delay: self.__scheduled_gateway_update = None self.loop.call_soon(self.__publish_gateway) else: self.__scheduled_gateway_update = self.loop.call_later(delay, self.__publish_gateway) def __publish_gateway(self): services = [s.serialize() for s in self.services] registration = {'services': services, 'status': 'online' if not self.busy else 'busy'} logger.debug('Publishing gateway %s', str(registration)) asyncio.ensure_future(self._conn.update_registration(registration), loop=self.loop) @property def services(self) -> Iterable[GatewayService]: """All registered services for the gateway.""" return self.__services
[docs] def add(self, services: Iterable[GatewayService], update_immediately=False): """Register service(s) for the gateway.""" logger.debug('Adding %s to gateway %s', str(services), self.name) if isinstance(services, (list, set)): services = set(services) else: services = {services} for service in services: if not isinstance(service, ServiceGatewayMixin) or\ not isinstance(service, caspia.meadow.services.ServiceBase): msg = '%r must be subclass of %s and %s!' args = (service, ServiceGatewayMixin, caspia.meadow.services.ServiceBase) raise ValueError(msg % args) self.__services |= services for service in services: service.attach(self._conn) asyncio.ensure_future(self.__prepare_service(service), loop=self.loop) if update_immediately: self.__schedule_gateway_update(delay=0) else: self.__schedule_gateway_update()
[docs] def remove(self, services: Iterable[GatewayService], update_immediately=False): """Remove service(s) from the gateway.""" if isinstance(services, (list, set)): services = set(services) else: services = {services} self.__services -= services for service in services: service.detach() if update_immediately: self.__schedule_gateway_update(delay=0) else: self.__schedule_gateway_update()
async def __handle_read(self, characteristic, **kwargs): if self.busy: raise errors.NotReadyError('gateway is busy') value = await characteristic.read(**kwargs) return characteristic.serialize_value(value) async def __handle_write(self, characteristic, value, **kwargs): if self.busy: raise errors.NotReadyError('gateway is busy') value = characteristic.deserialize_value(value) return await characteristic.write(value, **kwargs) async def __prepare_service(self, service: GatewayService): for characteristic in service.characteristics.values(): if characteristic.readable: handler = partial(self.__handle_read, characteristic) self._conn.handle_read(service.name, characteristic.name, handler) if characteristic.writable: handler = partial(self.__handle_write, characteristic) self._conn.handle_write(service.name, characteristic.name, handler) logger.debug('Service %s ready.', service)
[docs]class ServiceGatewayMixin: def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._last_notifications_hash = {} self._cached_values = {} self._pending_notifications = {} self._read_handlers = dict() self._write_handlers = dict() self._discover_characteristic_handlers() def _discover_characteristic_handlers(self): for name in dir(self.__class__): value = getattr(self.__class__, name) for characteristic_name in getattr(value, '_chars_reader', []): self._read_handlers[characteristic_name] = value for characteristic_name in getattr(value, '_chars_writer', []): self._write_handlers[characteristic_name] = value
[docs] def attach(self, connection): assert isinstance(connection, GatewayConnection) self.connection = connection
[docs] def detach(self): self.connection = None
@property def attached(self): return getattr(self, 'connection', None) is not None def _ensure_attached(self): if not self.attached: raise errors.NotAvailableError(f'{self} is not attached to any gateway')
[docs] async def published(self): for characteristic in self.characteristics.values(): if characteristic.readable and characteristic.notifiable and \ not characteristic.name.startswith('$'): try: value = await characteristic.read() await self.notify(characteristic, value) except NotImplementedError: logger.info('read of %s:%s not implemented', self.name, characteristic.name)
[docs] async def notify(self, characteristic, value, if_changed=False, extra=None): extra = extra or dict() if not characteristic.notifiable: raise RuntimeError('Characteristic %r is not notifiable.' % characteristic) if not isinstance(value, Exception): if if_changed and characteristic.name in self._last_notifications_hash: if self._last_notifications_hash[characteristic.name] == hash(value): return # validate value characteristic.validate_value(value) # serialize value prepared_value = characteristic.serialize_value(value) else: prepared_value = errors.MeadowError.from_exception(value) # send meadow notification if self.attached: try: await self.connection.notify(self.name, characteristic.name, prepared_value, extra=extra) except ConnectionRefusedError: pass # and trigger local subscriptions await self[characteristic.name].trigger(value, extra=extra) # save hash of current value for future if_changed=True try: self._last_notifications_hash[characteristic.name] = hash(value) except TypeError: self._last_notifications_hash[characteristic.name] = None
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic.name in self._read_handlers: return await self._read_handlers[characteristic.name](self, **kwargs) elif characteristic.name in self._cached_values: value, extra = self._cached_values[characteristic.name] if isinstance(value, Exception): raise value return value, extra else: raise NotImplementedError
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic.name in self._write_handlers: await self._write_handlers[characteristic.name](self, value, **kwargs) else: characteristic.value_and_extra = value, kwargs.get('extra', {})
[docs] def characteristic_cached_read(self, characteristic): try: value, extra = self._cached_values[characteristic.name] if isinstance(value, Exception): raise value return value, extra except KeyError: raise errors.NotReadyError('value not initialized')
[docs] def characteristic_cached_write(self, characteristic, value_and_meta): self._cached_values[characteristic.name] = value_and_meta if asyncio.get_event_loop().is_closed(): return if characteristic.notifiable and characteristic.name not in self._pending_notifications: future = asyncio.ensure_future(self._do_notify_cached(characteristic)) self._pending_notifications[characteristic.name] = future
async def _do_notify_cached(self, characteristic): del self._pending_notifications[characteristic.name] value, extra = self._cached_values[characteristic.name] await self.notify(characteristic, value, extra=extra, if_changed=True)
[docs]def characteristic_read_handler(characteristic_name: str): def decorator(fn): chars = getattr(fn, '_chars_reader', None) or set() chars.add(characteristic_name) setattr(fn, '_chars_reader', chars) return fn return decorator
[docs]def characteristic_write_handler(characteristic_name: str): def decorator(fn): chars = getattr(fn, '_chars_writer', None) or set() chars.add(characteristic_name) setattr(fn, '_chars_writer', chars) return fn return decorator