# pylint: disable=too-many-locals
#
# Implementation of an ARP source, that scans given network
# and sends updates to `presence` service.
#
# Configuration format:
# 'FF:FF:FF:FF:FF:FE': { name: alan-iphone, user: alan }
import asyncio
import logging
import time
from typing import Iterable, Set
import click
import yaml
from caspia.meadow.client import ServiceBrowser
from caspia.meadow.errors import NotAvailableError
from caspia.toolbox import arp
from caspia.toolbox.cli import async_command, logging_options
from caspia.toolbox.monitor import record_metric, register_metric
from caspia.toolbox.ping import ping
logger = logging.getLogger(__name__)
[docs]class Device:
def __init__(self, user, name, mac_addr):
self.user = user
self.name = name
self.mac_addr = mac_addr
self.last_seen = None
self.ip_addr = None
[docs]def present_devices(devices: Iterable[Device], seen_after: float) -> Set[Device]:
return devices - {d for d in devices if d.last_seen is None or d.last_seen <= seen_after}
[docs]def away_devices(devices: Iterable[Device], seen_before: float) -> Set[Device]:
return devices - {d for d in devices if d.last_seen is None or d.last_seen >= seen_before}
@click.command()
@click.argument('config', type=click.Path())
@click.argument('presence-service-name', type=str)
@click.option('--broker-url',
envvar='BROKER_URL',
type=str,
default='mqtt://localhost',
help='Broker URL')
@click.option('--name', default='presence-arp-source', help='Name for meadow connection')
@click.option('--interval', '-i', default=15, help='How ofter arp-scan will be run')
@click.option('--timeout', '-t', default=35)
@click.option('--interface', help='Interface to run arp-scan on')
@click.option('--arp-scan-path', help='Path to arp-scan executable')
@logging_options()
@async_command
async def run(broker_url, name, presence_service_name, config, interval, timeout, interface,
arp_scan_path):
register_metric('presence-arp:user-reached', 'boolean', labels=dict(name=name))
register_metric('presence-arp:user-present', 'boolean', labels=dict(name=name))
browser = ServiceBrowser(name=name, host=broker_url)
presence = browser.lookup.get(presence_service_name, 'presence')
devices = {
Device(name=data['name'], user=data['user'], mac_addr=mac_addr)
for mac_addr, data in yaml.load(open(config, 'rb')).items()
}
startup_time = time.time()
# start sniffing arp packets
def on_arp_packet(arp_response: arp.ArpResponse):
for device in devices:
if device.mac_addr.lower() == arp_response.mac_addr.lower():
logger.info('Device %s is present (we\'ve received ARP response)', device.name)
device.ip_addr = arp_response.ip_addr
device.last_seen = time.time()
record_metric('presence-arp:user-reached', True, labels=dict(user=device.user))
sniff_task = asyncio.ensure_future(arp.sniff(on_arp_packet))
@sniff_task.add_done_callback
def on_sniffing_done(fut): # pylint: disable=unused-variable
try:
fut.result()
except Exception:
logger.exception('ARP sniffing failed')
import sys
sys.exit(1)
while True:
try:
scan_start = time.time()
present_after = scan_start - timeout
# perform ARP scan
await arp.scan(interface=interface)
# wait for all the arp replies to get processed by the sniffer
await asyncio.sleep(1.0)
# perform PING for each unreached device
devices_to_ping = {d for d in away_devices(devices, present_after) if d.ip_addr}
ping_results = await asyncio.gather(*[ping(d.ip_addr) for d in devices_to_ping],
return_exceptions=True)
for result, device in zip(ping_results, devices_to_ping):
if isinstance(result, Exception):
logger.info('Failed to ping %s (%s)', device.name, repr(result))
else:
msg = 'Device %s did not respond to ARP request but ping was successful'
logger.debug(msg, device.name)
device.last_seen = time.time()
record_metric('presence-arp:user-reached', True, labels=dict(user=device.user))
all_users = {d.user for d in devices}
present_users = {d.user for d in present_devices(devices, present_after)}
away_users = {d.user for d in away_devices(devices, present_after)} - present_users
logger.info('Present: %s, away: %s, unknown: %s', present_users, away_users,
all_users - away_users - present_users)
presence_update = {
user: {
'timeout': interval + 2 * (time.time() - scan_start) + 10,
'source': 'arp-scan',
'state': 'present' if user in present_users else 'away'
}
for user in away_users | present_users
}
await presence.update.write(presence_update)
for user in all_users:
record_metric('presence-arp:user-present',
user in present_users,
labels=dict(user=user))
logger.debug('Scan complete')
# leave startup period
for device in devices:
if device.last_seen is None and startup_time < present_after:
device.last_seen = 0
except NotAvailableError as e:
logger.error(str(e))
except Exception as e:
logger.exception('Failure: %r', e)
await asyncio.sleep(interval)