import asyncio
from caspia.meadow.client.connection import ConsumerConnection
from caspia.meadow.errors import InvalidValueError, NotAvailableError
from caspia.meadow.services import ServiceBase
[docs]def get_class(service_type):
return ServiceBase.get_subclass(service_type, ServiceConsumerMixin)
[docs]def is_consumer_service(service):
return isinstance(service, ServiceConsumerMixin)
[docs]class ServiceConsumerMixin:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._connection = None
self._gateway_status = None
self._subscriptions = set()
self._notifications_subscription = None
self._notifications_enabled = False
self._requests = set()
@property
def meadow_connection(self) -> ConsumerConnection:
return self._connection
@meadow_connection.setter
def meadow_connection(self, new):
if new == self._connection:
return
if new and self._connection:
self.meadow_connection = None
self.meadow_connection = new
return
old, self._connection = self._connection, new
if new:
assert isinstance(new, ConsumerConnection)
self._connection_attached(new)
else:
self._connection_detached(old)
def _connection_attached(self, connection: ConsumerConnection):
asyncio.ensure_future(connection.subscribe_gateways_update(self.on_gateways_update))
if self._notifications_enabled:
asyncio.ensure_future(self.enable_notifications())
def _connection_detached(self, connection: ConsumerConnection):
for subscription in self._subscriptions:
subscription.unsubscribe()
self._subscriptions = set()
self._notifications_subscription = None
asyncio.ensure_future(self._cancel_requests())
async def _cancel_requests(self):
for request in self._requests:
request.cancel()
for characteristic in self.characteristics.values():
if characteristic.readable and characteristic.notifiable:
await characteristic.trigger(
NotAvailableError(f'Service {self.name} is not available.'))
def _ensure_has_connection(self):
if not self.meadow_connection:
raise NotAvailableError(f'service {self.name} not found')
def _ensure_gateway_is_ready(self):
if self._gateway_status != 'online':
raise NotAvailableError(f'gateway of {self.name} is {self._gateway_status}')
async def _make_request(self, request_future):
try:
task = asyncio.ensure_future(request_future)
self._requests.add(task)
return await task
finally:
self._requests.remove(task)
[docs] async def characteristic_write(self, characteristic, value, extra, timeout=1.0, **_):
self._ensure_has_connection()
if not characteristic.name.startswith('$'):
self._ensure_gateway_is_ready()
value = characteristic.serialize_value(value)
request = self._connection.write(self.name,
characteristic.name,
value,
extra=extra,
timeout=timeout)
return await self._make_request(request)
[docs] async def characteristic_read(self, characteristic, timeout=1.0, **_):
self._ensure_has_connection()
if not characteristic.name.startswith('$'):
self._ensure_gateway_is_ready()
request = self._connection.read(self.name, characteristic.name, timeout=timeout)
value, extra = await self._make_request(request)
try:
return characteristic.deserialize_value(value), extra
except ValueError as err:
raise InvalidValueError(str(err))
[docs] async def characteristic_subscribe(self, characteristic, on_value, on_error=None):
self._ensure_has_connection()
subscription = await self._connection.subscribe(self.name, characteristic.name, on_value,
on_error)
self._subscriptions.add(subscription)
return subscription
[docs] async def enable_notifications(self):
"""Receive notifications for all characteristics related to this service."""
if self._notifications_subscription:
return
self._ensure_has_connection()
subscription = await self._connection.subscribe(self.name, None,
self._on_value_notification,
self._on_error_notification)
self._notifications_subscription = subscription
self._notifications_enabled = True
self._subscriptions.add(subscription)
async def _on_value_notification(self, value, extra, characteristic, **kwargs):
try:
char_instance = self.characteristics[characteristic]
except KeyError:
return
await char_instance.on_value_notification(value, extra=extra, **kwargs)
async def _on_error_notification(self, error, characteristic, **kwargs):
try:
char_instance = self.characteristics[characteristic]
except KeyError:
return
await char_instance.on_error_notification(error=error, **kwargs)
[docs] async def on_gateways_update(self, gateways):
def find_service():
# TODO: This might need to get optimized, as we do this for each services => O(n^2)
for gateway_name, gateway in gateways.items():
for service in gateway.get('services', []):
if service.get('name', None) == self.name:
return service, gateway_name
return (None, None)
service, gateway_name = find_service()
if service:
self.load_definition(service)
old_status, self._gateway_status = \
self._gateway_status, gateways[gateway_name].get('status', 'offline')
else:
old_status, self._gateway_status = self._gateway_status, 'offline'
if old_status != self._gateway_status and self._gateway_status != 'online':
await self._cancel_requests()
async def _on_gateway_update(self):
if self._gateway_status != 'online':
await self._cancel_requests()