Untitled
import socket import threading import hashlib import random import string import logging import time import struct import json import os from dataclasses import dataclass from typing import Dict, Optional, Tuple from datetime import datetime import binascii # Set up detailed logging LOG_FORMAT = '%(asctime)s.%(msecs)03d [%(threadName)s] %(levelname)s: %(message)s' DATE_FORMAT = '%Y-%m-%d %H:%M:%S' logging.basicConfig( level=logging.DEBUG, format=LOG_FORMAT, datefmt=DATE_FORMAT, handlers=[ logging.FileHandler('gamespy_debug.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # PSP-specific constants PSP_GAMESPY_ID = 11 # PSP Platform ID for GameSpy GAMESPY_GAMENAME = "squarecom" # Warhammer 40k Squad Command identifier GAMESPY_DETECTION = bytes([0xFE, 0xFD]) GAMESPY_ENCODING = "GameSpy3D" NATNEG_MAGIC = b'\xfd\xfc\x1e\x66\x6a\xb2' USER_DATABASE = "users.json" @dataclass class ClientState: """Tracks state for connected clients""" socket: socket.socket address: Tuple[str, int] session_key: Optional[str] = None challenge: Optional[str] = None client_challenge: Optional[str] = None authenticated: bool = False user_id: Optional[int] = None profile_id: Optional[int] = None nick: Optional[str] = None email: Optional[str] = None game_port: Optional[int] = None local_ip: Optional[str] = None last_heartbeat: float = time.time() def __post_init__(self): self.connect_time = datetime.now() self.last_packet_time = time.time() class GameSpyPSPServer: def __init__(self, host='0.0.0.0'): self.host = host self.servers = {} self.clients: Dict[socket.socket, ClientState] = {} self.running = True self.heartbeat_interval = 10.0 self.next_user_id = 1000 self.next_profile_id = 1000 self.users = {} self.load_users() # Start heartbeat checker self.heartbeat_thread = threading.Thread(target=self._check_heartbeats) self.heartbeat_thread.daemon = True self.heartbeat_thread.start() def load_users(self): """Load user database from JSON file""" try: if os.path.exists(USER_DATABASE): with open(USER_DATABASE, 'r') as f: self.users = json.load(f) # Find highest user/profile ID for new registrations for user in self.users.values(): self.next_user_id = max(self.next_user_id, user['userid'] + 1) self.next_profile_id = max(self.next_profile_id, user['profileid'] + 1) else: self.users = {} except Exception as e: logger.error(f"Failed to load users: {e}") self.users = {} def save_users(self): """Save user database to JSON file""" try: with open(USER_DATABASE, 'w') as f: json.dump(self.users, f, indent=2) except Exception as e: logger.error(f"Failed to save users: {e}") def handle_newuser(self, client_socket: socket.socket, data: Dict[str, str]): """Handle new user registration""" client_state = self.clients[client_socket] logger.info(f"New user registration request from {client_state.address}") try: # Validate required fields required_fields = ['uniquenick', 'passwordenc', 'email'] if not all(field in data for field in required_fields): raise ValueError("Missing required registration fields") nick = data['uniquenick'] password = data['passwordenc'] # In real GameSpy this would be encrypted email = data['email'] # Check if username already exists if any(user['uniquenick'] == nick for user in self.users.values()): raise ValueError("Username already exists") # Create new user user_id = self.next_user_id profile_id = self.next_profile_id self.next_user_id += 1 self.next_profile_id += 1 # Hash password (using SHA256 instead of MD5) password_hash = hashlib.sha256(password.encode()).hexdigest() # Store user data self.users[nick] = { 'userid': user_id, 'profileid': profile_id, 'email': email, 'password': password_hash, 'uniquenick': nick, 'registered': datetime.now().isoformat() } self.save_users() # Send success response response = self.build_message({ "nur": "", "success": "1", "id": "1" }) self.send_message(client_socket, response) logger.info(f"Registration successful: {nick} (userid: {user_id})") except Exception as e: logger.error(f"Registration failed: {e}") error_msg = self.build_message({ "error": "", "err": "0", "fatal": "", "errmsg": str(e), "id": "1" }) self.send_message(client_socket, error_msg) def handle_login(self, client_socket: socket.socket, data: Dict[str, str]): """Process login request with proper authentication""" client_state = self.clients[client_socket] logger.info(f"Login request from {client_state.address}") try: if "uniquenick" not in data: raise ValueError("Missing username") nick = data['uniquenick'] # Check if user exists if nick not in self.users: raise ValueError("User not found") user = self.users[nick] # In a real implementation, verify password hash here # For demo purposes, we'll skip full challenge-response # Set up session session_key = random.randint(100000, 999999) client_state.session_key = str(session_key) client_state.authenticated = True client_state.user_id = user['userid'] client_state.profile_id = user['profileid'] client_state.nick = nick client_state.email = user['email'] # Send success response response = self.build_message({ "lc": "2", "sesskey": str(session_key), "proof": "0123456789abcdef", "userid": str(user['userid']), "profileid": str(user['profileid']), "uniquenick": nick, "lt": self.generate_challenge(22), "id": "1", "final": "" }) self.send_message(client_socket, response) logger.info(f"Login successful: {nick} (session: {session_key})") except Exception as e: logger.error(f"Login failed: {e}") error_msg = self.build_message({ "error": "", "err": "0", "fatal": "", "errmsg": str(e), "id": "1" }) self.send_message(client_socket, error_msg) def _debug_packet(self, data: bytes, prefix: str = ""): """Detailed packet debugging""" hex_dump = ' '.join([f"{b:02x}" for b in data]) ascii_dump = ''.join([chr(b) if 32 <= b <= 126 else '.' for b in data]) logger.debug(f"{prefix}HEX: {hex_dump}") logger.debug(f"{prefix}ASCII: {ascii_dump}") def encode_message(self, data: bytes) -> bytes: """XOR encode with GameSpy key""" key = GAMESPY_ENCODING.encode('ascii') result = bytearray() for i, byte in enumerate(data): result.append(byte ^ key[i % len(key)]) return bytes(result) def decode_message(self, data: bytes) -> bytes: """XOR decode with GameSpy key""" return self.encode_message(data) # XOR is symmetric def generate_challenge(self, length: int = 10) -> str: """Generate GameSpy challenge string""" return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) def parse_message(self, message: str) -> Dict[str, str]: """Parse GameSpy message into key-value pairs""" pairs = {} parts = message.split('\\') if not parts[0]: parts.pop(0) for i in range(0, len(parts) - 1, 2): if i + 1 < len(parts): pairs[parts[i]] = parts[i + 1] return pairs def build_message(self, pairs: Dict[str, str]) -> str: """Build GameSpy message from key-value pairs""" return '\\' + '\\'.join(f"{k}\\{v}" for k, v in pairs.items()) + '\\' def handle_presence(self, client_socket: socket.socket): """Handle GPCM presence protocol""" client_state = self.clients[client_socket] # Initial challenge challenge = self.generate_challenge() client_state.challenge = challenge message = self.build_message({ "lc": "1", "challenge": challenge, "id": "1" }) self.send_message(client_socket, message) while self.running: data = self.receive_message(client_socket) if not data: break message = data.decode('ascii', errors='ignore') pairs = self.parse_message(message) if "login" in pairs: self.handle_login(client_socket, pairs) elif "newuser" in pairs: self.handle_newuser(client_socket, pairs) elif "status" in pairs: self.handle_status(client_socket, pairs) elif "heartbeat" in pairs: self.handle_heartbeat(client_socket) def start_server(self, port: int, service_type: str): """Start server on specified port""" server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) server.bind((self.host, port)) server.listen(5) self.servers[port] = server logger.info(f"Started {service_type} server on {self.host}:{port}") accept_thread = threading.Thread( target=self.accept_connections, args=(server, service_type), name=f"{service_type}-acceptor" ) accept_thread.daemon = True accept_thread.start() def accept_connections(self, server: socket.socket, service_type: str): """Accept incoming connections""" while self.running: try: client_socket, address = server.accept() logger.info(f"New {service_type} connection from {address}") self.clients[client_socket] = ClientState( socket=client_socket, address=address ) client_thread = threading.Thread( target=self.handle_client, args=(client_socket, service_type), name=f"{service_type}-{address[0]}:{address[1]}" ) client_thread.daemon = True client_thread.start() except Exception as e: logger.error(f"Accept error: {e}", exc_info=True) def handle_client(self, client_socket: socket.socket, service_type: str): """Route client to appropriate handler""" try: if service_type == "presence": self.handle_presence(client_socket) elif service_type == "peerchat": self.handle_peerchat(client_socket) elif service_type == "natneg": self.handle_natneg(client_socket) except Exception as e: logger.error(f"Client handler error: {e}", exc_info=True) finally: self.cleanup_client(client_socket) def handle_heartbeat(self, client_socket: socket.socket): """Update client heartbeat timestamp""" client_state = self.clients[client_socket] client_state.last_heartbeat = time.time() logger.debug(f"Heartbeat from {client_state.nick} ({client_state.address})") def _check_heartbeats(self): """Monitor client heartbeats""" while self.running: current_time = time.time() # Check each client's last heartbeat for client_socket, state in list(self.clients.items()): if current_time - state.last_heartbeat > self.heartbeat_interval: logger.warning(f"Client timeout: {state.nick} ({state.address})") self.cleanup_client(client_socket) time.sleep(1) def handle_status(self, client_socket: socket.socket, data: Dict[str, str]): """Handle status updates from client""" client_state = self.clients[client_socket] if 'status' in data: logger.info(f"Status update from {client_state.nick}: {data['status']}") if 'locport' in data: client_state.game_port = int(data['locport']) if 'localip' in data: client_state.local_ip = data['localip'] def send_message(self, client_socket: socket.socket, message: str): """Send encoded message""" try: data = message.encode('ascii', errors='ignore') encoded = self.encode_message(data) client_socket.send(encoded) logger.debug(f"Sent to {self.clients[client_socket].address}:") logger.debug(f" Original: {message}") self._debug_packet(encoded, " Encoded: ") except Exception as e: logger.error(f"Send error: {e}", exc_info=True) self.cleanup_client(client_socket) def receive_message(self, client_socket: socket.socket) -> Optional[bytes]: """Receive and decode message""" try: data = client_socket.recv(4096) if not data: return None decoded = self.decode_message(data) logger.debug(f"Received from {self.clients[client_socket].address}:") self._debug_packet(data, " Raw: ") self._debug_packet(decoded, " Decoded: ") return decoded except Exception as e: logger.error(f"Receive error: {e}", exc_info=True) return None def cleanup_client(self, client_socket: socket.socket): """Clean up disconnected client""" try: client_state = self.clients.get(client_socket) if client_state: logger.info(f"Cleaning up client: {client_state.nick} ({client_state.address})") del self.clients[client_socket] client_socket.close() except Exception as e: logger.error(f"Cleanup error: {e}", exc_info=True) def stop(self): """Stop the server""" logger.info("Stopping server...") self.running = False # Close all server sockets for server in self.servers.values(): try: server.close() except Exception as e: logger.error(f"Stop error: {e}", exc_info=True) # Save final user state self.save_users() def main(): server = GameSpyPSPServer() try: # Start different services server.start_server(29900, "presence") # GPCM server.start_server(29901, "peerchat") # PeerChat server.start_server(27901, "natneg") # NAT Negotiation logger.info("GameSpy PSP server started. Press Ctrl+C to stop.") while True: time.sleep(1) except KeyboardInterrupt: logger.info("Shutting down...") server.stop() if __name__ == "__main__": main()
Leave a Comment