Source code for caspia.pan.services.switch

import asyncio
import logging

import caspia.meadow

from .base import PANService

logger = logging.getLogger(__name__)


[docs]class Switch(PANService, caspia.meadow.services.SwitchBase): def __init__(self, name, *, loop, storage): super().__init__(name) self.target = None self.loop = loop self.storage = storage @property def _is_on(self): with self.storage: return self.storage.get('is_on', False) @_is_on.setter def _is_on(self, new_value): with self.storage: self.storage['is_on'] = new_value
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.is_on: return self._is_on else: return await super().characteristic_read(characteristic, **kwargs)
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic is self.is_on: self._is_on = value await self.notify(self.is_on, self._is_on, if_changed=True) else: return await super().characteristic_write(characteristic, value, **kwargs)
def __call__(self, func): self.is_on.do(func) return func
[docs]class TaskSwitch(Switch): def __init__(self, name, *, loop, storage): super().__init__(name, loop=loop, storage=storage) self.lock = asyncio.Lock(loop=self.loop) self.task = None if self._is_on: asyncio.ensure_future(self.switch_on(), loop=self.loop)
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.is_on: return self.task is not None else: raise NotImplementedError
[docs] async def switch_on(self): if self.target: self.task = asyncio.ensure_future(self.target(), loop=self.loop) self.task.add_done_callback(self.task_done_callback) self._is_on = True await self.notify(self.is_on, True)
[docs] async def characteristic_write(self, characteristic, value, **kwargs): with await self.lock: if characteristic is self.is_on and value and self.task is None: # start the procedure if self.target: await self.switch_on() else: raise NotImplementedError elif characteristic is self.is_on and not value and self.task is not None: self._is_on = False # cancel the procedure self.task.cancel() elif characteristic is not self.is_on: raise NotImplementedError
[docs] def task_done_callback(self, task): asyncio.ensure_future(self.task_done(task), loop=self.loop)
[docs] async def task_done(self, task): with await self.lock: if task.cancelled(): logger.info('User task %s cancelled', self.target) elif task.exception(): logger.error('User task %s failed: %r', self.target, task.exception()) else: logger.info('User task %s finished.', self.target) self._is_on = False self.task = None await self.notify(self.is_on, False)
def __call__(self, target): self.target = target return target