Source code for caspia.pan.services.lightgroup

import asyncio

import caspia.meadow.rules
from caspia.meadow.services import LightBase

from .base import PANService


[docs]class LightGroup(PANService, LightBase): def __init__(self, name): super().__init__(name) self._lights = set() # set of lights self._states = {} # light.name: True|False|None self._group_state = None self._subscriptions = [] # is_on characteristic subscriptions @property def lights(self): return self._lights
[docs] def attach_consumer(self, connection): super().attach_consumer(connection) for light in self._lights: self._subscribe_light(light)
[docs] def add(self, *lights): """ Add lights to the group. """ self._lights.update(lights) for light in lights: self._subscribe_light(light) self._states[light.name] = None # request light-group-attach rule rule = caspia.meadow.rules.LightGroupAttachRule(self, light) caspia.meadow.rules.handler.rule_handler(rule)
def _subscribe_light(self, light): if self.connection: loop = self.consumer_conn.loop async def wrapper(value, **kwargs): await self._on_light_change(light, value, **kwargs) subscribe = self.consumer_conn.subscribe(light.name, 'is_on', wrapper) asyncio.ensure_future(subscribe, loop=loop) async def _on_light_change(self, light, value, **_): self._states[light.name] = value if not self._group_state and all(self._states.values()): self._group_state = True await self.notify(self.is_on, True) elif self._group_state and all(not state for state in self._states.values()): self._group_state = False await self.notify(self.is_on, False) async def _switch(self, on, *, request_src=None): self._group_state = on await self.notify(self.is_on, self._group_state, extra={ 'request': True, 'request-src': request_src })
[docs] async def characteristic_write(self, characteristic, value, **kwargs): extra = kwargs['extra'] if characteristic is self.is_on: await self._switch(value, request_src=extra.get('request-src')) elif characteristic is self.toggle: await self._switch(not self._group_state, request_src=extra.get('request-src')) else: return await super().characteristic_write(characteristic, value, **kwargs)
[docs] async def characteristic_read(self, characteristic, **kwargs): if characteristic is self.is_on: return self._group_state or False else: return await super().characteristic_read(characteristic, **kwargs)