Untitled
unknown
python
18 days ago
2.5 kB
3
Indexable
Never
import logging from eventlet.semaphore import Semaphore from scapy.config import conf from scapy.layers.inet import IP from scapy.sendrecv import AsyncSniffer conf.manuf_file = r"C:\Users\14434\Desktop\Programming Projects\networkMonitor\manuf" import time import binascii class PacketCapture: def __init__(self, socketio, interface=None, filter_expr=""): self.socketio = socketio self.interface = interface self.filter_expr = filter_expr self.pcs = {} self.lock = Semaphore() logging.info(f"PacketCapture initialized on interface: {self.interface} with filter: '{self.filter_expr}'") def sniff_packets(self): logging.info("Starting packet sniffing...") try: self.sniffer = AsyncSniffer( iface=self.interface, filter=self.filter_expr, prn=self.process_packet, store=False ) self.sniffer.start() except Exception as e: logging.error(f"Error during sniffing: {e}") def process_packet(self, packet): packet_info = {} logging.info("Processing a new packet.") try: if packet.haslayer(IP): ip = packet[IP] packet_info['ip_src'] = ip.src packet_info['ip_dst'] = ip.dst packet_info['ip_proto'] = ip.proto packet_info['ip_ttl'] = ip.ttl packet_info['length'] = len(packet) packet_info['timestamp'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) payload = bytes(packet) packet_info['payload'] = binascii.hexlify(payload).decode('utf-8') self.socketio.emit('new_packet', packet_info) logging.info(f"Emitted packet: {packet_info}") time.sleep(.5) with self.lock: if 'ip_src' in packet_info: self.pcs[packet_info['ip_src']] = {"ip": packet_info['ip_src']} if 'ip_dst' in packet_info: self.pcs[packet_info['ip_dst']] = {"ip": packet_info['ip_dst']} except Exception as e: logging.error(f"Error processing packet: {e}") def get_pcs(self): with self.lock: pcs_list = [{"id": idx + 1, "name": f"PC-{idx + 1}", "ip": ip} for idx, ip in enumerate(self.pcs.keys())] logging.info(f"Detected PCs: {pcs_list}") return pcs_list
Leave a Comment