Source code for caspia.hapbridge.manager

import asyncio
import logging

from caspia.meadow.client import ConsumerConnection, ServiceBrowser

from .bridge import Bridge
from .config import Config

logger = logging.getLogger(__name__)


[docs]class HAPManager: def __init__(self, consumer_conn: ConsumerConnection, config: Config, loop=None): self.loop = loop or asyncio.get_event_loop() # type: asyncio.AbstractEventLoop self.config = config self.consumer_conn = consumer_conn self.browser = ServiceBrowser(connection=consumer_conn, loop=self.loop) self.browser.add_services_update_hook(self._on_meadow_services_update) self._bridges = set() self._pending_update_handle = None self._update_lock = asyncio.Lock(loop=self.loop) self._published_services = set() # services pending to be bridged self._republished_services = set() # republished services which require state refetch self._known_services = set() self._prepare_bridges() self._schedule_update() async def _on_meadow_services_update(self, added, removed): unknown = added - self._known_services known = added - unknown if len(unknown) or len(known): logger.info('Discovered %s new and %s known services. Will schedule update', len(unknown), len(known)) self._published_services |= unknown self._republished_services |= known self._known_services |= unknown self._schedule_update() def _schedule_update(self): if self._pending_update_handle: self._pending_update_handle.cancel() logger.info('Cancelling delayed update') def do_update(): self._pending_update_handle = None asyncio.ensure_future(self.update(), loop=self.loop) logger.info('Scheduling delayed update') self._pending_update_handle = self.loop.call_later(1.0, do_update) def _prepare_bridges(self): for cfg in self.config.bridges: logger.info('Setting up new bridge %r', cfg.name) bridge = Bridge(cfg.name, self.config, storage=cfg.storage(), loop=self.loop) services = bridge.load_known_services(self.browser.lookup) logger.info('Bridge %r was preloaded with %s known services', cfg.name, len(services)) self._known_services |= set(services) self._bridges.add(bridge) def _bridge_service(self, md_service): for bridge in self._bridges: if bridge.add(md_service): logger.info('Service %s bridged by %s', md_service, bridge) return True return False
[docs] async def update(self): with await self._update_lock: logger.info('Starting update') for md_service in self._published_services: try: if not Bridge.is_bridgable(md_service): logger.debug('Skipping service %s as it is not bridgable', md_service) continue else: self._bridge_service(md_service) except Exception: logger.exception('Failure when adding service %s', str(md_service)) for bridge in self._bridges: await bridge.deploy() for service in self._published_services | self._republished_services: for bridge in self._bridges: bridge.set_needs_update(service) self._published_services = set() logger.info('Update completed')