Source code for caspia.pan.rules_manager

import asyncio
import logging
from collections import defaultdict
from collections.abc import Iterable

from caspia.pan.rules.base import RuleActivator, RuleState
from caspia.toolbox.monitor import record_metric, register_metric

logger = logging.getLogger(__name__)

register_metric('caspia-pan:rule-state', 'string')
register_metric('caspia-pan:rule-activation', 'boolean')


[docs]class RulesManager: def __init__(self): #: list of rules self.rules = list() #: rule: RuleActivate | str(gateway) | None self.activators = defaultdict(lambda: None) #: rule: (RuleState, str(message)) self.rule_states = dict() #: rule: float self.rule_timestamps = dict() # start monitoring self._metrics_task = asyncio.ensure_future(self._post_metrics_runloop())
[docs] def add(self, rules): """Register a rules (or rule, if not iterable).""" rules = rules if isinstance(rules, Iterable) else [rules] for rule in rules: if rule in self.rules: logger.error('rule %s was added already (%s)', repr(rule), repr(self.rules[self.rules.index(rule)])) else: self.rules.append(rule) self._record_rule_state(rule, (RuleState.SETUP, None))
[docs] def is_active(self, rule): activator = self.activators[rule] if activator is None: return False elif isinstance(activator, str): return True else: return activator.active
def _get_rule_activator(self, rule): activators = RuleActivator.__subclasses__() for activator_cls in activators: if activator_cls.can_activate_rule(rule): activator = activator_cls(rule) activator.state_changed_handler = self._rule_activator_state_did_change return activator raise NotImplementedError(f'{type(rule)} not supported')
[docs] async def activate_rule(self, rule): """Activate (registered) rule.""" assert rule in self.rules activator = self.activators[rule] if isinstance(activator, str): # already activated by a gateway return False elif isinstance(activator, RuleActivator): if not activator.active: await activator.activate() return True return False elif activator is None: activator = self._get_rule_activator(rule) await activator.activate() self.activators[rule] = activator return True else: raise RuntimeError()
[docs] async def set_rules_activated_by_gateway(self, gateway, rules): rules = set(rules) for rule in self.rules: if self.activators[rule] == gateway and rule not in rules: self.activators[rule] = None self._record_rule_state(rule, (RuleState.SETUP, f'rule was deactivated by {gateway}')) elif rule in rules: await self.deactivate_rule(rule) self.activators[rule] = gateway message = f'{gateway} is taking care of this rule' self._record_rule_state(rule, (RuleState.READY, message))
[docs] async def deactivate_rule(self, rule): """Deactivate (registered) rule.""" assert rule in self.rules activator = self.activators[rule] if isinstance(activator, RuleActivator): activator.state_changed_handler = lambda _: None if activator.active: await activator.deactivate()
[docs] async def activate_all(self): results, _ = await asyncio.wait([self.activate_rule(r) for r in self.rules]) failures = {r for r in results if r.exception() is not None} level = logging.INFO if len(failures) == 0 else logging.WARNING for failure in failures: try: failure.result() except Exception: logger.exception('Failed to activate rule: %r') logger.log(level, 'Finished activation of %d rules (%d failed).', len(results), len(failures))
[docs] async def deactivate(self, rules): results, _ = await asyncio.wait([self.deactivate_rule(r) for r in self.rules]) failures = {r for r in results if r.exception() is not None} level = logging.INFO if len(failures) == 0 else logging.WARNING logger.log(level, 'Finished deactivation of %d rules (%d failed).', len(results), len(failures))
def _rule_activator_state_did_change(self, activator): warning_level = activator.state[0].warning_level log_level = logging.DEBUG if warning_level <= 1 else logging.INFO logger.log(log_level, 'Rule %r transitioned to %r', activator.rule.name or activator.rule.identifier, activator.state) self._record_rule_state(activator.rule, activator.state) def _record_rule_state(self, rule, state): record_metric( 'caspia-pan:rule-state', dict(value=state[0].name, warning=state[0].warning_level, message=state[1] or ''), dict(rule_identifier=rule.identifier, rule_name=rule.name or f'[generated] Identifier: {rule.identifier}')) self.rule_states[rule] = state async def _post_metrics_runloop(self, interval=180.0): while True: self._post_states_of_all_rules() await asyncio.sleep(interval) def _post_states_of_all_rules(self): for rule in self.rules: state = self.rule_states.get(rule, (RuleState.SETUP, '-no state-')) self._record_rule_state(rule, state)