import asyncio
import logging
import re
from datetime import datetime
from async_timeout import timeout
LISTEN_PORT = 65507
logger = logging.getLogger(__name__)
[docs]def utcnow():
return datetime.utcnow().timestamp()
[docs]class MSResponse:
def __init__(self, addr, msg):
self.src_ip = addr[0]
self.src_port = addr[1]
data = dict(re.findall(r'(?P<name>.*?): (?P<value>.*?)\r\n', msg))
self.st = data.get('ST')
self.usn = data.get('USN')
self.server = data.get('SERVER')
self.location = data.get('LOCATION')
self.date = data.get('DATE')
self.cache_control = data.get('CACHE-CONTROL')
self.data = data
[docs]async def msearch(search_target='upnp:rootdevice', max_wait=2, loop=None):
class MSearchClientProtocol:
def __init__(self, search_target, max_wait, loop):
self.transport = None
self.msg = \
'M-SEARCH * HTTP/1.1\r\n' \
'HOST:239.255.255.250:1900\r\n' \
'ST:{st}\r\n' \
'MX:{mx}\r\n' \
'MAN:"ssdp:discover"\r\n' \
'\r\n'.format(st=search_target, mx=max_wait)
self.responses = asyncio.Queue(loop=loop)
self.start_time = None
self.max_wait = max_wait
self.ip = '239.255.255.250'
self.port = 1900
def connection_made(self, transport):
self.transport = transport
self.transport.sendto(self.msg.encode(), addr=(self.ip, self.port))
self.start_time = utcnow()
def datagram_received(self, data, addr):
self.responses.put_nowait(MSResponse(addr, data.decode()))
def error_received(self, exc):
print('error received:', exc)
def connection_lost(self, exc):
if exc:
print('connection lost:', exc)
def close(self):
self.transport.close()
def timeout(self):
return self.remaining <= 0
@property
def remaining(self):
return self.start_time + self.max_wait - utcnow()
assert 1 <= max_wait <= 120
loop = loop or asyncio.get_event_loop()
cp = MSearchClientProtocol(search_target, max_wait, loop)
await loop.create_datagram_endpoint(lambda: cp,
local_addr=('0.0.0.0', LISTEN_PORT),
reuse_port=True)
assert cp.start_time
try:
async with timeout(cp.remaining, loop=loop):
while not cp.timeout():
yield await cp.responses.get()
except asyncio.TimeoutError:
pass
except Exception: # pylint: disable=broad-except
logger.exception('Exception in m-search')
finally:
assert cp.timeout
cp.close()
[docs]async def msearch_first(search_target='upnp:rootdevice', max_wait=2, loop=None):
async for resp in msearch(search_target, max_wait, loop):
return resp