Untitled
unknown
plain_text
8 days ago
3.7 kB
3
Indexable
from fastapi import FastAPI, UploadFile, File, Form, Request from fastapi.responses import JSONResponse, FileResponse from pandas.errors import EmptyDataError, ParserError from typing import Union, Tuple import pandas as pd import io, uuid, os, time, logging app = FastAPI() logging.basicConfig(level=logging.INFO) JOBS_DIR = "jobs" os.makedirs(JOBS_DIR, exist_ok=True) def read_uploaded_file(file: UploadFile, sheet_name: Union[str, None] = None) -> Tuple[str, Union[pd.DataFrame, str]]: filename = file.filename try: content = file.file.read() if filename.endswith((".xlsx", ".xls")): df = pd.read_excel(io.BytesIO(content), sheet_name=sheet_name or 0) else: df = pd.read_csv(io.StringIO(content.decode("utf-8"))) return filename, df except (EmptyDataError, ParserError, UnicodeDecodeError, ValueError) as e: return filename, f"{filename} read error: {str(e)}" except Exception as e: return filename, f"{filename} unexpected error: {str(e)}" def validate_required_columns(df: pd.DataFrame, required_col: str, file_label: str) -> Union[None, str]: if required_col not in df.columns: return f"'{file_label}' missing required column: '{required_col}'" return None def merge_claim_member(claim_df: pd.DataFrame, member_df: pd.DataFrame) -> pd.DataFrame: return claim_df.merge(member_df[['member_id', 'Comments']], left_on="mem_id", right_on="member_id", how="inner") def save_merged_file(df: pd.DataFrame, job_id: str) -> str: path = os.path.join(JOBS_DIR, f"{job_id}.xlsx") with pd.ExcelWriter(path, engine="xlsxwriter") as writer: df.to_excel(writer, index=False, sheet_name="MergedData") return path @app.post("/start-merge") async def start_merge( request: Request, claim_data: UploadFile = File(...), member_data: UploadFile = File(...), claim_sheet: Union[str, None] = Form(None), member_sheet: Union[str, None] = Form(None) ): job_id = str(uuid.uuid4()) logging.info(f"[{job_id}] Merge started from {request.client.host}") claim_name, claim_df = read_uploaded_file(claim_data, claim_sheet) member_name, member_df = read_uploaded_file(member_data, member_sheet) if isinstance(claim_df, str) or isinstance(member_df, str): errors = [err for err in [claim_df, member_df] if isinstance(err, str)] return JSONResponse(status_code=400, content={"job_id": job_id, "errors": errors}) errors = [] if err := validate_required_columns(claim_df, "mem_id", claim_name): errors.append(err) if err := validate_required_columns(member_df, "member_id", member_name): errors.append(err) if "Comments" not in member_df.columns: errors.append("Missing 'Comments' column in member file.") if errors: return JSONResponse(status_code=400, content={"job_id": job_id, "errors": errors}) merged_df = merge_claim_member(claim_df, member_df) save_merged_file(merged_df, job_id) return { "job_id": job_id, "message": "Files received successfully. Processing started.", "rows": { "claims": len(claim_df), "members": len(member_df), "merged": len(merged_df) } } @app.get("/status/{job_id}") async def check_status(job_id: str): file_path = os.path.join(JOBS_DIR, f"{job_id}.xlsx") if os.path.exists(file_path): return FileResponse(file_path, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", filename=f"{job_id}.xlsx") else: return JSONResponse(status_code=202, content={"message": "File is still processing or not found. Please check back later."})
Editor is loading...
Leave a Comment