Untitled

 avatar
user_3431143
plain_text
a year ago
2.8 kB
2
Indexable
Never
import json
import time
import requests

from scapy.all import ARP, Ether, srp


DEVICE_WEBHOOK_URL = "https://webhook.site/898bae18-a5de-4100-8acb-1db15a02081e"
LOG_WEBHOOK_URL = "https://webhook.site/2ac7238e-81ab-40d4-b2f4-6be4a7afc066"
NETWORK_IP_RANGE = "192.168.0.0/24"
DEVICE_JSON_FILE = "devices.json"


def get_network_devices():
    # create ARP request packet
    arp = ARP(pdst=NETWORK_IP_RANGE)
    ether = Ether(dst="ff:ff:ff:ff:ff:ff")
    packet = ether/arp

    # send the packet and capture response
    result = srp(packet, timeout=3, verbose=0)[0]

    # create a list of network devices
    devices = []
    for sent, received in result:
        devices.append({'ip_address': received.psrc, 'mac_address': received.hwsrc})

    return devices


def is_device_online(ip):
    # create ARP request packet for specific IP address
    arp = ARP(pdst=ip)
    ether = Ether(dst="ff:ff:ff:ff:ff:ff")
    packet = ether/arp

    # send the packet and capture response
    result = srp(packet, timeout=3, verbose=0)[0]

    # return True if response is received, indicating device is online
    return bool(result)


def load_devices():
    try:
        with open(DEVICE_JSON_FILE, "r") as f:
            devices = json.load(f)
    except FileNotFoundError:
        devices = []
    return devices


def save_devices(devices):
    with open(DEVICE_JSON_FILE, "w") as f:
        json.dump(devices, f)


def send_devices(devices):
    payload = {"devices": devices}
    response = requests.post(DEVICE_WEBHOOK_URL, json=payload)
    return response.status_code


def send_log_message(message):
    payload = {"message": message}
    response = requests.post(LOG_WEBHOOK_URL, json=payload)
    return response.status_code


def check_devices(devices):
    updated_devices = []
    for device in devices:
        is_online = is_device_online(device['ip_address'])
        if device['online'] != is_online:
            device['online'] = is_online
            updated_devices.append(device)
    return updated_devices


def run():
    # load devices from file
    devices = load_devices()

    # initial network scan
    if not devices:
        devices = get_network_devices()
        for device in devices:
            device['online'] = is_device_online(device['ip_address'])
        save_devices(devices)
        send_devices(devices)

    while True:
        # check for device changes
        updated_devices = check_devices(devices)
        if updated_devices:
            save_devices(devices)
            send_devices(updated_devices)

        # sleep for 60 seconds
        time.sleep(60)


if __name__ == "__main__":
    # initialize log file
    send_log_message("Network scanner started.")

    try:
        run()
    except Exception as e:
        send_log_message(f"Network scanner error: {str(e)}")