Source code for caspia.toolbox.services.presence.sources.arp

# pylint: disable=too-many-locals
#
# Implementation of an ARP source, that scans given network
# and sends updates to `presence` service.
#
# Configuration format:
# 'FF:FF:FF:FF:FF:FE': { name: alan-iphone, user: alan }
import asyncio
import logging
import time
from typing import Iterable, Set

import click
import yaml

from caspia.meadow.client import ServiceBrowser
from caspia.meadow.errors import NotAvailableError
from caspia.toolbox import arp
from caspia.toolbox.cli import async_command, logging_options
from caspia.toolbox.monitor import record_metric, register_metric
from caspia.toolbox.ping import ping

logger = logging.getLogger(__name__)


[docs]class Device: def __init__(self, user, name, mac_addr): self.user = user self.name = name self.mac_addr = mac_addr self.last_seen = None self.ip_addr = None
[docs]def present_devices(devices: Iterable[Device], seen_after: float) -> Set[Device]: return devices - {d for d in devices if d.last_seen is None or d.last_seen <= seen_after}
[docs]def away_devices(devices: Iterable[Device], seen_before: float) -> Set[Device]: return devices - {d for d in devices if d.last_seen is None or d.last_seen >= seen_before}
@click.command() @click.argument('config', type=click.Path()) @click.argument('presence-service-name', type=str) @click.option('--broker-url', envvar='BROKER_URL', type=str, default='mqtt://localhost', help='Broker URL') @click.option('--name', default='presence-arp-source', help='Name for meadow connection') @click.option('--interval', '-i', default=15, help='How ofter arp-scan will be run') @click.option('--timeout', '-t', default=35) @click.option('--interface', help='Interface to run arp-scan on') @click.option('--arp-scan-path', help='Path to arp-scan executable') @logging_options() @async_command async def run(broker_url, name, presence_service_name, config, interval, timeout, interface, arp_scan_path): register_metric('presence-arp:user-reached', 'boolean', labels=dict(name=name)) register_metric('presence-arp:user-present', 'boolean', labels=dict(name=name)) browser = ServiceBrowser(name=name, host=broker_url) presence = browser.lookup.get(presence_service_name, 'presence') devices = { Device(name=data['name'], user=data['user'], mac_addr=mac_addr) for mac_addr, data in yaml.load(open(config, 'rb')).items() } startup_time = time.time() # start sniffing arp packets def on_arp_packet(arp_response: arp.ArpResponse): for device in devices: if device.mac_addr.lower() == arp_response.mac_addr.lower(): logger.info('Device %s is present (we\'ve received ARP response)', device.name) device.ip_addr = arp_response.ip_addr device.last_seen = time.time() record_metric('presence-arp:user-reached', True, labels=dict(user=device.user)) sniff_task = asyncio.ensure_future(arp.sniff(on_arp_packet)) @sniff_task.add_done_callback def on_sniffing_done(fut): # pylint: disable=unused-variable try: fut.result() except Exception: logger.exception('ARP sniffing failed') import sys sys.exit(1) while True: try: scan_start = time.time() present_after = scan_start - timeout # perform ARP scan await arp.scan(interface=interface) # wait for all the arp replies to get processed by the sniffer await asyncio.sleep(1.0) # perform PING for each unreached device devices_to_ping = {d for d in away_devices(devices, present_after) if d.ip_addr} ping_results = await asyncio.gather(*[ping(d.ip_addr) for d in devices_to_ping], return_exceptions=True) for result, device in zip(ping_results, devices_to_ping): if isinstance(result, Exception): logger.info('Failed to ping %s (%s)', device.name, repr(result)) else: msg = 'Device %s did not respond to ARP request but ping was successful' logger.debug(msg, device.name) device.last_seen = time.time() record_metric('presence-arp:user-reached', True, labels=dict(user=device.user)) all_users = {d.user for d in devices} present_users = {d.user for d in present_devices(devices, present_after)} away_users = {d.user for d in away_devices(devices, present_after)} - present_users logger.info('Present: %s, away: %s, unknown: %s', present_users, away_users, all_users - away_users - present_users) presence_update = { user: { 'timeout': interval + 2 * (time.time() - scan_start) + 10, 'source': 'arp-scan', 'state': 'present' if user in present_users else 'away' } for user in away_users | present_users } await presence.update.write(presence_update) for user in all_users: record_metric('presence-arp:user-present', user in present_users, labels=dict(user=user)) logger.debug('Scan complete') # leave startup period for device in devices: if device.last_seen is None and startup_time < present_after: device.last_seen = 0 except NotAvailableError as e: logger.error(str(e)) except Exception as e: logger.exception('Failure: %r', e) await asyncio.sleep(interval)