import asyncio
from caspia.meadow.rules import OnDoRule, OnDoRuleTrigger
from caspia.reactive import FunctionObserver
from caspia.reactive.errors import ValueNotReady
from caspia.toolbox.managed_task import managed_task
from .base import RuleActivator, RuleNotReadyError, RuleState
[docs]class OnDoRuleActivator(RuleActivator):
RETRY_DELAY = 5.0
RETRY_DELAY_MAX = 300.0
def __init__(self, rule):
super().__init__(rule)
self.rule: OnDoRule
self._condition_subscription = None
self._last_value = None
self._condition_error_conter = 0
self._processing_lock = asyncio.Lock()
[docs] @classmethod
def can_activate_rule(cls, rule):
return isinstance(rule, OnDoRule)
[docs] async def do_activate(self):
observer = FunctionObserver()
observer.subscribe_next(self._on_condition_update)
observer.subscribe_error(self._on_condition_error)
self._condition_subscription = self.rule.on.subscribe(observer)
# start initial load of the observable
if self.rule.trigger == OnDoRuleTrigger.ON_TRUE:
self.evaluation.schedule() # pylint: disable=no-member
else:
self.state = RuleState.READY, None
[docs] async def do_deactivate(self):
self.evaluation.cancel() # pylint: disable=no-member
self._condition_subscription.dispose()
self._condition_subscription = None
self._did_update_once = False
async def _on_condition_update(self, value, **kwargs):
self.evaluation.cancel() # pylint: disable=no-member
self._condition_error_conter = 0
async with self._processing_lock:
if self.rule.trigger == OnDoRuleTrigger.ON_UPDATE:
await self._run(value)
elif self.rule.trigger == OnDoRuleTrigger.ON_TRUE:
state, message = self.state
if bool(value) is not bool(self._last_value):
if value:
success = await self._run(value)
else:
success = True
self.state = RuleState.READY, None
if success:
self._last_value = bool(value)
elif state != RuleState.READY and (not message or '[condition' in message):
self.state = RuleState.READY, None
async def _on_condition_error(self, error, **kwargs):
self.evaluation.cancel() # pylint: disable=no-member
self._condition_error_conter += 1
if isinstance(error, ValueNotReady):
self.state = RuleState.WAITING, f'[condition warning]: {repr(error)}'
else:
self.state = RuleState.FAILURE, f'[condition error]: {repr(error)}'
# TODO: decide what to do ... is it always safe to retry forever (most likely not)
delay = min(self._condition_error_conter * self.RETRY_DELAY, self.RETRY_DELAY_MAX)
self.evaluation.schedule(delay=delay) # pylint: disable=no-member
async def _run(self, value):
with self.running_status():
try:
await self.rule.do.on_next(value)
return True
except ValueNotReady as error:
self.evaluation.schedule(delay=self.RETRY_DELAY) # pylint: disable=no-member
raise RuleNotReadyError(str(error))
except (TimeoutError, asyncio.TimeoutError) as error:
self.evaluation.schedule(delay=self.RETRY_DELAY) # pylint: disable=no-member
raise
return False
@managed_task()
async def evaluation(self):
try:
value = await self.rule.on.observe()
except Exception as error: # pylint: disable=broad-except
await self._on_condition_error(error)
else:
await self._on_condition_update(value)