Source code for caspia.node.components.base

# pylint: disable=abstract-class-instantiated
import collections
import functools
from abc import ABCMeta, abstractmethod

import aiopollen

import caspia
from caspia.node import events, pollen_action
from caspia.node.config import Config
from caspia.toolbox.subclasses import all_subclasses


[docs]class Component(metaclass=ABCMeta): """Abstract Component Class. Each component should derive from this class. (Class) Attributes to be defined by every subclass: type: int config_cls: Config state_cls: State """ type: int = None
[docs] class State(metaclass=ABCMeta):
[docs] @abstractmethod def update_from_bytes(self, data): raise NotImplementedError
[docs] @abstractmethod def update_from_event(self, event): raise NotImplementedError
[docs] def update_from_error(self, error_code, data): raise NotImplementedError
[docs] class Event(events.Event): """An Event. Every type of a broadcast made by a component should be represented as subclass of this class. Subclasses must provide `can_id` and `component_id` attributes. """
[docs] class Config(Config): pass
def __init__(self, node, identifier: int): self.identifier = identifier self.node = node self.config = None self.state = self.State()
[docs] @classmethod @abstractmethod def parse_broadcast(cls, broadcast): """Return Event subclass for specific broadcast.""" raise NotImplementedError
[docs] @staticmethod def component_classes(): """Return dictionary [type:cls] with all registered comp. classes.""" return {cls.type: cls for cls in all_subclasses(Component) if cls.type is not None}
[docs] @staticmethod def component_type_to_cls(types, raise_on_unknown=True): if not isinstance(types, collections.Iterable): return Component.component_type_to_cls([types], raise_on_unknown=raise_on_unknown)[0] result = [] classes = Component.component_classes() for tp in types: if tp in classes: result.append(classes[tp]) elif raise_on_unknown: raise ValueError(f'unknown component type {tp}') return result
[docs] @staticmethod def component_class_for_config_class(config_cls): """Get a Component subclass for a given Config subclass.""" for component_cls in Component.component_classes().values(): if component_cls.Config == config_cls: return component_cls raise KeyError('component for config cls %s not found.' % config_cls)
[docs] async def request(self, data: bytes, *args, timeout=5.0, **kwargs) -> bytes: """Make pollen request on behalf of the component.""" prefix = bytes([self.type, self.identifier]) return await self.node.client.request(self.node.can_id, prefix + data, *args, timeout=timeout, inner_timeout=timeout, **kwargs)
[docs] @staticmethod def get_error_code_domain(error_code) -> int: """Return the type of component the error_code is related to.""" return error_code >> 8
[docs] @classmethod def does_request(cls, builder): def decorator(f): def builder_(self, *args, **kwargs): prefix = bytes([self.type, self.identifier]) return self.node.can_id, prefix + builder(*args, **kwargs) @pollen_action.request(builder_) @functools.wraps(f) async def wrapped(self, *args, **kwargs): data = builder(*args, **kwargs) response = await self.request(data) return await f(self, response) return wrapped return decorator
[docs] async def load_state(self): try: data = await self.request(b'\x00') self.state.update_from_bytes(data) except aiopollen.errors.ResponseError as e: if self.get_error_code_domain(e.error_code) == self.type: self.state.update_from_error(e.error_code, e.data) else: raise return self.state
def __repr_fields__(self): return { 'type': type(self).__name__, 'config': caspia.node.reprf(self.config), 'state': caspia.node.reprf(self.state), }