# pylint: disable=too-many-instance-attributes
import asyncio
import logging
from caspia.meadow import errors as meadow_errors
from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.services import ThermostatBase, ThermostatState
from caspia.reactive import Observable
from caspia.toolbox.monitor import record_metric, register_metric
from caspia.toolbox.pid import PIDController
from caspia.toolbox.storage import storage_property
logger = logging.getLogger(__name__)
register_metric('thermostat:pid-input', 'float')
register_metric('thermostat:pid-integral', 'float')
register_metric('thermostat:pid-output', 'float')
[docs]class Thermostat(ServiceGatewayMixin, ThermostatBase):
def __init__(self,
name: str,
*,
storage,
current_temp: Observable,
p=0.0,
i=0.0,
d=0.0,
windup=None,
sample_time=0.0,
interval=30,
loop=None,
heating=None):
super().__init__(name)
self.interval = interval
self.heating = heating
self._loop = loop or asyncio.get_event_loop()
self._storage = storage
self._current_temp_source = current_temp
self._current_temp = None
self._current_state = ThermostatState.OFF
self._pid = PIDController(self._target_temp,
p=p,
i=i,
d=d,
windup=windup,
sample_time=sample_time)
self._pid.state = self._pid_state
self.start()
_target_state = storage_property('target_state',
default=ThermostatState.OFF.value,
storage='_storage')
_target_temp = storage_property('target_temp', default=22.0, storage='_storage')
_pid_state = storage_property('pid_state', default={}, storage='_storage')
[docs] @_target_state.getter
def get_target_state(self, value):
return ThermostatState(value)
[docs] @_target_state.setter
def set_target_state(self, value):
return value.value
[docs] @_target_temp.setter
def target_temp_set(self, value):
self._pid.set_point = value
return value
def _store_pid_state(self):
with self._storage:
self._storage['pid_state'] = self._pid.state
[docs] def start(self):
if not hasattr(self, '_periodic_task'):
self._periodic_task = asyncio.ensure_future(self.update_periodically(self.interval),
loop=self._loop)
[docs] def stop(self):
if hasattr(self, '_periodic_task'):
self._periodic_task.cancel()
self._periodic_task = None
[docs] async def update(self):
# Recalculate and update heating
self._current_temp = await self._current_temp_source.observe()
if self._target_state == ThermostatState.OFF:
val = None
else:
val = self._pid.update(self._current_temp)
if self.heating:
self._current_state = await self.heating.on_next(val, target_temp=self._target_temp)
if self._current_state is None:
self._current_state = ThermostatState.HEATING
else:
self._current_state = ThermostatState.OFF
await self._notify_about_changes()
await self._update_pid_metrics()
# Store PID Controller's state
self._pid_state = self._pid.state
async def _notify_about_changes(self):
await self.notify(self.target_temp, self._target_temp)
await self.notify(self.current_temp, self._current_temp)
await self.notify(self.current_state, self._current_state.value)
await self.notify(self.target_state, self._target_state.value)
async def _update_pid_metrics(self):
if self._pid.output is None:
return
labels = dict(service=self.name)
record_metric('thermostat:pid-input', self._current_temp, labels=labels)
record_metric('thermostat:pid-integral', self._pid.i_cum, labels=labels)
record_metric('thermostat:pid-output', self._pid.output, labels=labels)
[docs] async def update_periodically(self, interval):
while True:
try:
logger.info('Running thermostat update %s', self)
await self.update()
logger.info('Thermostat update done %s', self)
except Exception as e: # pylint: disable=broad-except
logger.exception('Failure in thermostat (%s) runloop: %r', self, e)
try:
await self.heating.on_error(e)
except Exception: # pylint: disable=broad-except
msg = 'Failure in `heating.on_failure` callback of thermostat (%s): %r'
logger.exception(msg, self, e)
await asyncio.sleep(interval)
[docs] async def characteristic_read(self, characteristic, **kwargs):
if characteristic is self.target_temp:
return self._target_temp
elif characteristic is self.current_temp:
if self._current_temp is None:
raise meadow_errors.SensorFailureError('temperature not available')
return self._current_temp
elif characteristic is self.current_state:
return self._current_state.value
elif characteristic is self.target_state:
return self._target_state.value
else:
raise NotImplementedError
[docs] async def characteristic_write(self, characteristic, value, **kwargs):
if characteristic is self.target_temp:
self._target_temp = value
await self.notify(self.target_temp, self._target_temp)
elif characteristic is self.target_state:
self._target_state = ThermostatState(value)
await self.notify(self.target_state, self._target_state.value)
else:
raise NotImplementedError