import struct
import arrow
import caspia.node
from .base import SensorBase
from .datatypes import parse_analog_uint10
[docs]class AnalogSensor(SensorBase):
type = 0x07
[docs] class MeasurementEvent(SensorBase.MeasurementEvent):
[docs] def parse(self):
super().parse()
self.value = self.measurement_data
self.error = self.value is None
def __repr_fields__(self):
time = self.timestamp.format() if self.timestamp else '-'
return dict(value=self.value, time=time)
[docs] class State(SensorBase.State):
def __init__(self, value=None, error=None, timestamp=None):
self.value = value
self.error = error
self.timestamp = timestamp
[docs] def update_from_bytes(self, data):
self.value = parse_analog_uint10(data)
self.error = self.value is None
self.timestamp = arrow.now()
[docs] def update_from_event(self, event):
if isinstance(event, AnalogSensor.MeasurementEvent):
self.value = event.value
self.error = event.error
self.timestamp = event.timestamp
def __repr_fields__(self):
return dict(value=self.value, time=self.timestamp, error=self.error)
[docs] class Config(SensorBase.Config):
def __init__(self, pin, interval, power_pin=None, samples=1):
super().__init__(interval)
self.pin = pin
self.power_pin = power_pin
self.samples = samples
[docs] def to_config_dict(self):
cfg = super().to_config_dict()
cfg[0x11] = struct.pack('B', self.pin.number)
cfg[0x13] = struct.pack('B', self.samples)
if self.power_pin is not None:
cfg[0x12] = struct.pack('B', self.power_pin.number)
return cfg
[docs] @classmethod
def params_from_config_dict(cls, cfg):
return dict(pin=caspia.node.Pin(cfg[0x11][0]),
samples=cfg[0x13][0],
power_pin=caspia.node.Pin(cfg[0x12][0]) if 0x12 in cfg else None,
**SensorBase.Config.params_from_config_dict(cfg))
def __repr_fields__(self):
return dict(interval=self.interval,
power_pin=self.power_pin.number if self.power_pin is not None else None,
pin=self.pin,
samples=self.samples)
[docs] @classmethod
def parse_measurement_data(cls, data):
return parse_analog_uint10(data[0:2])