Source code for caspia.toolbox.arp.sniffer

import asyncio
import logging
import sys
from pathlib import Path
from typing import Callable

import click
from scapy.layers.l2 import ARP
from scapy.sendrecv import sniff as sniff_scapy

from caspia.toolbox.arp.response import ArpResponse

logger = logging.getLogger(__name__)


[docs]def handle_packet(pkt): if not pkt.haslayer(ARP): return if pkt[ARP] == 1: # request (who-has) print('[request]', pkt[ARP].psrc, pkt[ARP].pdst, flush=True) if pkt[ARP].op == 2: # is-at print('[response]', pkt[ARP].hwsrc, pkt[ARP].psrc, flush=True)
@click.command() def sniff_cmd(): sniff_scapy(prn=handle_packet, filter='arp', store=0)
[docs]async def sniff(on_response: Callable[[ArpResponse], None]): executable = Path(sys.executable).with_name('caspia-arp-sniff') process = await asyncio.create_subprocess_exec(str(executable), stdout=asyncio.subprocess.PIPE) async def process_output(): while True: line = await process.stdout.readline() line = line.decode('utf-8').strip() if line.startswith('[response]'): _, mac_addr, ip_addr = line.split(' ') response = ArpResponse(ip_addr=ip_addr, mac_addr=mac_addr, name=None) try: on_response(response) except Exception: logger.exception('Failure when processing packet') elif line == '': return processing = asyncio.ensure_future(process_output()) try: retval = await process.wait() if retval: raise RuntimeError(f'caspia-arp-sniff failed with code {retval}') else: raise RuntimeWarning('caspia-arp-sniff unexpectedly finished') finally: if process.returncode is None: process.kill() processing.cancel()
if __name__ == '__main__': sniff_cmd()