Source code for caspia.gateway.config.loading

# pylint: disable=redefined-builtin
import logging
import os.path

import marshmallow
import yaml

import caspia.node.utils
from caspia.gateway import errors

from .schema.service import ServiceSchema

logger = logging.getLogger(__name__)


[docs]def iterate_definitions(data, namespace=None, type='group', hwid=False): """Generate service/node definitions within a document. :yields: (namespace, type, data) """ if isinstance(data, list): for item in data: yield from iterate_definitions(item, namespace=namespace, type=type, hwid=hwid) return current = data.get('type', type) if isinstance(data, dict) else type if current == 'group': namespace = '' if namespace is None else namespace + '.' subtype = 'hwid' if hwid else type for path in data: yield from iterate_definitions(data[path], namespace=namespace + path, type=subtype) else: namespace = namespace or '' yield (namespace, current, data)
[docs]def load_hwid(work_dir): hwid_map = dict() for fpath in work_dir.iterdir(): if not fpath.is_file() or not fpath.name.endswith('hwid.yaml'): continue with open(fpath, 'r') as f: data = yaml.load(f, ConfigLoader) for name, _, hwidstr in iterate_definitions(data, hwid=True): hwid_map[name] = caspia.node.utils.hwid_read(hwidstr) return hwid_map
[docs]def deserialize_service_config(namespace, sdata, context=None): context = context or {} try: stype = sdata['type'] schema = ServiceSchema.get(stype)(strict=True) except KeyError: raise errors.ConfigurationSemanticError(f'invalid service type "{stype}"') schema.context['namespace'] = namespace schema.context.update(context) try: config, _ = schema.load(sdata) except marshmallow.exceptions.ValidationError as e: name_components = [namespace] if 'name' in sdata: name_components.append(sdata['name']) name = '.'.join(name_components) messages = [f'Invalid configuration ({name})'] messages += list(_flatten_validation_messages(e.messages)) print(messages) raise errors.ConfigurationSemanticError(*messages) return config
[docs]def iterate_service_configs(work_dir, nodes): context = {'nodes': nodes} try: for fpath in work_dir.iterdir(): if not fpath.is_file() or not fpath.name.endswith('services.yaml'): continue with open(fpath, 'r') as f: data = yaml.load(f, ConfigLoader) for name, _, sdata in iterate_definitions(data): config = deserialize_service_config(name, sdata, context) yield config except yaml.YAMLError as error: raise errors.ConfigurationSyntaxError(str(error))
[docs]def get_config_value(key, work_dir, default=None): path = work_dir / 'config.yaml' if not path.exists(): return default with open(path, 'r') as f: data = yaml.load(f, ConfigLoader) if data is None: return default else: return data.get(key, default)
[docs]class ConfigLoader(yaml.Loader): # pylint: disable=too-many-ancestors def __init__(self, stream): self._root = os.path.split(stream.name)[0] super().__init__(stream)
[docs] def include(self, node): filename = os.path.join(self._root, self.construct_scalar(node)) with open(filename, 'r') as f: return yaml.load(f, ConfigLoader)
[docs] def import_module(self, node): filename = os.path.join(self._root, self.construct_scalar(node)) from caspia.toolbox.module import import_module module = import_module(filename) return module
[docs] def import_object(self, node): spec = os.path.join(self._root, self.construct_scalar(node)) filename, attribute = spec.split(':') from caspia.toolbox.module import import_module module = import_module(filename) return getattr(module, attribute)
def _flatten_validation_messages(messages, prefix=None): if isinstance(messages, str): yield messages if not prefix else f'{prefix}: {messages}' elif isinstance(messages, list): for m in messages: yield from _flatten_validation_messages(m, prefix) elif isinstance(messages, dict): for key, value in messages.items(): current_prefix = key if not prefix else '.'.join([prefix, key]) yield from _flatten_validation_messages(value, current_prefix) else: yield str(messages) ConfigLoader.add_constructor('!include', ConfigLoader.include) ConfigLoader.add_constructor('!import_module', ConfigLoader.import_module) ConfigLoader.add_constructor('!import', ConfigLoader.import_object)