# pylint: disable=too-many-instance-attributes
import asyncio
import functools
import logging
import caspia.meadow
from caspia.gateway import Configurator, Network, config
from caspia.gateway.cidmanagement import CidManagement
from caspia.gateway.monitor import AvailabilityMonitor, register_general_metrics
from caspia.gateway.rules import RulesFilter
from caspia.gateway.services import GatewayService
from caspia.meadow.client import Gateway, ServiceConsumerMixin
from caspia.meadow.lookup import Lookup
from caspia.meadow.rules import LightGroupAttachRule, OnDoRule, Rule
from caspia.node import Broadcast, Listener
from caspia.toolbox import monitor, storage
logger = logging.getLogger(__name__)
__all__ = ['CaspiaGateway']
[docs]class CaspiaGateway(Gateway):
"""Bridge between Caspia CAN Components and Meadow Services."""
def __init__(self,
*,
name,
connection,
config_path,
storage_path,
pollen_client,
consumer_conn,
rules_filter=RulesFilter.ALL):
super().__init__(name, connection=connection)
self.config_path, self.storage_path = config_path, storage_path
self.config = config.GatewayConfig(config_path)
self.consumer_conn = consumer_conn
self.pollen_client = pollen_client
self.storage = storage.ShelveStorage(str(storage_path / 'data'))
self.lookup = Lookup(prepare=self.prepare_service)
self.cid_management = CidManagement(cache_path=storage_path / 'canid_used.yaml')
self.network = Network(pollen_client, self.cid_management)
self.configurator = Configurator(hwid_map=self.hwid_map,
network=self.network,
lookup=self.lookup,
loop=self.loop,
work_dir=config_path,
rules_filter=rules_filter)
self.listener = Listener(self.on_component_event)
self.component_dependencies = dict()
self.configuration_request_handle = None
self.init_monitors()
self.busy = True
[docs] def init_monitors(self):
self._availability_monitor = \
AvailabilityMonitor(gateway_name=self.name, pollen_client=self.pollen_client,
network=self.network, loop=self.loop)
register_general_metrics(self.name)
@property
def hwid_map(self):
return dict(self.config.iterate_nodes())
[docs] async def setup(self):
# load CAN ID assignment
self.configure_cidmanagement()
# preload all configured services
self.load_services()
# prepare nodes for configuration
await self.configurator.prepare_nodes()
# listen for events on the bus
self.pollen_client.listen_broadcast(self.on_raw_broadcast)
# add services to the gateway
self.add(list(self.lookup.services.values()))
# start handling rules requests
await self.connection.handle_rules_request(self.handle_rules_request)
self.request_delayed_configuration()
[docs] def load_services(self):
for cfg in self.config.iterate_services():
stype = cfg['type']
cls = caspia.meadow.services.ServiceBase.get_subclass(stype, mixin=GatewayService)
sinstance = cls(config=cfg,
network=self.network,
storage=storage.ProxyStorage(self.storage,
'service.{}.'.format(cfg['name'])))
self.lookup.add(sinstance)
[docs] async def load_states(self):
logger.info('Going to load state of all components')
for name, node in self.network.nodes.items():
for component in node.components.values():
try:
await component.load_state()
except Exception: # pylint: disable=broad-except
monitor.record_metric('caspia-gateway:nodes-err', 1)
logger.error('Failed to load state of %s component at %s[%d]',
type(component).__name__, name, component.identifier)
logger.info('State of all components loaded')
[docs] def prepare_service(self, service):
if isinstance(service, ServiceConsumerMixin):
service.meadow_connection = self.consumer_conn
[docs] def on_raw_broadcast(self, *args, **kwargs):
self.listener.process_broadcast(Broadcast(*args, **kwargs))
[docs] def on_component_event(self, node_cid, component_id, component_type, event):
if not self.configurator.nodes_configured:
return
name = self.cid_management.name_of(node_cid)
if name is None:
logger.debug('Received event from unknown node 0x{:03X}'.format(node_cid))
return
node = self.network.nodes[name]
component = node.components[component_id]
if component.type != component_type:
args = (component_type, component.type)
msg = 'Event does not match expected component type (%r, expected %r)' % args
logger.error(msg)
raise RuntimeError(msg)
component.state.update_from_event(event)
for dependent_service in self.configurator.component_dependencies[component]:
task = asyncio.ensure_future(dependent_service.on_component_event(component, event))
handler = functools.partial(self.on_component_event_processed, name, component)
task.add_done_callback(handler)
[docs] def on_component_event_processed(self, name, component, task):
if task.exception() is not None and not task.cancelled():
try:
task.result()
except Exception:
msg = 'Failure when processing broadcast from %s: %s'
logger.exception(msg, name, str(task.exception()))
[docs] async def handle_rules_request(self, rules):
logger.info('Received rules activation request (%d rules)', len(rules))
rules = sorted(rules, key=lambda rule: rule['identifier'])
requested = []
def get_service(name, type=None): # pylint: disable=redefined-builtin
if type:
return self.lookup.get(name, type=type)
else:
instance = self.lookup.find(name)
if instance is None:
raise KeyError('Uknown service %s' % name)
return instance
def deserialize_rule(serialized):
try:
requested.append(Rule.deserialize(serialized, dict(get_service=get_service)))
except KeyError as e:
if e.args[0].startswith('Uknown service'):
logger.debug('Failed to deserialize rule: %s', str(e))
else:
raise
except Exception as e: # pylint: disable=broad-except
logger.warning('Failed to deserialize rule %s: %s', serialized, str(e))
for serialized in (r for r in rules if r['type'] == LightGroupAttachRule.serialized_type):
deserialize_rule(serialized)
for serialized in (r for r in rules if r['type'] == OnDoRule.serialized_type):
deserialize_rule(serialized)
self.configurator.requested_rules = requested
self.request_delayed_configuration()
[docs] async def update_active_rules(self):
rules = dict()
for rule in self.configurator.requested_rules:
rules[rule.identifier] = rule in self.configurator.active_rules
logger.info('This gateway is taking care of %d rules (out of %d requested).',
len(self.configurator.active_rules), len(self.configurator.requested_rules))
await self.connection.update_rules(rules)
[docs] def request_delayed_configuration(self):
if self.configuration_request_handle is not None:
self.configuration_request_handle.cancel()
def start():
self.configuration_request_handle = None
asyncio.ensure_future(self.configure(), loop=self.loop)
logger.info('Scheduling delayed configuration ...')
self.configuration_request_handle = self.loop.call_later(1.0, start)