READ DESCRIPTON!!!
you need to change username and bearer token areas!from rich import print import requests import os from datetime import datetime from pathlib import Path import sqlite3 import json import time def decode_varint(buffer): """Decode a protobuf varint to integer.""" value = 0 shift = 0 for byte in buffer: value |= (byte & 0x7F) << shift if not (byte & 0x80): break shift += 7 # Handle negative numbers (two's complement) if value > 0x7FFFFFFFFFFFFFFF: value = value - (1 << 64) return value def decode_queue_position(response_bytes): """Decode the protobuf response for queue position.""" if not response_bytes or len(response_bytes) < 2: raise ValueError("Invalid response bytes") if response_bytes[0] != 0x08: # Field 1, wire type 0 raise ValueError("Unexpected protobuf format") return decode_varint(response_bytes[1:]) def get_latest_edited_file(workspace_path: str = None) -> str: if workspace_path is None: workspace_path = r"C:\Users\your_username_here\AppData\Roaming\Cursor\User\workspaceStorage" try: # Get all files recursively latest_time = 0 latest_file = None target_file = "state.vscdb" for root, dirs, files in os.walk(workspace_path): if target_file in files: file_path = os.path.join(root, target_file) try: # Get the last modified time mtime = os.path.getmtime(file_path) if mtime > latest_time: latest_time = mtime latest_file = file_path except OSError: continue if latest_file: return latest_file else: print("[bold red]No state.vscdb files found in workspace storage[/bold red]") return None except Exception as e: print(f"[bold red]Error accessing workspace storage:[/bold red] {str(e)}") return None def get_ai_generations(db_path: str = None): if db_path is None: db_path = get_latest_edited_file() if not db_path: return None try: # Connect to the database conn = sqlite3.connect(db_path) cursor = conn.cursor() # Query for aiService.generations cursor.execute("SELECT key, value FROM ItemTable WHERE key LIKE 'aiService.generations%'") rows = cursor.fetchall() generations = [] for row in rows: key, value = row try: # Parse the JSON value gen_data = json.loads(value) generations.append({ 'key': key, 'data': gen_data }) except json.JSONDecodeError: print(f"[bold yellow]Warning: Could not parse JSON for key {key}[/bold yellow]") continue # Print the generations in a readable format if generations: pass else: print("[bold yellow]No AI generations found in the database[/bold yellow]") return generations except sqlite3.Error as e: print(f"[bold red]SQLite error:[/bold red] {str(e)}") return None except Exception as e: print(f"[bold red]Error reading generations:[/bold red] {str(e)}") return None finally: if 'conn' in locals(): conn.close() def queuePosition(request_id: str, model: str) -> int: url = "https://api2.cursor.sh/aiserver.v1.AiService/CheckQueuePosition" headers = { "accept-encoding": "gzip", "authorization": "Bearer your bearer token here", "connect-protocol-version": "1", "content-type": "application/proto", "user-agent": "connect-es/1.6.1", "x-ghost-mode": "true" } # Protobuf request data data = f"\n${request_id}\u0012\u000f\n\u000b{model}\"\u0000" try: response = requests.post(url, headers=headers, data=data) # Check response status if response.status_code != 200: print(f"[bold red]Error: HTTP {response.status_code}[/bold red]") return None # Verify content type if response.headers.get("content-type") != "application/proto": print("[bold red]Error: Unexpected content type[/bold red]") return None # Verify response content if not response.content: print("Request processing...") return None return decode_queue_position(response.content) except requests.RequestException as e: print(f"[bold red]Network error:[/bold red] {str(e)}") return None except ValueError as e: print(f"[bold red]Decoding error:[/bold red] {str(e)}") return None except Exception as e: print(f"[bold red]Unexpected error:[/bold red] {str(e)}") return None def get_latest_composer_uuid(db_path: str = None) -> str: generations = get_ai_generations(db_path) if not generations: return None latest_time = 0 latest_uuid = None for gen in generations: try: data = gen['data'] # Handle case where data is a list if isinstance(data, list): for item in data: if isinstance(item, dict) and item.get('type') == 'composer': unix_ms = int(item.get('unixMs', 0)) if unix_ms > latest_time: latest_time = unix_ms latest_uuid = item.get('generationUUID') # Handle case where data is a dict elif isinstance(data, dict) and data.get('type') == 'composer': unix_ms = int(data.get('unixMs', 0)) if unix_ms > latest_time: latest_time = unix_ms latest_uuid = data.get('generationUUID') except (KeyError, ValueError, TypeError) as e: print(f"[bold yellow]Warning: Error processing generation: {str(e)}[/bold yellow]") continue if latest_uuid: timestamp = datetime.fromtimestamp(latest_time / 1000).strftime('%Y-%m-%d %H:%M:%S') return latest_uuid else: print("[bold yellow]No composer generations found[/bold yellow]") return None # Example usage if __name__ == "__main__": #clear the console os.system('cls' if os.name == 'nt' else 'clear') while True: generations = get_ai_generations() latest_uuid = get_latest_composer_uuid() # Test queue position if latest_uuid: request_id = latest_uuid model = "deepseek-r1" # model name is not important for the queue position position = queuePosition(request_id, model) if position is not None: print(f"[bold green]Queue Position:[/bold green] {position}") else: print("[bold yellow]No composer generations found[/bold yellow]") time.sleep(0.5)
Leave a Comment