Source code for caspia.node.node

import logging
from typing import Dict, Tuple, Type, TypeVar, cast

import aiopollen

from caspia.node import config, errors
from caspia.node.components import Component, System

logger = logging.getLogger(__name__)
C = TypeVar('C')


[docs]class Node: def __init__(self, pollen_client: aiopollen.Client, can_id: int): """ Initialize a new node. :param pollen_client: Client used for communication with the node. :param can_id: Pollen identifier of the node. """ self.can_id = can_id self.components: Dict[int, Component] = {} self.register_component(System) self.client = pollen_client self.loop = pollen_client.loop @property def system(self): """Return system component (always present component with identifier 0).""" return cast(System, self.components[0])
[docs] def get_component(self, identifier) -> Component: """Get component with given identifier.""" return self.components[identifier]
[docs] def register_component(self, component_cls: Type[C], identifier=None) -> Tuple[int, C]: """Register given component class for the components. :param component_cls: The class to be registered :param identifier: Optionally the identifier the new component should have. :returns: tuple with assigned identifier and the new component instance """ if identifier is None: for test_id in range(0, 16): if test_id not in self.components: identifier = test_id break if identifier > 15 or identifier is None: raise errors.CaspiaError('Node is limited to 16 components.') self.components[identifier] = component_cls(self, identifier) return identifier, cast(C, self.components[identifier])
[docs] def drop_components(self): """Remove all components except system component.""" for idx in range(1, 16): self.components.pop(idx, None)
[docs] async def configure(self, force=False, restart=True): """Configure the node to its current configuration. If the configuration is not complete or valid, raises errors.InvalidConfigurationError. :param force: True if the configuration should be uploaded to the board even if the config hashes are the same. :param restart: True if the node should be restarted after the configuration is done. """ # build the configuration buildinfo = await self.system.read_buildinfo() # check the node supports all required components required_component_types = {c.type for c in self.components.values() if c.identifier > 0} unsupported_components = required_component_types - set(buildinfo.supported_component_types) if unsupported_components: names = ','.join( [cls.__name__ for cls in Component.component_type_to_cls(unsupported_components)]) raise errors.CaspiaError(f'node does not support {names}') data = config.build_node_config(self.components) await self.system.config_reset() # reset the buffer to FLASH content # do we need to update? new_hash = config.calculate_config_hash(data) current_hash = await self.system.config_hash_read() if not force and new_hash == current_hash: logger.debug('0x{:03X} - no need for configuration'.format(self.can_id)) return False # update await self.system.config_clear() await self.system.config_write(data) current_hash = await self.system.config_hash_read() if current_hash != new_hash: raise errors.CaspiaError('configuration failed (hash check)') await self.system.config_flash() if restart: await self.system.restart() return True
[docs] async def load_configuration(self): data = await self.system.config_current_read() configurations = config.parse_node_config(data) initial_components = list(self.components.keys())[1:] for comp_id, comp_config in configurations.items(): component_cls = Component.component_class_for_config_class(type(comp_config)) # pylint: disable=unidiomatic-typecheck if comp_id not in self.components or type(self.components[comp_id]) != component_cls: self.components[comp_id] = component_cls(self, comp_id) self.components[comp_id].config = comp_config if comp_id in initial_components: initial_components.remove(comp_id) for key in initial_components: del self.components[key]
[docs] @staticmethod async def request_heartbeats(client, set_online=False): """ Send broadcast requesting heartbeat from all nodes. :param client: The client to send the heartbeats over. :param set_online: If True, nodes will set themselves to Online state. """ flags = set_online & 0x01 await client.broadcast(0x010, bytes([flags]))
def __repr_fields__(self): return { 'can_id': hex(self.can_id), 'components': self.components, }