Source code for caspia.node.components.sensors.mcp980x

import struct

import arrow

from .base import SensorBase
from .datatypes import parse_temperature


[docs]class MCP980XSensor(SensorBase): type = 0x03
[docs] class MeasurementEvent(SensorBase.MeasurementEvent):
[docs] def parse(self): super().parse() self.temperature = self.measurement_data self.error = self.temperature is None
def __repr_fields__(self): time = self.timestamp.format() if self.timestamp else '-' temp = '{:.02f}°C'.format(self.temperature) if not self.error else 'error' return dict(temp=temp, time=time)
[docs] class State(SensorBase.State): def __init__(self, temperature=None, error=None, timestamp=None): self.temperature = temperature #: In degrees celsius self.error = error #: If False, self.temperature contains valid temp self.timestamp = timestamp #: Time of last update
[docs] def update_from_bytes(self, data): self.temperature = parse_temperature(data) self.error = self.temperature is None self.timestamp = arrow.now()
[docs] def update_from_event(self, event): if isinstance(event, MCP980XSensor.MeasurementEvent): self.temperature = event.temperature self.error = event.error self.timestamp = event.timestamp
def __repr_fields__(self): return dict(temp=self.temperature, error=self.error, timestamp=self.timestamp)
[docs] class Config(SensorBase.Config): RESOLUTION_9b = 0x00 RESOLUTION_10b = 0x01 RESOLUTION_11b = 0x02 RESOLUTION_12b = 0x03 def __init__(self, interval, address=None, resolution=None): super().__init__(interval) self.address = address if address is not None else 0x4F self.resolution = resolution if resolution is not None else self.RESOLUTION_12b
[docs] def to_config_dict(self): cfg = super().to_config_dict() cfg[0x11] = struct.pack('B', self.address) cfg[0x12] = struct.pack('B', self.resolution) return cfg
[docs] @classmethod def params_from_config_dict(cls, cfg): return dict(address=struct.unpack('<B', cfg.get(0x11)), resolution=cfg.get(0x12), **SensorBase.Config.params_from_config_dict(cfg))
def __repr_fields__(self): return dict(interval=self.interval, address=self.address, resolution=self.resolution)
[docs] @classmethod def parse_measurement_data(cls, data): return parse_temperature(data[0:2])