Source code for caspia.pan.pan

# pylint: disable=too-many-instance-attributes
import asyncio
import functools
import logging
from collections import defaultdict

import caspia.meadow.rules.handler
from caspia.meadow.client import (ConsumerConnection, GatewayConnection, ServiceBrowser,
                                  ServiceConsumerMixin)
from caspia.meadow.client.gateway import Gateway
from caspia.meadow.lookup import Lookup
from caspia.pan import automator, builder
from caspia.pan import services as pan_services
from caspia.pan.rules_manager import RulesManager

__all__ = ['get_global_pan', 'set_global_pan', 'Pan']
_global_pan: 'Pan' = None
logger = logging.getLogger(__name__)


[docs]def set_global_pan(pan: 'Pan'): global _global_pan _global_pan = pan
[docs]def get_global_pan() -> 'Pan': return _global_pan
[docs]class Pan: def __init__(self, broker_url, storage, loop=None): self.loop = loop or asyncio.get_event_loop() self.name = 'pan' # create gateway gateway_conn = GatewayConnection(broker_url, name=self.name, loop=self.loop) self.gateway = Gateway(self.name, connection=gateway_conn) # prepare consumer connection self.consumer_conn = ConsumerConnection(broker_url, name=self.name + '-consumer', loop=self.loop) self.run_forever_tasks = [ gateway_conn.run_forever(), self.consumer_conn.run_forever(), self.check_unavailable_services() ] self.lookup = Lookup(prepare=self.prepare_service) self.browser = None self.storage = storage self.builder = builder.Builder(self.lookup, loop=self.loop, storage=self.storage) self.loop = loop self.prepared = False self.requested_rules = defaultdict(dict) # {gateway_name: rules} self.automator = automator.Automator() self.rules_manager = RulesManager() caspia.meadow.rules.handler.rule_handler = self.rules_manager.add self.subscribed_gateways = set()
[docs] def prepare_service(self, service): if isinstance(service, pan_services.PANService): # publish locally create service self.gateway.add(service) service.attach_consumer(self.consumer_conn)
[docs] async def prepare(self): self.prepared = True self.browser = ServiceBrowser(connection=self.consumer_conn, lookup=self.lookup) self.browser.add_services_update_hook(self.on_services_change) # load all rules logger.info('Loaded %d rules.', len(self.rules_manager.rules)) await self.rules_manager.activate_all() # start optimizations await self.request_gateway_activations()
[docs] async def on_services_change(self, added, removed): logger.debug('Services update.') # enable notifications on the services subscribes = [] for service in added: if isinstance(service, caspia.meadow.client.ServiceConsumerMixin): subscribes.append(service.enable_notifications()) results = await asyncio.gather(*subscribes, return_exceptions=False) failed = len([r for r in results if isinstance(r, Exception)]) if failed: logger.error('Failed to subscribe for notifications (%s failured out of %s)', failed, len(results)) else: logger.info('Successfully subscribed for notifications of %s services', len(results)) # request rules await self.request_gateway_activations()
[docs] async def run_forever(self): if not self.prepared: await self.prepare() while True: await asyncio.gather(*self.run_forever_tasks, loop=self.loop)
[docs] async def check_unavailable_services(self): interval = 2 last_message_hash = None while True: await asyncio.sleep(interval) interval = min(interval + 2, 10) all_services = set(s for s in self.lookup.services.values() if isinstance(s, ServiceConsumerMixin)) found_services = set(self.browser.services) missing_services = all_services.difference(found_services) current_hash = hash( tuple(s.name for s in sorted(missing_services, key=lambda s: s.name))) if current_hash == last_message_hash: continue else: last_message_hash = current_hash if len(missing_services) == 0: logger.info('Service Availability Check: All services are available') else: fmt = 'Service Availability Check: Some mentioned services are not available: \n%r' logger.warning(fmt, [str(s) for s in missing_services])
# # Rules Activation #
[docs] async def request_gateway_activations(self): for gateway in self.browser.gateways: if gateway == 'pan': continue serialized_rules = [] for rule in self.rules_manager.rules: try: serialized = rule.serialize() if serialized: serialized_rules.append(serialized) except NotImplementedError: pass handler = functools.partial(self.on_gateway_active_rules_update, gateway) if gateway not in self.subscribed_gateways: self.subscribed_gateways.add(gateway) await self.consumer_conn.subscribe_rules_update(gateway, handler) await self.consumer_conn.request_rules(gateway, serialized_rules) self.requested_rules[gateway] = serialized_rules[:]
[docs] async def on_gateway_active_rules_update(self, gateway, rules): activated = [r for r in self.rules_manager.rules if rules.get(r.identifier, False)] logger.info('Gateway %r took care of %d rules.', gateway, len(activated)) await self.rules_manager.set_rules_activated_by_gateway(gateway, activated) await self.rules_manager.activate_all()