Untitled

 avatar
unknown
plain_text
a month ago
13 kB
2
Indexable
import socket
import struct
import threading
import logging
from typing import Optional, Tuple, Dict, List, Union, ByteString
from dataclasses import dataclass
from dnslib import DNSRecord, QTYPE, RR
from dnslib.server import DNSServer, BaseResolver
from datetime import datetime
import binascii
import hashlib

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('gamespy_analyzer.log'),
        logging.StreamHandler()
    ]
)

class GameSpyConstants:
    """Constants used in GameSpy protocol"""
    # Standard ports
    MASTER_PORT = 27900
    QUERY_PORT = 27901
    VERIFY_PORT = 27902
    SEARCH_PORT = 27903
    DEFAULT_GAME_PORT = 29910

    # Common game IDs
    GAMES = {
        'gmtest': 'GameSpy Test',
        'gslive': 'GameSpy Live',
        'doom3': 'Doom 3',
        'ut2004': 'Unreal Tournament 2004',
        'bf1942': 'Battlefield 1942',
        'bf2': 'Battlefield 2',
        'swbf': 'Star Wars Battlefront',
        'swbf2': 'Star Wars Battlefront 2'
    }

    # Query types
    QUERY_TYPES = {
        'basic': 0x01,
        'info': 0x02,
        'rules': 0x03,
        'players': 0x04,
        'status': 0x05
    }

    # Common server fields
    SERVER_FIELDS = [
        'hostname',
        'gamename',
        'gamever',
        'mapname',
        'gametype',
        'numplayers',
        'maxplayers',
        'hostport',
        'password'
    ]

@dataclass
class GameSpyPacket:
    """Data class to represent a GameSpy packet"""
    raw_data: bytes
    decrypted_data: bytes
    packet_type: int
    timestamp: datetime
    game_id: Optional[str] = None
    source_addr: Optional[Tuple[str, int]] = None

class GameSpyUtils:
    """Utility functions for GameSpy protocol operations"""
    
    @staticmethod
    def generate_challenge() -> str:
        """Generates a GameSpy challenge string"""
        return hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:10]

    @staticmethod
    def validate_challenge_response(challenge: str, response: str, game_id: str) -> bool:
        """Validates a GameSpy challenge response"""
        expected = hashlib.md5(f"{challenge}GameSpy3D{game_id}".encode()).hexdigest()
        return response.lower() == expected.lower()

    @staticmethod
    def encode_gamespy_string(text: str) -> bytes:
        """Encodes a string in GameSpy format (null-terminated)"""
        return text.encode('utf-8') + b'\x00'

    @staticmethod
    def decode_gamespy_string(data: bytes) -> str:
        """Decodes a GameSpy null-terminated string"""
        try:
            return data.split(b'\x00')[0].decode('utf-8')
        except:
            return ''

    @staticmethod
    def parse_gamespy_key_values(data: bytes) -> Dict[str, str]:
        """Parses GameSpy key-value format (key\\value\\key\\value)"""
        result = {}
        try:
            parts = data.split(b'\\')[1:]  # Skip first empty part
            for i in range(0, len(parts), 2):
                if i + 1 < len(parts):
                    key = parts[i].decode('utf-8', errors='ignore')
                    value = parts[i + 1].decode('utf-8', errors='ignore')
                    result[key] = value
        except Exception as e:
            logging.error(f"Error parsing key-values: {e}")
        return result

class GameSpyQuery:
    """Handles construction and parsing of GameSpy query packets"""
    
    @staticmethod
    def create_server_query(query_type: int, game_id: str) -> bytes:
        """Creates a server query packet"""
        packet = bytearray()
        packet.extend(b'\\basic\\')  # Query header
        packet.extend(GameSpyUtils.encode_gamespy_string(game_id))
        packet.append(query_type)
        return bytes(packet)

    @staticmethod
    def create_challenge_request(game_id: str) -> bytes:
        """Creates a challenge request packet"""
        packet = bytearray([0x01, 0x00, 0x00, 0x00, 0x01])  # Header
        packet.extend(GameSpyUtils.encode_gamespy_string(game_id))
        return bytes(packet)

class GameSpyResponse:
    """Handles parsing and validation of GameSpy server responses"""

    @staticmethod
    def parse_server_info(data: bytes) -> Dict[str, Union[str, int]]:
        """Parses server info response"""
        info = GameSpyUtils.parse_gamespy_key_values(data)
        
        # Convert numeric fields
        numeric_fields = ['numplayers', 'maxplayers', 'hostport']
        for field in numeric_fields:
            if field in info:
                try:
                    info[field] = int(info[field])
                except ValueError:
                    pass
        
        return info

    @staticmethod
    def parse_player_info(data: bytes) -> List[Dict[str, str]]:
        """Parses player info from server response"""
        players = []
        try:
            kvp = GameSpyUtils.parse_gamespy_key_values(data)
            player_count = int(kvp.get('numplayers', 0))
            
            for i in range(player_count):
                player = {
                    'name': kvp.get(f'player_{i}', ''),
                    'score': kvp.get(f'score_{i}', '0'),
                    'ping': kvp.get(f'ping_{i}', '0'),
                    'team': kvp.get(f'team_{i}', '')
                }
                players.append(player)
        except Exception as e:
            logging.error(f"Error parsing player info: {e}")
        
        return players

class GameSpyPacketAnalyzer:
    """Enhanced analyzer for GameSpy packets"""
    PACKET_TYPES = {
        0x01: "Challenge Request",
        0x02: "Challenge Response",
        0x03: "Heartbeat",
        0x04: "Player Status",
        0x05: "Keep Alive",
        0x06: "Server List Request",
        0x07: "Server Info Response",
        0x08: "Player Info Request",
        0x09: "Player Info Response",
        0x0A: "Rules Request",
        0x0B: "Rules Response"
    }

    def __init__(self):
        self.challenges = {}  # Store active challenges
        self.servers = {}     # Store known servers
        self.utils = GameSpyUtils()

    def analyze_packet(self, packet: GameSpyPacket) -> Dict:
        """Analyzes a GameSpy packet and returns detailed information"""
        analysis = {
            "timestamp": packet.timestamp.isoformat(),
            "packet_info": {
                "length": len(packet.decrypted_data),
                "type": packet.packet_type,
                "type_name": self.PACKET_TYPES.get(packet.packet_type, "Unknown"),
                "game_id": packet.game_id,
                "game_name": GameSpyConstants.GAMES.get(packet.game_id, "Unknown Game")
            },
            "hex_dump": {
                "raw": packet.raw_data.hex(),
                "decrypted": packet.decrypted_data.hex()
            },
            "parsed_data": self._parse_packet_content(packet),
            "possible_responses": self._get_possible_responses(packet)
        }

        if packet.source_addr:
            analysis["source"] = {
                "ip": packet.source_addr[0],
                "port": packet.source_addr[1]
            }

        return analysis

    def _parse_packet_content(self, packet: GameSpyPacket) -> Dict:
        """Parses the content of different packet types"""
        try:
            if packet.packet_type == 0x01:  # Challenge Request
                return self._parse_challenge_request(packet)
            elif packet.packet_type == 0x02:  # Challenge Response
                return self._parse_challenge_response(packet)
            elif packet.packet_type == 0x03:  # Heartbeat
                return self._parse_heartbeat(packet)
            elif packet.packet_type in [0x07, 0x09]:  # Info Response
                return GameSpyResponse.parse_server_info(packet.decrypted_data)
            else:
                return {"raw_content": packet.decrypted_data[4:].hex()}
        except Exception as e:
            return {"error": f"Failed to parse packet: {str(e)}"}

    def _get_possible_responses(self, packet: GameSpyPacket) -> Dict:
        """Generates example responses for the packet type"""
        if packet.packet_type == 0x01:  # Challenge Request
            challenge = self.utils.generate_challenge()
            return {
                "type": "Challenge Response",
                "binary": self._create_challenge_response(challenge),
                "description": "Server should respond with a challenge string"
            }
        elif packet.packet_type == 0x06:  # Server List Request
            return {
                "type": "Server Info Response",
                "format": "\\hostname\\Server Name\\gamename\\Game ID\\mapname\\Map01\\numplayers\\16\\maxplayers\\32",
                "description": "Server should respond with basic server information"
            }
        return {}

    def _create_challenge_response(self, challenge: str) -> bytes:
        """Creates a challenge response packet"""
        packet = bytearray([0x02, 0x00, 0x00, 0x00])
        packet.extend(challenge.encode())
        return bytes(packet)

class GameSpyServerEmulator:
    """Basic GameSpy server emulator for testing"""
    
    def __init__(self, game_id: str, port: int = GameSpyConstants.DEFAULT_GAME_PORT):
        self.game_id = game_id
        self.port = port
        self.analyzer = GameSpyPacketAnalyzer()
        self.utils = GameSpyUtils()
        self.running = False

    def start(self):
        """Starts the server emulator"""
        self.running = True
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind(('0.0.0.0', self.port))
        
        while self.running:
            try:
                data, addr = self.socket.recvfrom(4096)
                self._handle_packet(data, addr)
            except Exception as e:
                logging.error(f"Server error: {e}")

    def _handle_packet(self, data: bytes, addr: Tuple[str, int]):
        """Handles incoming packets"""
        try:
            packet = GameSpyPacket(
                raw_data=data,
                decrypted_data=XORCipher.decrypt(data),
                packet_type=data[0] if data else 0,
                timestamp=datetime.now(),
                game_id=self.game_id,
                source_addr=addr
            )

            analysis = self.analyzer.analyze_packet(packet)
            logging.info(f"Received packet: {analysis}")

            # Send appropriate response
            response = self._generate_response(packet)
            if response:
                self.socket.sendto(response, addr)

        except Exception as e:
            logging.error(f"Error handling packet: {e}")

    def _generate_response(self, packet: GameSpyPacket) -> Optional[bytes]:
        """Generates appropriate response for different packet types"""
        if packet.packet_type == 0x01:  # Challenge Request
            challenge = self.utils.generate_challenge()
            return self.analyzer._create_challenge_response(challenge)
        elif packet.packet_type == 0x06:  # Server List Request
            return self._create_server_info_response()
        return None

    def _create_server_info_response(self) -> bytes:
        """Creates a basic server info response"""
        info = {
            'hostname': 'Test Server',
            'gamename': self.game_id,
            'mapname': 'TestMap',
            'numplayers': '0',
            'maxplayers': '16',
            'gametype': 'dm',
            'password': '0'
        }
        
        response = bytearray([0x07, 0x00, 0x00, 0x00])  # Info Response header
        for key, value in info.items():
            response.extend(f"\\{key}\\{value}".encode())
        return bytes(response)

def main():
    """Main function to start the GameSpy packet analyzer and server emulator"""
    try:
        # Start DNS server
        resolver = GameSpyDNSResolver()
        dns_server = DNSServer(resolver, port=53)
        dns_thread = threading.Thread(target=dns_server.start)
        dns_thread.daemon = True
        dns_thread.start()
        logging.info("DNS server started on port 53")

        # Start UDP listener
        udp_thread = threading.Thread(target=start_udp_listener)
        udp_thread.daemon = True
        udp_thread.start()
        
        # Start server emulator (optional)
        emulator = GameSpyServerEmulator('gmtest')
        emulator_thread = threading.Thread(target=emulator.start)
        emulator_thread.daemon = True
        emulator_thread.start()
        logging.info(f"Server emulator started on port {GameSpyConstants.DEFAULT_GAME_PORT}")
        
        # Keep main thread running
        while True:
            pass
            
    except Exception as e:
        logging.error(f"Error in main function: {str(e)}")
        raise

if __name__ == "__main__":
    main()
Leave a Comment