Source code for caspia.toolbox.services.thermostat.thermostat

# pylint: disable=too-many-instance-attributes
import asyncio
import logging

from caspia.meadow import errors as meadow_errors
from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.services import ThermostatBase, ThermostatState
from caspia.reactive import Observable
from caspia.toolbox.monitor import record_metric, register_metric
from caspia.toolbox.pid import PIDController
from caspia.toolbox.storage import storage_property

logger = logging.getLogger(__name__)
register_metric('thermostat:pid-input', 'float')
register_metric('thermostat:pid-integral', 'float')
register_metric('thermostat:pid-output', 'float')


[docs]class Thermostat(ServiceGatewayMixin, ThermostatBase): def __init__(self, name: str, *, storage, current_temp: Observable, p=0.0, i=0.0, d=0.0, windup=None, sample_time=0.0, interval=30, loop=None, heating=None): super().__init__(name) self.interval = interval self.heating = heating self._loop = loop or asyncio.get_event_loop() self._storage = storage self._current_temp_source = current_temp self._current_temp = None self._current_state = ThermostatState.OFF self._pid = PIDController(self._target_temp, p=p, i=i, d=d, windup=windup, sample_time=sample_time) self._pid.state = self._pid_state self.start() _target_state = storage_property('target_state', default=ThermostatState.OFF.value, storage='_storage') _target_temp = storage_property('target_temp', default=22.0, storage='_storage') _pid_state = storage_property('pid_state', default={}, storage='_storage')
[docs] @_target_state.getter def get_target_state(self, value): return ThermostatState(value)
[docs] @_target_state.setter def set_target_state(self, value): return value.value
[docs] @_target_temp.setter def target_temp_set(self, value): self._pid.set_point = value return value
def _store_pid_state(self): with self._storage: self._storage['pid_state'] = self._pid.state
[docs] def start(self): if not hasattr(self, '_periodic_task'): self._periodic_task = asyncio.ensure_future(self.update_periodically(self.interval), loop=self._loop)
[docs] def stop(self): if hasattr(self, '_periodic_task'): self._periodic_task.cancel() self._periodic_task = None
[docs] async def update(self): # Recalculate and update heating self._current_temp = await self._current_temp_source.observe() if self._target_state == ThermostatState.OFF: val = None else: val = self._pid.update(self._current_temp) if self.heating: self._current_state = await self.heating.on_next(val, target_temp=self._target_temp) if self._current_state is None: self._current_state = ThermostatState.HEATING else: self._current_state = ThermostatState.OFF await self._notify_about_changes() await self._update_pid_metrics() # Store PID Controller's state self._pid_state = self._pid.state
async def _notify_about_changes(self): await self.notify(self.target_temp, self._target_temp) await self.notify(self.current_temp, self._current_temp) await self.notify(self.current_state, self._current_state.value) await self.notify(self.target_state, self._target_state.value) async def _update_pid_metrics(self): if self._pid.output is None: return labels = dict(service=self.name) record_metric('thermostat:pid-input', self._current_temp, labels=labels) record_metric('thermostat:pid-integral', self._pid.i_cum, labels=labels) record_metric('thermostat:pid-output', self._pid.output, labels=labels)
[docs] async def update_periodically(self, interval): while True: try: logger.info('Running thermostat update %s', self) await self.update() logger.info('Thermostat update done %s', self) except Exception as e: # pylint: disable=broad-except logger.exception('Failure in thermostat (%s) runloop: %r', self, e) try: await self.heating.on_error(e) except Exception: # pylint: disable=broad-except msg = 'Failure in `heating.on_failure` callback of thermostat (%s): %r' logger.exception(msg, self, e) await asyncio.sleep(interval)
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.target_temp: return self._target_temp elif characteristic is self.current_temp: if self._current_temp is None: raise meadow_errors.SensorFailureError('temperature not available') return self._current_temp elif characteristic is self.current_state: return self._current_state.value elif characteristic is self.target_state: return self._target_state.value else: raise NotImplementedError
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic is self.target_temp: self._target_temp = value await self.notify(self.target_temp, self._target_temp) elif characteristic is self.target_state: self._target_state = ThermostatState(value) await self.notify(self.target_state, self._target_state.value) else: raise NotImplementedError