Source code for caspia.toolbox.integrations.eq3.thermostat

import asyncio
import concurrent.futures
import logging
from collections import namedtuple

import eq3bt
from caspia.meadow.client.gateway import ServiceGatewayMixin
from caspia.meadow.services import ThermostatBase, ThermostatState
from caspia.toolbox.monitor import record_metric, register_metric
from caspia.toolbox.storage import storage_property

logger = logging.getLogger(__name__)
ble_executor = concurrent.futures.ProcessPoolExecutor()
Eq3State = namedtuple('Eq3State', ['target_temp', 'valve'])
register_metric('eq3:valve', 'float')
register_metric('eq3:target_temp', 'float')
register_metric('eq3:sync-ops', 'integer')
register_metric('eq3:sync-err', 'integer')


def _perform_sync(target_temp, mac_address):
    eq3 = eq3bt.Thermostat(mac_address)
    eq3.update()
    temp = min(target_temp, eq3.max_temp)
    temp = max(temp, eq3.min_temp)
    eq3.target_temperature = temp
    return Eq3State(target_temp=temp, valve=eq3.valve_state / 100)


[docs]class EQ3Thermostat(ServiceGatewayMixin, ThermostatBase): def __init__(self, name, mac_address, *, loop, storage, sync_interval=30): super().__init__(name) self.loop = loop self.mac_address = mac_address self.storage = storage self.valve_state = None self.labels = dict(service=name, mac=mac_address) asyncio.ensure_future(self.sync_periodically(sync_interval), loop=self.loop) target_temp_value = storage_property('target_temp', default=22.0)
[docs] async def sync_periodically(self, interval): while True: try: state = await self.loop.run_in_executor(ble_executor, _perform_sync, self.target_temp_value, self.mac_address) self.valve_state = state.valve logger.info('Successfully synced eq3 valve %s: %s', self, state) record_metric('eq3:valve', float(state.valve), labels=self.labels) record_metric('eq3:target_temp', float(state.target_temp), labels=self.labels) record_metric('eq3:sync-ops', labels=self.labels) await self.notify_current_state() except Exception as e: # pylint: disable=broad-except logger.exception('Failure when updating eq3 valve %s: %r', self, e) record_metric('eq3:sync-err', labels=self.labels) await asyncio.sleep(interval)
[docs] async def notify_current_state(self): current_state = ThermostatState.HEATING if self.valve_state else ThermostatState.OFF await self.notify(self.current_state, current_state.value)
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.target_temp: return self.target_temp_value elif characteristic is self.current_temp: return self.target_temp_value elif characteristic is self.target_state: return ThermostatState.HEATING.value elif characteristic is self.current_state: if self.valve_state: return ThermostatState.HEATING.value else: return ThermostatState.OFF.value else: raise NotImplementedError
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic is self.target_temp: self.target_temp_value = value await self.notify(self.target_temp, value, if_changed=True) else: raise NotImplementedError