Untitled

mail@pastecode.io avatar
unknown
python
18 days ago
2.5 kB
3
Indexable
Never
import logging
from eventlet.semaphore import Semaphore
from scapy.config import conf
from scapy.layers.inet import IP
from scapy.sendrecv import AsyncSniffer

conf.manuf_file = r"C:\Users\14434\Desktop\Programming Projects\networkMonitor\manuf"
import time
import binascii

class PacketCapture:
    def __init__(self, socketio, interface=None, filter_expr=""):
        self.socketio = socketio
        self.interface = interface
        self.filter_expr = filter_expr
        self.pcs = {}
        self.lock = Semaphore()

        logging.info(f"PacketCapture initialized on interface: {self.interface} with filter: '{self.filter_expr}'")

    def sniff_packets(self):

        logging.info("Starting packet sniffing...")
        try:
            self.sniffer = AsyncSniffer(
                iface=self.interface,
                filter=self.filter_expr,
                prn=self.process_packet,
                store=False
            )
            self.sniffer.start()
        except Exception as e:
            logging.error(f"Error during sniffing: {e}")

    def process_packet(self, packet):
        packet_info = {}
        logging.info("Processing a new packet.")

        try:
            if packet.haslayer(IP):
                ip = packet[IP]
                packet_info['ip_src'] = ip.src
                packet_info['ip_dst'] = ip.dst
                packet_info['ip_proto'] = ip.proto
                packet_info['ip_ttl'] = ip.ttl


            packet_info['length'] = len(packet)
            packet_info['timestamp'] = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())

            payload = bytes(packet)
            packet_info['payload'] = binascii.hexlify(payload).decode('utf-8')

            self.socketio.emit('new_packet', packet_info)
            logging.info(f"Emitted packet: {packet_info}")
            time.sleep(.5)

            with self.lock:
                if 'ip_src' in packet_info:
                    self.pcs[packet_info['ip_src']] = {"ip": packet_info['ip_src']}
                if 'ip_dst' in packet_info:
                    self.pcs[packet_info['ip_dst']] = {"ip": packet_info['ip_dst']}

        except Exception as e:
            logging.error(f"Error processing packet: {e}")

    def get_pcs(self):
        with self.lock:
            pcs_list = [{"id": idx + 1, "name": f"PC-{idx + 1}", "ip": ip} for idx, ip in enumerate(self.pcs.keys())]
            logging.info(f"Detected PCs: {pcs_list}")
            return pcs_list
Leave a Comment