import asyncio
import logging
from caspia.meadow.client import ConsumerConnection, ServiceBrowser
from .bridge import Bridge
from .config import Config
logger = logging.getLogger(__name__)
[docs]class HAPManager:
def __init__(self, consumer_conn: ConsumerConnection, config: Config, loop=None):
self.loop = loop or asyncio.get_event_loop() # type: asyncio.AbstractEventLoop
self.config = config
self.consumer_conn = consumer_conn
self.browser = ServiceBrowser(connection=consumer_conn, loop=self.loop)
self.browser.add_services_update_hook(self._on_meadow_services_update)
self._bridges = set()
self._pending_update_handle = None
self._update_lock = asyncio.Lock(loop=self.loop)
self._published_services = set() # services pending to be bridged
self._republished_services = set() # republished services which require state refetch
self._known_services = set()
self._prepare_bridges()
self._schedule_update()
async def _on_meadow_services_update(self, added, removed):
unknown = added - self._known_services
known = added - unknown
if len(unknown) or len(known):
logger.info('Discovered %s new and %s known services. Will schedule update',
len(unknown), len(known))
self._published_services |= unknown
self._republished_services |= known
self._known_services |= unknown
self._schedule_update()
def _schedule_update(self):
if self._pending_update_handle:
self._pending_update_handle.cancel()
logger.info('Cancelling delayed update')
def do_update():
self._pending_update_handle = None
asyncio.ensure_future(self.update(), loop=self.loop)
logger.info('Scheduling delayed update')
self._pending_update_handle = self.loop.call_later(1.0, do_update)
def _prepare_bridges(self):
for cfg in self.config.bridges:
logger.info('Setting up new bridge %r', cfg.name)
bridge = Bridge(cfg.name, self.config, storage=cfg.storage(), loop=self.loop)
services = bridge.load_known_services(self.browser.lookup)
logger.info('Bridge %r was preloaded with %s known services', cfg.name, len(services))
self._known_services |= set(services)
self._bridges.add(bridge)
def _bridge_service(self, md_service):
for bridge in self._bridges:
if bridge.add(md_service):
logger.info('Service %s bridged by %s', md_service, bridge)
return True
return False
[docs] async def update(self):
with await self._update_lock:
logger.info('Starting update')
for md_service in self._published_services:
try:
if not Bridge.is_bridgable(md_service):
logger.debug('Skipping service %s as it is not bridgable', md_service)
continue
else:
self._bridge_service(md_service)
except Exception:
logger.exception('Failure when adding service %s', str(md_service))
for bridge in self._bridges:
await bridge.deploy()
for service in self._published_services | self._republished_services:
for bridge in self._bridges:
bridge.set_needs_update(service)
self._published_services = set()
logger.info('Update completed')