Untitled

mail@pastecode.io avatarunknown
plain_text
a month ago
10 kB
1
Indexable
Never
import os
print(os.getcwd())
import shutil
import pandas as pd
import numpy
import glob
import json
from PyPDF2 import PdfReader

from .ExtractExcel import excel
from .ExtractEditablePdf import editable
from .Helper import inputToInterim,find_pdf_type
from .ExtractOtherPdf import scanned,scanned_image

from PIL import Image
from upload_api_helper import celery_app
python_file_path = os.path.realpath(os.path.dirname(__file__))
# print(python_file_path)
quality_detection = True
evaluation_list = []



def extraction_json_format(path, hospital_id, docTrack):
    """
    path: path to Raw Data folder
    """
    output = []
    print()

    for doc_num , doc_dir in enumerate(glob.glob(os.path.join(path, "*"))):
        doc_dirname = os.path.basename(doc_dir)
        for sub_folder in glob.glob(os.path.join(doc_dir, "*")): # Meta Data Raw Data temp
            if sub_folder.startswith("."):
                continue
            if "Raw Data" in sub_folder.split("/"):
                for file in glob.glob(os.path.join(sub_folder, "*")):
                    if file.startswith("."):
                        continue

                    # for file in glob.glob(os.path.join(sub_folder, "*")):
                    file_json = {"hospitalId": hospital_id}

                    excel_data_df = pd.read_excel(file, sheet_name='Data',header=None)
                    excel_bbox_df = pd.read_excel(file, sheet_name='Bbox',header=None)
                    response = []
                    index = 1
                    import math
                    import numpy
                    for i in range(0, len(excel_data_df.index)):
                        for j in range(0, len(excel_bbox_df.columns)):
                            try:
                                data = excel_data_df.values[i, j]
                                if math.isnan(data) or numpy.isnan(data):
                                    data = ""
                            except:
                                data = ""
                            try:
                                bbox =  excel_bbox_df.values[i, j]
                            except:
                                bbox =[0,0,0,0]
                            #print(type(data),data)
                            cell_desc = {"boundingBox": str(bbox),
                                "code": "CELL",
                                "confidence": 100,
                                "confidenceColor": "GREEN",
                                "fieldChanged": False,
                                "fieldType": f"C{j}R{i}",
                                "label": f"CELL{index}",
                                "value": str(data)}
                            index = index+1
                            response.append(cell_desc)

                    filename = os.path.basename(file).rsplit(".",1)[0]
                    page = filename.split("_page_")[1].split("_")[0]
                    table = filename.split("_table_")[-1]


                    ##
                    try:
                        doc_name = os.path.basename(doc_dir)
                        document = docTrack.get(doc_name.strip())
                    except:
                        print("document number not found, assigning default")
                        document = str(doc_num + 99)

                    pageViewURL = ''
                    file_json["page"] = page
                    file_json["table"] = table
                    file_json["document"] = document
                    file_json["pageViewURL"] = pageViewURL
                    file_json["response"] = response

                    output.append(file_json)

            else:
                # print("Raw Data folder not found. ", sub_folder)
                pass

    return output


def process(pdf_dir , destination):
    global evaluation_list
    try:
        base_name = pdf_dir.rsplit("/",1)[1]
    except:
        base_name = pdf_dir
    for pdf_name in os.listdir(pdf_dir):
        file_extraction_info = ["", "", ""]
        if pdf_name.startswith("."):
            continue

        input_file_path = os.path.join(pdf_dir,pdf_name)
        file_extraction_info[0] = pdf_name
        current_working_directory = os.path.join(destination,pdf_name)

        for i in ["Inputs", "Raw Data", "Meta Data", "temp/images", "temp/jsons"]:
            if not os.path.exists(os.path.join(destination,pdf_name , i)):
                os.makedirs(os.path.join(destination, pdf_name, i))
        shutil.copy(input_file_path,os.path.join(destination, pdf_name,"Inputs"))

        # Redirect code for [.xls, .pdf, .tif, .jpg ]
        if ".xls" in pdf_name.lower() and ".xlsb" not in pdf_name.lower() :
            file_extraction_info[1] = "Excel Handling"
            xls = pd.ExcelFile(input_file_path)
            sheetnames = xls.sheet_names
            meta_dict = {"Path": [], "Room Key": ""}
            lis = []
            pno = 1
            for name in sheetnames:
                try:
                    no =0
                    tables = excel(input_file_path, name) # return a list of dataframe in that sheet
                    for tab in tables:
                        no=no+1
                      tab_bbox = []
                        for _,row in tab.iterrows():
                            v =[]
                            for col in row:
                                v.append([0,0,0,0])
                            tab_bbox.append(v)
                        tab_bbox = pd.DataFrame(tab_bbox)
                        with pd.ExcelWriter(os.path.join(current_working_directory,"Raw Data",f'_page_{pno}_table_{no}.xlsx')) as writer:
                            tab.to_excel(writer, index=False, header=False, sheet_name="Data")
                            tab_bbox.to_excel(writer, index=False, header=False, sheet_name="Bbox")
                        lis.append(f'_page_{pno}_table_{no}.xlsx')
                    pno = pno+1

                except Exception as e:
                    print([i, name, str(e)])
            file_extraction_info[2] = len(lis)
            meta_dict["Path"] = lis
            meta = pd.DataFrame.from_dict(meta_dict)
            meta.to_excel(os.path.join(current_working_directory, "Meta Data", f'Meta.xlsx'),index=False)

        if ".pdf" in pdf_name.lower():
            ty = find_pdf_type(input_file_path)
            if ty=="Scanned":
                file_extraction_info[1] = "Textract"
                scanned(input_file_path, current_working_directory)
            elif ty== "Editable":
                try:
                    file_extraction_info[1] = "Camelot"
                    edit = editable(input_file_path,current_working_directory,quality_detection=quality_detection)
                except Exception as e:
                    print("line 73",e)
                    edit = True
                    file_extraction_info[1] = "Textract"
                    scanned(input_file_path, current_working_directory)

                if edit == False:
                    file_extraction_info[1] = "Textract"
                    scanned(input_file_path, current_working_directory)

        # elif ".tif" in pdf_name.lower():
        #     file_extraction_info[1] = "Textract"
        #     im = Image.open(pdf_name)
        #     ext = pdf_name.split(0)[-1]
        #     ny = pdf_name.replace(ext,".jpg")
        #     im.save(input_file_path.replace(pdf_name,ny), 'JPEG')
        #     scanned_image(input_file_path.replace(pdf_name,ny), current_working_directory)

        # elif ".jpg" in pdf_name.lower() or ".png" in pdf_name.lower() or ".jpeg" in pdf_name.lower():
        #     file_extraction_info[1] = "Textract"
        #     scanned_image(input_file_path, current_working_directory)
        # else:
        #     print("new Format file ",pdf_name)

        file_extraction_info = [base_name] + file_extraction_info
        print(file_extraction_info)
        evaluation_list.append(file_extraction_info)
        # shutil.rmtree(os.path.join(destination, pdf_name,"temp")) # comment if you want temp folders
        shutil.rmtree(os.path.join(destination, pdf_name, "Inputs"))





def auto_extraction(path: str, docTrack: dict, s3URL= "") -> tuple:
# def auto_extraction(self, path: str, docTrack: dict, s3URL= "") -> tuple:
    hid = path.split("/")[-1]
    interimPath = path
    outputpath = path + "/../../extraction_interim/" + hid  # first run input_from_ids
    # if not os.path.exists(interimPath):
    #     os.makedirs(interimPath)
        # inputToInterim(path, interimPath)
    if not os.path.exists(outputpath):
        os.makedirs(outputpath)

    try:
        print("we will process files in this folder "+ interimPath)
        print(os.listdir(interimPath))
        process(interimPath, outputpath)
        #for i in os.listdir(interimPath):
            #if i == ".DS_Store":
             #   continue
            #process(interimPath, outputpath, i)

        try:
            evaluation_dataframe = pd.DataFrame(evaluation_list,columns=["HID", "FileName", "ExtractionMode", "ExtractedTables"])
            evaluation_dataframe.to_excel(path + ".xlsx", index=False)

        except Exception as e:
            print(f"Error in summary creation - {e}")
        print("**Extraction completed Creating Evaluation Dataframe**")

    except Exception as e:
        print(f"**Error - {e}, FORCE QUIT **")
        # shutil.rmtree(outputpath)
        # shutil.rmtree(interimPath)
        quit()

    # save json extraction response
    json_output = extraction_json_format(outputpath, hid, docTrack)


    return outputpath, json_output

# auto_extraction("/home/devanshisukhija/Documents/dfg/code/uploads/INPUT/2",{'HEGIC-HS-60470-10-15.pdf': '1'}, "s3URL")