import asyncio
import logging
from functools import partial
from typing import Iterable, Union
import caspia.meadow.client
from caspia.meadow import errors
from caspia.meadow.client import GatewayConnection
logger = logging.getLogger(__name__)
GatewayService = Union['GatewayServiceMixin', 'caspia.meadow.services.ServiceBase']
[docs]class Gateway:
def __init__(self, name, broker_url=None, connection=None, services=None, loop=None):
self.name = name
if connection is None:
self._conn = GatewayConnection(broker_url, name=name, loop=loop)
self.loop = self._conn.loop
asyncio.ensure_future(self._conn.run_forever(), loop=self.loop)
else:
self._conn = connection
self.loop = connection.loop
self.__prepare_task = asyncio.ensure_future(self.__setup(), loop=self.loop)
self.__scheduled_gateway_update = None
self.__services = set() # Set[GatewayService]
self.__busy = False
self._conn.gateway_registered_handler = self.on_gateway_registered
if services:
self.add(services)
@property
def connection(self):
return self._conn
@property
def busy(self):
return self.__busy
@busy.setter
def busy(self, value):
if value == self.__busy:
return
self.__busy = value
if value:
logger.info('Gateway %r became busy', self.name)
else:
logger.info('Gateway %r is no longer busy', self.name)
self.__schedule_gateway_update(delay=0)
[docs] async def prepare(self):
if not self.__prepare_task.done():
await self.__prepare_task
async def __setup(self):
self.__schedule_gateway_update()
[docs] async def on_gateway_registered(self):
if not self.busy:
async def do_work():
logger.info('Starting broadcast of current values')
start = asyncio.get_event_loop().time()
failures = 0
for service in self.services:
try:
await service.published()
except Exception: # pylint: disable=broad-except
failures += 1
end = asyncio.get_event_loop().time()
logger.info('Current values broadcasted in %f s (%d failures)', end - start,
failures)
asyncio.ensure_future(do_work())
def __schedule_gateway_update(self, delay=0.5):
if self.__scheduled_gateway_update:
self.__scheduled_gateway_update.cancel()
if not delay:
self.__scheduled_gateway_update = None
self.loop.call_soon(self.__publish_gateway)
else:
self.__scheduled_gateway_update = self.loop.call_later(delay, self.__publish_gateway)
def __publish_gateway(self):
services = [s.serialize() for s in self.services]
registration = {'services': services, 'status': 'online' if not self.busy else 'busy'}
logger.debug('Publishing gateway %s', str(registration))
asyncio.ensure_future(self._conn.update_registration(registration), loop=self.loop)
@property
def services(self) -> Iterable[GatewayService]:
"""All registered services for the gateway."""
return self.__services
[docs] def add(self, services: Iterable[GatewayService], update_immediately=False):
"""Register service(s) for the gateway."""
logger.debug('Adding %s to gateway %s', str(services), self.name)
if isinstance(services, (list, set)):
services = set(services)
else:
services = {services}
for service in services:
if not isinstance(service, ServiceGatewayMixin) or\
not isinstance(service, caspia.meadow.services.ServiceBase):
msg = '%r must be subclass of %s and %s!'
args = (service, ServiceGatewayMixin, caspia.meadow.services.ServiceBase)
raise ValueError(msg % args)
self.__services |= services
for service in services:
service.attach(self._conn)
asyncio.ensure_future(self.__prepare_service(service), loop=self.loop)
if update_immediately:
self.__schedule_gateway_update(delay=0)
else:
self.__schedule_gateway_update()
[docs] def remove(self, services: Iterable[GatewayService], update_immediately=False):
"""Remove service(s) from the gateway."""
if isinstance(services, (list, set)):
services = set(services)
else:
services = {services}
self.__services -= services
for service in services:
service.detach()
if update_immediately:
self.__schedule_gateway_update(delay=0)
else:
self.__schedule_gateway_update()
async def __handle_read(self, characteristic, **kwargs):
if self.busy:
raise errors.NotReadyError('gateway is busy')
value = await characteristic.read(**kwargs)
return characteristic.serialize_value(value)
async def __handle_write(self, characteristic, value, **kwargs):
if self.busy:
raise errors.NotReadyError('gateway is busy')
value = characteristic.deserialize_value(value)
return await characteristic.write(value, **kwargs)
async def __prepare_service(self, service: GatewayService):
for characteristic in service.characteristics.values():
if characteristic.readable:
handler = partial(self.__handle_read, characteristic)
self._conn.handle_read(service.name, characteristic.name, handler)
if characteristic.writable:
handler = partial(self.__handle_write, characteristic)
self._conn.handle_write(service.name, characteristic.name, handler)
logger.debug('Service %s ready.', service)
[docs]class ServiceGatewayMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._last_notifications_hash = {}
self._cached_values = {}
self._pending_notifications = {}
self._read_handlers = dict()
self._write_handlers = dict()
self._discover_characteristic_handlers()
def _discover_characteristic_handlers(self):
for name in dir(self.__class__):
value = getattr(self.__class__, name)
for characteristic_name in getattr(value, '_chars_reader', []):
self._read_handlers[characteristic_name] = value
for characteristic_name in getattr(value, '_chars_writer', []):
self._write_handlers[characteristic_name] = value
[docs] def attach(self, connection):
assert isinstance(connection, GatewayConnection)
self.connection = connection
[docs] def detach(self):
self.connection = None
@property
def attached(self):
return getattr(self, 'connection', None) is not None
def _ensure_attached(self):
if not self.attached:
raise errors.NotAvailableError(f'{self} is not attached to any gateway')
[docs] async def published(self):
for characteristic in self.characteristics.values():
if characteristic.readable and characteristic.notifiable and \
not characteristic.name.startswith('$'):
try:
value = await characteristic.read()
await self.notify(characteristic, value)
except NotImplementedError:
logger.info('read of %s:%s not implemented', self.name, characteristic.name)
[docs] async def notify(self, characteristic, value, if_changed=False, extra=None):
extra = extra or dict()
if not characteristic.notifiable:
raise RuntimeError('Characteristic %r is not notifiable.' % characteristic)
if not isinstance(value, Exception):
if if_changed and characteristic.name in self._last_notifications_hash:
if self._last_notifications_hash[characteristic.name] == hash(value):
return
# validate value
characteristic.validate_value(value)
# serialize value
prepared_value = characteristic.serialize_value(value)
else:
prepared_value = errors.MeadowError.from_exception(value)
# send meadow notification
if self.attached:
try:
await self.connection.notify(self.name,
characteristic.name,
prepared_value,
extra=extra)
except ConnectionRefusedError:
pass
# and trigger local subscriptions
await self[characteristic.name].trigger(value, extra=extra)
# save hash of current value for future if_changed=True
try:
self._last_notifications_hash[characteristic.name] = hash(value)
except TypeError:
self._last_notifications_hash[characteristic.name] = None
[docs] async def characteristic_read(self, characteristic, **kwargs):
if characteristic.name in self._read_handlers:
return await self._read_handlers[characteristic.name](self, **kwargs)
elif characteristic.name in self._cached_values:
value, extra = self._cached_values[characteristic.name]
if isinstance(value, Exception):
raise value
return value, extra
else:
raise NotImplementedError
[docs] async def characteristic_write(self, characteristic, value, **kwargs):
if characteristic.name in self._write_handlers:
await self._write_handlers[characteristic.name](self, value, **kwargs)
else:
characteristic.value_and_extra = value, kwargs.get('extra', {})
[docs] def characteristic_cached_read(self, characteristic):
try:
value, extra = self._cached_values[characteristic.name]
if isinstance(value, Exception):
raise value
return value, extra
except KeyError:
raise errors.NotReadyError('value not initialized')
[docs] def characteristic_cached_write(self, characteristic, value_and_meta):
self._cached_values[characteristic.name] = value_and_meta
if asyncio.get_event_loop().is_closed():
return
if characteristic.notifiable and characteristic.name not in self._pending_notifications:
future = asyncio.ensure_future(self._do_notify_cached(characteristic))
self._pending_notifications[characteristic.name] = future
async def _do_notify_cached(self, characteristic):
del self._pending_notifications[characteristic.name]
value, extra = self._cached_values[characteristic.name]
await self.notify(characteristic, value, extra=extra, if_changed=True)
[docs]def characteristic_read_handler(characteristic_name: str):
def decorator(fn):
chars = getattr(fn, '_chars_reader', None) or set()
chars.add(characteristic_name)
setattr(fn, '_chars_reader', chars)
return fn
return decorator
[docs]def characteristic_write_handler(characteristic_name: str):
def decorator(fn):
chars = getattr(fn, '_chars_writer', None) or set()
chars.add(characteristic_name)
setattr(fn, '_chars_writer', chars)
return fn
return decorator