Source code for caspia.meadow.metadata.provider
import asyncio
import logging
from functools import partial
from caspia import meadow
from caspia.meadow.client import Gateway, GatewayConnection, ServiceGatewayMixin
from caspia.meadow.services import MetadataProviderBase, ServiceBase
from .store import MetadataStore
logger = logging.getLogger(__name__)
[docs]class MetadataProviderService(ServiceGatewayMixin, MetadataProviderBase):
def __init__(self, provider: 'MetadataProvider'):
super().__init__('global')
self.provider = provider
self._update_entries()
self.provider.store.subscribe_events(self._handle_store_event)
def _update_entries(self):
self.entries.value = sorted(self.provider.store.names())
def _handle_store_event(self, *args, **kwargs):
self._update_entries()
[docs] async def characteristic_write(self, characteristic, value, **kwargs):
if characteristic is self.create:
entry = self.provider.store[value]
entry['manually-created'] = True
else:
super().characteristic_write(characteristic, value, **kwargs)
[docs]class MetadataProvider:
metadata_only_services = ['group', 'room']
def __init__(self, broker_url, store, gateway_conn=None):
self.store: MetadataStore = store
self.hosted_services = dict()
if not gateway_conn:
self.gateway_conn = GatewayConnection(broker_url=broker_url, name='metadata-provider')
asyncio.ensure_future(self.gateway_conn.run_forever())
else:
self.gateway_conn = gateway_conn
self.gateway_conn.handle_read('+',
'$metadata',
self.handle_metadata_read,
handle_dolar=True)
self.gateway_conn.handle_write('+',
'$metadata',
self.handle_metadata_write,
handle_dolar=True)
self.gateway = Gateway('metadata-provider',
connection=self.gateway_conn,
services=[MetadataProviderService(self)])
self.store.subscribe_events(self.handle_store_event)
self._setup_services()
[docs] def handle_store_event(self, name, old_value, new_value):
stype, _ = meadow.name.split(name, strict=False)
if not old_value and new_value and stype in self.metadata_only_services:
self._start_hosting_metadata_service(name)
elif old_value and not new_value and stype in self.metadata_only_services:
self._stop_hosting_metadata_service(name)
self.notify(name)
[docs] def handle_metadata_read(self, service, characteristic, **kwargs):
assert characteristic == '$metadata'
return self.store[service].read()
[docs] def handle_metadata_write(self, value, extra, service, characteristic, **kwargs):
assert characteristic == '$metadata'
entry = self.store[service]
for k, v in value.items():
entry[k] = v
[docs] def notify(self, service):
ntf = self.gateway_conn.notify(service,
'$metadata',
self.store[service].read(),
retain=True)
task = asyncio.ensure_future(ntf)
task.add_done_callback(partial(self._on_notification_sent, service))
def _on_notification_sent(self, service, task):
if task.exception():
logger.error('failed to send metadata notification for %s: %r', service,
task.exception())
def _setup_services(self):
for name in self.store.names():
stype, _ = meadow.name.split(name, strict=False)
if stype in self.metadata_only_services:
self._start_hosting_metadata_service(name)
def _start_hosting_metadata_service(self, name):
if name in self.hosted_services:
logger.warning('service %s is already being hosted by metadata-provider', name)
return
try:
stype, _ = meadow.name.split(name)
except ValueError:
logger.warning('service %s has invalid name', name)
return
cls = ServiceBase.get_subclass(stype, mixin=ServiceGatewayMixin)
instance = cls(name)
self.hosted_services[name] = instance
self.gateway.add(instance)
def _stop_hosting_metadata_service(self, name):
if name not in self.hosted_services:
logger.warning('service %s is already not being hosted by metadata-provider', name)
return
instance = self.hosted_services.pop(name)
self.gateway.remove(instance)