Untitled
unknown
plain_text
9 months ago
9.3 kB
8
Indexable
async def call():
adapter = OpenAiAdapter(
OpenAI(api_key=os.getenv('OPENAI_API_KEY'), max_retries=2),
"gpt-4o"
)
details = ProductDetails()
language = ReportLanguage(code='pl')
details.product_name = 'Dyson Purifier Cool Formaldehyde TP09'
details.product_website_content = ''
# This will hold our final structured JSON which is updated continuously.
final_output: Dict[str, Any] = {}
# Buffer for accumulating partial JSON text.
acc_buffer: str = ""
# Track the active nested key path (e.g. ["product", "productSummary"])
active_field_path: List[str] = []
# Holds the current streaming value for the active field.
streaming_value: str = ""
stream = await adapter.callLlm(
product=details,
recent_informations='',
language=language
)
# Use a synchronous context manager and iterate over the stream.
with stream as s:
for chunk in s:
chunk_data = chunk.to_dict()
snapshot_raw = chunk_data.get("snapshot", {})
if isinstance(snapshot_raw, str):
acc_buffer += snapshot_raw
try:
# Attempt to decode a complete JSON object from the buffer.
snapshot = json.loads(acc_buffer)
# Process complete JSON object.
choices = snapshot.get("choices", [{}])
if choices:
message = choices[0].get("message", {})
if isinstance(message, dict):
# Process structure update from "parsed" field.
if "parsed" in message:
parsed_value = message["parsed"]
if isinstance(parsed_value, dict):
new_structure = parsed_value
delta_structure = compute_structure_delta(final_output, new_structure)
if delta_structure:
# In this design we assume the model outputs one main object (e.g. "product")
# and then further keys inside it. We use the last changed key as the active key.
if "product" in delta_structure and isinstance(delta_structure["product"], dict):
changes = delta_structure["product"]
if changes:
# Use the last updated key
active_key = list(changes.keys())[-1]
active_field_path = ["product", active_key]
# Reset streaming value to the new completed content.
streaming_value = new_structure["product"].get(active_key, "")
# Emit structure change message.
update_msg = {"key": dot_path(active_field_path), "value": streaming_value}
print(json.dumps(update_msg, ensure_ascii=False))
else:
# No delta means no new structural keys.
pass
final_output = new_structure
else:
# Fallback for non-dict parsed values.
new_str = str(parsed_value)
delta_str = new_str[len(streaming_value):] if new_str.startswith(streaming_value) else new_str
if delta_str:
update_msg = {"key": dot_path(active_field_path), "value": delta_str}
print(json.dumps(update_msg, ensure_ascii=False))
streaming_value = new_str
# Process value tokens from the "content" field.
if "content" in message:
full_content = message["content"]
# Compute incremental delta.
delta_content = full_content[len(streaming_value):] if full_content.startswith(streaming_value) else full_content
if delta_content:
# Append delta.
streaming_value += delta_content
# Update our final output with the new value.
if active_field_path:
set_nested_value(final_output, active_field_path, streaming_value)
update_msg = {"key": dot_path(active_field_path), "value": delta_content}
print(json.dumps(update_msg, ensure_ascii=False))
else:
streaming_value = full_content
# Reset buffer after a complete JSON is processed.
acc_buffer = ""
except json.JSONDecodeError:
# If JSON is incomplete, try to extract partial tokens from the "content" field.
content_match = re.search(r'"content"\s*:\s*"([^"]*)', acc_buffer)
if content_match:
token = content_match.group(1)
delta = token[len(streaming_value):] if token.startswith(streaming_value) else token
if delta:
streaming_value += delta
if active_field_path:
set_nested_value(final_output, active_field_path, streaming_value)
update_msg = {"key": dot_path(active_field_path), "value": delta}
print(json.dumps(update_msg, ensure_ascii=False))
# Continue waiting for more data.
continue
else:
# If snapshot_raw is already a JSON object.
snapshot = snapshot_raw
choices = snapshot.get("choices", [{}])
if choices:
message = choices[0].get("message", {})
if isinstance(message, dict):
if "parsed" in message:
parsed_value = message["parsed"]
if isinstance(parsed_value, dict):
new_structure = parsed_value
delta_structure = compute_structure_delta(final_output, new_structure)
if delta_structure:
if "product" in delta_structure and isinstance(delta_structure["product"], dict):
changes = delta_structure["product"]
if changes:
active_key = list(changes.keys())[-1]
active_field_path = ["product", active_key]
streaming_value = new_structure["product"].get(active_key, "")
update_msg = {"key": dot_path(active_field_path), "value": streaming_value}
print(json.dumps(update_msg, ensure_ascii=False))
final_output = new_structure
else:
new_str = str(parsed_value)
delta_str = new_str[len(streaming_value):] if new_str.startswith(streaming_value) else new_str
if delta_str:
update_msg = {"key": dot_path(active_field_path), "value": delta_str}
print(json.dumps(update_msg, ensure_ascii=False))
streaming_value = new_str
if "content" in message:
token = message["content"]
delta = token[len(streaming_value):] if token.startswith(streaming_value) else token
if delta:
streaming_value += delta
if active_field_path:
set_nested_value(final_output, active_field_path, streaming_value)
update_msg = {"key": dot_path(active_field_path), "value": delta}
print(json.dumps(update_msg, ensure_ascii=False))
# For demonstration, print the combined final JSON so far.
print("Current final JSON output:", json.dumps(final_output, ensure_ascii=False))
print("Final collected response (JSON):", json.dumps(final_output, ensure_ascii=False))
if __name__ == '__main__':
asyncio.run(call())Editor is loading...
Leave a Comment