Untitled

 avatar
user_3431143
plain_text
a year ago
3.6 kB
2
Indexable
Never
import scapy.all as scapy
import requests
import json
import time
import os
import socket

WEBHOOK_URL = "https://webhook.site/0b330127-63ac-4c42-bf70-2d1dff0d84a2"
JSON_FILE = "devices.json"
SCAN_INTERVAL = 60  # seconds


def get_device_name(ip):
    try:
        # Get the device name from the IP address
        device_name = socket.gethostbyaddr(ip)[0]
        return device_name
    except socket.herror:
        return None


def get_mac(ip):
    arp_request = scapy.ARP(pdst=ip)
    broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
    arp_request_broadcast = broadcast / arp_request
    answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0]
    return answered_list[0][1].hwsrc if answered_list else None


def scan_network():
    devices = {}

    ip_range = "192.168.0.0/24"

    arp_request = scapy.ARP(pdst=ip_range)
    broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
    arp_request_broadcast = broadcast / arp_request
    answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0]

    for item in answered_list:
        mac_address = item[1].hwsrc
        ip_address = item[1].psrc
        device_name = get_device_name(ip_address)
        device = {"name": device_name, "ip": ip_address, "mac": mac_address, "status": "online"}
        devices[ip_address] = device

    return devices


def send_webhook(devices):
    data = {"devices": devices}
    headers = {'Content-type': 'application/json'}

    response = requests.post(WEBHOOK_URL, data=json.dumps(data), headers=headers)

    if response.status_code == 200:
        print("Webhook sent successfully!")
    else:
        print(f"Failed to send webhook. Status code: {response.status_code}")


def save_devices(devices):
    with open(JSON_FILE, 'w') as f:
        json.dump(devices, f)


def load_devices():
    devices = {}
    if os.path.isfile(JSON_FILE):
        with open(JSON_FILE, 'r') as f:
            try:
                devices = json.load(f)
            except json.JSONDecodeError:
                print(f"JSON file '{JSON_FILE}' is corrupt or empty.")
    return devices


def get_device_changes(old_devices, new_devices):
    changed_devices = []

    for ip_address, new_device in new_devices.items():
        old_device = old_devices.get(ip_address)

        if not old_device:
            # New device found
            changed_devices.append(new_device)
            continue

        # Check for changes in status
        if old_device['status'] != new_device['status']:
            changed_devices.append(new_device)

    return changed_devices


def run():
    print("Starting network scanner...")

    # Load existing devices from JSON
    devices = load_devices()

    # If there are no existing devices, scan the network and save the results
    if not devices:
        print("No existing devices found. Scanning network...")
        devices = scan_network()
        save_devices(devices)
        send_webhook(list(devices.values()))
        print("Initial scan complete. Waiting for next scan...")
        time.sleep(SCAN_INTERVAL)

    while True:
        print("Performing network scan...")
        new_devices = scan_network()
        changed_devices = get_device_changes(devices, new_devices)

        if changed_devices:
            print("Device changes detected. Sending webhook...")
            send_webhook(changed_devices)

        save_devices(new_devices)
        devices = new_devices
        print("Network scan complete. Waiting for next scan...")
        time.sleep(SCAN_INTERVAL)

run()