Source code for caspia.gateway.services.sensors.light.analog

from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler
from caspia.meadow.errors import SensorFailureError
from caspia.meadow.services import LightSensorBase
from caspia.node.components.sensors.analog import AnalogSensor


[docs]class AnalogLightSensor(GatewayService, LightSensorBase): auto_discovery = False
[docs] def configure(self): self.analog_c: AnalogSensor = self.network.get_configured_component( self.get_node(), AnalogSensor, self.config['analog']) self.equation = self.config['equation'] self.params = self.config['params']
@property def is_updated_periodically(self): return self.config['analog'].interval != 0 @property def dependant_components(self): return {self.analog_c}
[docs] def convert_ratio_to_resistance(self, ratio): if ratio == 1: return 100000000 elif ratio == 0: return 0 return 12000 * (1 / (1 / ratio - 1))
[docs] def convert_from_raw(self, raw): resistance = self.convert_ratio_to_resistance(raw) return self.equation(self.params, resistance)
[docs] async def on_component_event(self, component, event): if self.analog_c == component: if isinstance(event, AnalogSensor.MeasurementEvent): if not event.error: value = self.convert_from_raw(event.value) else: value = SensorFailureError('sensor reported an error') await self.notify(self.light, value)
[docs] @characteristic_read_handler('light') async def light_read(self, **kwargs): if self.is_updated_periodically: raw, error = self.analog_c.state.value, self.analog_c.state.error else: raw = await self.analog_c.measure() error = raw is None if error: raise SensorFailureError(message='temperature not available') lux = self.convert_from_raw(raw) return lux