Untitled
unknown
plain_text
3 months ago
9.3 kB
6
Indexable
async def call(): adapter = OpenAiAdapter( OpenAI(api_key=os.getenv('OPENAI_API_KEY'), max_retries=2), "gpt-4o" ) details = ProductDetails() language = ReportLanguage(code='pl') details.product_name = 'Dyson Purifier Cool Formaldehyde TP09' details.product_website_content = '' # This will hold our final structured JSON which is updated continuously. final_output: Dict[str, Any] = {} # Buffer for accumulating partial JSON text. acc_buffer: str = "" # Track the active nested key path (e.g. ["product", "productSummary"]) active_field_path: List[str] = [] # Holds the current streaming value for the active field. streaming_value: str = "" stream = await adapter.callLlm( product=details, recent_informations='', language=language ) # Use a synchronous context manager and iterate over the stream. with stream as s: for chunk in s: chunk_data = chunk.to_dict() snapshot_raw = chunk_data.get("snapshot", {}) if isinstance(snapshot_raw, str): acc_buffer += snapshot_raw try: # Attempt to decode a complete JSON object from the buffer. snapshot = json.loads(acc_buffer) # Process complete JSON object. choices = snapshot.get("choices", [{}]) if choices: message = choices[0].get("message", {}) if isinstance(message, dict): # Process structure update from "parsed" field. if "parsed" in message: parsed_value = message["parsed"] if isinstance(parsed_value, dict): new_structure = parsed_value delta_structure = compute_structure_delta(final_output, new_structure) if delta_structure: # In this design we assume the model outputs one main object (e.g. "product") # and then further keys inside it. We use the last changed key as the active key. if "product" in delta_structure and isinstance(delta_structure["product"], dict): changes = delta_structure["product"] if changes: # Use the last updated key active_key = list(changes.keys())[-1] active_field_path = ["product", active_key] # Reset streaming value to the new completed content. streaming_value = new_structure["product"].get(active_key, "") # Emit structure change message. update_msg = {"key": dot_path(active_field_path), "value": streaming_value} print(json.dumps(update_msg, ensure_ascii=False)) else: # No delta means no new structural keys. pass final_output = new_structure else: # Fallback for non-dict parsed values. new_str = str(parsed_value) delta_str = new_str[len(streaming_value):] if new_str.startswith(streaming_value) else new_str if delta_str: update_msg = {"key": dot_path(active_field_path), "value": delta_str} print(json.dumps(update_msg, ensure_ascii=False)) streaming_value = new_str # Process value tokens from the "content" field. if "content" in message: full_content = message["content"] # Compute incremental delta. delta_content = full_content[len(streaming_value):] if full_content.startswith(streaming_value) else full_content if delta_content: # Append delta. streaming_value += delta_content # Update our final output with the new value. if active_field_path: set_nested_value(final_output, active_field_path, streaming_value) update_msg = {"key": dot_path(active_field_path), "value": delta_content} print(json.dumps(update_msg, ensure_ascii=False)) else: streaming_value = full_content # Reset buffer after a complete JSON is processed. acc_buffer = "" except json.JSONDecodeError: # If JSON is incomplete, try to extract partial tokens from the "content" field. content_match = re.search(r'"content"\s*:\s*"([^"]*)', acc_buffer) if content_match: token = content_match.group(1) delta = token[len(streaming_value):] if token.startswith(streaming_value) else token if delta: streaming_value += delta if active_field_path: set_nested_value(final_output, active_field_path, streaming_value) update_msg = {"key": dot_path(active_field_path), "value": delta} print(json.dumps(update_msg, ensure_ascii=False)) # Continue waiting for more data. continue else: # If snapshot_raw is already a JSON object. snapshot = snapshot_raw choices = snapshot.get("choices", [{}]) if choices: message = choices[0].get("message", {}) if isinstance(message, dict): if "parsed" in message: parsed_value = message["parsed"] if isinstance(parsed_value, dict): new_structure = parsed_value delta_structure = compute_structure_delta(final_output, new_structure) if delta_structure: if "product" in delta_structure and isinstance(delta_structure["product"], dict): changes = delta_structure["product"] if changes: active_key = list(changes.keys())[-1] active_field_path = ["product", active_key] streaming_value = new_structure["product"].get(active_key, "") update_msg = {"key": dot_path(active_field_path), "value": streaming_value} print(json.dumps(update_msg, ensure_ascii=False)) final_output = new_structure else: new_str = str(parsed_value) delta_str = new_str[len(streaming_value):] if new_str.startswith(streaming_value) else new_str if delta_str: update_msg = {"key": dot_path(active_field_path), "value": delta_str} print(json.dumps(update_msg, ensure_ascii=False)) streaming_value = new_str if "content" in message: token = message["content"] delta = token[len(streaming_value):] if token.startswith(streaming_value) else token if delta: streaming_value += delta if active_field_path: set_nested_value(final_output, active_field_path, streaming_value) update_msg = {"key": dot_path(active_field_path), "value": delta} print(json.dumps(update_msg, ensure_ascii=False)) # For demonstration, print the combined final JSON so far. print("Current final JSON output:", json.dumps(final_output, ensure_ascii=False)) print("Final collected response (JSON):", json.dumps(final_output, ensure_ascii=False)) if __name__ == '__main__': asyncio.run(call())
Editor is loading...
Leave a Comment