import struct
from dataclasses import dataclass, field
from typing import Any, Dict, Type
import arrow
from caspia.node import binary, utils
from caspia.node.config import Config
from caspia.node.events import BroadcastEvent
from ..base import Component
[docs]class SensorBase(Component):
type = None
[docs] @dataclass(repr=False)
class MeasurementEvent(BroadcastEvent, Component.Event):
sensor_cls: Type['SensorBase'] = field(repr=False)
timestamp: arrow.Arrow = field(init=False)
error: int = field(init=False)
measurement_data: Any = field(init=False)
def __post_init__(self):
super().__post_init__()
self.timestamp = arrow.now()
self.error = self.broadcast.error_code
if not self.error:
raw = self.broadcast.component_data[1:]
self.measurement_data = self.sensor_cls.parse_measurement_data(raw)
else:
self.measurement_data = None
self.parse()
[docs] @dataclass
class Config(Config):
interval: int = field()
[docs] def to_config_dict(self) -> Dict[int, bytes]:
"""To be overriden by subclasses."""
return {0x01: struct.pack('<H', self.interval)}
[docs] @classmethod
def params_from_config_dict(cls, cfg: Dict[int, bytes]):
"""To be overriden by subclasses - should map cfg to initializer's parameters."""
return {'interval': struct.unpack('<H', cfg[0x01])[0]}
[docs] def get_bytes(self):
return binary.encode_dictionary(self.to_config_dict())
[docs] @classmethod
def from_bytes(cls, data):
cfg, _ = binary.decode_dictionary(data)
return cls(**cls.params_from_config_dict(cfg))
def __repr_fields__(self):
return dict(interval=self.interval)
[docs] @classmethod
def parse_measurement_data(cls, data):
return NotImplemented
[docs] @classmethod
def parse_broadcast(cls, broadcast):
if broadcast.component_data[0] == 0x00:
return cls.MeasurementEvent(broadcast=broadcast, sensor_cls=cls)
return None
[docs] async def request_measurement(self):
await self.request(b'\x01')
[docs] async def measure(self):
def accept_event(event):
if not isinstance(event, self.MeasurementEvent):
return False
if event.can_id != self.node.can_id:
return False
return event.component_id == self.identifier
await self.request_measurement()
event = await utils.receive_event(accept_event, self.node.client)
if event.error:
return None
else:
return event.measurement_data
[docs] @classmethod
def register_measurement_event_cls(cls, evt_cls):
cls.MeasurementEvent = evt_cls
return evt_cls