# pylint: disable=too-many-instance-attributes
import asyncio
import functools
import logging
from collections import defaultdict
import caspia.meadow.rules.handler
from caspia.meadow.client import (ConsumerConnection, GatewayConnection, ServiceBrowser,
ServiceConsumerMixin)
from caspia.meadow.client.gateway import Gateway
from caspia.meadow.lookup import Lookup
from caspia.pan import automator, builder
from caspia.pan import services as pan_services
from caspia.pan.rules_manager import RulesManager
__all__ = ['get_global_pan', 'set_global_pan', 'Pan']
_global_pan: 'Pan' = None
logger = logging.getLogger(__name__)
[docs]def set_global_pan(pan: 'Pan'):
global _global_pan
_global_pan = pan
[docs]def get_global_pan() -> 'Pan':
return _global_pan
[docs]class Pan:
def __init__(self, broker_url, storage, loop=None):
self.loop = loop or asyncio.get_event_loop()
self.name = 'pan'
# create gateway
gateway_conn = GatewayConnection(broker_url, name=self.name, loop=self.loop)
self.gateway = Gateway(self.name, connection=gateway_conn)
# prepare consumer connection
self.consumer_conn = ConsumerConnection(broker_url,
name=self.name + '-consumer',
loop=self.loop)
self.run_forever_tasks = [
gateway_conn.run_forever(),
self.consumer_conn.run_forever(),
self.check_unavailable_services()
]
self.lookup = Lookup(prepare=self.prepare_service)
self.browser = None
self.storage = storage
self.builder = builder.Builder(self.lookup, loop=self.loop, storage=self.storage)
self.loop = loop
self.prepared = False
self.requested_rules = defaultdict(dict) # {gateway_name: rules}
self.automator = automator.Automator()
self.rules_manager = RulesManager()
caspia.meadow.rules.handler.rule_handler = self.rules_manager.add
self.subscribed_gateways = set()
[docs] def prepare_service(self, service):
if isinstance(service, pan_services.PANService):
# publish locally create service
self.gateway.add(service)
service.attach_consumer(self.consumer_conn)
[docs] async def prepare(self):
self.prepared = True
self.browser = ServiceBrowser(connection=self.consumer_conn, lookup=self.lookup)
self.browser.add_services_update_hook(self.on_services_change)
# load all rules
logger.info('Loaded %d rules.', len(self.rules_manager.rules))
await self.rules_manager.activate_all()
# start optimizations
await self.request_gateway_activations()
[docs] async def on_services_change(self, added, removed):
logger.debug('Services update.')
# enable notifications on the services
subscribes = []
for service in added:
if isinstance(service, caspia.meadow.client.ServiceConsumerMixin):
subscribes.append(service.enable_notifications())
results = await asyncio.gather(*subscribes, return_exceptions=False)
failed = len([r for r in results if isinstance(r, Exception)])
if failed:
logger.error('Failed to subscribe for notifications (%s failured out of %s)', failed,
len(results))
else:
logger.info('Successfully subscribed for notifications of %s services', len(results))
# request rules
await self.request_gateway_activations()
[docs] async def run_forever(self):
if not self.prepared:
await self.prepare()
while True:
await asyncio.gather(*self.run_forever_tasks, loop=self.loop)
[docs] async def check_unavailable_services(self):
interval = 2
last_message_hash = None
while True:
await asyncio.sleep(interval)
interval = min(interval + 2, 10)
all_services = set(s for s in self.lookup.services.values()
if isinstance(s, ServiceConsumerMixin))
found_services = set(self.browser.services)
missing_services = all_services.difference(found_services)
current_hash = hash(
tuple(s.name for s in sorted(missing_services, key=lambda s: s.name)))
if current_hash == last_message_hash:
continue
else:
last_message_hash = current_hash
if len(missing_services) == 0:
logger.info('Service Availability Check: All services are available')
else:
fmt = 'Service Availability Check: Some mentioned services are not available: \n%r'
logger.warning(fmt, [str(s) for s in missing_services])
#
# Rules Activation
#
[docs] async def request_gateway_activations(self):
for gateway in self.browser.gateways:
if gateway == 'pan':
continue
serialized_rules = []
for rule in self.rules_manager.rules:
try:
serialized = rule.serialize()
if serialized:
serialized_rules.append(serialized)
except NotImplementedError:
pass
handler = functools.partial(self.on_gateway_active_rules_update, gateway)
if gateway not in self.subscribed_gateways:
self.subscribed_gateways.add(gateway)
await self.consumer_conn.subscribe_rules_update(gateway, handler)
await self.consumer_conn.request_rules(gateway, serialized_rules)
self.requested_rules[gateway] = serialized_rules[:]
[docs] async def on_gateway_active_rules_update(self, gateway, rules):
activated = [r for r in self.rules_manager.rules if rules.get(r.identifier, False)]
logger.info('Gateway %r took care of %d rules.', gateway, len(activated))
await self.rules_manager.set_rules_activated_by_gateway(gateway, activated)
await self.rules_manager.activate_all()