Source code for caspia.gateway.services.sensors.temperature.sht2x

from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler
from caspia.meadow.errors import SensorFailureError
from caspia.meadow.services import TemperatureSensorBase
from caspia.node.components.sensors.sht2x import SHT2XSensor


[docs]class SHT2XTemperatureSensor(GatewayService, TemperatureSensorBase): auto_discovery = False
[docs] def configure(self): self.sht2x_c: SHT2XSensor = self.network.get_configured_component( self.get_node(), SHT2XSensor, self.config['sht2x'])
@property def is_updated_periodically(self): return self.config['sht2x'].interval != 0 @property def dependant_components(self): return {self.sht2x_c}
[docs] async def on_component_event(self, component, event): if self.sht2x_c == component: if isinstance(event, SHT2XSensor.MeasurementEvent): if not event.error: value = round(event.temperature, 2) else: value = SensorFailureError('sensor reported an error') await self.notify(self.temp, value)
[docs] @characteristic_read_handler('temp') async def temp_read(self, **kwargs): if self.is_updated_periodically: temp, error = self.sht2x_c.state.temperature, self.sht2x_c.state.error else: temp, _ = await self.sht2x_c.measure() error = temp is None if error: raise SensorFailureError('temperature not available') return round(float(temp), 2)