import struct
from dataclasses import dataclass, field
from typing import Optional
import arrow
from .base import SensorBase
from .datatypes import parse_co2, parse_humidity, parse_temperature
[docs]class SCD30Sensor(SensorBase):
""" CO2, Temperature and Relative Humidity sensor. """
type = 0x0D
[docs] @staticmethod
def parse_measurement_data(data):
co2 = parse_co2(data[0:2])
temperature = parse_temperature(data[2:4])
humidity = parse_humidity(data[4:6])
return co2, temperature, humidity
[docs] async def set_self_calibration_enabled(self, enabled):
await self.request(bytes([0x10, int(enabled)]))
[docs] @dataclass
class MeasurementEvent(SensorBase.MeasurementEvent):
co2: Optional[float] = field(init=False)
temperature: Optional[float] = field(init=False)
humidity: Optional[float] = field(init=False)
def __post_init__(self):
super().__post_init__()
if not self.error:
self.co2, self.temperature, self.humidity = self.measurement_data
else:
self.co2, self.temperature, self.humidity = None, None, None
[docs] @dataclass
class State(SensorBase.State):
co2: float = None
temperature: float = None
humidity: float = None
timestamp: arrow.Arrow = None
error: bool = None
[docs] def update_from_bytes(self, data):
measurement = SCD30Sensor.parse_measurement_data(data)
self.co2, self.temperature, self.humidity = measurement
self.timestamp = arrow.now()
self.error = any(m is None for m in measurement)
[docs] def update_from_event(self, event):
if isinstance(event, SCD30Sensor.MeasurementEvent):
self.co2 = event.co2
self.temperature = event.temperature
self.humidity = event.humidity
self.timestamp = event.timestamp
self.error = event.error
[docs] class Config(SensorBase.Config):
def __init__(self, *args, temperature_offset=0, ambient_pressure=0, **kwargs):
super().__init__(*args, **kwargs)
self.temperature_offset = temperature_offset
self.ambient_pressure = ambient_pressure
[docs] def to_config_dict(self):
data = super().to_config_dict()
data[0x11] = struct.pack('<H', int(self.temperature_offset))
data[0x12] = struct.pack('<H', int(self.ambient_pressure * 1000))
return data
[docs] @classmethod
def params_from_config_dict(cls, cfg):
temperature_offset, = struct.unpack('<H', cfg[0x11])
ambient_pressure, = struct.unpack('<H', cfg[0x12])
return dict(temperature_offset=temperature_offset,
ambient_pressure=ambient_pressure,
**SensorBase.Config.params_from_config_dict(cfg))