Untitled

 avatar
unknown
plain_text
3 months ago
9.3 kB
6
Indexable
async def call():
    adapter = OpenAiAdapter(
        OpenAI(api_key=os.getenv('OPENAI_API_KEY'), max_retries=2),
        "gpt-4o"
    )
    details = ProductDetails()
    language = ReportLanguage(code='pl')
    details.product_name = 'Dyson Purifier Cool Formaldehyde TP09'
    details.product_website_content = ''

    # This will hold our final structured JSON which is updated continuously.
    final_output: Dict[str, Any] = {}
    # Buffer for accumulating partial JSON text.
    acc_buffer: str = ""
    # Track the active nested key path (e.g. ["product", "productSummary"])
    active_field_path: List[str] = []
    # Holds the current streaming value for the active field.
    streaming_value: str = ""

    stream = await adapter.callLlm(
        product=details,
        recent_informations='',
        language=language
    )

    # Use a synchronous context manager and iterate over the stream.
    with stream as s:
        for chunk in s:
            chunk_data = chunk.to_dict()
            snapshot_raw = chunk_data.get("snapshot", {})

            if isinstance(snapshot_raw, str):
                acc_buffer += snapshot_raw
                try:
                    # Attempt to decode a complete JSON object from the buffer.
                    snapshot = json.loads(acc_buffer)
                    # Process complete JSON object.
                    choices = snapshot.get("choices", [{}])
                    if choices:
                        message = choices[0].get("message", {})
                        if isinstance(message, dict):
                            # Process structure update from "parsed" field.
                            if "parsed" in message:
                                parsed_value = message["parsed"]
                                if isinstance(parsed_value, dict):
                                    new_structure = parsed_value
                                    delta_structure = compute_structure_delta(final_output, new_structure)
                                    if delta_structure:
                                        # In this design we assume the model outputs one main object (e.g. "product")
                                        # and then further keys inside it. We use the last changed key as the active key.
                                        if "product" in delta_structure and isinstance(delta_structure["product"], dict):
                                            changes = delta_structure["product"]
                                            if changes:
                                                # Use the last updated key
                                                active_key = list(changes.keys())[-1]
                                                active_field_path = ["product", active_key]
                                                # Reset streaming value to the new completed content.
                                                streaming_value = new_structure["product"].get(active_key, "")
                                                # Emit structure change message.
                                                update_msg = {"key": dot_path(active_field_path), "value": streaming_value}
                                                print(json.dumps(update_msg, ensure_ascii=False))
                                    else:
                                        # No delta means no new structural keys.
                                        pass
                                    final_output = new_structure
                                else:
                                    # Fallback for non-dict parsed values.
                                    new_str = str(parsed_value)
                                    delta_str = new_str[len(streaming_value):] if new_str.startswith(streaming_value) else new_str
                                    if delta_str:
                                        update_msg = {"key": dot_path(active_field_path), "value": delta_str}
                                        print(json.dumps(update_msg, ensure_ascii=False))
                                    streaming_value = new_str
                            # Process value tokens from the "content" field.
                            if "content" in message:
                                full_content = message["content"]
                                # Compute incremental delta.
                                delta_content = full_content[len(streaming_value):] if full_content.startswith(streaming_value) else full_content
                                if delta_content:
                                    # Append delta.
                                    streaming_value += delta_content
                                    # Update our final output with the new value.
                                    if active_field_path:
                                        set_nested_value(final_output, active_field_path, streaming_value)
                                        update_msg = {"key": dot_path(active_field_path), "value": delta_content}
                                        print(json.dumps(update_msg, ensure_ascii=False))
                                else:
                                    streaming_value = full_content
                    # Reset buffer after a complete JSON is processed.
                    acc_buffer = ""
                except json.JSONDecodeError:
                    # If JSON is incomplete, try to extract partial tokens from the "content" field.
                    content_match = re.search(r'"content"\s*:\s*"([^"]*)', acc_buffer)
                    if content_match:
                        token = content_match.group(1)
                        delta = token[len(streaming_value):] if token.startswith(streaming_value) else token
                        if delta:
                            streaming_value += delta
                            if active_field_path:
                                set_nested_value(final_output, active_field_path, streaming_value)
                                update_msg = {"key": dot_path(active_field_path), "value": delta}
                                print(json.dumps(update_msg, ensure_ascii=False))
                    # Continue waiting for more data.
                    continue
            else:
                # If snapshot_raw is already a JSON object.
                snapshot = snapshot_raw
                choices = snapshot.get("choices", [{}])
                if choices:
                    message = choices[0].get("message", {})
                    if isinstance(message, dict):
                        if "parsed" in message:
                            parsed_value = message["parsed"]
                            if isinstance(parsed_value, dict):
                                new_structure = parsed_value
                                delta_structure = compute_structure_delta(final_output, new_structure)
                                if delta_structure:
                                    if "product" in delta_structure and isinstance(delta_structure["product"], dict):
                                        changes = delta_structure["product"]
                                        if changes:
                                            active_key = list(changes.keys())[-1]
                                            active_field_path = ["product", active_key]
                                            streaming_value = new_structure["product"].get(active_key, "")
                                            update_msg = {"key": dot_path(active_field_path), "value": streaming_value}
                                            print(json.dumps(update_msg, ensure_ascii=False))
                                final_output = new_structure
                            else:
                                new_str = str(parsed_value)
                                delta_str = new_str[len(streaming_value):] if new_str.startswith(streaming_value) else new_str
                                if delta_str:
                                    update_msg = {"key": dot_path(active_field_path), "value": delta_str}
                                    print(json.dumps(update_msg, ensure_ascii=False))
                                streaming_value = new_str
                        if "content" in message:
                            token = message["content"]
                            delta = token[len(streaming_value):] if token.startswith(streaming_value) else token
                            if delta:
                                streaming_value += delta
                                if active_field_path:
                                    set_nested_value(final_output, active_field_path, streaming_value)
                                    update_msg = {"key": dot_path(active_field_path), "value": delta}
                                    print(json.dumps(update_msg, ensure_ascii=False))
            # For demonstration, print the combined final JSON so far.
            print("Current final JSON output:", json.dumps(final_output, ensure_ascii=False))

    print("Final collected response (JSON):", json.dumps(final_output, ensure_ascii=False))

if __name__ == '__main__':
    asyncio.run(call())
Editor is loading...
Leave a Comment