Untitled
unknown
plain_text
a month ago
3.9 kB
7
Indexable
"""
Manager WebSocket test.
Run with: python tests/ai/ai_review/test_manager_websocket.py
Mirrors test_websocket.py — same checks, different endpoint.
Fill in FULL_URL before running.
"""
import sys
import os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "../../..")))
import asyncio
import json
import websockets
# ── PASTE YOUR FULL URL HERE ─────────────────────────────────────────────────
# Note: endpoint is /manager-realtime not /openai-realtime
# employee_id here is the employee being reviewed by the manager
FULL_URL = "ws://localhost:8000/ws/v1/manager-realtime?employee_id=YOUR_EMPLOYEE_ID&session_id=test-manager-001&token=YOUR_JWT"
# ─────────────────────────────────────────────────────────────────────────────
async def test_manager_websocket():
print(f"Connecting to: {FULL_URL}")
print("-" * 60)
try:
async with websockets.connect(FULL_URL) as ws:
print("✅ WebSocket connected")
async def listen():
async for message in ws:
if isinstance(message, bytes):
print(f"📦 Binary audio: {len(message)} bytes")
else:
try:
data = json.loads(message)
msg_type = data.get("type", "unknown")
if msg_type == "status":
print(f"✅ Status: {data.get('message')}")
elif msg_type == "error":
print(f"❌ Error: {data.get('message')}")
elif msg_type == "session.created":
print(f"✅ Session created with OpenAI")
elif msg_type == "session.updated":
session = data.get("session", {})
instructions = session.get("instructions", "") or ""
tools = session.get("tools", []) or []
print(f"✅ Session updated:")
print(f" instructions length : {len(instructions)} chars")
print(f" tools registered : {[t.get('name') for t in tools]}")
elif msg_type == "review_progress":
print(f"📊 Progress: {data.get('percentage')}%")
elif msg_type == "heartbeat":
print(f"💓 Heartbeat")
else:
print(f"📨 {msg_type}")
except json.JSONDecodeError:
print(f"📨 Raw: {message[:100]}")
try:
await asyncio.wait_for(listen(), timeout=15.0)
except asyncio.TimeoutError:
print("\n⏱ 15 second test complete")
except Exception as e:
print(f"❌ Connection failed: {e}")
print("\n── Expected ────────────────────────────────────────────")
print("✅ WebSocket connected")
print("✅ Status: Connected to proxy server")
print("✅ Status: Connected to OpenAI")
print("✅ Session created with OpenAI")
print("✅ Session updated: instructions + tools")
print("📨 Audio events (greeting from Archie)")
if __name__ == "__main__":
asyncio.run(test_manager_websocket())Editor is loading...
Leave a Comment