Source code for caspia.meadow.client.browser

import asyncio
import logging
from typing import Callable, Iterable, Set, Union

import caspia.meadow.client.connection
from caspia.meadow import Lookup
from caspia.toolbox.managed_task import managed_task

logger = logging.getLogger(__name__)
Service = caspia.meadow.services.ServiceBase
ServiceName = str
GatewayName = str
ServicesUpdateHook = Callable[[Set[Service], Set[Service]], None]


[docs]class ServiceBrowser: UPDATE_DELAY = 0.1 def __init__(self, host=None, connection=None, loop=None, lookup=None, name=None): if connection is None: self.loop = loop or asyncio.get_event_loop() # type: asyncio.AbstractEventLoop self._conn = caspia.meadow.client.ConsumerConnection(host, loop=self.loop, name=name) asyncio.ensure_future(self._conn.run_forever(), loop=self.loop) else: self._conn = connection self.loop = connection.loop self._lookup = lookup or Lookup(prepare=self._prepare_service) self._services = set() # type: Set[Service] self._services_update_hooks = [self._services_updated] # type: List[ServicesUpdateHook] self._prepare_task = asyncio.ensure_future(self._setup(), loop=self.loop) self._gateway_map = dict() # type: Dict[ServiceName:GatewayName]
[docs] async def prepare(self): if not self._prepare_task.done(): await self._prepare_task
async def _setup(self): await self._conn.subscribe_gateways_update(self._handle_gateways_update) async def _services_updated(self, added, removed): logger.info('Gateway update: %d services added, %d services removed', len(added), len(removed)) async def _handle_gateways_update(self, gateways): # pylint: disable=no-member self._pending_gateways_update = gateways self._process_gateways_update.schedule(delay=self.UPDATE_DELAY) @managed_task() async def _process_gateways_update(self): if self._pending_gateways_update is None: return gateways, self._pending_gateways_update = self._pending_gateways_update, None logger.debug('Going to process gateways update.') start_time = asyncio.get_event_loop().time() services, gateway_map = set(), dict() for gw_name, gw_registration in gateways.items(): for serialized in gw_registration['services']: try: instance = self.lookup.get(serialized['name']) if isinstance(instance, caspia.meadow.client.ServiceConsumerMixin): instance.meadow_connection = self._conn await instance.on_gateways_update(gateways) services.add(instance) gateway_map[instance.name] = gw_name except Exception: name = serialized.get('name', '--unknown--') logger.exception('Failed to load service %s of gateway "%s"', name, gw_name) added = services.difference(self._services) removed = self.services.difference(services) self._services, self._gateway_map = services, gateway_map logger.debug('Gateways update processed (took %0.02f sec)', asyncio.get_event_loop().time() - start_time) if added or removed: for hook in self._services_update_hooks: await hook(added, removed)
[docs] def add_services_update_hook(self, func: ServicesUpdateHook): """ Register given hook to be called on every services updated. Example:: async def my_func(added_services, removed_services): print(added_services, removed_services) browser.add_services_update_hook(my_func) """ self._services_update_hooks.append(func)
def _prepare_service(self, service): pass @property def services(self) -> Iterable[Service]: """Broadcasted services within meadow.""" return self._services @property def gateways(self): """Broadcasted gateways within meadow.""" return set(self._gateway_map.values())
[docs] def gateway_of(self, service) -> Union[GatewayName, None]: """Name of gateway broadcasting given service (or None).""" return self._gateway_map.get(service.name)
@property def lookup(self) -> Lookup: """Lookup object holding all known services.""" return self._lookup