Untitled
unknown
plain_text
10 months ago
17 kB
5
Indexable
import socket
import threading
import hashlib
import random
import string
import logging
import time
import struct
import json
import os
from dataclasses import dataclass
from typing import Dict, Optional, Tuple
from datetime import datetime
import binascii
# Set up detailed logging
LOG_FORMAT = '%(asctime)s.%(msecs)03d [%(threadName)s] %(levelname)s: %(message)s'
DATE_FORMAT = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(
level=logging.DEBUG,
format=LOG_FORMAT,
datefmt=DATE_FORMAT,
handlers=[
logging.FileHandler('gamespy_debug.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# PSP-specific constants
PSP_GAMESPY_ID = 11 # PSP Platform ID for GameSpy
GAMESPY_GAMENAME = "squarecom" # Warhammer 40k Squad Command identifier
GAMESPY_DETECTION = bytes([0xFE, 0xFD])
GAMESPY_ENCODING = "GameSpy3D"
NATNEG_MAGIC = b'\xfd\xfc\x1e\x66\x6a\xb2'
USER_DATABASE = "users.json"
@dataclass
class ClientState:
"""Tracks state for connected clients"""
socket: socket.socket
address: Tuple[str, int]
session_key: Optional[str] = None
challenge: Optional[str] = None
client_challenge: Optional[str] = None
authenticated: bool = False
user_id: Optional[int] = None
profile_id: Optional[int] = None
nick: Optional[str] = None
email: Optional[str] = None
game_port: Optional[int] = None
local_ip: Optional[str] = None
last_heartbeat: float = time.time()
def __post_init__(self):
self.connect_time = datetime.now()
self.last_packet_time = time.time()
class GameSpyPSPServer:
def __init__(self, host='0.0.0.0'):
self.host = host
self.servers = {}
self.clients: Dict[socket.socket, ClientState] = {}
self.running = True
self.heartbeat_interval = 10.0
self.next_user_id = 1000
self.next_profile_id = 1000
self.users = {}
self.load_users()
# Start heartbeat checker
self.heartbeat_thread = threading.Thread(target=self._check_heartbeats)
self.heartbeat_thread.daemon = True
self.heartbeat_thread.start()
def load_users(self):
"""Load user database from JSON file"""
try:
if os.path.exists(USER_DATABASE):
with open(USER_DATABASE, 'r') as f:
self.users = json.load(f)
# Find highest user/profile ID for new registrations
for user in self.users.values():
self.next_user_id = max(self.next_user_id, user['userid'] + 1)
self.next_profile_id = max(self.next_profile_id, user['profileid'] + 1)
else:
self.users = {}
except Exception as e:
logger.error(f"Failed to load users: {e}")
self.users = {}
def save_users(self):
"""Save user database to JSON file"""
try:
with open(USER_DATABASE, 'w') as f:
json.dump(self.users, f, indent=2)
except Exception as e:
logger.error(f"Failed to save users: {e}")
def handle_newuser(self, client_socket: socket.socket, data: Dict[str, str]):
"""Handle new user registration"""
client_state = self.clients[client_socket]
logger.info(f"New user registration request from {client_state.address}")
try:
# Validate required fields
required_fields = ['uniquenick', 'passwordenc', 'email']
if not all(field in data for field in required_fields):
raise ValueError("Missing required registration fields")
nick = data['uniquenick']
password = data['passwordenc'] # In real GameSpy this would be encrypted
email = data['email']
# Check if username already exists
if any(user['uniquenick'] == nick for user in self.users.values()):
raise ValueError("Username already exists")
# Create new user
user_id = self.next_user_id
profile_id = self.next_profile_id
self.next_user_id += 1
self.next_profile_id += 1
# Hash password (using SHA256 instead of MD5)
password_hash = hashlib.sha256(password.encode()).hexdigest()
# Store user data
self.users[nick] = {
'userid': user_id,
'profileid': profile_id,
'email': email,
'password': password_hash,
'uniquenick': nick,
'registered': datetime.now().isoformat()
}
self.save_users()
# Send success response
response = self.build_message({
"nur": "",
"success": "1",
"id": "1"
})
self.send_message(client_socket, response)
logger.info(f"Registration successful: {nick} (userid: {user_id})")
except Exception as e:
logger.error(f"Registration failed: {e}")
error_msg = self.build_message({
"error": "",
"err": "0",
"fatal": "",
"errmsg": str(e),
"id": "1"
})
self.send_message(client_socket, error_msg)
def handle_login(self, client_socket: socket.socket, data: Dict[str, str]):
"""Process login request with proper authentication"""
client_state = self.clients[client_socket]
logger.info(f"Login request from {client_state.address}")
try:
if "uniquenick" not in data:
raise ValueError("Missing username")
nick = data['uniquenick']
# Check if user exists
if nick not in self.users:
raise ValueError("User not found")
user = self.users[nick]
# In a real implementation, verify password hash here
# For demo purposes, we'll skip full challenge-response
# Set up session
session_key = random.randint(100000, 999999)
client_state.session_key = str(session_key)
client_state.authenticated = True
client_state.user_id = user['userid']
client_state.profile_id = user['profileid']
client_state.nick = nick
client_state.email = user['email']
# Send success response
response = self.build_message({
"lc": "2",
"sesskey": str(session_key),
"proof": "0123456789abcdef",
"userid": str(user['userid']),
"profileid": str(user['profileid']),
"uniquenick": nick,
"lt": self.generate_challenge(22),
"id": "1",
"final": ""
})
self.send_message(client_socket, response)
logger.info(f"Login successful: {nick} (session: {session_key})")
except Exception as e:
logger.error(f"Login failed: {e}")
error_msg = self.build_message({
"error": "",
"err": "0",
"fatal": "",
"errmsg": str(e),
"id": "1"
})
self.send_message(client_socket, error_msg)
def _debug_packet(self, data: bytes, prefix: str = ""):
"""Detailed packet debugging"""
hex_dump = ' '.join([f"{b:02x}" for b in data])
ascii_dump = ''.join([chr(b) if 32 <= b <= 126 else '.' for b in data])
logger.debug(f"{prefix}HEX: {hex_dump}")
logger.debug(f"{prefix}ASCII: {ascii_dump}")
def encode_message(self, data: bytes) -> bytes:
"""XOR encode with GameSpy key"""
key = GAMESPY_ENCODING.encode('ascii')
result = bytearray()
for i, byte in enumerate(data):
result.append(byte ^ key[i % len(key)])
return bytes(result)
def decode_message(self, data: bytes) -> bytes:
"""XOR decode with GameSpy key"""
return self.encode_message(data) # XOR is symmetric
def generate_challenge(self, length: int = 10) -> str:
"""Generate GameSpy challenge string"""
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def parse_message(self, message: str) -> Dict[str, str]:
"""Parse GameSpy message into key-value pairs"""
pairs = {}
parts = message.split('\\')
if not parts[0]: parts.pop(0)
for i in range(0, len(parts) - 1, 2):
if i + 1 < len(parts):
pairs[parts[i]] = parts[i + 1]
return pairs
def build_message(self, pairs: Dict[str, str]) -> str:
"""Build GameSpy message from key-value pairs"""
return '\\' + '\\'.join(f"{k}\\{v}" for k, v in pairs.items()) + '\\'
def handle_presence(self, client_socket: socket.socket):
"""Handle GPCM presence protocol"""
client_state = self.clients[client_socket]
# Initial challenge
challenge = self.generate_challenge()
client_state.challenge = challenge
message = self.build_message({
"lc": "1",
"challenge": challenge,
"id": "1"
})
self.send_message(client_socket, message)
while self.running:
data = self.receive_message(client_socket)
if not data:
break
message = data.decode('ascii', errors='ignore')
pairs = self.parse_message(message)
if "login" in pairs:
self.handle_login(client_socket, pairs)
elif "newuser" in pairs:
self.handle_newuser(client_socket, pairs)
elif "status" in pairs:
self.handle_status(client_socket, pairs)
elif "heartbeat" in pairs:
self.handle_heartbeat(client_socket)
def start_server(self, port: int, service_type: str):
"""Start server on specified port"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, port))
server.listen(5)
self.servers[port] = server
logger.info(f"Started {service_type} server on {self.host}:{port}")
accept_thread = threading.Thread(
target=self.accept_connections,
args=(server, service_type),
name=f"{service_type}-acceptor"
)
accept_thread.daemon = True
accept_thread.start()
def accept_connections(self, server: socket.socket, service_type: str):
"""Accept incoming connections"""
while self.running:
try:
client_socket, address = server.accept()
logger.info(f"New {service_type} connection from {address}")
self.clients[client_socket] = ClientState(
socket=client_socket,
address=address
)
client_thread = threading.Thread(
target=self.handle_client,
args=(client_socket, service_type),
name=f"{service_type}-{address[0]}:{address[1]}"
)
client_thread.daemon = True
client_thread.start()
except Exception as e:
logger.error(f"Accept error: {e}", exc_info=True)
def handle_client(self, client_socket: socket.socket, service_type: str):
"""Route client to appropriate handler"""
try:
if service_type == "presence":
self.handle_presence(client_socket)
elif service_type == "peerchat":
self.handle_peerchat(client_socket)
elif service_type == "natneg":
self.handle_natneg(client_socket)
except Exception as e:
logger.error(f"Client handler error: {e}", exc_info=True)
finally:
self.cleanup_client(client_socket)
def handle_heartbeat(self, client_socket: socket.socket):
"""Update client heartbeat timestamp"""
client_state = self.clients[client_socket]
client_state.last_heartbeat = time.time()
logger.debug(f"Heartbeat from {client_state.nick} ({client_state.address})")
def _check_heartbeats(self):
"""Monitor client heartbeats"""
while self.running:
current_time = time.time()
# Check each client's last heartbeat
for client_socket, state in list(self.clients.items()):
if current_time - state.last_heartbeat > self.heartbeat_interval:
logger.warning(f"Client timeout: {state.nick} ({state.address})")
self.cleanup_client(client_socket)
time.sleep(1)
def handle_status(self, client_socket: socket.socket, data: Dict[str, str]):
"""Handle status updates from client"""
client_state = self.clients[client_socket]
if 'status' in data:
logger.info(f"Status update from {client_state.nick}: {data['status']}")
if 'locport' in data:
client_state.game_port = int(data['locport'])
if 'localip' in data:
client_state.local_ip = data['localip']
def send_message(self, client_socket: socket.socket, message: str):
"""Send encoded message"""
try:
data = message.encode('ascii', errors='ignore')
encoded = self.encode_message(data)
client_socket.send(encoded)
logger.debug(f"Sent to {self.clients[client_socket].address}:")
logger.debug(f" Original: {message}")
self._debug_packet(encoded, " Encoded: ")
except Exception as e:
logger.error(f"Send error: {e}", exc_info=True)
self.cleanup_client(client_socket)
def receive_message(self, client_socket: socket.socket) -> Optional[bytes]:
"""Receive and decode message"""
try:
data = client_socket.recv(4096)
if not data:
return None
decoded = self.decode_message(data)
logger.debug(f"Received from {self.clients[client_socket].address}:")
self._debug_packet(data, " Raw: ")
self._debug_packet(decoded, " Decoded: ")
return decoded
except Exception as e:
logger.error(f"Receive error: {e}", exc_info=True)
return None
def cleanup_client(self, client_socket: socket.socket):
"""Clean up disconnected client"""
try:
client_state = self.clients.get(client_socket)
if client_state:
logger.info(f"Cleaning up client: {client_state.nick} ({client_state.address})")
del self.clients[client_socket]
client_socket.close()
except Exception as e:
logger.error(f"Cleanup error: {e}", exc_info=True)
def stop(self):
"""Stop the server"""
logger.info("Stopping server...")
self.running = False
# Close all server sockets
for server in self.servers.values():
try:
server.close()
except Exception as e:
logger.error(f"Stop error: {e}", exc_info=True)
# Save final user state
self.save_users()
def main():
server = GameSpyPSPServer()
try:
# Start different services
server.start_server(29900, "presence") # GPCM
server.start_server(29901, "peerchat") # PeerChat
server.start_server(27901, "natneg") # NAT Negotiation
logger.info("GameSpy PSP server started. Press Ctrl+C to stop.")
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Shutting down...")
server.stop()
if __name__ == "__main__":
main()Editor is loading...
Leave a Comment