Source code for caspia.meadow.metadata.store

import json
import logging
from copy import copy
from functools import partial
from pathlib import Path

from caspia import meadow
from caspia.reactive.disposables import AnonymousDisposable

logger = logging.getLogger(__name__)


[docs]class MetadataStore: def __init__(self, directory): self.directory = Path(directory) self.directory.mkdir(exist_ok=True, parents=True) self._event_subscribers = list() self._entries = dict()
[docs] def subscribe_events(self, fn): """ Subscribe to changes in the store. :param fn: Callable with signature `fn(name, old_value, new_value, **kwargs)`. """ self._event_subscribers.append(fn) def dispose(): self._event_subscribers.remove(fn) return AnonymousDisposable(dispose)
[docs] def get_path(self, name): return self.directory / f'{name}.json'
def __getitem__(self, name): meadow.name.split(name) # ensure the name is valid if name in self._entries: return self._entries[name] entry = ServiceEntry(self.get_path(name), partial(self._on_change, name)) self._entries[name] = entry return entry
[docs] def names(self): for file_name in self.directory.iterdir(): yield file_name.with_suffix('').name
def _on_change(self, name, old_value, new_value): for subscriber in self._event_subscribers: try: subscriber(name, old_value, new_value) except Exception: logger.exception('Failure in metadata subscriber')
[docs]class ServiceEntry: def __init__(self, path, on_change): self.path = Path(path) self._on_change = on_change self._data = None def _read(self): if self._data: return if self.path.exists(): with open(self.path) as f: self._data = json.load(f) else: self._data = {} def _write(self): if not self._data: if self.path.exists(): self.path.unlink() return with open(self.path, 'w') as f: json.dump(self._data, f)
[docs] def keys(self): self._read() return self._data.keys()
[docs] def read(self): self._read() return self._data
def __getitem__(self, key): self._read() return self._data.get(key, None) def __setitem__(self, key, value): # setting None is the same as deleting if value is None: del self[key] return # update self._read() previous = copy(self._data) if self._data.get(key, None) != value: self._data[key] = value self._write() self._did_change(previous, self._data) def __delitem__(self, key): self._read() previous = copy(self._data) if self._data.get(key, None) is not None: self._data.pop(key, None) self._write() self._did_change(previous, self._data) def _did_change(self, old, new): self._on_change(old, new)