Untitled
import socket import struct import threading import logging from typing import Optional, Tuple, Dict, List, Union, ByteString from dataclasses import dataclass from dnslib import DNSRecord, QTYPE, RR from dnslib.server import DNSServer, BaseResolver from datetime import datetime import binascii import hashlib # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('gamespy_analyzer.log'), logging.StreamHandler() ] ) class GameSpyConstants: """Constants used in GameSpy protocol""" # Standard ports MASTER_PORT = 27900 QUERY_PORT = 27901 VERIFY_PORT = 27902 SEARCH_PORT = 27903 DEFAULT_GAME_PORT = 29910 # Common game IDs GAMES = { 'gmtest': 'GameSpy Test', 'gslive': 'GameSpy Live', 'doom3': 'Doom 3', 'ut2004': 'Unreal Tournament 2004', 'bf1942': 'Battlefield 1942', 'bf2': 'Battlefield 2', 'swbf': 'Star Wars Battlefront', 'swbf2': 'Star Wars Battlefront 2' } # Query types QUERY_TYPES = { 'basic': 0x01, 'info': 0x02, 'rules': 0x03, 'players': 0x04, 'status': 0x05 } # Common server fields SERVER_FIELDS = [ 'hostname', 'gamename', 'gamever', 'mapname', 'gametype', 'numplayers', 'maxplayers', 'hostport', 'password' ] @dataclass class GameSpyPacket: """Data class to represent a GameSpy packet""" raw_data: bytes decrypted_data: bytes packet_type: int timestamp: datetime game_id: Optional[str] = None source_addr: Optional[Tuple[str, int]] = None class GameSpyUtils: """Utility functions for GameSpy protocol operations""" @staticmethod def generate_challenge() -> str: """Generates a GameSpy challenge string""" return hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:10] @staticmethod def validate_challenge_response(challenge: str, response: str, game_id: str) -> bool: """Validates a GameSpy challenge response""" expected = hashlib.md5(f"{challenge}GameSpy3D{game_id}".encode()).hexdigest() return response.lower() == expected.lower() @staticmethod def encode_gamespy_string(text: str) -> bytes: """Encodes a string in GameSpy format (null-terminated)""" return text.encode('utf-8') + b'\x00' @staticmethod def decode_gamespy_string(data: bytes) -> str: """Decodes a GameSpy null-terminated string""" try: return data.split(b'\x00')[0].decode('utf-8') except: return '' @staticmethod def parse_gamespy_key_values(data: bytes) -> Dict[str, str]: """Parses GameSpy key-value format (key\\value\\key\\value)""" result = {} try: parts = data.split(b'\\')[1:] # Skip first empty part for i in range(0, len(parts), 2): if i + 1 < len(parts): key = parts[i].decode('utf-8', errors='ignore') value = parts[i + 1].decode('utf-8', errors='ignore') result[key] = value except Exception as e: logging.error(f"Error parsing key-values: {e}") return result class GameSpyQuery: """Handles construction and parsing of GameSpy query packets""" @staticmethod def create_server_query(query_type: int, game_id: str) -> bytes: """Creates a server query packet""" packet = bytearray() packet.extend(b'\\basic\\') # Query header packet.extend(GameSpyUtils.encode_gamespy_string(game_id)) packet.append(query_type) return bytes(packet) @staticmethod def create_challenge_request(game_id: str) -> bytes: """Creates a challenge request packet""" packet = bytearray([0x01, 0x00, 0x00, 0x00, 0x01]) # Header packet.extend(GameSpyUtils.encode_gamespy_string(game_id)) return bytes(packet) class GameSpyResponse: """Handles parsing and validation of GameSpy server responses""" @staticmethod def parse_server_info(data: bytes) -> Dict[str, Union[str, int]]: """Parses server info response""" info = GameSpyUtils.parse_gamespy_key_values(data) # Convert numeric fields numeric_fields = ['numplayers', 'maxplayers', 'hostport'] for field in numeric_fields: if field in info: try: info[field] = int(info[field]) except ValueError: pass return info @staticmethod def parse_player_info(data: bytes) -> List[Dict[str, str]]: """Parses player info from server response""" players = [] try: kvp = GameSpyUtils.parse_gamespy_key_values(data) player_count = int(kvp.get('numplayers', 0)) for i in range(player_count): player = { 'name': kvp.get(f'player_{i}', ''), 'score': kvp.get(f'score_{i}', '0'), 'ping': kvp.get(f'ping_{i}', '0'), 'team': kvp.get(f'team_{i}', '') } players.append(player) except Exception as e: logging.error(f"Error parsing player info: {e}") return players class GameSpyPacketAnalyzer: """Enhanced analyzer for GameSpy packets""" PACKET_TYPES = { 0x01: "Challenge Request", 0x02: "Challenge Response", 0x03: "Heartbeat", 0x04: "Player Status", 0x05: "Keep Alive", 0x06: "Server List Request", 0x07: "Server Info Response", 0x08: "Player Info Request", 0x09: "Player Info Response", 0x0A: "Rules Request", 0x0B: "Rules Response" } def __init__(self): self.challenges = {} # Store active challenges self.servers = {} # Store known servers self.utils = GameSpyUtils() def analyze_packet(self, packet: GameSpyPacket) -> Dict: """Analyzes a GameSpy packet and returns detailed information""" analysis = { "timestamp": packet.timestamp.isoformat(), "packet_info": { "length": len(packet.decrypted_data), "type": packet.packet_type, "type_name": self.PACKET_TYPES.get(packet.packet_type, "Unknown"), "game_id": packet.game_id, "game_name": GameSpyConstants.GAMES.get(packet.game_id, "Unknown Game") }, "hex_dump": { "raw": packet.raw_data.hex(), "decrypted": packet.decrypted_data.hex() }, "parsed_data": self._parse_packet_content(packet), "possible_responses": self._get_possible_responses(packet) } if packet.source_addr: analysis["source"] = { "ip": packet.source_addr[0], "port": packet.source_addr[1] } return analysis def _parse_packet_content(self, packet: GameSpyPacket) -> Dict: """Parses the content of different packet types""" try: if packet.packet_type == 0x01: # Challenge Request return self._parse_challenge_request(packet) elif packet.packet_type == 0x02: # Challenge Response return self._parse_challenge_response(packet) elif packet.packet_type == 0x03: # Heartbeat return self._parse_heartbeat(packet) elif packet.packet_type in [0x07, 0x09]: # Info Response return GameSpyResponse.parse_server_info(packet.decrypted_data) else: return {"raw_content": packet.decrypted_data[4:].hex()} except Exception as e: return {"error": f"Failed to parse packet: {str(e)}"} def _get_possible_responses(self, packet: GameSpyPacket) -> Dict: """Generates example responses for the packet type""" if packet.packet_type == 0x01: # Challenge Request challenge = self.utils.generate_challenge() return { "type": "Challenge Response", "binary": self._create_challenge_response(challenge), "description": "Server should respond with a challenge string" } elif packet.packet_type == 0x06: # Server List Request return { "type": "Server Info Response", "format": "\\hostname\\Server Name\\gamename\\Game ID\\mapname\\Map01\\numplayers\\16\\maxplayers\\32", "description": "Server should respond with basic server information" } return {} def _create_challenge_response(self, challenge: str) -> bytes: """Creates a challenge response packet""" packet = bytearray([0x02, 0x00, 0x00, 0x00]) packet.extend(challenge.encode()) return bytes(packet) class GameSpyServerEmulator: """Basic GameSpy server emulator for testing""" def __init__(self, game_id: str, port: int = GameSpyConstants.DEFAULT_GAME_PORT): self.game_id = game_id self.port = port self.analyzer = GameSpyPacketAnalyzer() self.utils = GameSpyUtils() self.running = False def start(self): """Starts the server emulator""" self.running = True self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.bind(('0.0.0.0', self.port)) while self.running: try: data, addr = self.socket.recvfrom(4096) self._handle_packet(data, addr) except Exception as e: logging.error(f"Server error: {e}") def _handle_packet(self, data: bytes, addr: Tuple[str, int]): """Handles incoming packets""" try: packet = GameSpyPacket( raw_data=data, decrypted_data=XORCipher.decrypt(data), packet_type=data[0] if data else 0, timestamp=datetime.now(), game_id=self.game_id, source_addr=addr ) analysis = self.analyzer.analyze_packet(packet) logging.info(f"Received packet: {analysis}") # Send appropriate response response = self._generate_response(packet) if response: self.socket.sendto(response, addr) except Exception as e: logging.error(f"Error handling packet: {e}") def _generate_response(self, packet: GameSpyPacket) -> Optional[bytes]: """Generates appropriate response for different packet types""" if packet.packet_type == 0x01: # Challenge Request challenge = self.utils.generate_challenge() return self.analyzer._create_challenge_response(challenge) elif packet.packet_type == 0x06: # Server List Request return self._create_server_info_response() return None def _create_server_info_response(self) -> bytes: """Creates a basic server info response""" info = { 'hostname': 'Test Server', 'gamename': self.game_id, 'mapname': 'TestMap', 'numplayers': '0', 'maxplayers': '16', 'gametype': 'dm', 'password': '0' } response = bytearray([0x07, 0x00, 0x00, 0x00]) # Info Response header for key, value in info.items(): response.extend(f"\\{key}\\{value}".encode()) return bytes(response) def main(): """Main function to start the GameSpy packet analyzer and server emulator""" try: # Start DNS server resolver = GameSpyDNSResolver() dns_server = DNSServer(resolver, port=53) dns_thread = threading.Thread(target=dns_server.start) dns_thread.daemon = True dns_thread.start() logging.info("DNS server started on port 53") # Start UDP listener udp_thread = threading.Thread(target=start_udp_listener) udp_thread.daemon = True udp_thread.start() # Start server emulator (optional) emulator = GameSpyServerEmulator('gmtest') emulator_thread = threading.Thread(target=emulator.start) emulator_thread.daemon = True emulator_thread.start() logging.info(f"Server emulator started on port {GameSpyConstants.DEFAULT_GAME_PORT}") # Keep main thread running while True: pass except Exception as e: logging.error(f"Error in main function: {str(e)}") raise if __name__ == "__main__": main()
Leave a Comment