Untitled
user_3431143
plain_text
a year ago
3.6 kB
2
Indexable
Never
import scapy.all as scapy import requests import json import time import os import socket WEBHOOK_URL = "https://webhook.site/0b330127-63ac-4c42-bf70-2d1dff0d84a2" JSON_FILE = "devices.json" SCAN_INTERVAL = 60 # seconds def get_device_name(ip): try: # Get the device name from the IP address device_name = socket.gethostbyaddr(ip)[0] return device_name except socket.herror: return None def get_mac(ip): arp_request = scapy.ARP(pdst=ip) broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff") arp_request_broadcast = broadcast / arp_request answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0] return answered_list[0][1].hwsrc if answered_list else None def scan_network(): devices = {} ip_range = "192.168.0.0/24" arp_request = scapy.ARP(pdst=ip_range) broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff") arp_request_broadcast = broadcast / arp_request answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0] for item in answered_list: mac_address = item[1].hwsrc ip_address = item[1].psrc device_name = get_device_name(ip_address) device = {"name": device_name, "ip": ip_address, "mac": mac_address, "status": "online"} devices[ip_address] = device return devices def send_webhook(devices): data = {"devices": devices} headers = {'Content-type': 'application/json'} response = requests.post(WEBHOOK_URL, data=json.dumps(data), headers=headers) if response.status_code == 200: print("Webhook sent successfully!") else: print(f"Failed to send webhook. Status code: {response.status_code}") def save_devices(devices): with open(JSON_FILE, 'w') as f: json.dump(devices, f) def load_devices(): devices = {} if os.path.isfile(JSON_FILE): with open(JSON_FILE, 'r') as f: try: devices = json.load(f) except json.JSONDecodeError: print(f"JSON file '{JSON_FILE}' is corrupt or empty.") return devices def get_device_changes(old_devices, new_devices): changed_devices = [] for ip_address, new_device in new_devices.items(): old_device = old_devices.get(ip_address) if not old_device: # New device found changed_devices.append(new_device) continue # Check for changes in status if old_device['status'] != new_device['status']: changed_devices.append(new_device) return changed_devices def run(): print("Starting network scanner...") # Load existing devices from JSON devices = load_devices() # If there are no existing devices, scan the network and save the results if not devices: print("No existing devices found. Scanning network...") devices = scan_network() save_devices(devices) send_webhook(list(devices.values())) print("Initial scan complete. Waiting for next scan...") time.sleep(SCAN_INTERVAL) while True: print("Performing network scan...") new_devices = scan_network() changed_devices = get_device_changes(devices, new_devices) if changed_devices: print("Device changes detected. Sending webhook...") send_webhook(changed_devices) save_devices(new_devices) devices = new_devices print("Network scan complete. Waiting for next scan...") time.sleep(SCAN_INTERVAL) run()