Untitled
unknown
plain_text
10 months ago
13 kB
4
Indexable
import socket
import struct
import threading
import logging
from typing import Optional, Tuple, Dict, List, Union, ByteString
from dataclasses import dataclass
from dnslib import DNSRecord, QTYPE, RR
from dnslib.server import DNSServer, BaseResolver
from datetime import datetime
import binascii
import hashlib
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('gamespy_analyzer.log'),
logging.StreamHandler()
]
)
class GameSpyConstants:
"""Constants used in GameSpy protocol"""
# Standard ports
MASTER_PORT = 27900
QUERY_PORT = 27901
VERIFY_PORT = 27902
SEARCH_PORT = 27903
DEFAULT_GAME_PORT = 29910
# Common game IDs
GAMES = {
'gmtest': 'GameSpy Test',
'gslive': 'GameSpy Live',
'doom3': 'Doom 3',
'ut2004': 'Unreal Tournament 2004',
'bf1942': 'Battlefield 1942',
'bf2': 'Battlefield 2',
'swbf': 'Star Wars Battlefront',
'swbf2': 'Star Wars Battlefront 2'
}
# Query types
QUERY_TYPES = {
'basic': 0x01,
'info': 0x02,
'rules': 0x03,
'players': 0x04,
'status': 0x05
}
# Common server fields
SERVER_FIELDS = [
'hostname',
'gamename',
'gamever',
'mapname',
'gametype',
'numplayers',
'maxplayers',
'hostport',
'password'
]
@dataclass
class GameSpyPacket:
"""Data class to represent a GameSpy packet"""
raw_data: bytes
decrypted_data: bytes
packet_type: int
timestamp: datetime
game_id: Optional[str] = None
source_addr: Optional[Tuple[str, int]] = None
class GameSpyUtils:
"""Utility functions for GameSpy protocol operations"""
@staticmethod
def generate_challenge() -> str:
"""Generates a GameSpy challenge string"""
return hashlib.md5(str(datetime.now().timestamp()).encode()).hexdigest()[:10]
@staticmethod
def validate_challenge_response(challenge: str, response: str, game_id: str) -> bool:
"""Validates a GameSpy challenge response"""
expected = hashlib.md5(f"{challenge}GameSpy3D{game_id}".encode()).hexdigest()
return response.lower() == expected.lower()
@staticmethod
def encode_gamespy_string(text: str) -> bytes:
"""Encodes a string in GameSpy format (null-terminated)"""
return text.encode('utf-8') + b'\x00'
@staticmethod
def decode_gamespy_string(data: bytes) -> str:
"""Decodes a GameSpy null-terminated string"""
try:
return data.split(b'\x00')[0].decode('utf-8')
except:
return ''
@staticmethod
def parse_gamespy_key_values(data: bytes) -> Dict[str, str]:
"""Parses GameSpy key-value format (key\\value\\key\\value)"""
result = {}
try:
parts = data.split(b'\\')[1:] # Skip first empty part
for i in range(0, len(parts), 2):
if i + 1 < len(parts):
key = parts[i].decode('utf-8', errors='ignore')
value = parts[i + 1].decode('utf-8', errors='ignore')
result[key] = value
except Exception as e:
logging.error(f"Error parsing key-values: {e}")
return result
class GameSpyQuery:
"""Handles construction and parsing of GameSpy query packets"""
@staticmethod
def create_server_query(query_type: int, game_id: str) -> bytes:
"""Creates a server query packet"""
packet = bytearray()
packet.extend(b'\\basic\\') # Query header
packet.extend(GameSpyUtils.encode_gamespy_string(game_id))
packet.append(query_type)
return bytes(packet)
@staticmethod
def create_challenge_request(game_id: str) -> bytes:
"""Creates a challenge request packet"""
packet = bytearray([0x01, 0x00, 0x00, 0x00, 0x01]) # Header
packet.extend(GameSpyUtils.encode_gamespy_string(game_id))
return bytes(packet)
class GameSpyResponse:
"""Handles parsing and validation of GameSpy server responses"""
@staticmethod
def parse_server_info(data: bytes) -> Dict[str, Union[str, int]]:
"""Parses server info response"""
info = GameSpyUtils.parse_gamespy_key_values(data)
# Convert numeric fields
numeric_fields = ['numplayers', 'maxplayers', 'hostport']
for field in numeric_fields:
if field in info:
try:
info[field] = int(info[field])
except ValueError:
pass
return info
@staticmethod
def parse_player_info(data: bytes) -> List[Dict[str, str]]:
"""Parses player info from server response"""
players = []
try:
kvp = GameSpyUtils.parse_gamespy_key_values(data)
player_count = int(kvp.get('numplayers', 0))
for i in range(player_count):
player = {
'name': kvp.get(f'player_{i}', ''),
'score': kvp.get(f'score_{i}', '0'),
'ping': kvp.get(f'ping_{i}', '0'),
'team': kvp.get(f'team_{i}', '')
}
players.append(player)
except Exception as e:
logging.error(f"Error parsing player info: {e}")
return players
class GameSpyPacketAnalyzer:
"""Enhanced analyzer for GameSpy packets"""
PACKET_TYPES = {
0x01: "Challenge Request",
0x02: "Challenge Response",
0x03: "Heartbeat",
0x04: "Player Status",
0x05: "Keep Alive",
0x06: "Server List Request",
0x07: "Server Info Response",
0x08: "Player Info Request",
0x09: "Player Info Response",
0x0A: "Rules Request",
0x0B: "Rules Response"
}
def __init__(self):
self.challenges = {} # Store active challenges
self.servers = {} # Store known servers
self.utils = GameSpyUtils()
def analyze_packet(self, packet: GameSpyPacket) -> Dict:
"""Analyzes a GameSpy packet and returns detailed information"""
analysis = {
"timestamp": packet.timestamp.isoformat(),
"packet_info": {
"length": len(packet.decrypted_data),
"type": packet.packet_type,
"type_name": self.PACKET_TYPES.get(packet.packet_type, "Unknown"),
"game_id": packet.game_id,
"game_name": GameSpyConstants.GAMES.get(packet.game_id, "Unknown Game")
},
"hex_dump": {
"raw": packet.raw_data.hex(),
"decrypted": packet.decrypted_data.hex()
},
"parsed_data": self._parse_packet_content(packet),
"possible_responses": self._get_possible_responses(packet)
}
if packet.source_addr:
analysis["source"] = {
"ip": packet.source_addr[0],
"port": packet.source_addr[1]
}
return analysis
def _parse_packet_content(self, packet: GameSpyPacket) -> Dict:
"""Parses the content of different packet types"""
try:
if packet.packet_type == 0x01: # Challenge Request
return self._parse_challenge_request(packet)
elif packet.packet_type == 0x02: # Challenge Response
return self._parse_challenge_response(packet)
elif packet.packet_type == 0x03: # Heartbeat
return self._parse_heartbeat(packet)
elif packet.packet_type in [0x07, 0x09]: # Info Response
return GameSpyResponse.parse_server_info(packet.decrypted_data)
else:
return {"raw_content": packet.decrypted_data[4:].hex()}
except Exception as e:
return {"error": f"Failed to parse packet: {str(e)}"}
def _get_possible_responses(self, packet: GameSpyPacket) -> Dict:
"""Generates example responses for the packet type"""
if packet.packet_type == 0x01: # Challenge Request
challenge = self.utils.generate_challenge()
return {
"type": "Challenge Response",
"binary": self._create_challenge_response(challenge),
"description": "Server should respond with a challenge string"
}
elif packet.packet_type == 0x06: # Server List Request
return {
"type": "Server Info Response",
"format": "\\hostname\\Server Name\\gamename\\Game ID\\mapname\\Map01\\numplayers\\16\\maxplayers\\32",
"description": "Server should respond with basic server information"
}
return {}
def _create_challenge_response(self, challenge: str) -> bytes:
"""Creates a challenge response packet"""
packet = bytearray([0x02, 0x00, 0x00, 0x00])
packet.extend(challenge.encode())
return bytes(packet)
class GameSpyServerEmulator:
"""Basic GameSpy server emulator for testing"""
def __init__(self, game_id: str, port: int = GameSpyConstants.DEFAULT_GAME_PORT):
self.game_id = game_id
self.port = port
self.analyzer = GameSpyPacketAnalyzer()
self.utils = GameSpyUtils()
self.running = False
def start(self):
"""Starts the server emulator"""
self.running = True
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.socket.bind(('0.0.0.0', self.port))
while self.running:
try:
data, addr = self.socket.recvfrom(4096)
self._handle_packet(data, addr)
except Exception as e:
logging.error(f"Server error: {e}")
def _handle_packet(self, data: bytes, addr: Tuple[str, int]):
"""Handles incoming packets"""
try:
packet = GameSpyPacket(
raw_data=data,
decrypted_data=XORCipher.decrypt(data),
packet_type=data[0] if data else 0,
timestamp=datetime.now(),
game_id=self.game_id,
source_addr=addr
)
analysis = self.analyzer.analyze_packet(packet)
logging.info(f"Received packet: {analysis}")
# Send appropriate response
response = self._generate_response(packet)
if response:
self.socket.sendto(response, addr)
except Exception as e:
logging.error(f"Error handling packet: {e}")
def _generate_response(self, packet: GameSpyPacket) -> Optional[bytes]:
"""Generates appropriate response for different packet types"""
if packet.packet_type == 0x01: # Challenge Request
challenge = self.utils.generate_challenge()
return self.analyzer._create_challenge_response(challenge)
elif packet.packet_type == 0x06: # Server List Request
return self._create_server_info_response()
return None
def _create_server_info_response(self) -> bytes:
"""Creates a basic server info response"""
info = {
'hostname': 'Test Server',
'gamename': self.game_id,
'mapname': 'TestMap',
'numplayers': '0',
'maxplayers': '16',
'gametype': 'dm',
'password': '0'
}
response = bytearray([0x07, 0x00, 0x00, 0x00]) # Info Response header
for key, value in info.items():
response.extend(f"\\{key}\\{value}".encode())
return bytes(response)
def main():
"""Main function to start the GameSpy packet analyzer and server emulator"""
try:
# Start DNS server
resolver = GameSpyDNSResolver()
dns_server = DNSServer(resolver, port=53)
dns_thread = threading.Thread(target=dns_server.start)
dns_thread.daemon = True
dns_thread.start()
logging.info("DNS server started on port 53")
# Start UDP listener
udp_thread = threading.Thread(target=start_udp_listener)
udp_thread.daemon = True
udp_thread.start()
# Start server emulator (optional)
emulator = GameSpyServerEmulator('gmtest')
emulator_thread = threading.Thread(target=emulator.start)
emulator_thread.daemon = True
emulator_thread.start()
logging.info(f"Server emulator started on port {GameSpyConstants.DEFAULT_GAME_PORT}")
# Keep main thread running
while True:
pass
except Exception as e:
logging.error(f"Error in main function: {str(e)}")
raise
if __name__ == "__main__":
main()Editor is loading...
Leave a Comment