# pylint: disable=redefined-builtin
import logging
import os.path
import marshmallow
import yaml
import caspia.node.utils
from caspia.gateway import errors
from .schema.service import ServiceSchema
logger = logging.getLogger(__name__)
[docs]def iterate_definitions(data, namespace=None, type='group', hwid=False):
"""Generate service/node definitions within a document.
:yields: (namespace, type, data)
"""
if isinstance(data, list):
for item in data:
yield from iterate_definitions(item, namespace=namespace, type=type, hwid=hwid)
return
current = data.get('type', type) if isinstance(data, dict) else type
if current == 'group':
namespace = '' if namespace is None else namespace + '.'
subtype = 'hwid' if hwid else type
for path in data:
yield from iterate_definitions(data[path], namespace=namespace + path, type=subtype)
else:
namespace = namespace or ''
yield (namespace, current, data)
[docs]def load_hwid(work_dir):
hwid_map = dict()
for fpath in work_dir.iterdir():
if not fpath.is_file() or not fpath.name.endswith('hwid.yaml'):
continue
with open(fpath, 'r') as f:
data = yaml.load(f, ConfigLoader)
for name, _, hwidstr in iterate_definitions(data, hwid=True):
hwid_map[name] = caspia.node.utils.hwid_read(hwidstr)
return hwid_map
[docs]def deserialize_service_config(namespace, sdata, context=None):
context = context or {}
try:
stype = sdata['type']
schema = ServiceSchema.get(stype)(strict=True)
except KeyError:
raise errors.ConfigurationSemanticError(f'invalid service type "{stype}"')
schema.context['namespace'] = namespace
schema.context.update(context)
try:
config, _ = schema.load(sdata)
except marshmallow.exceptions.ValidationError as e:
name_components = [namespace]
if 'name' in sdata:
name_components.append(sdata['name'])
name = '.'.join(name_components)
messages = [f'Invalid configuration ({name})']
messages += list(_flatten_validation_messages(e.messages))
print(messages)
raise errors.ConfigurationSemanticError(*messages)
return config
[docs]def iterate_service_configs(work_dir, nodes):
context = {'nodes': nodes}
try:
for fpath in work_dir.iterdir():
if not fpath.is_file() or not fpath.name.endswith('services.yaml'):
continue
with open(fpath, 'r') as f:
data = yaml.load(f, ConfigLoader)
for name, _, sdata in iterate_definitions(data):
config = deserialize_service_config(name, sdata, context)
yield config
except yaml.YAMLError as error:
raise errors.ConfigurationSyntaxError(str(error))
[docs]def get_config_value(key, work_dir, default=None):
path = work_dir / 'config.yaml'
if not path.exists():
return default
with open(path, 'r') as f:
data = yaml.load(f, ConfigLoader)
if data is None:
return default
else:
return data.get(key, default)
[docs]class ConfigLoader(yaml.Loader): # pylint: disable=too-many-ancestors
def __init__(self, stream):
self._root = os.path.split(stream.name)[0]
super().__init__(stream)
[docs] def include(self, node):
filename = os.path.join(self._root, self.construct_scalar(node))
with open(filename, 'r') as f:
return yaml.load(f, ConfigLoader)
[docs] def import_module(self, node):
filename = os.path.join(self._root, self.construct_scalar(node))
from caspia.toolbox.module import import_module
module = import_module(filename)
return module
[docs] def import_object(self, node):
spec = os.path.join(self._root, self.construct_scalar(node))
filename, attribute = spec.split(':')
from caspia.toolbox.module import import_module
module = import_module(filename)
return getattr(module, attribute)
def _flatten_validation_messages(messages, prefix=None):
if isinstance(messages, str):
yield messages if not prefix else f'{prefix}: {messages}'
elif isinstance(messages, list):
for m in messages:
yield from _flatten_validation_messages(m, prefix)
elif isinstance(messages, dict):
for key, value in messages.items():
current_prefix = key if not prefix else '.'.join([prefix, key])
yield from _flatten_validation_messages(value, current_prefix)
else:
yield str(messages)
ConfigLoader.add_constructor('!include', ConfigLoader.include)
ConfigLoader.add_constructor('!import_module', ConfigLoader.import_module)
ConfigLoader.add_constructor('!import', ConfigLoader.import_object)