Source code for caspia.node.components.sensors.base

import struct
from dataclasses import dataclass, field
from typing import Any, Dict, Type

import arrow

from caspia.node import binary, utils
from caspia.node.config import Config
from caspia.node.events import BroadcastEvent

from ..base import Component


[docs]class SensorBase(Component): type = None
[docs] @dataclass(repr=False) class MeasurementEvent(BroadcastEvent, Component.Event): sensor_cls: Type['SensorBase'] = field(repr=False) timestamp: arrow.Arrow = field(init=False) error: int = field(init=False) measurement_data: Any = field(init=False) def __post_init__(self): super().__post_init__() self.timestamp = arrow.now() self.error = self.broadcast.error_code if not self.error: raw = self.broadcast.component_data[1:] self.measurement_data = self.sensor_cls.parse_measurement_data(raw) else: self.measurement_data = None self.parse()
[docs] def parse(self): pass
[docs] @dataclass class Config(Config): interval: int = field()
[docs] def to_config_dict(self) -> Dict[int, bytes]: """To be overriden by subclasses.""" return {0x01: struct.pack('<H', self.interval)}
[docs] @classmethod def params_from_config_dict(cls, cfg: Dict[int, bytes]): """To be overriden by subclasses - should map cfg to initializer's parameters.""" return {'interval': struct.unpack('<H', cfg[0x01])[0]}
[docs] def get_bytes(self): return binary.encode_dictionary(self.to_config_dict())
[docs] @classmethod def from_bytes(cls, data): cfg, _ = binary.decode_dictionary(data) return cls(**cls.params_from_config_dict(cfg))
def __repr_fields__(self): return dict(interval=self.interval)
[docs] @classmethod def parse_measurement_data(cls, data): return NotImplemented
[docs] @classmethod def parse_broadcast(cls, broadcast): if broadcast.component_data[0] == 0x00: return cls.MeasurementEvent(broadcast=broadcast, sensor_cls=cls) return None
[docs] async def request_measurement(self): await self.request(b'\x01')
[docs] async def measure(self): def accept_event(event): if not isinstance(event, self.MeasurementEvent): return False if event.can_id != self.node.can_id: return False return event.component_id == self.identifier await self.request_measurement() event = await utils.receive_event(accept_event, self.node.client) if event.error: return None else: return event.measurement_data
[docs] @classmethod def register_measurement_event_cls(cls, evt_cls): cls.MeasurementEvent = evt_cls return evt_cls