Untitled

 avatar
user_3431143
plain_text
a year ago
3.1 kB
1
Indexable
Never
import scapy.all as scapy 
import requests 
import json 
import time 
import os.path
import logging

WEBHOOK_URL = "https://webhook.site/898bae18-a5de-4100-8acb-1db15a02081e"
JSON_FILE = "devices.json"
LOG_FILE = "network_scanner.log"

def get_mac(ip):
    arp_request = scapy.ARP(pdst=ip)
    broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
    arp_request_broadcast = broadcast/arp_request
    answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0]         
    return answered_list[0][1].hwsrc if answered_list else None

def scan_network():
    devices = []

    ip_range = "192.168.0.0/24"  # Replace with your network IP range

    arp_request = scapy.ARP(pdst=ip_range)
    broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
    arp_request_broadcast = broadcast/arp_request
    answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0]

    for item in answered_list:
        mac_address = item[1].hwsrc
        ip_address = item[1].psrc
        device_name = get_device_name(ip_address)
        device = {"ip": ip_address, "mac": mac_address, "status": "online", "name": device_name}
        devices.append(device)

    return devices

def get_device_name(ip):
    # Use scapy to get the device name
    try:
        hostname = scapy.getfqdn(ip)
        return hostname.split('.')[0]
    except:
        return "Unknown"

def send_webhook(devices):
    data = {"devices": devices}
    headers = {'Content-type': 'application/json'}

    response = requests.post(WEBHOOK_URL, data=json.dumps(data), headers=headers)

    if response.status_code == 200:
        logging.info("Webhook sent successfully!")
    else:
        logging.error(f"Failed to send webhook. Status code: {response.status_code}")

def save_devices(devices):
    with open(JSON_FILE, 'w') as f:
        json.dump(devices, f)

def load_devices():
    devices = []
    if os.path.isfile(JSON_FILE):
        with open(JSON_FILE, 'r') as f:
            try:
                devices = json.load(f)
            except json.JSONDecodeError:
                logging.error(f"JSON file '{JSON_FILE}' is corrupt or empty.")
    return devices

def check_device_status(old_device, new_device):
    if old_device['ip'] == new_device['ip'] and old_device['mac'] == new_device['mac']:
        if old_device['status'] == 'online' and new_device['status'] == 'offline':
            return True
        elif old_device['status'] == 'offline' and new_device['status'] == 'online':
            return True
    return False

def run():
    logging.basicConfig(filename=LOG_FILE, level=logging.DEBUG)

    devices = scan_network()
    save_devices(devices)
    send_webhook(devices)
    logging.info(f"Initial network scan complete. Found {len(devices)} devices.")
    
    while True:
        old_devices = load_devices()
        new_devices = scan_network()

        updated_devices = []

        for new_device in new_devices:
            for old_device in old_devices:
                if check_device_status(old_device, new_device):
                    updated