Source code for caspia.meadow.client.consumer

import asyncio

from caspia.meadow.client.connection import ConsumerConnection
from caspia.meadow.errors import InvalidValueError, NotAvailableError
from caspia.meadow.services import ServiceBase


[docs]def get_class(service_type): return ServiceBase.get_subclass(service_type, ServiceConsumerMixin)
[docs]def is_consumer_service(service): return isinstance(service, ServiceConsumerMixin)
[docs]class ServiceConsumerMixin: def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._connection = None self._gateway_status = None self._subscriptions = set() self._notifications_subscription = None self._notifications_enabled = False self._requests = set() @property def meadow_connection(self) -> ConsumerConnection: return self._connection @meadow_connection.setter def meadow_connection(self, new): if new == self._connection: return if new and self._connection: self.meadow_connection = None self.meadow_connection = new return old, self._connection = self._connection, new if new: assert isinstance(new, ConsumerConnection) self._connection_attached(new) else: self._connection_detached(old) def _connection_attached(self, connection: ConsumerConnection): asyncio.ensure_future(connection.subscribe_gateways_update(self.on_gateways_update)) if self._notifications_enabled: asyncio.ensure_future(self.enable_notifications()) def _connection_detached(self, connection: ConsumerConnection): for subscription in self._subscriptions: subscription.unsubscribe() self._subscriptions = set() self._notifications_subscription = None asyncio.ensure_future(self._cancel_requests()) async def _cancel_requests(self): for request in self._requests: request.cancel() for characteristic in self.characteristics.values(): if characteristic.readable and characteristic.notifiable: await characteristic.trigger( NotAvailableError(f'Service {self.name} is not available.')) def _ensure_has_connection(self): if not self.meadow_connection: raise NotAvailableError(f'service {self.name} not found') def _ensure_gateway_is_ready(self): if self._gateway_status != 'online': raise NotAvailableError(f'gateway of {self.name} is {self._gateway_status}') async def _make_request(self, request_future): try: task = asyncio.ensure_future(request_future) self._requests.add(task) return await task finally: self._requests.remove(task)
[docs] async def characteristic_write(self, characteristic, value, extra, timeout=1.0, **_): self._ensure_has_connection() if not characteristic.name.startswith('$'): self._ensure_gateway_is_ready() value = characteristic.serialize_value(value) request = self._connection.write(self.name, characteristic.name, value, extra=extra, timeout=timeout) return await self._make_request(request)
[docs] async def characteristic_read(self, characteristic, timeout=1.0, **_): self._ensure_has_connection() if not characteristic.name.startswith('$'): self._ensure_gateway_is_ready() request = self._connection.read(self.name, characteristic.name, timeout=timeout) value, extra = await self._make_request(request) try: return characteristic.deserialize_value(value), extra except ValueError as err: raise InvalidValueError(str(err))
[docs] async def characteristic_subscribe(self, characteristic, on_value, on_error=None): self._ensure_has_connection() subscription = await self._connection.subscribe(self.name, characteristic.name, on_value, on_error) self._subscriptions.add(subscription) return subscription
[docs] async def enable_notifications(self): """Receive notifications for all characteristics related to this service.""" if self._notifications_subscription: return self._ensure_has_connection() subscription = await self._connection.subscribe(self.name, None, self._on_value_notification, self._on_error_notification) self._notifications_subscription = subscription self._notifications_enabled = True self._subscriptions.add(subscription)
async def _on_value_notification(self, value, extra, characteristic, **kwargs): try: char_instance = self.characteristics[characteristic] except KeyError: return await char_instance.on_value_notification(value, extra=extra, **kwargs) async def _on_error_notification(self, error, characteristic, **kwargs): try: char_instance = self.characteristics[characteristic] except KeyError: return await char_instance.on_error_notification(error=error, **kwargs)
[docs] async def on_gateways_update(self, gateways): def find_service(): # TODO: This might need to get optimized, as we do this for each services => O(n^2) for gateway_name, gateway in gateways.items(): for service in gateway.get('services', []): if service.get('name', None) == self.name: return service, gateway_name return (None, None) service, gateway_name = find_service() if service: self.load_definition(service) old_status, self._gateway_status = \ self._gateway_status, gateways[gateway_name].get('status', 'offline') else: old_status, self._gateway_status = self._gateway_status, 'offline' if old_status != self._gateway_status and self._gateway_status != 'online': await self._cancel_requests()
async def _on_gateway_update(self): if self._gateway_status != 'online': await self._cancel_requests()