Source code for caspia.meadow.metadata.store
import json
import logging
from copy import copy
from functools import partial
from pathlib import Path
from caspia import meadow
from caspia.reactive.disposables import AnonymousDisposable
logger = logging.getLogger(__name__)
[docs]class MetadataStore:
def __init__(self, directory):
self.directory = Path(directory)
self.directory.mkdir(exist_ok=True, parents=True)
self._event_subscribers = list()
self._entries = dict()
[docs] def subscribe_events(self, fn):
"""
Subscribe to changes in the store.
:param fn: Callable with signature `fn(name, old_value, new_value, **kwargs)`.
"""
self._event_subscribers.append(fn)
def dispose():
self._event_subscribers.remove(fn)
return AnonymousDisposable(dispose)
def __getitem__(self, name):
meadow.name.split(name) # ensure the name is valid
if name in self._entries:
return self._entries[name]
entry = ServiceEntry(self.get_path(name), partial(self._on_change, name))
self._entries[name] = entry
return entry
[docs] def names(self):
for file_name in self.directory.iterdir():
yield file_name.with_suffix('').name
def _on_change(self, name, old_value, new_value):
for subscriber in self._event_subscribers:
try:
subscriber(name, old_value, new_value)
except Exception:
logger.exception('Failure in metadata subscriber')
[docs]class ServiceEntry:
def __init__(self, path, on_change):
self.path = Path(path)
self._on_change = on_change
self._data = None
def _read(self):
if self._data:
return
if self.path.exists():
with open(self.path) as f:
self._data = json.load(f)
else:
self._data = {}
def _write(self):
if not self._data:
if self.path.exists():
self.path.unlink()
return
with open(self.path, 'w') as f:
json.dump(self._data, f)
def __getitem__(self, key):
self._read()
return self._data.get(key, None)
def __setitem__(self, key, value):
# setting None is the same as deleting
if value is None:
del self[key]
return
# update
self._read()
previous = copy(self._data)
if self._data.get(key, None) != value:
self._data[key] = value
self._write()
self._did_change(previous, self._data)
def __delitem__(self, key):
self._read()
previous = copy(self._data)
if self._data.get(key, None) is not None:
self._data.pop(key, None)
self._write()
self._did_change(previous, self._data)
def _did_change(self, old, new):
self._on_change(old, new)