Untitled

 avatar
unknown
plain_text
15 days ago
4.5 kB
4
Indexable
from fastapi import FastAPI, UploadFile, File, Form, Request
from fastapi.responses import StreamingResponse, JSONResponse
from pandas.errors import EmptyDataError, ParserError
from typing import Union, Tuple
import pandas as pd
import io
import logging
import time

# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

app = FastAPI()


def read_uploaded_file(file: UploadFile, sheet_name: Union[str, None] = None) -> Tuple[str, Union[pd.DataFrame, str]]:
    """Reads an uploaded CSV or Excel file into a pandas DataFrame."""
    filename = file.filename
    try:
        content = file.file.read()
        if filename.endswith((".xlsx", ".xls")):
            df = pd.read_excel(io.BytesIO(content), sheet_name=sheet_name or 0)
        else:
            df = pd.read_csv(io.StringIO(content.decode("utf-8")))
        return filename, df
    except (EmptyDataError, ParserError, UnicodeDecodeError, ValueError) as e:
        return filename, f"{filename} read error: {str(e)}"
    except Exception as e:
        return filename, f"{filename} unexpected error: {str(e)}"


def validate_required_columns(df: pd.DataFrame, required_col: str, file_label: str) -> Union[None, str]:
    """Checks if the required column is present in the DataFrame."""
    if required_col not in df.columns:
        return f"'{file_label}' missing required column: '{required_col}'"
    return None


def merge_claim_member(claim_df: pd.DataFrame, member_df: pd.DataFrame) -> pd.DataFrame:
    """Merges claim and member data on mem_id and member_id respectively."""
    return claim_df.merge(member_df, left_on="mem_id", right_on="member_id", how="inner")


def generate_excel(df: pd.DataFrame, sheet_name: str = "MergedData") -> io.BytesIO:
    """Converts a DataFrame into an Excel file stored in memory."""
    output = io.BytesIO()
    with pd.ExcelWriter(output, engine="xlsxwriter") as writer:
        df.to_excel(writer, index=False, sheet_name=sheet_name)
    output.seek(0)
    return output


@app.post("/merge-files-download")
async def merge_files_download(
    request: Request,
    claim_data: UploadFile = File(..., description="Claim data file (CSV/Excel) with 'mem_id' column"),
    member_data: UploadFile = File(..., description="Member data file (CSV/Excel) with 'member_id' column"),
    claim_sheet: Union[str, None] = Form(None),
    member_sheet: Union[str, None] = Form(None)
):
    """Merges uploaded claim and member files, returns Excel and summary stats."""
    start_time = time.time()
    client_ip = request.client.host
    logging.info(f"Request received from {client_ip}: claim_file={claim_data.filename}, member_file={member_data.filename}")

    claim_name, claim_df = read_uploaded_file(claim_data, claim_sheet)
    member_name, member_df = read_uploaded_file(member_data, member_sheet)

    if isinstance(claim_df, str) or isinstance(member_df, str):
        errors = [err for err in [claim_df, member_df] if isinstance(err, str)]
        logging.error(f"File read errors: {errors}")
        return JSONResponse(status_code=400, content={
            "message": "One or both files failed to read.",
            "errors": errors,
            "file_names": [claim_name, member_name]
        })

    errors = []
    if err := validate_required_columns(claim_df, "mem_id", claim_name):
        errors.append(err)
    if err := validate_required_columns(member_df, "member_id", member_name):
        errors.append(err)
    if errors:
        logging.warning(f"Validation failed: {errors}")
        return JSONResponse(status_code=400, content={"message": "Validation failed.", "errors": errors})

    merged_df = merge_claim_member(claim_df, member_df)
    excel_file = generate_excel(merged_df)

    duration = round(time.time() - start_time, 2)
    logging.info(f"Merge completed in {duration}s. Rows: {len(claim_df)} (claim), {len(member_df)} (member), {len(merged_df)} (merged)")

    headers = {
        "Content-Disposition": "attachment; filename=merged_data.xlsx",
        "X-Claim-Rows": str(len(claim_df)),
        "X-Member-Rows": str(len(member_df)),
        "X-Merged-Rows": str(len(merged_df)),
        "X-Processing-Time": f"{duration}s"
    }

    return StreamingResponse(
        excel_file,
        media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
        headers=headers
    )
Editor is loading...
Leave a Comment