Untitled
unknown
plain_text
2 days ago
4.1 kB
3
Indexable
from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks from fastapi.responses import JSONResponse, StreamingResponse from pandas.errors import EmptyDataError, ParserError from typing import Union import pandas as pd import io import os import uuid import logging # Create temp folder if not exists os.makedirs("temp", exist_ok=True) # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI() # In-memory job tracker job_status = {} def read_file(path: str, sheet_name: Union[str, None] = None) -> pd.DataFrame: if path.endswith((".xlsx", ".xls")): return pd.read_excel(path, sheet_name=sheet_name or 0) else: return pd.read_csv(path) def merge_claim_member(claim_df: pd.DataFrame, member_df: pd.DataFrame) -> pd.DataFrame: return claim_df.merge(member_df, left_on="mem_id", right_on="member_id", how="inner") def generate_excel(df: pd.DataFrame, sheet_name: str = "MergedData") -> io.BytesIO: output = io.BytesIO() with pd.ExcelWriter(output, engine="xlsxwriter") as writer: df.to_excel(writer, index=False, sheet_name=sheet_name) output.seek(0) return output def process_merge_job(job_id: str, claim_path: str, member_path: str, claim_sheet: Union[str, None], member_sheet: Union[str, None]): try: claim_df = read_file(claim_path, claim_sheet) member_df = read_file(member_path, member_sheet) if "mem_id" not in claim_df.columns or "member_id" not in member_df.columns: job_status[job_id] = {"status": "failed", "error": "Required columns missing."} return merged_df = merge_claim_member(claim_df, member_df) excel_file = generate_excel(merged_df) merged_path = f"temp/{job_id}_merged.xlsx" with open(merged_path, "wb") as f: f.write(excel_file.read()) job_status[job_id] = { "status": "completed", "claim_rows": len(claim_df), "member_rows": len(member_df), "merged_rows": len(merged_df), "download_url": f"/download/{job_id}" } logger.info(f"Job {job_id} completed successfully.") except (EmptyDataError, ParserError, ValueError, Exception) as e: logger.error(f"Job {job_id} failed: {str(e)}") job_status[job_id] = {"status": "failed", "error": str(e)} @app.post("/start-merge-job") async def start_merge_job( background_tasks: BackgroundTasks, claim_data: UploadFile = File(...), member_data: UploadFile = File(...), claim_sheet: Union[str, None] = Form(None), member_sheet: Union[str, None] = Form(None) ): job_id = str(uuid.uuid4()) claim_path = f"temp/{job_id}_claim{os.path.splitext(claim_data.filename)[1]}" member_path = f"temp/{job_id}_member{os.path.splitext(member_data.filename)[1]}" with open(claim_path, "wb") as f: f.write(await claim_data.read()) with open(member_path, "wb") as f: f.write(await member_data.read()) job_status[job_id] = {"status": "in progress"} background_tasks.add_task(process_merge_job, job_id, claim_path, member_path, claim_sheet, member_sheet) return {"job_id": job_id, "message": "Job started. Use /check-status/{job_id} to track progress."} @app.get("/check-status/{job_id}") async def check_status(job_id: str): if job_id not in job_status: return JSONResponse(status_code=404, content={"message": "Invalid job ID."}) return job_status[job_id] @app.get("/download/{job_id}") async def download_file(job_id: str): merged_path = f"temp/{job_id}_merged.xlsx" if not os.path.exists(merged_path): return JSONResponse(status_code=404, content={"message": "File not ready or job failed."}) return StreamingResponse(open(merged_path, "rb"), media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", headers={"Content-Disposition": f"attachment; filename=merged_{job_id}.xlsx"})
Editor is loading...
Leave a Comment