READ DESCRIPTON!!!

you need to change username and bearer token areas!
 avatar
unknown
python
a day ago
7.3 kB
38
No Index
from rich import print
import requests
import os
from datetime import datetime
from pathlib import Path
import sqlite3
import json
import time
def decode_varint(buffer):
    """Decode a protobuf varint to integer."""
    value = 0
    shift = 0
    
    for byte in buffer:
        value |= (byte & 0x7F) << shift
        if not (byte & 0x80):
            break
        shift += 7
    
    # Handle negative numbers (two's complement)
    if value > 0x7FFFFFFFFFFFFFFF:
        value = value - (1 << 64)
        
    return value

def decode_queue_position(response_bytes):
    """Decode the protobuf response for queue position."""
    if not response_bytes or len(response_bytes) < 2:
        raise ValueError("Invalid response bytes")
        
    if response_bytes[0] != 0x08:  # Field 1, wire type 0
        raise ValueError("Unexpected protobuf format")
    return decode_varint(response_bytes[1:])

def get_latest_edited_file(workspace_path: str = None) -> str:
    if workspace_path is None:
        workspace_path = r"C:\Users\your_username_here\AppData\Roaming\Cursor\User\workspaceStorage"
    
    try:
        # Get all files recursively
        latest_time = 0
        latest_file = None
        target_file = "state.vscdb"
        
        for root, dirs, files in os.walk(workspace_path):
            if target_file in files:
                file_path = os.path.join(root, target_file)
                try:
                    # Get the last modified time
                    mtime = os.path.getmtime(file_path)
                    if mtime > latest_time:
                        latest_time = mtime
                        latest_file = file_path
                except OSError:
                    continue
        
        if latest_file:
            return latest_file
        else:
            print("[bold red]No state.vscdb files found in workspace storage[/bold red]")
            return None
            
    except Exception as e:
        print(f"[bold red]Error accessing workspace storage:[/bold red] {str(e)}")
        return None

def get_ai_generations(db_path: str = None):
    if db_path is None:
        db_path = get_latest_edited_file()
        if not db_path:
            return None
    
    try:
        # Connect to the database
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        # Query for aiService.generations
        cursor.execute("SELECT key, value FROM ItemTable WHERE key LIKE 'aiService.generations%'")
        rows = cursor.fetchall()
        
        generations = []
        for row in rows:
            key, value = row
            try:
                # Parse the JSON value
                gen_data = json.loads(value)
                generations.append({
                    'key': key,
                    'data': gen_data
                })
            except json.JSONDecodeError:
                print(f"[bold yellow]Warning: Could not parse JSON for key {key}[/bold yellow]")
                continue
        
        # Print the generations in a readable format
        if generations:
            pass
        else:
            print("[bold yellow]No AI generations found in the database[/bold yellow]")
            
        return generations
        
    except sqlite3.Error as e:
        print(f"[bold red]SQLite error:[/bold red] {str(e)}")
        return None
    except Exception as e:
        print(f"[bold red]Error reading generations:[/bold red] {str(e)}")
        return None
    finally:
        if 'conn' in locals():
            conn.close()

def queuePosition(request_id: str, model: str) -> int:
    url = "https://api2.cursor.sh/aiserver.v1.AiService/CheckQueuePosition"
    
    headers = {
        "accept-encoding": "gzip",
        "authorization": "Bearer your bearer token here",
        "connect-protocol-version": "1",
        "content-type": "application/proto",
        "user-agent": "connect-es/1.6.1",
        "x-ghost-mode": "true"
    }
    
    # Protobuf request data
    data = f"\n${request_id}\u0012\u000f\n\u000b{model}\"\u0000"
    
    try:
        response = requests.post(url, headers=headers, data=data)
        
        # Check response status
        if response.status_code != 200:
            print(f"[bold red]Error: HTTP {response.status_code}[/bold red]")
            return None
            
        # Verify content type
        if response.headers.get("content-type") != "application/proto":
            print("[bold red]Error: Unexpected content type[/bold red]")
            return None
            
        # Verify response content
        if not response.content:
            print("Request processing...")
            return None
            
        return decode_queue_position(response.content)
        
    except requests.RequestException as e:
        print(f"[bold red]Network error:[/bold red] {str(e)}")
        return None
    except ValueError as e:
        print(f"[bold red]Decoding error:[/bold red] {str(e)}")
        return None
    except Exception as e:
        print(f"[bold red]Unexpected error:[/bold red] {str(e)}")
        return None

def get_latest_composer_uuid(db_path: str = None) -> str:
    generations = get_ai_generations(db_path)
    if not generations:
        return None
        
    latest_time = 0
    latest_uuid = None
    
    for gen in generations:
        try:
            data = gen['data']
            # Handle case where data is a list
            if isinstance(data, list):
                for item in data:
                    if isinstance(item, dict) and item.get('type') == 'composer':
                        unix_ms = int(item.get('unixMs', 0))
                        if unix_ms > latest_time:
                            latest_time = unix_ms
                            latest_uuid = item.get('generationUUID')
            # Handle case where data is a dict
            elif isinstance(data, dict) and data.get('type') == 'composer':
                unix_ms = int(data.get('unixMs', 0))
                if unix_ms > latest_time:
                    latest_time = unix_ms
                    latest_uuid = data.get('generationUUID')
        except (KeyError, ValueError, TypeError) as e:
            print(f"[bold yellow]Warning: Error processing generation: {str(e)}[/bold yellow]")
            continue
    
    if latest_uuid:
        timestamp = datetime.fromtimestamp(latest_time / 1000).strftime('%Y-%m-%d %H:%M:%S')
        return latest_uuid
    else:
        print("[bold yellow]No composer generations found[/bold yellow]")
        return None

# Example usage
if __name__ == "__main__":
    #clear the console
    os.system('cls' if os.name == 'nt' else 'clear')
    while True:
        generations = get_ai_generations()
        latest_uuid = get_latest_composer_uuid()
        # Test queue position
        if latest_uuid:
            request_id = latest_uuid
            model = "deepseek-r1" # model name is not important for the queue position
            
            position = queuePosition(request_id, model)
            if position is not None:
                print(f"[bold green]Queue Position:[/bold green] {position}")
        else:
            print("[bold yellow]No composer generations found[/bold yellow]")
        time.sleep(0.5)
Leave a Comment