Untitled

 avatar
unknown
plain_text
a month ago
17 kB
3
Indexable
import socket 
import threading 
import hashlib 
import random 
import string 
import logging 
import time 
import struct 
import json 
import os 
from dataclasses import dataclass 
from typing import Dict, Optional, Tuple 
from datetime import datetime 
import binascii 
 
# Set up detailed logging 
LOG_FORMAT = '%(asctime)s.%(msecs)03d [%(threadName)s] %(levelname)s: %(message)s' 
DATE_FORMAT = '%Y-%m-%d %H:%M:%S' 
logging.basicConfig( 
    level=logging.DEBUG, 
    format=LOG_FORMAT, 
    datefmt=DATE_FORMAT, 
    handlers=[ 
        logging.FileHandler('gamespy_debug.log'), 
        logging.StreamHandler() 
    ] 
) 
logger = logging.getLogger(__name__) 
 
# PSP-specific constants 
PSP_GAMESPY_ID = 11  # PSP Platform ID for GameSpy 
GAMESPY_GAMENAME = "squarecom"  # Warhammer 40k Squad Command identifier 
GAMESPY_DETECTION = bytes([0xFE, 0xFD]) 
GAMESPY_ENCODING = "GameSpy3D" 
NATNEG_MAGIC = b'\xfd\xfc\x1e\x66\x6a\xb2' 
USER_DATABASE = "users.json" 
 
@dataclass 
class ClientState: 
    """Tracks state for connected clients""" 
    socket: socket.socket 
    address: Tuple[str, int] 
    session_key: Optional[str] = None 
    challenge: Optional[str] = None 
    client_challenge: Optional[str] = None 
    authenticated: bool = False 
    user_id: Optional[int] = None 
    profile_id: Optional[int] = None 
    nick: Optional[str] = None 
    email: Optional[str] = None 
    game_port: Optional[int] = None 
    local_ip: Optional[str] = None 
    last_heartbeat: float = time.time() 
     
    def __post_init__(self): 
        self.connect_time = datetime.now() 
        self.last_packet_time = time.time() 
class GameSpyPSPServer: 
    def __init__(self, host='0.0.0.0'): 
        self.host = host 
        self.servers = {} 
        self.clients: Dict[socket.socket, ClientState] = {} 
        self.running = True 
        self.heartbeat_interval = 10.0 
        self.next_user_id = 1000 
        self.next_profile_id = 1000 
        self.users = {} 
        self.load_users() 
         
        # Start heartbeat checker 
        self.heartbeat_thread = threading.Thread(target=self._check_heartbeats) 
        self.heartbeat_thread.daemon = True 
        self.heartbeat_thread.start() 
 
    def load_users(self): 
        """Load user database from JSON file""" 
        try: 
            if os.path.exists(USER_DATABASE): 
                with open(USER_DATABASE, 'r') as f: 
                    self.users = json.load(f) 
                    # Find highest user/profile ID for new registrations 
                    for user in self.users.values(): 
                        self.next_user_id = max(self.next_user_id, user['userid'] + 1) 
                        self.next_profile_id = max(self.next_profile_id, user['profileid'] + 1) 
            else: 
                self.users = {} 
        except Exception as e: 
            logger.error(f"Failed to load users: {e}") 
            self.users = {} 
 
    def save_users(self): 
        """Save user database to JSON file""" 
        try: 
            with open(USER_DATABASE, 'w') as f: 
                json.dump(self.users, f, indent=2) 
        except Exception as e: 
            logger.error(f"Failed to save users: {e}") 
 
    def handle_newuser(self, client_socket: socket.socket, data: Dict[str, str]): 
        """Handle new user registration""" 
        client_state = self.clients[client_socket] 
        logger.info(f"New user registration request from {client_state.address}") 
         
        try: 
            # Validate required fields 
            required_fields = ['uniquenick', 'passwordenc', 'email'] 
            if not all(field in data for field in required_fields): 
                raise ValueError("Missing required registration fields") 
                 
            nick = data['uniquenick'] 
            password = data['passwordenc']  # In real GameSpy this would be encrypted 
            email = data['email'] 
             
            # Check if username already exists 
            if any(user['uniquenick'] == nick for user in self.users.values()): 
                raise ValueError("Username already exists") 
                 
            # Create new user 
            user_id = self.next_user_id 
            profile_id = self.next_profile_id 
            self.next_user_id += 1 
            self.next_profile_id += 1 
             
            # Hash password (using SHA256 instead of MD5) 
            password_hash = hashlib.sha256(password.encode()).hexdigest() 
             
            # Store user data 
            self.users[nick] = { 
                'userid': user_id, 
                'profileid': profile_id, 
                'email': email, 
                'password': password_hash, 
                'uniquenick': nick, 
                'registered': datetime.now().isoformat() 
            } 
             
            self.save_users() 
             
            # Send success response 
            response = self.build_message({ 
                "nur": "", 
                "success": "1", 
                "id": "1" 
            }) 
             
            self.send_message(client_socket, response) 
            logger.info(f"Registration successful: {nick} (userid: {user_id})") 
             
        except Exception as e: 
            logger.error(f"Registration failed: {e}") 
            error_msg = self.build_message({ 
                "error": "", 
                "err": "0", 
                "fatal": "", 
                "errmsg": str(e), 
                "id": "1" 
            }) 
            self.send_message(client_socket, error_msg) 
 
    def handle_login(self, client_socket: socket.socket, data: Dict[str, str]): 
        """Process login request with proper authentication""" 
        client_state = self.clients[client_socket] 
        logger.info(f"Login request from {client_state.address}") 
         
        try: 
            if "uniquenick" not in data: 
                raise ValueError("Missing username") 
                 
            nick = data['uniquenick'] 
             
            # Check if user exists 
            if nick not in self.users: 
                raise ValueError("User not found") 
                 
            user = self.users[nick] 
             
            # In a real implementation, verify password hash here 
            # For demo purposes, we'll skip full challenge-response 
             
            # Set up session 
            session_key = random.randint(100000, 999999) 
            client_state.session_key = str(session_key) 
            client_state.authenticated = True 
            client_state.user_id = user['userid'] 
            client_state.profile_id = user['profileid'] 
            client_state.nick = nick 
            client_state.email = user['email'] 
             
            # Send success response 
            response = self.build_message({ 
                "lc": "2", 
                "sesskey": str(session_key), 
                "proof": "0123456789abcdef", 
                "userid": str(user['userid']), 
                "profileid": str(user['profileid']), 
                "uniquenick": nick, 
                "lt": self.generate_challenge(22), 
                "id": "1", 
                "final": "" 
            }) 
             
            self.send_message(client_socket, response) 
            logger.info(f"Login successful: {nick} (session: {session_key})") 
             
        except Exception as e: 
            logger.error(f"Login failed: {e}") 
            error_msg = self.build_message({ 
                "error": "", 
                "err": "0", 
                "fatal": "", 
                "errmsg": str(e), 
                "id": "1" 
            }) 
            self.send_message(client_socket, error_msg) 
    def _debug_packet(self, data: bytes, prefix: str = ""): 
        """Detailed packet debugging""" 
        hex_dump = ' '.join([f"{b:02x}" for b in data]) 
        ascii_dump = ''.join([chr(b) if 32 <= b <= 126 else '.' for b in data]) 
        logger.debug(f"{prefix}HEX: {hex_dump}") 
        logger.debug(f"{prefix}ASCII: {ascii_dump}") 
 
    def encode_message(self, data: bytes) -> bytes: 
        """XOR encode with GameSpy key""" 
        key = GAMESPY_ENCODING.encode('ascii') 
        result = bytearray() 
        for i, byte in enumerate(data): 
            result.append(byte ^ key[i % len(key)]) 
        return bytes(result) 
 
    def decode_message(self, data: bytes) -> bytes: 
        """XOR decode with GameSpy key""" 
        return self.encode_message(data)  # XOR is symmetric 
 
    def generate_challenge(self, length: int = 10) -> str: 
        """Generate GameSpy challenge string""" 
        return ''.join(random.choices(string.ascii_letters + string.digits, k=length)) 
 
    def parse_message(self, message: str) -> Dict[str, str]: 
        """Parse GameSpy message into key-value pairs""" 
        pairs = {} 
        parts = message.split('\\') 
        if not parts[0]: parts.pop(0) 
        for i in range(0, len(parts) - 1, 2): 
            if i + 1 < len(parts): 
                pairs[parts[i]] = parts[i + 1] 
        return pairs 
 
    def build_message(self, pairs: Dict[str, str]) -> str: 
        """Build GameSpy message from key-value pairs""" 
        return '\\' + '\\'.join(f"{k}\\{v}" for k, v in pairs.items()) + '\\' 
 
    def handle_presence(self, client_socket: socket.socket): 
        """Handle GPCM presence protocol""" 
        client_state = self.clients[client_socket] 
         
        # Initial challenge 
        challenge = self.generate_challenge() 
        client_state.challenge = challenge 
        message = self.build_message({ 
            "lc": "1", 
            "challenge": challenge, 
            "id": "1" 
        }) 
        self.send_message(client_socket, message) 
 
        while self.running: 
            data = self.receive_message(client_socket) 
            if not data: 
                break 
 
            message = data.decode('ascii', errors='ignore') 
            pairs = self.parse_message(message) 
             
            if "login" in pairs: 
                self.handle_login(client_socket, pairs) 
            elif "newuser" in pairs: 
                self.handle_newuser(client_socket, pairs) 
            elif "status" in pairs: 
                self.handle_status(client_socket, pairs) 
            elif "heartbeat" in pairs: 
                self.handle_heartbeat(client_socket) 
 
    def start_server(self, port: int, service_type: str): 
        """Start server on specified port""" 
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
        server.bind((self.host, port)) 
        server.listen(5) 
        self.servers[port] = server 
         
        logger.info(f"Started {service_type} server on {self.host}:{port}") 
         
        accept_thread = threading.Thread( 
            target=self.accept_connections, 
            args=(server, service_type), 
            name=f"{service_type}-acceptor" 
        ) 
        accept_thread.daemon = True 
        accept_thread.start() 
 
    def accept_connections(self, server: socket.socket, service_type: str): 
        """Accept incoming connections""" 
        while self.running: 
            try: 
                client_socket, address = server.accept() 
                logger.info(f"New {service_type} connection from {address}") 
                 
                self.clients[client_socket] = ClientState( 
                    socket=client_socket, 
                    address=address 
                ) 
                 
                client_thread = threading.Thread( 
                    target=self.handle_client, 
                    args=(client_socket, service_type), 
                    name=f"{service_type}-{address[0]}:{address[1]}" 
                ) 
                client_thread.daemon = True 
                client_thread.start() 
                 
            except Exception as e: 
                logger.error(f"Accept error: {e}", exc_info=True) 
 
    def handle_client(self, client_socket: socket.socket, service_type: str): 
        """Route client to appropriate handler""" 
        try: 
            if service_type == "presence": 
                self.handle_presence(client_socket) 
            elif service_type == "peerchat": 
                self.handle_peerchat(client_socket) 
            elif service_type == "natneg": 
                self.handle_natneg(client_socket) 
        except Exception as e: 
            logger.error(f"Client handler error: {e}", exc_info=True) 
        finally: 
            self.cleanup_client(client_socket) 
    def handle_heartbeat(self, client_socket: socket.socket): 
        """Update client heartbeat timestamp""" 
        client_state = self.clients[client_socket] 
        client_state.last_heartbeat = time.time() 
        logger.debug(f"Heartbeat from {client_state.nick} ({client_state.address})") 
 
    def _check_heartbeats(self): 
        """Monitor client heartbeats""" 
        while self.running: 
            current_time = time.time() 
            # Check each client's last heartbeat 
            for client_socket, state in list(self.clients.items()): 
                if current_time - state.last_heartbeat > self.heartbeat_interval: 
                    logger.warning(f"Client timeout: {state.nick} ({state.address})") 
                    self.cleanup_client(client_socket) 
            time.sleep(1) 
 
    def handle_status(self, client_socket: socket.socket, data: Dict[str, str]): 
        """Handle status updates from client""" 
        client_state = self.clients[client_socket] 
        if 'status' in data: 
            logger.info(f"Status update from {client_state.nick}: {data['status']}") 
        if 'locport' in data: 
            client_state.game_port = int(data['locport']) 
        if 'localip' in data: 
            client_state.local_ip = data['localip'] 
 
    def send_message(self, client_socket: socket.socket, message: str): 
        """Send encoded message""" 
        try: 
            data = message.encode('ascii', errors='ignore') 
            encoded = self.encode_message(data) 
            client_socket.send(encoded) 
            logger.debug(f"Sent to {self.clients[client_socket].address}:") 
            logger.debug(f"  Original: {message}") 
            self._debug_packet(encoded, "  Encoded: ") 
        except Exception as e: 
            logger.error(f"Send error: {e}", exc_info=True) 
            self.cleanup_client(client_socket) 
 
    def receive_message(self, client_socket: socket.socket) -> Optional[bytes]: 
        """Receive and decode message""" 
        try: 
            data = client_socket.recv(4096) 
            if not data: 
                return None 
                 
            decoded = self.decode_message(data) 
            logger.debug(f"Received from {self.clients[client_socket].address}:") 
            self._debug_packet(data, "  Raw: ") 
            self._debug_packet(decoded, "  Decoded: ") 
            return decoded 
             
        except Exception as e: 
            logger.error(f"Receive error: {e}", exc_info=True) 
            return None 
 
    def cleanup_client(self, client_socket: socket.socket): 
        """Clean up disconnected client""" 
        try: 
            client_state = self.clients.get(client_socket) 
            if client_state: 
                logger.info(f"Cleaning up client: {client_state.nick} ({client_state.address})") 
                del self.clients[client_socket] 
            client_socket.close() 
        except Exception as e: 
            logger.error(f"Cleanup error: {e}", exc_info=True) 
 
    def stop(self): 
        """Stop the server""" 
        logger.info("Stopping server...") 
        self.running = False 
        # Close all server sockets 
        for server in self.servers.values(): 
            try: 
                server.close() 
            except Exception as e: 
                logger.error(f"Stop error: {e}", exc_info=True) 
        # Save final user state 
        self.save_users() 
 
 
def main(): 
    server = GameSpyPSPServer() 
 
    try: 
        # Start different services 
        server.start_server(29900, "presence")    # GPCM 
        server.start_server(29901, "peerchat")    # PeerChat 
        server.start_server(27901, "natneg")      # NAT Negotiation 
 
        logger.info("GameSpy PSP server started. Press Ctrl+C to stop.") 
         
        while True: 
            time.sleep(1) 
    except KeyboardInterrupt: 
        logger.info("Shutting down...") 
        server.stop() 
 
if __name__ == "__main__": 
    main()
Leave a Comment