Source code for caspia.gateway.services.sensors.temperature.analog

from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler
from caspia.meadow.errors import SensorFailureError
from caspia.meadow.services import TemperatureSensorBase
from caspia.node.components.sensors.analog import AnalogSensor


[docs]class AnalogTemperatureSensor(GatewayService, TemperatureSensorBase): auto_discovery = False def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.analog_c: AnalogSensor = None self.user_equation_module = None self.params = None
[docs] def configure(self): self.analog_c = self.network.get_configured_component(self.get_node(), AnalogSensor, self.config['analog']) self.equation = self.config['equation'] self.params = self.config['params']
@property def is_updated_periodically(self): return self.config['analog'].interval != 0 @property def dependant_components(self): return {self.analog_c}
[docs] def convert_ratio_to_resistance(self, ratio): if ratio == 1: raise SensorFailureError('suspicious voltage measured', extra='Voltage was exactly Vcc -> is the sensor disconnected?') if ratio == 0: raise SensorFailureError( 'suspicious voltage measured.', extra='Voltage was exactly Vss -> is the sensor short-circuited?') return 12000 * (1 / (1 / ratio - 1))
[docs] def convert_from_raw(self, raw): resistance = self.convert_ratio_to_resistance(raw) temp = self.equation(self.params, resistance) return round(temp, 2)
[docs] async def on_component_event(self, component, event): if self.analog_c == component: if isinstance(event, AnalogSensor.MeasurementEvent): if not event.error: value = self.convert_from_raw(event.value) else: value = SensorFailureError('sensor reported an error') await self.notify(self.temp, value)
[docs] @characteristic_read_handler('temp') async def temp_read(self, **kwargs): if self.is_updated_periodically: raw, error = self.analog_c.state.value, self.analog_c.state.error else: raw = await self.analog_c.measure() error = raw is None if error: raise SensorFailureError('temperature not available') temp = self.convert_from_raw(raw) return float(temp)