READ DESCRIPTON!!!
you need to change username and bearer token areas!unknown
python
9 months ago
7.3 kB
112
No Index
from rich import print
import requests
import os
from datetime import datetime
from pathlib import Path
import sqlite3
import json
import time
def decode_varint(buffer):
"""Decode a protobuf varint to integer."""
value = 0
shift = 0
for byte in buffer:
value |= (byte & 0x7F) << shift
if not (byte & 0x80):
break
shift += 7
# Handle negative numbers (two's complement)
if value > 0x7FFFFFFFFFFFFFFF:
value = value - (1 << 64)
return value
def decode_queue_position(response_bytes):
"""Decode the protobuf response for queue position."""
if not response_bytes or len(response_bytes) < 2:
raise ValueError("Invalid response bytes")
if response_bytes[0] != 0x08: # Field 1, wire type 0
raise ValueError("Unexpected protobuf format")
return decode_varint(response_bytes[1:])
def get_latest_edited_file(workspace_path: str = None) -> str:
if workspace_path is None:
workspace_path = r"C:\Users\your_username_here\AppData\Roaming\Cursor\User\workspaceStorage"
try:
# Get all files recursively
latest_time = 0
latest_file = None
target_file = "state.vscdb"
for root, dirs, files in os.walk(workspace_path):
if target_file in files:
file_path = os.path.join(root, target_file)
try:
# Get the last modified time
mtime = os.path.getmtime(file_path)
if mtime > latest_time:
latest_time = mtime
latest_file = file_path
except OSError:
continue
if latest_file:
return latest_file
else:
print("[bold red]No state.vscdb files found in workspace storage[/bold red]")
return None
except Exception as e:
print(f"[bold red]Error accessing workspace storage:[/bold red] {str(e)}")
return None
def get_ai_generations(db_path: str = None):
if db_path is None:
db_path = get_latest_edited_file()
if not db_path:
return None
try:
# Connect to the database
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Query for aiService.generations
cursor.execute("SELECT key, value FROM ItemTable WHERE key LIKE 'aiService.generations%'")
rows = cursor.fetchall()
generations = []
for row in rows:
key, value = row
try:
# Parse the JSON value
gen_data = json.loads(value)
generations.append({
'key': key,
'data': gen_data
})
except json.JSONDecodeError:
print(f"[bold yellow]Warning: Could not parse JSON for key {key}[/bold yellow]")
continue
# Print the generations in a readable format
if generations:
pass
else:
print("[bold yellow]No AI generations found in the database[/bold yellow]")
return generations
except sqlite3.Error as e:
print(f"[bold red]SQLite error:[/bold red] {str(e)}")
return None
except Exception as e:
print(f"[bold red]Error reading generations:[/bold red] {str(e)}")
return None
finally:
if 'conn' in locals():
conn.close()
def queuePosition(request_id: str, model: str) -> int:
url = "https://api2.cursor.sh/aiserver.v1.AiService/CheckQueuePosition"
headers = {
"accept-encoding": "gzip",
"authorization": "Bearer your bearer token here",
"connect-protocol-version": "1",
"content-type": "application/proto",
"user-agent": "connect-es/1.6.1",
"x-ghost-mode": "true"
}
# Protobuf request data
data = f"\n${request_id}\u0012\u000f\n\u000b{model}\"\u0000"
try:
response = requests.post(url, headers=headers, data=data)
# Check response status
if response.status_code != 200:
print(f"[bold red]Error: HTTP {response.status_code}[/bold red]")
return None
# Verify content type
if response.headers.get("content-type") != "application/proto":
print("[bold red]Error: Unexpected content type[/bold red]")
return None
# Verify response content
if not response.content:
print("Request processing...")
return None
return decode_queue_position(response.content)
except requests.RequestException as e:
print(f"[bold red]Network error:[/bold red] {str(e)}")
return None
except ValueError as e:
print(f"[bold red]Decoding error:[/bold red] {str(e)}")
return None
except Exception as e:
print(f"[bold red]Unexpected error:[/bold red] {str(e)}")
return None
def get_latest_composer_uuid(db_path: str = None) -> str:
generations = get_ai_generations(db_path)
if not generations:
return None
latest_time = 0
latest_uuid = None
for gen in generations:
try:
data = gen['data']
# Handle case where data is a list
if isinstance(data, list):
for item in data:
if isinstance(item, dict) and item.get('type') == 'composer':
unix_ms = int(item.get('unixMs', 0))
if unix_ms > latest_time:
latest_time = unix_ms
latest_uuid = item.get('generationUUID')
# Handle case where data is a dict
elif isinstance(data, dict) and data.get('type') == 'composer':
unix_ms = int(data.get('unixMs', 0))
if unix_ms > latest_time:
latest_time = unix_ms
latest_uuid = data.get('generationUUID')
except (KeyError, ValueError, TypeError) as e:
print(f"[bold yellow]Warning: Error processing generation: {str(e)}[/bold yellow]")
continue
if latest_uuid:
timestamp = datetime.fromtimestamp(latest_time / 1000).strftime('%Y-%m-%d %H:%M:%S')
return latest_uuid
else:
print("[bold yellow]No composer generations found[/bold yellow]")
return None
# Example usage
if __name__ == "__main__":
#clear the console
os.system('cls' if os.name == 'nt' else 'clear')
while True:
generations = get_ai_generations()
latest_uuid = get_latest_composer_uuid()
# Test queue position
if latest_uuid:
request_id = latest_uuid
model = "deepseek-r1" # model name is not important for the queue position
position = queuePosition(request_id, model)
if position is not None:
print(f"[bold green]Queue Position:[/bold green] {position}")
else:
print("[bold yellow]No composer generations found[/bold yellow]")
time.sleep(0.5)
Editor is loading...
Leave a Comment