Source code for caspia.meadow.services.characteristic

# pylint: disable=protected-access
import asyncio
import inspect
import logging

import caspia.meadow.value
from caspia.meadow import errors, utils
from caspia.meadow.serialization import Deserializable, Serializable
from caspia.reactive import Observable

logger = logging.getLogger(__name__)


[docs]class Characteristic(Observable, Serializable, Deserializable): """ Characteristic attached to some service. Look at `Observable` to see how to subscribe for its notifications (and don't forget to enable them via `enable_notifications()`). """ def __init__(self, value_type, permissions, name=None, service=None, accepts_null=False, validate=(), description=None): """ Initialize new characteristic. :param value_type: The value-type of the characteristic (string, e.g. 'bool') :param permissions: String describing allowed operations with the characteristic value. For example 'RW' means "Readable and Notifiable". Valid chars are "R" "W" and "N". :param name: The name of the characteristic. If None, must be provided later. :param service: The parent service for this characteristic. If None, must be provided later. :param accepts_null: Boolean indicating, whether `None` is valid value for the characteristic. :param validate: Tuple of extra validators for the characteristic value. Validator is a callable(value) -> None, which raises ValueError for invalid value. :param description: Optional description of the characteristic used for documentation. """ super().__init__() #: True if value might be null self.accepts_null = accepts_null or value_type.endswith('?') #: Value type of the characteristic self.value_type = value_type.replace('?', '') #: Tuple of extra validators self.extra_validators = validate if isinstance(validate, tuple) else (validate, ) #: Value permissions (Readable, Writable, Notifiable) as string. For example "RWN". self.permissions = permissions.upper() #: Name of the Characteristic self.name = name #: Parent service self.service = service self.notification_subscription = False self.description = description @property def readable(self): return 'R' in self.permissions @property def writable(self): return 'W' in self.permissions @property def notifiable(self): return 'N' in self.permissions
[docs] def validate_value(self, value): """ Validate given value. :raises ValueError: if the value is not valid """ caspia.meadow.value.validate(value, self.value_type, optional=self.accepts_null, extra_validators=self.extra_validators)
[docs] def serialize_value(self, value): return caspia.meadow.value.serialize(value, self.value_type)
[docs] def deserialize_value(self, value): return caspia.meadow.value.deserialize(value, self.value_type)
[docs] async def read(self, *, timeout=1.0, extra=False): """Read value of the characteristic.""" if not self.readable: raise errors.NotSupportedError(message=f'characteristic {self.name} is not readable') # read the value try: retval = await self.service.characteristic_read(self, timeout=timeout) except asyncio.TimeoutError: raise asyncio.TimeoutError('reading of %s timed out' % self) from None # separate metadata if isinstance(retval, tuple): value, extra_ = retval else: value, extra_ = retval, dict() # validate value try: self.validate_value(value) except ValueError as e: raise errors.InvalidValueError(message=str(e)) # return if extra: return value, extra_ else: return value
[docs] async def write(self, value, *, extra=None, timeout=1.0): """Write value to the characteristic.""" if not self.writable: raise errors.NotSupportedError(message=f'characteristic {self.name} is not writable') # validate the value try: self.validate_value(value) except ValueError as e: raise errors.InvalidValueError(message=str(e)) # perform write try: await self.service.characteristic_write(self, value, extra={} if extra is None else extra, timeout=timeout) except asyncio.TimeoutError: raise asyncio.TimeoutError('Write to %s timed out' % self) from None
@property def value_and_extra(self): return self.service.characteristic_cached_read(self) @value_and_extra.setter def value_and_extra(self, value): self.service.characteristic_cached_write(self, value) @property def value(self): return self.value_and_extra[0] @value.setter def value(self, value): self.service.characteristic_cached_write(self, (value, {})) # # Observable - observe() support #
[docs] async def observe(self): return await self.read()
# # Notification Support (see Observable) #
[docs] async def enable_notifications(self): """Enable receiving and processing notifications from meadow.""" if not self.notification_subscription: sub = await self.service.characteristic_subscribe(self, self.on_value_notification, self.on_error_notification) self.notification_subscription = sub
[docs] def disable_notifications(self): """Disable receiving notifications from meadow.""" if self.notification_subscription: self.notification_subscription.unsubscribe() self.notification_subscription = None
[docs] async def on_value_notification(self, value, extra, **_): """ Shall be called on new incoming notification via meadow connection. :param value: is the raw - serialized value received :param extra: dictionary of value's metadata """ try: deserialized = self.deserialize_value(value) self.validate_value(deserialized) except ValueError as e: # TODO: set the observable to error state logger.exception(('Will not process notification for %r because' ' value validation failed: %s.'), self, e) return # trigger notification await self.trigger(deserialized, extra=extra)
[docs] async def on_error_notification(self, error, **_): """ Shall be Called on a new incoming notification (containing error). :param error: the error """ await self.trigger(error)
# # Serializable / Deserializable # serialized_type = 'characteristic-notification'
[docs] @classmethod def deserialize(cls, data, context): get_service = context['get_service'] service = get_service(data['name']) return service[data['characteristic']]
[docs] def serialize(self): return { 'type': type(self).serialized_type, 'name': self.service.name, 'characteristic': self.name, }
# # Extra # def __repr__(self): if hasattr(self.service, 'name'): service_name = self.service.name else: service_name = str(self.service) return '<Characteristic %s:%s>' % (service_name, self.name) def __str__(self): if hasattr(self.service, 'name'): service_name = self.service.name else: service_name = str(self.service) return f'{service_name}:{self.name}' async def __call__(self): await self.write(None) def __copy__(self): return Characteristic(value_type=self.value_type, permissions=self.permissions, name=self.name, service=self.service, accepts_null=self.accepts_null, validate=self.extra_validators) def __get__(self, obj, objtype): if obj is None: return self else: if self.name in obj.characteristics: return obj.characteristics[self.name] else: raise AttributeError(f'{obj} does not have characteristic "{self.name}"') @utils.classproperty def __doc__(self): lines = '' # specific documentation if self.description: lines += inspect.cleandoc(self.description) + '\n\n' # metadata perms_desc = {'R': 'Readable', 'W': 'Writable', 'N': 'Notifiable'} perms = ', '.join(perms_desc[perm] for perm in self.permissions) lines += inspect.cleandoc(""" :type: :class:`Characteristic` **<{self.value_type}{opt}>** :permissions: {perms} """).format(self=self, perms=perms, opt='?' if self.accepts_null else '') # validations if self.extra_validators: lines += '\n' lines += '\n'.join(':validation: ' + v.description() for v in self.extra_validators if hasattr(v, 'description')) return lines