Source code for caspia.node.components.sensors.s300

import struct

import arrow

from .base import SensorBase


[docs]class S300Sensor(SensorBase): type = 0x0A
[docs] @classmethod def parse_measurement_data(cls, data): co2_level = struct.unpack('<H', data)[0] return None if co2_level == 0xFFFF else co2_level
[docs] async def send_command(self, command): """Send command to the S300Sensor. See Programming Guide for I2C (S-Series) for available commands. """ await self.request(bytes([0x10, command]))
[docs] class MeasurementEvent(SensorBase.MeasurementEvent):
[docs] def parse(self): super().parse() self.co2 = self.measurement_data self.error = self.co2 is None
def __repr_fields__(self): time = self.timestamp.format() if self.timestamp else '-' co2 = f'{self.co2} ppm' if not self.error else 'error' return dict(time=time, co2=co2)
[docs] class State(SensorBase.State): def __init__(self, co2=None, error=None, timestamp=None): self.co2 = co2 self.error = error self.timestamp = timestamp
[docs] def update_from_bytes(self, data): self.co2 = S300Sensor.parse_measurement_data(data) self.error = self.co2 is None self.timestamp = arrow.now()
[docs] def update_from_event(self, event): if isinstance(event, S300Sensor.MeasurementEvent): self.co2 = event.co2 self.error = event.error self.timestamp = event.timestamp
def __repr_fields__(self): return dict(co2=self.co2, error=self.error, timestamp=self.timestamp)
[docs] class Config(SensorBase.Config): pass