Source code for caspia.gateway.gateway

# pylint: disable=too-many-instance-attributes
import asyncio
import functools
import logging

import caspia.meadow
from caspia.gateway import Configurator, Network, config
from caspia.gateway.cidmanagement import CidManagement
from caspia.gateway.monitor import AvailabilityMonitor, register_general_metrics
from caspia.gateway.rules import RulesFilter
from caspia.gateway.services import GatewayService
from caspia.meadow.client import Gateway, ServiceConsumerMixin
from caspia.meadow.lookup import Lookup
from caspia.meadow.rules import LightGroupAttachRule, OnDoRule, Rule
from caspia.node import Broadcast, Listener
from caspia.toolbox import monitor, storage

logger = logging.getLogger(__name__)
__all__ = ['CaspiaGateway']


[docs]class CaspiaGateway(Gateway): """Bridge between Caspia CAN Components and Meadow Services.""" def __init__(self, *, name, connection, config_path, storage_path, pollen_client, consumer_conn, rules_filter=RulesFilter.ALL): super().__init__(name, connection=connection) self.config_path, self.storage_path = config_path, storage_path self.config = config.GatewayConfig(config_path) self.consumer_conn = consumer_conn self.pollen_client = pollen_client self.storage = storage.ShelveStorage(str(storage_path / 'data')) self.lookup = Lookup(prepare=self.prepare_service) self.cid_management = CidManagement(cache_path=storage_path / 'canid_used.yaml') self.network = Network(pollen_client, self.cid_management) self.configurator = Configurator(hwid_map=self.hwid_map, network=self.network, lookup=self.lookup, loop=self.loop, work_dir=config_path, rules_filter=rules_filter) self.listener = Listener(self.on_component_event) self.component_dependencies = dict() self.configuration_request_handle = None self.init_monitors() self.busy = True
[docs] def init_monitors(self): self._availability_monitor = \ AvailabilityMonitor(gateway_name=self.name, pollen_client=self.pollen_client, network=self.network, loop=self.loop) register_general_metrics(self.name)
@property def hwid_map(self): return dict(self.config.iterate_nodes())
[docs] async def setup(self): # load CAN ID assignment self.configure_cidmanagement() # preload all configured services self.load_services() # prepare nodes for configuration await self.configurator.prepare_nodes() # listen for events on the bus self.pollen_client.listen_broadcast(self.on_raw_broadcast) # add services to the gateway self.add(list(self.lookup.services.values())) # start handling rules requests await self.connection.handle_rules_request(self.handle_rules_request) self.request_delayed_configuration()
[docs] def load_services(self): for cfg in self.config.iterate_services(): stype = cfg['type'] cls = caspia.meadow.services.ServiceBase.get_subclass(stype, mixin=GatewayService) sinstance = cls(config=cfg, network=self.network, storage=storage.ProxyStorage(self.storage, 'service.{}.'.format(cfg['name']))) self.lookup.add(sinstance)
[docs] def configure_cidmanagement(self): self.cid_management.configure(self.config.allowed_canids)
[docs] async def load_states(self): logger.info('Going to load state of all components') for name, node in self.network.nodes.items(): for component in node.components.values(): try: await component.load_state() except Exception: # pylint: disable=broad-except monitor.record_metric('caspia-gateway:nodes-err', 1) logger.error('Failed to load state of %s component at %s[%d]', type(component).__name__, name, component.identifier) logger.info('State of all components loaded')
[docs] def prepare_service(self, service): if isinstance(service, ServiceConsumerMixin): service.meadow_connection = self.consumer_conn
[docs] def on_raw_broadcast(self, *args, **kwargs): self.listener.process_broadcast(Broadcast(*args, **kwargs))
[docs] def on_component_event(self, node_cid, component_id, component_type, event): if not self.configurator.nodes_configured: return name = self.cid_management.name_of(node_cid) if name is None: logger.debug('Received event from unknown node 0x{:03X}'.format(node_cid)) return node = self.network.nodes[name] component = node.components[component_id] if component.type != component_type: args = (component_type, component.type) msg = 'Event does not match expected component type (%r, expected %r)' % args logger.error(msg) raise RuntimeError(msg) component.state.update_from_event(event) for dependent_service in self.configurator.component_dependencies[component]: task = asyncio.ensure_future(dependent_service.on_component_event(component, event)) handler = functools.partial(self.on_component_event_processed, name, component) task.add_done_callback(handler)
[docs] def on_component_event_processed(self, name, component, task): if task.exception() is not None and not task.cancelled(): try: task.result() except Exception: msg = 'Failure when processing broadcast from %s: %s' logger.exception(msg, name, str(task.exception()))
[docs] async def handle_rules_request(self, rules): logger.info('Received rules activation request (%d rules)', len(rules)) rules = sorted(rules, key=lambda rule: rule['identifier']) requested = [] def get_service(name, type=None): # pylint: disable=redefined-builtin if type: return self.lookup.get(name, type=type) else: instance = self.lookup.find(name) if instance is None: raise KeyError('Uknown service %s' % name) return instance def deserialize_rule(serialized): try: requested.append(Rule.deserialize(serialized, dict(get_service=get_service))) except KeyError as e: if e.args[0].startswith('Uknown service'): logger.debug('Failed to deserialize rule: %s', str(e)) else: raise except Exception as e: # pylint: disable=broad-except logger.warning('Failed to deserialize rule %s: %s', serialized, str(e)) for serialized in (r for r in rules if r['type'] == LightGroupAttachRule.serialized_type): deserialize_rule(serialized) for serialized in (r for r in rules if r['type'] == OnDoRule.serialized_type): deserialize_rule(serialized) self.configurator.requested_rules = requested self.request_delayed_configuration()
[docs] async def update_active_rules(self): rules = dict() for rule in self.configurator.requested_rules: rules[rule.identifier] = rule in self.configurator.active_rules logger.info('This gateway is taking care of %d rules (out of %d requested).', len(self.configurator.active_rules), len(self.configurator.requested_rules)) await self.connection.update_rules(rules)
[docs] def request_delayed_configuration(self): if self.configuration_request_handle is not None: self.configuration_request_handle.cancel() def start(): self.configuration_request_handle = None asyncio.ensure_future(self.configure(), loop=self.loop) logger.info('Scheduling delayed configuration ...') self.configuration_request_handle = self.loop.call_later(1.0, start)
[docs] async def configure(self): self.busy = True try: await self.configurator.configure() await self.update_active_rules() await self.load_states() # setup availability monitor, if needed if len(self._availability_monitor.nodes) == 0: self._availability_monitor.nodes = set(self.network.nodes) finally: self.busy = False