import asyncio
from contextlib import contextmanager
from enum import Enum
from caspia.toolbox.monitor import record_metric
[docs]class RuleState(Enum):
SETUP = 'setup'
WAITING = 'waiting'
READY = 'ready'
RUNNING = 'running'
FAILURE = 'failure'
@property
def warning_level(self):
if self == RuleState.READY:
return 0
elif self in (RuleState.SETUP, RuleState.WAITING, RuleState.RUNNING):
return 1
else:
return 2
[docs]class RuleNotReadyError(Exception):
def __init__(self, message):
super().__init__(message)
self.message = message
[docs]class RuleActivator:
"""Implements some rule logic."""
WARMUP_DURATION = 120.0
def __init__(self, rule):
self._warmup_handle = None
self._warmup_failing = False
self.rule = rule
self.state_changed_handler = lambda self: None
self.state = RuleState.SETUP, None
self.active = False
[docs] @classmethod
def can_activate_rule(cls, rule):
return False
@property
def state(self):
"""Tuple of RuleState and optional message."""
return getattr(self, '_state', (RuleState.SETUP, None))
@state.setter
def state(self, new):
assert isinstance(new, tuple)
assert len(new) == 2
# if in warmup, do not report failures
if self._warmup_handle and new[0] == RuleState.FAILURE:
new = RuleState.WAITING, new[1]
self._warmup_failing = True
else:
self._warmup_failing = False
if self.state != new:
self._state = new
self.state_changed_handler(self)
def _leave_warmup(self):
self._warmup_handle = None
if self._warmup_failing:
self.state = RuleState.FAILURE, self.state[1]
[docs] async def activate(self, warmup=True):
"""Activate the rule."""
if self.active:
raise RuntimeError('rule is already active')
await self.do_activate()
if warmup:
self.active = True
self._warmup_handle = asyncio.get_event_loop().call_later(self.WARMUP_DURATION,
self._leave_warmup)
[docs] async def do_activate(self):
"""To be implemented by subclasses."""
raise NotImplementedError()
[docs] async def deactivate(self):
"""Deactivate the rule."""
if not self.active:
raise RuntimeError('rule isn\'t active')
await self.do_deactivate()
self.active = False
if self._warmup_handle:
self._warmup_handle.cancel()
self._warmup_handle = None
[docs] async def do_deactivate(self):
"""To be implemented by subclasses."""
raise NotImplementedError()
[docs] @contextmanager
def running_status(self, message=None):
"""Set status to RUNNING and then to FAILURE or READY (contextmanager)."""
self.state = RuleState.RUNNING, message
start = asyncio.get_event_loop().time()
try:
yield
except RuleNotReadyError as e:
error = e
self.state = RuleState.WAITING, error.message
except Exception as e: # pylint: disable=broad-except
error = e
message = f'[running error]: {repr(error)}'
self.state = RuleState.FAILURE, message
else:
self.state = RuleState.READY, None
error = None
end = asyncio.get_event_loop().time()
record_metric(
'caspia-pan:rule-activation',
dict(value=error is None,
duration=float(end - start),
error=repr(error) if error else None),
dict(rule_name=self.rule.name, rule_identifier=self.rule.identifier))