import scapy.all as scapy
import requests
import json
import time
import os
WEBHOOK_URL = "https://webhook.site/898bae18-a5de-4100-8acb-1db15a02081e"
JSON_FILE = "devices.json"
SCAN_INTERVAL = 60 # seconds
def get_device_name(ip):
try:
# Send an ARP request for the IP and get the device name from the response
result = scapy.arping(ip, verbose=False)
return result[0][1].psrc
except:
return None
def get_mac(ip):
arp_request = scapy.ARP(pdst=ip)
broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = broadcast / arp_request
answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0]
return answered_list[0][1].hwsrc if answered_list else None
def scan_network():
devices = {}
ip_range = "192.168.0.0/24"
arp_request = scapy.ARP(pdst=ip_range)
broadcast = scapy.Ether(dst="ff:ff:ff:ff:ff:ff")
arp_request_broadcast = broadcast / arp_request
answered_list = scapy.srp(arp_request_broadcast, timeout=1, verbose=False)[0]
for item in answered_list:
mac_address = item[1].hwsrc
ip_address = item[1].psrc
device_name = get_device_name(ip_address)
device = {"name": device_name, "ip": ip_address, "mac": mac_address, "status": "online"}
devices[ip_address] = device
return devices
def send_webhook(devices):
data = {"devices": list(devices.values())}
headers = {'Content-type': 'application/json'}
response = requests.post(WEBHOOK_URL, data=json.dumps(data), headers=headers)
if response.status_code == 200:
print("Webhook sent successfully!")
else:
print(f"Failed to send webhook. Status code: {response.status_code}")
def save_devices(devices):
with open(JSON_FILE, 'w') as f:
json.dump(devices, f)
def load_devices():
devices = {}
if os.path.isfile(JSON_FILE):
with open(JSON_FILE, 'r') as f:
try:
devices = json.load(f)
except json.JSONDecodeError:
print(f"JSON file '{JSON_FILE}' is corrupt or empty.")
return devices
def get_device_changes(old_devices, new_devices):
changed_devices = []
for ip_address, new_device in new_devices.items():
old_device = old_devices.get(ip_address)
if not old_device:
# New device found
changed_devices.append(new_device)
continue
# Check for changes in status
if old_device['status'] != new_device['status']:
changed_devices.append(new_device)
return changed_devices
def run():
# Load existing devices from JSON
devices = load_devices()
# If there are no existing devices, scan the network and save the results
if not devices:
print("No existing devices found. Scanning network...")
devices = scan_network()
save_devices(devices)
send_webhook(devices)
print("Initial scan complete. Waiting for next scan...")
time.sleep(SCAN_INTERVAL)
while True:
# Scan the network