from caspia.gateway.services import GatewayService
from caspia.meadow.client import characteristic_read_handler
from caspia.meadow.errors import SensorFailureError
from caspia.meadow.services import TemperatureSensorBase
from caspia.node.components.sensors.analog import AnalogSensor
[docs]class AnalogTemperatureSensor(GatewayService, TemperatureSensorBase):
auto_discovery = False
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.analog_c: AnalogSensor = None
self.user_equation_module = None
self.params = None
@property
def is_updated_periodically(self):
return self.config['analog'].interval != 0
@property
def dependant_components(self):
return {self.analog_c}
[docs] def convert_ratio_to_resistance(self, ratio):
if ratio == 1:
raise SensorFailureError('suspicious voltage measured',
extra='Voltage was exactly Vcc -> is the sensor disconnected?')
if ratio == 0:
raise SensorFailureError(
'suspicious voltage measured.',
extra='Voltage was exactly Vss -> is the sensor short-circuited?')
return 12000 * (1 / (1 / ratio - 1))
[docs] def convert_from_raw(self, raw):
resistance = self.convert_ratio_to_resistance(raw)
temp = self.equation(self.params, resistance)
return round(temp, 2)
[docs] async def on_component_event(self, component, event):
if self.analog_c == component:
if isinstance(event, AnalogSensor.MeasurementEvent):
if not event.error:
value = self.convert_from_raw(event.value)
else:
value = SensorFailureError('sensor reported an error')
await self.notify(self.temp, value)
[docs] @characteristic_read_handler('temp')
async def temp_read(self, **kwargs):
if self.is_updated_periodically:
raw, error = self.analog_c.state.value, self.analog_c.state.error
else:
raw = await self.analog_c.measure()
error = raw is None
if error:
raise SensorFailureError('temperature not available')
temp = self.convert_from_raw(raw)
return float(temp)