Source code for caspia.pan.rules.lightgroup

from caspia.meadow.rules import LightGroupAttachRule

from .base import RuleActivator, RuleState


[docs]class LightGroupAttachRuleActivator(RuleActivator):
[docs] @classmethod def can_activate_rule(cls, rule): return isinstance(rule, LightGroupAttachRule)
[docs] async def do_activate(self): self.subscription = self.rule.group.is_on.subscribe(self.on_group_light_is_on_notification) self.state = RuleState.READY, None
[docs] async def do_deactivate(self): self.subscription.dispose()
[docs] async def on_group_light_is_on_notification(self, value, extra): with self.running_status(): if extra.get('request', False): await self.rule.light.is_on.write(value)