# pylint: disable=protected-access
import asyncio
import inspect
import logging
import caspia.meadow.value
from caspia.meadow import errors, utils
from caspia.meadow.serialization import Deserializable, Serializable
from caspia.reactive import Observable
logger = logging.getLogger(__name__)
[docs]class Characteristic(Observable, Serializable, Deserializable):
"""
Characteristic attached to some service.
Look at `Observable` to see how to subscribe for its notifications
(and don't forget to enable them via `enable_notifications()`).
"""
def __init__(self,
value_type,
permissions,
name=None,
service=None,
accepts_null=False,
validate=(),
description=None):
"""
Initialize new characteristic.
:param value_type: The value-type of the characteristic (string, e.g. 'bool')
:param permissions: String describing allowed operations with the characteristic
value. For example 'RW' means "Readable and Notifiable".
Valid chars are "R" "W" and "N".
:param name: The name of the characteristic. If None, must be provided later.
:param service: The parent service for this characteristic.
If None, must be provided later.
:param accepts_null: Boolean indicating, whether `None` is valid value
for the characteristic.
:param validate: Tuple of extra validators for the characteristic value.
Validator is a callable(value) -> None, which raises ValueError
for invalid value.
:param description: Optional description of the characteristic used for documentation.
"""
super().__init__()
#: True if value might be null
self.accepts_null = accepts_null or value_type.endswith('?')
#: Value type of the characteristic
self.value_type = value_type.replace('?', '')
#: Tuple of extra validators
self.extra_validators = validate if isinstance(validate, tuple) else (validate, )
#: Value permissions (Readable, Writable, Notifiable) as string. For example "RWN".
self.permissions = permissions.upper()
#: Name of the Characteristic
self.name = name
#: Parent service
self.service = service
self.notification_subscription = False
self.description = description
@property
def readable(self):
return 'R' in self.permissions
@property
def writable(self):
return 'W' in self.permissions
@property
def notifiable(self):
return 'N' in self.permissions
[docs] def validate_value(self, value):
"""
Validate given value.
:raises ValueError: if the value is not valid
"""
caspia.meadow.value.validate(value,
self.value_type,
optional=self.accepts_null,
extra_validators=self.extra_validators)
[docs] def serialize_value(self, value):
return caspia.meadow.value.serialize(value, self.value_type)
[docs] def deserialize_value(self, value):
return caspia.meadow.value.deserialize(value, self.value_type)
[docs] async def read(self, *, timeout=1.0, extra=False):
"""Read value of the characteristic."""
if not self.readable:
raise errors.NotSupportedError(message=f'characteristic {self.name} is not readable')
# read the value
try:
retval = await self.service.characteristic_read(self, timeout=timeout)
except asyncio.TimeoutError:
raise asyncio.TimeoutError('reading of %s timed out' % self) from None
# separate metadata
if isinstance(retval, tuple):
value, extra_ = retval
else:
value, extra_ = retval, dict()
# validate value
try:
self.validate_value(value)
except ValueError as e:
raise errors.InvalidValueError(message=str(e))
# return
if extra:
return value, extra_
else:
return value
[docs] async def write(self, value, *, extra=None, timeout=1.0):
"""Write value to the characteristic."""
if not self.writable:
raise errors.NotSupportedError(message=f'characteristic {self.name} is not writable')
# validate the value
try:
self.validate_value(value)
except ValueError as e:
raise errors.InvalidValueError(message=str(e))
# perform write
try:
await self.service.characteristic_write(self,
value,
extra={} if extra is None else extra,
timeout=timeout)
except asyncio.TimeoutError:
raise asyncio.TimeoutError('Write to %s timed out' % self) from None
@property
def value_and_extra(self):
return self.service.characteristic_cached_read(self)
@value_and_extra.setter
def value_and_extra(self, value):
self.service.characteristic_cached_write(self, value)
@property
def value(self):
return self.value_and_extra[0]
@value.setter
def value(self, value):
self.service.characteristic_cached_write(self, (value, {}))
#
# Observable - observe() support
#
[docs] async def observe(self):
return await self.read()
#
# Notification Support (see Observable)
#
[docs] async def enable_notifications(self):
"""Enable receiving and processing notifications from meadow."""
if not self.notification_subscription:
sub = await self.service.characteristic_subscribe(self, self.on_value_notification,
self.on_error_notification)
self.notification_subscription = sub
[docs] def disable_notifications(self):
"""Disable receiving notifications from meadow."""
if self.notification_subscription:
self.notification_subscription.unsubscribe()
self.notification_subscription = None
[docs] async def on_value_notification(self, value, extra, **_):
"""
Shall be called on new incoming notification via meadow connection.
:param value: is the raw - serialized value received
:param extra: dictionary of value's metadata
"""
try:
deserialized = self.deserialize_value(value)
self.validate_value(deserialized)
except ValueError as e:
# TODO: set the observable to error state
logger.exception(('Will not process notification for %r because'
' value validation failed: %s.'), self, e)
return
# trigger notification
await self.trigger(deserialized, extra=extra)
[docs] async def on_error_notification(self, error, **_):
"""
Shall be Called on a new incoming notification (containing error).
:param error: the error
"""
await self.trigger(error)
#
# Serializable / Deserializable
#
serialized_type = 'characteristic-notification'
[docs] @classmethod
def deserialize(cls, data, context):
get_service = context['get_service']
service = get_service(data['name'])
return service[data['characteristic']]
[docs] def serialize(self):
return {
'type': type(self).serialized_type,
'name': self.service.name,
'characteristic': self.name,
}
#
# Extra
#
def __repr__(self):
if hasattr(self.service, 'name'):
service_name = self.service.name
else:
service_name = str(self.service)
return '<Characteristic %s:%s>' % (service_name, self.name)
def __str__(self):
if hasattr(self.service, 'name'):
service_name = self.service.name
else:
service_name = str(self.service)
return f'{service_name}:{self.name}'
async def __call__(self):
await self.write(None)
def __copy__(self):
return Characteristic(value_type=self.value_type,
permissions=self.permissions,
name=self.name,
service=self.service,
accepts_null=self.accepts_null,
validate=self.extra_validators)
def __get__(self, obj, objtype):
if obj is None:
return self
else:
if self.name in obj.characteristics:
return obj.characteristics[self.name]
else:
raise AttributeError(f'{obj} does not have characteristic "{self.name}"')
@utils.classproperty
def __doc__(self):
lines = ''
# specific documentation
if self.description:
lines += inspect.cleandoc(self.description) + '\n\n'
# metadata
perms_desc = {'R': 'Readable', 'W': 'Writable', 'N': 'Notifiable'}
perms = ', '.join(perms_desc[perm] for perm in self.permissions)
lines += inspect.cleandoc("""
:type: :class:`Characteristic` **<{self.value_type}{opt}>**
:permissions: {perms}
""").format(self=self, perms=perms, opt='?' if self.accepts_null else '')
# validations
if self.extra_validators:
lines += '\n'
lines += '\n'.join(':validation: ' + v.description() for v in self.extra_validators
if hasattr(v, 'description'))
return lines