import asyncio
import logging
from typing import Callable, Iterable, Set, Union
import caspia.meadow.client.connection
from caspia.meadow import Lookup
from caspia.toolbox.managed_task import managed_task
logger = logging.getLogger(__name__)
Service = caspia.meadow.services.ServiceBase
ServiceName = str
GatewayName = str
ServicesUpdateHook = Callable[[Set[Service], Set[Service]], None]
[docs]class ServiceBrowser:
UPDATE_DELAY = 0.1
def __init__(self, host=None, connection=None, loop=None, lookup=None, name=None):
if connection is None:
self.loop = loop or asyncio.get_event_loop() # type: asyncio.AbstractEventLoop
self._conn = caspia.meadow.client.ConsumerConnection(host, loop=self.loop, name=name)
asyncio.ensure_future(self._conn.run_forever(), loop=self.loop)
else:
self._conn = connection
self.loop = connection.loop
self._lookup = lookup or Lookup(prepare=self._prepare_service)
self._services = set() # type: Set[Service]
self._services_update_hooks = [self._services_updated] # type: List[ServicesUpdateHook]
self._prepare_task = asyncio.ensure_future(self._setup(), loop=self.loop)
self._gateway_map = dict() # type: Dict[ServiceName:GatewayName]
[docs] async def prepare(self):
if not self._prepare_task.done():
await self._prepare_task
async def _setup(self):
await self._conn.subscribe_gateways_update(self._handle_gateways_update)
async def _services_updated(self, added, removed):
logger.info('Gateway update: %d services added, %d services removed', len(added),
len(removed))
async def _handle_gateways_update(self, gateways):
# pylint: disable=no-member
self._pending_gateways_update = gateways
self._process_gateways_update.schedule(delay=self.UPDATE_DELAY)
@managed_task()
async def _process_gateways_update(self):
if self._pending_gateways_update is None:
return
gateways, self._pending_gateways_update = self._pending_gateways_update, None
logger.debug('Going to process gateways update.')
start_time = asyncio.get_event_loop().time()
services, gateway_map = set(), dict()
for gw_name, gw_registration in gateways.items():
for serialized in gw_registration['services']:
try:
instance = self.lookup.get(serialized['name'])
if isinstance(instance, caspia.meadow.client.ServiceConsumerMixin):
instance.meadow_connection = self._conn
await instance.on_gateways_update(gateways)
services.add(instance)
gateway_map[instance.name] = gw_name
except Exception:
name = serialized.get('name', '--unknown--')
logger.exception('Failed to load service %s of gateway "%s"', name, gw_name)
added = services.difference(self._services)
removed = self.services.difference(services)
self._services, self._gateway_map = services, gateway_map
logger.debug('Gateways update processed (took %0.02f sec)',
asyncio.get_event_loop().time() - start_time)
if added or removed:
for hook in self._services_update_hooks:
await hook(added, removed)
[docs] def add_services_update_hook(self, func: ServicesUpdateHook):
"""
Register given hook to be called on every services updated.
Example::
async def my_func(added_services, removed_services):
print(added_services, removed_services)
browser.add_services_update_hook(my_func)
"""
self._services_update_hooks.append(func)
def _prepare_service(self, service):
pass
@property
def services(self) -> Iterable[Service]:
"""Broadcasted services within meadow."""
return self._services
@property
def gateways(self):
"""Broadcasted gateways within meadow."""
return set(self._gateway_map.values())
[docs] def gateway_of(self, service) -> Union[GatewayName, None]:
"""Name of gateway broadcasting given service (or None)."""
return self._gateway_map.get(service.name)
@property
def lookup(self) -> Lookup:
"""Lookup object holding all known services."""
return self._lookup