# pylint: disable=consider-using-enumerate
import struct
import caspia.node
from caspia.node import binary, errors
from caspia.toolbox import reprf
CONFIG_LENGTH = 1024
[docs]class Config(reprf.ReprFields):
def __post_init__(self):
pass
[docs] def get_bytes(self):
raise NotImplementedError
[docs] @classmethod
def from_bytes(cls, data):
raise NotImplementedError
def __hash__(self):
return hash(self.get_bytes())
def __eq__(self, other):
if isinstance(other, type(self)):
return hash(self) == hash(other)
else:
return False
def __repr_fields__(self):
""" Each subclass might override this and return some
dictionary which will be used to construct __str__.
"""
return {}
[docs]def build_node_config(components):
configurations = []
for comp_id in range(0, len(components)):
comp = components[comp_id]
if comp.config is None:
msg = 'Component {} does not have configuration'.format(comp)
raise errors.InvalidConfigurationError(msg)
comp_config = comp.config.get_bytes()
entry = struct.pack('<B', comp.type) + comp_config
configurations.append(entry)
data = binary.encode_list(configurations)
return data
[docs]def parse_node_config(data):
entries, _ = binary.decode_list(data)
configurations = []
component_classes = caspia.node.components.Component.component_classes()
for entry in entries:
if len(entry) == 0:
raise ValueError('Configuration entry has no data')
component_type = entry[0]
if component_type not in component_classes:
raise errors.InvalidConfigurationError(f'Unknown component type {component_type}')
config_cls = component_classes[component_type].Config
configurations.append(config_cls.from_bytes(entry[1:]))
return dict(enumerate(configurations))
[docs]def calculate_config_hash(data, max_length=CONFIG_LENGTH):
data = data + b'\x00' * (max_length - len(data))
hsh = 0
for i in range(0, len(data)):
hsh = (((hsh << 5) - hsh)) + data[i]
hsh = hsh & 0xFFFFFFFF
return hsh