Source code for caspia.meadow.metadata.provider

import asyncio
import logging
from functools import partial

from caspia import meadow
from caspia.meadow.client import Gateway, GatewayConnection, ServiceGatewayMixin
from caspia.meadow.services import MetadataProviderBase, ServiceBase

from .store import MetadataStore

logger = logging.getLogger(__name__)


[docs]class MetadataProviderService(ServiceGatewayMixin, MetadataProviderBase): def __init__(self, provider: 'MetadataProvider'): super().__init__('global') self.provider = provider self._update_entries() self.provider.store.subscribe_events(self._handle_store_event) def _update_entries(self): self.entries.value = sorted(self.provider.store.names()) def _handle_store_event(self, *args, **kwargs): self._update_entries()
[docs] async def characteristic_write(self, characteristic, value, **kwargs): if characteristic is self.create: entry = self.provider.store[value] entry['manually-created'] = True else: super().characteristic_write(characteristic, value, **kwargs)
[docs]class MetadataProvider: metadata_only_services = ['group', 'room'] def __init__(self, broker_url, store, gateway_conn=None): self.store: MetadataStore = store self.hosted_services = dict() if not gateway_conn: self.gateway_conn = GatewayConnection(broker_url=broker_url, name='metadata-provider') asyncio.ensure_future(self.gateway_conn.run_forever()) else: self.gateway_conn = gateway_conn self.gateway_conn.handle_read('+', '$metadata', self.handle_metadata_read, handle_dolar=True) self.gateway_conn.handle_write('+', '$metadata', self.handle_metadata_write, handle_dolar=True) self.gateway = Gateway('metadata-provider', connection=self.gateway_conn, services=[MetadataProviderService(self)]) self.store.subscribe_events(self.handle_store_event) self._setup_services()
[docs] def handle_store_event(self, name, old_value, new_value): stype, _ = meadow.name.split(name, strict=False) if not old_value and new_value and stype in self.metadata_only_services: self._start_hosting_metadata_service(name) elif old_value and not new_value and stype in self.metadata_only_services: self._stop_hosting_metadata_service(name) self.notify(name)
[docs] def handle_metadata_read(self, service, characteristic, **kwargs): assert characteristic == '$metadata' return self.store[service].read()
[docs] def handle_metadata_write(self, value, extra, service, characteristic, **kwargs): assert characteristic == '$metadata' entry = self.store[service] for k, v in value.items(): entry[k] = v
[docs] def notify(self, service): ntf = self.gateway_conn.notify(service, '$metadata', self.store[service].read(), retain=True) task = asyncio.ensure_future(ntf) task.add_done_callback(partial(self._on_notification_sent, service))
def _on_notification_sent(self, service, task): if task.exception(): logger.error('failed to send metadata notification for %s: %r', service, task.exception()) def _setup_services(self): for name in self.store.names(): stype, _ = meadow.name.split(name, strict=False) if stype in self.metadata_only_services: self._start_hosting_metadata_service(name) def _start_hosting_metadata_service(self, name): if name in self.hosted_services: logger.warning('service %s is already being hosted by metadata-provider', name) return try: stype, _ = meadow.name.split(name) except ValueError: logger.warning('service %s has invalid name', name) return cls = ServiceBase.get_subclass(stype, mixin=ServiceGatewayMixin) instance = cls(name) self.hosted_services[name] = instance self.gateway.add(instance) def _stop_hosting_metadata_service(self, name): if name not in self.hosted_services: logger.warning('service %s is already not being hosted by metadata-provider', name) return instance = self.hosted_services.pop(name) self.gateway.remove(instance)