import struct
import arrow
from .base import SensorBase
[docs]class S300Sensor(SensorBase):
type = 0x0A
[docs] @classmethod
def parse_measurement_data(cls, data):
co2_level = struct.unpack('<H', data)[0]
return None if co2_level == 0xFFFF else co2_level
[docs] async def send_command(self, command):
"""Send command to the S300Sensor.
See Programming Guide for I2C (S-Series) for available commands.
"""
await self.request(bytes([0x10, command]))
[docs] class MeasurementEvent(SensorBase.MeasurementEvent):
[docs] def parse(self):
super().parse()
self.co2 = self.measurement_data
self.error = self.co2 is None
def __repr_fields__(self):
time = self.timestamp.format() if self.timestamp else '-'
co2 = f'{self.co2} ppm' if not self.error else 'error'
return dict(time=time, co2=co2)
[docs] class State(SensorBase.State):
def __init__(self, co2=None, error=None, timestamp=None):
self.co2 = co2
self.error = error
self.timestamp = timestamp
[docs] def update_from_bytes(self, data):
self.co2 = S300Sensor.parse_measurement_data(data)
self.error = self.co2 is None
self.timestamp = arrow.now()
[docs] def update_from_event(self, event):
if isinstance(event, S300Sensor.MeasurementEvent):
self.co2 = event.co2
self.error = event.error
self.timestamp = event.timestamp
def __repr_fields__(self):
return dict(co2=self.co2, error=self.error, timestamp=self.timestamp)
[docs] class Config(SensorBase.Config):
pass