Untitled
plain_text
a month ago
10 kB
1
Indexable
Never
import os print(os.getcwd()) import shutil import pandas as pd import numpy import glob import json from PyPDF2 import PdfReader from .ExtractExcel import excel from .ExtractEditablePdf import editable from .Helper import inputToInterim,find_pdf_type from .ExtractOtherPdf import scanned,scanned_image from PIL import Image from upload_api_helper import celery_app python_file_path = os.path.realpath(os.path.dirname(__file__)) # print(python_file_path) quality_detection = True evaluation_list = [] def extraction_json_format(path, hospital_id, docTrack): """ path: path to Raw Data folder """ output = [] print() for doc_num , doc_dir in enumerate(glob.glob(os.path.join(path, "*"))): doc_dirname = os.path.basename(doc_dir) for sub_folder in glob.glob(os.path.join(doc_dir, "*")): # Meta Data Raw Data temp if sub_folder.startswith("."): continue if "Raw Data" in sub_folder.split("/"): for file in glob.glob(os.path.join(sub_folder, "*")): if file.startswith("."): continue # for file in glob.glob(os.path.join(sub_folder, "*")): file_json = {"hospitalId": hospital_id} excel_data_df = pd.read_excel(file, sheet_name='Data',header=None) excel_bbox_df = pd.read_excel(file, sheet_name='Bbox',header=None) response = [] index = 1 import math import numpy for i in range(0, len(excel_data_df.index)): for j in range(0, len(excel_bbox_df.columns)): try: data = excel_data_df.values[i, j] if math.isnan(data) or numpy.isnan(data): data = "" except: data = "" try: bbox = excel_bbox_df.values[i, j] except: bbox =[0,0,0,0] #print(type(data),data) cell_desc = {"boundingBox": str(bbox), "code": "CELL", "confidence": 100, "confidenceColor": "GREEN", "fieldChanged": False, "fieldType": f"C{j}R{i}", "label": f"CELL{index}", "value": str(data)} index = index+1 response.append(cell_desc) filename = os.path.basename(file).rsplit(".",1)[0] page = filename.split("_page_")[1].split("_")[0] table = filename.split("_table_")[-1] ## try: doc_name = os.path.basename(doc_dir) document = docTrack.get(doc_name.strip()) except: print("document number not found, assigning default") document = str(doc_num + 99) pageViewURL = '' file_json["page"] = page file_json["table"] = table file_json["document"] = document file_json["pageViewURL"] = pageViewURL file_json["response"] = response output.append(file_json) else: # print("Raw Data folder not found. ", sub_folder) pass return output def process(pdf_dir , destination): global evaluation_list try: base_name = pdf_dir.rsplit("/",1)[1] except: base_name = pdf_dir for pdf_name in os.listdir(pdf_dir): file_extraction_info = ["", "", ""] if pdf_name.startswith("."): continue input_file_path = os.path.join(pdf_dir,pdf_name) file_extraction_info[0] = pdf_name current_working_directory = os.path.join(destination,pdf_name) for i in ["Inputs", "Raw Data", "Meta Data", "temp/images", "temp/jsons"]: if not os.path.exists(os.path.join(destination,pdf_name , i)): os.makedirs(os.path.join(destination, pdf_name, i)) shutil.copy(input_file_path,os.path.join(destination, pdf_name,"Inputs")) # Redirect code for [.xls, .pdf, .tif, .jpg ] if ".xls" in pdf_name.lower() and ".xlsb" not in pdf_name.lower() : file_extraction_info[1] = "Excel Handling" xls = pd.ExcelFile(input_file_path) sheetnames = xls.sheet_names meta_dict = {"Path": [], "Room Key": ""} lis = [] pno = 1 for name in sheetnames: try: no =0 tables = excel(input_file_path, name) # return a list of dataframe in that sheet for tab in tables: no=no+1 tab_bbox = [] for _,row in tab.iterrows(): v =[] for col in row: v.append([0,0,0,0]) tab_bbox.append(v) tab_bbox = pd.DataFrame(tab_bbox) with pd.ExcelWriter(os.path.join(current_working_directory,"Raw Data",f'_page_{pno}_table_{no}.xlsx')) as writer: tab.to_excel(writer, index=False, header=False, sheet_name="Data") tab_bbox.to_excel(writer, index=False, header=False, sheet_name="Bbox") lis.append(f'_page_{pno}_table_{no}.xlsx') pno = pno+1 except Exception as e: print([i, name, str(e)]) file_extraction_info[2] = len(lis) meta_dict["Path"] = lis meta = pd.DataFrame.from_dict(meta_dict) meta.to_excel(os.path.join(current_working_directory, "Meta Data", f'Meta.xlsx'),index=False) if ".pdf" in pdf_name.lower(): ty = find_pdf_type(input_file_path) if ty=="Scanned": file_extraction_info[1] = "Textract" scanned(input_file_path, current_working_directory) elif ty== "Editable": try: file_extraction_info[1] = "Camelot" edit = editable(input_file_path,current_working_directory,quality_detection=quality_detection) except Exception as e: print("line 73",e) edit = True file_extraction_info[1] = "Textract" scanned(input_file_path, current_working_directory) if edit == False: file_extraction_info[1] = "Textract" scanned(input_file_path, current_working_directory) # elif ".tif" in pdf_name.lower(): # file_extraction_info[1] = "Textract" # im = Image.open(pdf_name) # ext = pdf_name.split(0)[-1] # ny = pdf_name.replace(ext,".jpg") # im.save(input_file_path.replace(pdf_name,ny), 'JPEG') # scanned_image(input_file_path.replace(pdf_name,ny), current_working_directory) # elif ".jpg" in pdf_name.lower() or ".png" in pdf_name.lower() or ".jpeg" in pdf_name.lower(): # file_extraction_info[1] = "Textract" # scanned_image(input_file_path, current_working_directory) # else: # print("new Format file ",pdf_name) file_extraction_info = [base_name] + file_extraction_info print(file_extraction_info) evaluation_list.append(file_extraction_info) # shutil.rmtree(os.path.join(destination, pdf_name,"temp")) # comment if you want temp folders shutil.rmtree(os.path.join(destination, pdf_name, "Inputs")) def auto_extraction(path: str, docTrack: dict, s3URL= "") -> tuple: # def auto_extraction(self, path: str, docTrack: dict, s3URL= "") -> tuple: hid = path.split("/")[-1] interimPath = path outputpath = path + "/../../extraction_interim/" + hid # first run input_from_ids # if not os.path.exists(interimPath): # os.makedirs(interimPath) # inputToInterim(path, interimPath) if not os.path.exists(outputpath): os.makedirs(outputpath) try: print("we will process files in this folder "+ interimPath) print(os.listdir(interimPath)) process(interimPath, outputpath) #for i in os.listdir(interimPath): #if i == ".DS_Store": # continue #process(interimPath, outputpath, i) try: evaluation_dataframe = pd.DataFrame(evaluation_list,columns=["HID", "FileName", "ExtractionMode", "ExtractedTables"]) evaluation_dataframe.to_excel(path + ".xlsx", index=False) except Exception as e: print(f"Error in summary creation - {e}") print("**Extraction completed Creating Evaluation Dataframe**") except Exception as e: print(f"**Error - {e}, FORCE QUIT **") # shutil.rmtree(outputpath) # shutil.rmtree(interimPath) quit() # save json extraction response json_output = extraction_json_format(outputpath, hid, docTrack) return outputpath, json_output # auto_extraction("/home/devanshisukhija/Documents/dfg/code/uploads/INPUT/2",{'HEGIC-HS-60470-10-15.pdf': '1'}, "s3URL")