Source code for caspia.gateway.rules.lightgroup

import asyncio
import logging
import uuid

from caspia.gateway.config import get_config_value
from caspia.node import Broadcast, Listener, components

logger = logging.getLogger(__name__)


[docs]class LightGroupBehavior: """Implements logic for driving set of lights that are being part of a group light.""" def __init__(self, group_light, context): self.context = context self.group_light = group_light self.lights = set() self.master_light = None self.global_identifier = str(uuid.uuid4()) # setup identifier self.identifier = type(self).get_next_identifier(context) # subscribe for meadow group light notifications asyncio.ensure_future(self.group_light.is_on.enable_notifications(), loop=self.context.loop) subscription = self.group_light.is_on.subscribe(self._on_group_light_state_update) # subscribe for pollen group light broadcasts self._pollen_client.listen_broadcast(self._on_pollen_broadcast) self._broadcast_listener = Listener(on_system_event=self._on_system_event) @context.on_cleanup async def cleanup(): # pylint: disable=unused-variable subscription.dispose() self._pollen_client.remove_broadcast_listener(self._on_pollen_broadcast) @property def _pollen_client(self): return self.context.configurator.network.pollen_client def _on_pollen_broadcast(self, *args, **kwargs): self._broadcast_listener.process_broadcast(Broadcast(*args, **kwargs)) def _on_system_event(self, node_cid, event): if isinstance(event, (components.Relay.GroupOffEvent, components.Relay.GroupOnEvent)): if event.group != self.identifier or node_cid == self._pollen_client: return new_state = isinstance(event, components.Relay.GroupOnEvent) future = self.group_light.is_on.write(new_state, extra={'request-src': self.global_identifier}) asyncio.ensure_future(future) async def _on_group_light_state_update(self, value, extra): """Shall be called when 'is_on' of the (Group) Light changes.""" client = self._pollen_client if extra.get('request') and extra.get('request-src') != self.global_identifier: if value: await components.relay.Relay.group_on(client, self.identifier) else: await components.relay.Relay.group_off(client, self.identifier)
[docs] def add(self, light): """Add given ``light`` service to the group.""" if self.master_light is None: self.master_light = light self.lights.add(light)
[docs] def is_master(self, light): """Return true if the given ``light`` is master of the group.""" return light == self.master_light
[docs] @classmethod def get_group_identifiers_range(cls, context): """Get range of assigned group identifiers for this gateway.""" if not hasattr(cls, '_identifiers_range'): range_desc = get_config_value('lightgroup-identifiers-range', context.configurator.work_dir, default='0x0-0xFFFF') start, end = [int(i, 0) for i in range_desc.split('-')] cls._identifiers_range = range(start, end + 1) return cls._identifiers_range
[docs] @classmethod def get_next_identifier(cls, context): """Get next available group identifier and reserve it.""" id_range = cls.get_group_identifiers_range(context) identifier = context.get('lightgroup-identifier', id_range[0]) if identifier not in id_range: raise RuntimeError('Could not allocate lightgroup-identifier') context['lightgroup-identifier'] = identifier + 1 return identifier