import asyncio
import logging
from collections import defaultdict
from collections.abc import Iterable
from caspia.pan.rules.base import RuleActivator, RuleState
from caspia.toolbox.monitor import record_metric, register_metric
logger = logging.getLogger(__name__)
register_metric('caspia-pan:rule-state', 'string')
register_metric('caspia-pan:rule-activation', 'boolean')
[docs]class RulesManager:
def __init__(self):
#: list of rules
self.rules = list()
#: rule: RuleActivate | str(gateway) | None
self.activators = defaultdict(lambda: None)
#: rule: (RuleState, str(message))
self.rule_states = dict()
#: rule: float
self.rule_timestamps = dict()
# start monitoring
self._metrics_task = asyncio.ensure_future(self._post_metrics_runloop())
[docs] def add(self, rules):
"""Register a rules (or rule, if not iterable)."""
rules = rules if isinstance(rules, Iterable) else [rules]
for rule in rules:
if rule in self.rules:
logger.error('rule %s was added already (%s)', repr(rule),
repr(self.rules[self.rules.index(rule)]))
else:
self.rules.append(rule)
self._record_rule_state(rule, (RuleState.SETUP, None))
[docs] def is_active(self, rule):
activator = self.activators[rule]
if activator is None:
return False
elif isinstance(activator, str):
return True
else:
return activator.active
def _get_rule_activator(self, rule):
activators = RuleActivator.__subclasses__()
for activator_cls in activators:
if activator_cls.can_activate_rule(rule):
activator = activator_cls(rule)
activator.state_changed_handler = self._rule_activator_state_did_change
return activator
raise NotImplementedError(f'{type(rule)} not supported')
[docs] async def activate_rule(self, rule):
"""Activate (registered) rule."""
assert rule in self.rules
activator = self.activators[rule]
if isinstance(activator, str):
# already activated by a gateway
return False
elif isinstance(activator, RuleActivator):
if not activator.active:
await activator.activate()
return True
return False
elif activator is None:
activator = self._get_rule_activator(rule)
await activator.activate()
self.activators[rule] = activator
return True
else:
raise RuntimeError()
[docs] async def set_rules_activated_by_gateway(self, gateway, rules):
rules = set(rules)
for rule in self.rules:
if self.activators[rule] == gateway and rule not in rules:
self.activators[rule] = None
self._record_rule_state(rule,
(RuleState.SETUP, f'rule was deactivated by {gateway}'))
elif rule in rules:
await self.deactivate_rule(rule)
self.activators[rule] = gateway
message = f'{gateway} is taking care of this rule'
self._record_rule_state(rule, (RuleState.READY, message))
[docs] async def deactivate_rule(self, rule):
"""Deactivate (registered) rule."""
assert rule in self.rules
activator = self.activators[rule]
if isinstance(activator, RuleActivator):
activator.state_changed_handler = lambda _: None
if activator.active:
await activator.deactivate()
[docs] async def activate_all(self):
results, _ = await asyncio.wait([self.activate_rule(r) for r in self.rules])
failures = {r for r in results if r.exception() is not None}
level = logging.INFO if len(failures) == 0 else logging.WARNING
for failure in failures:
try:
failure.result()
except Exception:
logger.exception('Failed to activate rule: %r')
logger.log(level, 'Finished activation of %d rules (%d failed).', len(results),
len(failures))
[docs] async def deactivate(self, rules):
results, _ = await asyncio.wait([self.deactivate_rule(r) for r in self.rules])
failures = {r for r in results if r.exception() is not None}
level = logging.INFO if len(failures) == 0 else logging.WARNING
logger.log(level, 'Finished deactivation of %d rules (%d failed).', len(results),
len(failures))
def _rule_activator_state_did_change(self, activator):
warning_level = activator.state[0].warning_level
log_level = logging.DEBUG if warning_level <= 1 else logging.INFO
logger.log(log_level, 'Rule %r transitioned to %r', activator.rule.name
or activator.rule.identifier, activator.state)
self._record_rule_state(activator.rule, activator.state)
def _record_rule_state(self, rule, state):
record_metric(
'caspia-pan:rule-state',
dict(value=state[0].name, warning=state[0].warning_level, message=state[1] or ''),
dict(rule_identifier=rule.identifier,
rule_name=rule.name or f'[generated] Identifier: {rule.identifier}'))
self.rule_states[rule] = state
async def _post_metrics_runloop(self, interval=180.0):
while True:
self._post_states_of_all_rules()
await asyncio.sleep(interval)
def _post_states_of_all_rules(self):
for rule in self.rules:
state = self.rule_states.get(rule, (RuleState.SETUP, '-no state-'))
self._record_rule_state(rule, state)