Source code for caspia.pan.rules.base

import asyncio
from contextlib import contextmanager
from enum import Enum

from caspia.toolbox.monitor import record_metric


[docs]class RuleState(Enum): SETUP = 'setup' WAITING = 'waiting' READY = 'ready' RUNNING = 'running' FAILURE = 'failure' @property def warning_level(self): if self == RuleState.READY: return 0 elif self in (RuleState.SETUP, RuleState.WAITING, RuleState.RUNNING): return 1 else: return 2
[docs]class RuleNotReadyError(Exception): def __init__(self, message): super().__init__(message) self.message = message
[docs]class RuleActivator: """Implements some rule logic.""" WARMUP_DURATION = 120.0 def __init__(self, rule): self._warmup_handle = None self._warmup_failing = False self.rule = rule self.state_changed_handler = lambda self: None self.state = RuleState.SETUP, None self.active = False
[docs] @classmethod def can_activate_rule(cls, rule): return False
@property def state(self): """Tuple of RuleState and optional message.""" return getattr(self, '_state', (RuleState.SETUP, None)) @state.setter def state(self, new): assert isinstance(new, tuple) assert len(new) == 2 # if in warmup, do not report failures if self._warmup_handle and new[0] == RuleState.FAILURE: new = RuleState.WAITING, new[1] self._warmup_failing = True else: self._warmup_failing = False if self.state != new: self._state = new self.state_changed_handler(self) def _leave_warmup(self): self._warmup_handle = None if self._warmup_failing: self.state = RuleState.FAILURE, self.state[1]
[docs] async def activate(self, warmup=True): """Activate the rule.""" if self.active: raise RuntimeError('rule is already active') await self.do_activate() if warmup: self.active = True self._warmup_handle = asyncio.get_event_loop().call_later(self.WARMUP_DURATION, self._leave_warmup)
[docs] async def do_activate(self): """To be implemented by subclasses.""" raise NotImplementedError()
[docs] async def deactivate(self): """Deactivate the rule.""" if not self.active: raise RuntimeError('rule isn\'t active') await self.do_deactivate() self.active = False if self._warmup_handle: self._warmup_handle.cancel() self._warmup_handle = None
[docs] async def do_deactivate(self): """To be implemented by subclasses.""" raise NotImplementedError()
[docs] @contextmanager def running_status(self, message=None): """Set status to RUNNING and then to FAILURE or READY (contextmanager).""" self.state = RuleState.RUNNING, message start = asyncio.get_event_loop().time() try: yield except RuleNotReadyError as e: error = e self.state = RuleState.WAITING, error.message except Exception as e: # pylint: disable=broad-except error = e message = f'[running error]: {repr(error)}' self.state = RuleState.FAILURE, message else: self.state = RuleState.READY, None error = None end = asyncio.get_event_loop().time() record_metric( 'caspia-pan:rule-activation', dict(value=error is None, duration=float(end - start), error=repr(error) if error else None), dict(rule_name=self.rule.name, rule_identifier=self.rule.identifier))