Source code for caspia.node.components.sensors.scd30

import struct
from dataclasses import dataclass, field
from typing import Optional

import arrow

from .base import SensorBase
from .datatypes import parse_co2, parse_humidity, parse_temperature


[docs]class SCD30Sensor(SensorBase): """ CO2, Temperature and Relative Humidity sensor. """ type = 0x0D
[docs] @staticmethod def parse_measurement_data(data): co2 = parse_co2(data[0:2]) temperature = parse_temperature(data[2:4]) humidity = parse_humidity(data[4:6]) return co2, temperature, humidity
[docs] async def set_self_calibration_enabled(self, enabled): await self.request(bytes([0x10, int(enabled)]))
[docs] @dataclass class MeasurementEvent(SensorBase.MeasurementEvent): co2: Optional[float] = field(init=False) temperature: Optional[float] = field(init=False) humidity: Optional[float] = field(init=False) def __post_init__(self): super().__post_init__() if not self.error: self.co2, self.temperature, self.humidity = self.measurement_data else: self.co2, self.temperature, self.humidity = None, None, None
[docs] @dataclass class State(SensorBase.State): co2: float = None temperature: float = None humidity: float = None timestamp: arrow.Arrow = None error: bool = None
[docs] def update_from_bytes(self, data): measurement = SCD30Sensor.parse_measurement_data(data) self.co2, self.temperature, self.humidity = measurement self.timestamp = arrow.now() self.error = any(m is None for m in measurement)
[docs] def update_from_event(self, event): if isinstance(event, SCD30Sensor.MeasurementEvent): self.co2 = event.co2 self.temperature = event.temperature self.humidity = event.humidity self.timestamp = event.timestamp self.error = event.error
[docs] class Config(SensorBase.Config): def __init__(self, *args, temperature_offset=0, ambient_pressure=0, **kwargs): super().__init__(*args, **kwargs) self.temperature_offset = temperature_offset self.ambient_pressure = ambient_pressure
[docs] def to_config_dict(self): data = super().to_config_dict() data[0x11] = struct.pack('<H', int(self.temperature_offset)) data[0x12] = struct.pack('<H', int(self.ambient_pressure * 1000)) return data
[docs] @classmethod def params_from_config_dict(cls, cfg): temperature_offset, = struct.unpack('<H', cfg[0x11]) ambient_pressure, = struct.unpack('<H', cfg[0x12]) return dict(temperature_offset=temperature_offset, ambient_pressure=ambient_pressure, **SensorBase.Config.params_from_config_dict(cfg))