Untitled
unknown
python
a year ago
2.5 kB
15
Indexable
import logging
from eventlet.semaphore import Semaphore
from scapy.config import conf
from scapy.layers.inet import IP
from scapy.sendrecv import AsyncSniffer
conf.manuf_file = r"C:\Users\14434\Desktop\Programming Projects\networkMonitor\manuf"
import time
import binascii
class PacketCapture:
def __init__(self, socketio, interface=None, filter_expr=""):
self.socketio = socketio
self.interface = interface
self.filter_expr = filter_expr
self.pcs = {}
self.lock = Semaphore()
logging.info(f"PacketCapture initialized on interface: {self.interface} with filter: '{self.filter_expr}'")
def sniff_packets(self):
logging.info("Starting packet sniffing...")
try:
self.sniffer = AsyncSniffer(
iface=self.interface,
filter=self.filter_expr,
prn=self.process_packet,
store=False
)
self.sniffer.start()
except Exception as e:
logging.error(f"Error during sniffing: {e}")
def process_packet(self, packet):
packet_info = {}
logging.info("Processing a new packet.")
try:
if packet.haslayer(IP):
ip = packet[IP]
packet_info['ip_src'] = ip.src
packet_info['ip_dst'] = ip.dst
packet_info['ip_proto'] = ip.proto
packet_info['ip_ttl'] = ip.ttl
packet_info['length'] = len(packet)
packet_info['timestamp'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
payload = bytes(packet)
packet_info['payload'] = binascii.hexlify(payload).decode('utf-8')
self.socketio.emit('new_packet', packet_info)
logging.info(f"Emitted packet: {packet_info}")
time.sleep(.5)
with self.lock:
if 'ip_src' in packet_info:
self.pcs[packet_info['ip_src']] = {"ip": packet_info['ip_src']}
if 'ip_dst' in packet_info:
self.pcs[packet_info['ip_dst']] = {"ip": packet_info['ip_dst']}
except Exception as e:
logging.error(f"Error processing packet: {e}")
def get_pcs(self):
with self.lock:
pcs_list = [{"id": idx + 1, "name": f"PC-{idx + 1}", "ip": ip} for idx, ip in enumerate(self.pcs.keys())]
logging.info(f"Detected PCs: {pcs_list}")
return pcs_list
Editor is loading...
Leave a Comment