Untitled
unknown
plain_text
2 years ago
4.9 kB
7
Indexable
from fastapi import FastAPI, WebSocket, WebSocketDisconnect from fastapi.responses import HTMLResponse from game import Game, State from connection_manager import ConnectionManager app = FastAPI() manager = ConnectionManager() games = [] @app.get("/") async def get(): with open("index.html", 'r') as index: index_string = index.read() return HTMLResponse(index_string) @app.websocket("/lobby") async def websocket_lobby(websocket: WebSocket): await manager.connect(websocket) try: while True: data = await websocket.receive_bytes() player_id = data.decode(encoding="utf-16") for current_game in games: if current_game.white_id == player_id or current_game.black_id == player_id: current_game_board = [] current_game_board_raw = [] for piece in range(1, len(current_game.board) + 1): if current_game.board[piece] == State.WHITE: current_game_board.append(0) elif current_game.board[piece] == State.BLACK: current_game_board.append(1) else: current_game_board.append(2) for x in current_game_board: current_game_board_raw.append(x.to_bytes(byteorder="big", length=2, signed=True)) await websocket.send_bytes(current_game_board_raw) await websocket.send_bytes(current_game.id.encode(encoding="utf-16")) await websocket.close() return None if (len(manager.active_connections) % 2) != 0: game = Game() games.append(game) game.white_id = data.decode(encoding="utf-16") await websocket.send_bytes(game.id.encode(encoding="utf-16")) await websocket.close() else: for game in games: if game.black_websocket is None: game.black_id = data.decode(encoding="utf-16") await websocket.send_bytes(game.id.encode(encoding="utf-16")) await websocket.close() except WebSocketDisconnect: print("WebSocketDisconnect") @app.websocket("/{game_id}") async def websocket_game(websocket: WebSocket, game_id: str): await websocket.accept() for game in games: if game.id == game_id: if game.white_websocket is None: game.white_websocket = websocket elif game.black_websocket is None: game.black_websocket = websocket try: while True: if game.white_turn: await game.white_websocket.send_bytes((1).to_bytes(length=2, byteorder="big")) else: await game.black_websocket.send_bytes((1).to_bytes(length=2, byteorder="big")) data = await websocket.receive_bytes() x_raw = data[0:2] y_raw = data[2:4] coordinates = [int.from_bytes(x_raw, byteorder="big"), int.from_bytes(y_raw, byteorder="big")] object_index = game.is_interactive(coordinates) if object_index is not None: move = game.add_interactions(object_index) if move: payload = [] for x in move: payload.append(x.to_bytes(byteorder="big", length=2, signed=True)) await game.white_websocket.send_bytes(payload) await game.black_websocket.send_bytes(payload) if game.game_over: if game.white_wins: winner = 1111 elif game.black_wins: winner = 0000 await game.white_websocket.send_bytes(winner.to_bytes(byteorder="big", length=4, signed=True)) await game.black_websocket.send_bytes(winner.to_bytes(byteorder="big", length=4, signed=True)) await game.white_websocket.close() await game.black_websocket.close() except WebSocketDisconnect: if game.white_websocket == websocket: game.white_websocket = None elif game.black_websocket == websocket: game.black_websocket = None manager.disconnect(websocket)
Editor is loading...