import asyncio
import logging
import sys
from pathlib import Path
from typing import Callable
import click
from scapy.layers.l2 import ARP
from scapy.sendrecv import sniff as sniff_scapy
from caspia.toolbox.arp.response import ArpResponse
logger = logging.getLogger(__name__)
[docs]def handle_packet(pkt):
if not pkt.haslayer(ARP):
return
if pkt[ARP] == 1: # request (who-has)
print('[request]', pkt[ARP].psrc, pkt[ARP].pdst, flush=True)
if pkt[ARP].op == 2: # is-at
print('[response]', pkt[ARP].hwsrc, pkt[ARP].psrc, flush=True)
@click.command()
def sniff_cmd():
sniff_scapy(prn=handle_packet, filter='arp', store=0)
[docs]async def sniff(on_response: Callable[[ArpResponse], None]):
executable = Path(sys.executable).with_name('caspia-arp-sniff')
process = await asyncio.create_subprocess_exec(str(executable), stdout=asyncio.subprocess.PIPE)
async def process_output():
while True:
line = await process.stdout.readline()
line = line.decode('utf-8').strip()
if line.startswith('[response]'):
_, mac_addr, ip_addr = line.split(' ')
response = ArpResponse(ip_addr=ip_addr, mac_addr=mac_addr, name=None)
try:
on_response(response)
except Exception:
logger.exception('Failure when processing packet')
elif line == '':
return
processing = asyncio.ensure_future(process_output())
try:
retval = await process.wait()
if retval:
raise RuntimeError(f'caspia-arp-sniff failed with code {retval}')
else:
raise RuntimeWarning('caspia-arp-sniff unexpectedly finished')
finally:
if process.returncode is None:
process.kill()
processing.cancel()
if __name__ == '__main__':
sniff_cmd()