Source code for caspia.pan.rules.on_do

import asyncio

from caspia.meadow.rules import OnDoRule, OnDoRuleTrigger
from caspia.reactive import FunctionObserver
from caspia.reactive.errors import ValueNotReady
from caspia.toolbox.managed_task import managed_task

from .base import RuleActivator, RuleNotReadyError, RuleState


[docs]class OnDoRuleActivator(RuleActivator): RETRY_DELAY = 5.0 RETRY_DELAY_MAX = 300.0 def __init__(self, rule): super().__init__(rule) self.rule: OnDoRule self._condition_subscription = None self._last_value = None self._condition_error_conter = 0 self._processing_lock = asyncio.Lock()
[docs] @classmethod def can_activate_rule(cls, rule): return isinstance(rule, OnDoRule)
[docs] async def do_activate(self): observer = FunctionObserver() observer.subscribe_next(self._on_condition_update) observer.subscribe_error(self._on_condition_error) self._condition_subscription = self.rule.on.subscribe(observer) # start initial load of the observable if self.rule.trigger == OnDoRuleTrigger.ON_TRUE: self.evaluation.schedule() # pylint: disable=no-member else: self.state = RuleState.READY, None
[docs] async def do_deactivate(self): self.evaluation.cancel() # pylint: disable=no-member self._condition_subscription.dispose() self._condition_subscription = None self._did_update_once = False
async def _on_condition_update(self, value, **kwargs): self.evaluation.cancel() # pylint: disable=no-member self._condition_error_conter = 0 async with self._processing_lock: if self.rule.trigger == OnDoRuleTrigger.ON_UPDATE: await self._run(value) elif self.rule.trigger == OnDoRuleTrigger.ON_TRUE: state, message = self.state if bool(value) is not bool(self._last_value): if value: success = await self._run(value) else: success = True self.state = RuleState.READY, None if success: self._last_value = bool(value) elif state != RuleState.READY and (not message or '[condition' in message): self.state = RuleState.READY, None async def _on_condition_error(self, error, **kwargs): self.evaluation.cancel() # pylint: disable=no-member self._condition_error_conter += 1 if isinstance(error, ValueNotReady): self.state = RuleState.WAITING, f'[condition warning]: {repr(error)}' else: self.state = RuleState.FAILURE, f'[condition error]: {repr(error)}' # TODO: decide what to do ... is it always safe to retry forever (most likely not) delay = min(self._condition_error_conter * self.RETRY_DELAY, self.RETRY_DELAY_MAX) self.evaluation.schedule(delay=delay) # pylint: disable=no-member async def _run(self, value): with self.running_status(): try: await self.rule.do.on_next(value) return True except ValueNotReady as error: self.evaluation.schedule(delay=self.RETRY_DELAY) # pylint: disable=no-member raise RuleNotReadyError(str(error)) except (TimeoutError, asyncio.TimeoutError) as error: self.evaluation.schedule(delay=self.RETRY_DELAY) # pylint: disable=no-member raise return False @managed_task() async def evaluation(self): try: value = await self.rule.on.observe() except Exception as error: # pylint: disable=broad-except await self._on_condition_error(error) else: await self._on_condition_update(value)