import logging
from typing import Dict, Tuple, Type, TypeVar, cast
import aiopollen
from caspia.node import config, errors
from caspia.node.components import Component, System
logger = logging.getLogger(__name__)
C = TypeVar('C')
[docs]class Node:
def __init__(self, pollen_client: aiopollen.Client, can_id: int):
"""
Initialize a new node.
:param pollen_client: Client used for communication with the node.
:param can_id: Pollen identifier of the node.
"""
self.can_id = can_id
self.components: Dict[int, Component] = {}
self.register_component(System)
self.client = pollen_client
self.loop = pollen_client.loop
@property
def system(self):
"""Return system component (always present component with identifier 0)."""
return cast(System, self.components[0])
[docs] def get_component(self, identifier) -> Component:
"""Get component with given identifier."""
return self.components[identifier]
[docs] def register_component(self, component_cls: Type[C], identifier=None) -> Tuple[int, C]:
"""Register given component class for the components.
:param component_cls: The class to be registered
:param identifier: Optionally the identifier the new component should have.
:returns: tuple with assigned identifier and the new component instance
"""
if identifier is None:
for test_id in range(0, 16):
if test_id not in self.components:
identifier = test_id
break
if identifier > 15 or identifier is None:
raise errors.CaspiaError('Node is limited to 16 components.')
self.components[identifier] = component_cls(self, identifier)
return identifier, cast(C, self.components[identifier])
[docs] def drop_components(self):
"""Remove all components except system component."""
for idx in range(1, 16):
self.components.pop(idx, None)
[docs] async def load_configuration(self):
data = await self.system.config_current_read()
configurations = config.parse_node_config(data)
initial_components = list(self.components.keys())[1:]
for comp_id, comp_config in configurations.items():
component_cls = Component.component_class_for_config_class(type(comp_config))
# pylint: disable=unidiomatic-typecheck
if comp_id not in self.components or type(self.components[comp_id]) != component_cls:
self.components[comp_id] = component_cls(self, comp_id)
self.components[comp_id].config = comp_config
if comp_id in initial_components:
initial_components.remove(comp_id)
for key in initial_components:
del self.components[key]
[docs] @staticmethod
async def request_heartbeats(client, set_online=False):
"""
Send broadcast requesting heartbeat from all nodes.
:param client: The client to send the heartbeats over.
:param set_online: If True, nodes will set themselves to Online state.
"""
flags = set_online & 0x01
await client.broadcast(0x010, bytes([flags]))
def __repr_fields__(self):
return {
'can_id': hex(self.can_id),
'components': self.components,
}