Source code for caspia.gateway.services.sensors.humidity.scd30

from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler
from caspia.meadow.errors import SensorFailureError
from caspia.meadow.services import HumiditySensorBase
from caspia.node.components.sensors.scd30 import SCD30Sensor


[docs]class SCD30HumiditySensor(GatewayService, HumiditySensorBase): auto_discovery = False
[docs] def configure(self): self.scd30_c = self.network.get_configured_component(self.get_node(), SCD30Sensor, self.config['scd30'])
@property def is_updated_periodically(self): return self.config['scd30'].interval != 0 @property def dependant_components(self): return {self.scd30_c}
[docs] async def on_component_event(self, component, event): if self.scd30_c == component: if isinstance(event, SCD30Sensor.MeasurementEvent): if not event.error: value = round(event.humidity, 2) else: value = SensorFailureError('sensor reported an error') await self.notify(self.humidity, value)
[docs] @characteristic_read_handler('humidity') async def humidity_read(self, **kwargs): if not self.is_updated_periodically: await self.scd30_c.measure() if self.scd30_c.error: raise SensorFailureError('sensor reported an error') return round(float(self.scd30_c.state.humidity), 2)