Source code for caspia.toolbox.upnp

import asyncio
import logging
import re
from datetime import datetime

from async_timeout import timeout

LISTEN_PORT = 65507
logger = logging.getLogger(__name__)


[docs]def utcnow(): return datetime.utcnow().timestamp()
[docs]class MSResponse: def __init__(self, addr, msg): self.src_ip = addr[0] self.src_port = addr[1] data = dict(re.findall(r'(?P<name>.*?): (?P<value>.*?)\r\n', msg)) self.st = data.get('ST') self.usn = data.get('USN') self.server = data.get('SERVER') self.location = data.get('LOCATION') self.date = data.get('DATE') self.cache_control = data.get('CACHE-CONTROL') self.data = data
[docs]async def msearch(search_target='upnp:rootdevice', max_wait=2, loop=None): class MSearchClientProtocol: def __init__(self, search_target, max_wait, loop): self.transport = None self.msg = \ 'M-SEARCH * HTTP/1.1\r\n' \ 'HOST:239.255.255.250:1900\r\n' \ 'ST:{st}\r\n' \ 'MX:{mx}\r\n' \ 'MAN:"ssdp:discover"\r\n' \ '\r\n'.format(st=search_target, mx=max_wait) self.responses = asyncio.Queue(loop=loop) self.start_time = None self.max_wait = max_wait self.ip = '239.255.255.250' self.port = 1900 def connection_made(self, transport): self.transport = transport self.transport.sendto(self.msg.encode(), addr=(self.ip, self.port)) self.start_time = utcnow() def datagram_received(self, data, addr): self.responses.put_nowait(MSResponse(addr, data.decode())) def error_received(self, exc): print('error received:', exc) def connection_lost(self, exc): if exc: print('connection lost:', exc) def close(self): self.transport.close() def timeout(self): return self.remaining <= 0 @property def remaining(self): return self.start_time + self.max_wait - utcnow() assert 1 <= max_wait <= 120 loop = loop or asyncio.get_event_loop() cp = MSearchClientProtocol(search_target, max_wait, loop) await loop.create_datagram_endpoint(lambda: cp, local_addr=('0.0.0.0', LISTEN_PORT), reuse_port=True) assert cp.start_time try: async with timeout(cp.remaining, loop=loop): while not cp.timeout(): yield await cp.responses.get() except asyncio.TimeoutError: pass except Exception: # pylint: disable=broad-except logger.exception('Exception in m-search') finally: assert cp.timeout cp.close()
[docs]async def msearch_first(search_target='upnp:rootdevice', max_wait=2, loop=None): async for resp in msearch(search_target, max_wait, loop): return resp