Untitled
unknown
python
2 years ago
14 kB
7
Indexable
import fitz import io import requests from time import time import json import re from operator import itemgetter def fetch_pdf(): pdf_url = "https://www.bain.com/globalassets/noindex/2023/bain_report_global_m_and_a_report_2023.pdf" print('Fetching') start = time() r = requests.get(pdf_url) pdf_file = None if r.status_code == 200: pdf_file = io.BytesIO(r.content) if pdf_file is None: raise Exception('Could not load PDF') print(f'Loaded in {time() - start:.2f} seconds') return pdf_file class MetaData: def __init__( self, document_id: str, section_idx: int, paragraph_idx:int, sentence_idx: int, text: str, title: str, url: str, doc_type: str, line_number: int, page_number: int, tokens:list[int]=[], ): """ :param document_id: :param section_idx: :param paragraph_idx: :param sentence_idx: :param text: :param title: :param url: :param doc_type: :param line_number: :param page_number: :param tokens: """ self.document_id = document_id self.section_idx = section_idx self.paragraph_idx = paragraph_idx self.sentence_idx = sentence_idx self.text = text self.title = title self.url = url self.doc_type = doc_type self.line_number = line_number self.page_number = page_number self.tokens = tokens def to_json(self): return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) def to_dict(self): """ Returns dictionary of the object properties and values :return: """ return vars(self) class DocVec: def __init__( self, vector_id: str, emb_vec: list, metadata: MetaData, ): """ :param vector_id: :param emb_vec: :param metadata: """ self.vector_id = vector_id self.emb_vec = emb_vec self.metadata = metadata def to_json(self): return json.dumps(self, default=lambda o: o.__dict__, sort_keys=True, indent=4) def to_dict(self): """ Returns dictionary of the object properties and values :return: """ return vars(self) def fonts(pdf_obj, granularity=False): """Extracts fonts and their usage in PDF documents. :param pdf_obj: PDF document to iterate through :type pdf_obj: <class 'fitz.fitz.Document'> :param granularity: also use 'font', 'flags' and 'color' to discriminate text :type granularity: bool :rtype: [(font_size, count), (font_size, count}], dict :return: most used fonts sorted by count, font style information """ styles = {} font_counts = {} for page in pdf_obj: blocks = page.get_text("dict")["blocks"] for b in blocks: # iterate through the text blocks if b['type'] == 0: # block contains text for l in b["lines"]: # iterate through the text lines for s in l["spans"]: # iterate through the text spans if granularity: identifier = "{0}_{1}_{2}_{3}".format(s['size'], s['flags'], s['font'], s['color']) styles[identifier] = {'size': s['size'], 'flags': s['flags'], 'font': s['font'], 'color': s['color']} else: identifier = "{0}".format(s['size']) styles[identifier] = {'size': s['size'], 'font': s['font']} font_counts[identifier] = font_counts.get(identifier, 0) + 1 # count the fonts usage font_counts = sorted(font_counts.items(), key=itemgetter(1), reverse=True) if len(font_counts) < 1: raise ValueError("Zero discriminating fonts found!") return font_counts, styles def font_tags(font_counts, styles): """Returns dictionary with font sizes as keys and tags as value. :param font_counts: (font_size, count) for all fonts occuring in document :type font_counts: list :param styles: all styles found in the document :type styles: dict :rtype: dict :return: all element tags based on font-sizes """ p_style = styles[font_counts[0][0]] # get style for most used font by count (paragraph) p_size = p_style['size'] # get the paragraph's size # sorting the font sizes high to low, so that we can append the right integer to each tag font_sizes = [] for (font_size, count) in font_counts: font_sizes.append(float(font_size)) font_sizes.sort(reverse=True) # aggregating the tags for each font size idx = 0 size_tag = {} for size in font_sizes: idx += 1 if size == p_size: idx = 0 size_tag[size] = '<p>' if size > p_size: size_tag[size] = '<h{0}>'.format(idx) elif size < p_size: size_tag[size] = '<s{0}>'.format(idx) return size_tag def headers_para(pdf_obj, size_tag, start_page_idx, end_page_idx): """Scrapes headers & paragraphs from PDF and return texts with element tags. :param pdf_obj: PDF document to iterate through :type pdf_obj: <class 'fitz.fitz.Document'> :param size_tag: textual element tags for each size :type size_tag: dict :param start_page_idx: index of first page to scrape :type start_page_idx: int :param end_page_idx: index of last page to scrape :type end_page_idx: int :rtype: list :return: texts with pre-prended element tags """ header_para = [] # list with headers and paragraphs page_numbers = [] first = True # boolean operator for first header previous_s = {} # previous span for i, page in enumerate(pdf_obj): if i < start_page_idx or i > end_page_idx: continue blocks = page.get_text("dict")["blocks"] for b in blocks: # iterate through the text blocks if b['type'] == 0: # this block contains text # REMEMBER: multiple fonts and sizes are possible IN one block block_string = "" # text found in block for l in b["lines"]: # iterate through the text lines for s in l["spans"]: # iterate through the text spans if s['text'].strip(): # removing whitespaces: if first: previous_s = s first = False block_string = size_tag[s['size']] + s['text'] else: if s['size'] == previous_s['size']: if block_string and all((c == "|") for c in block_string): # block_string only contains pipes block_string = size_tag[s['size']] + s['text'] if block_string == "": # new block has started, so append size tag block_string = size_tag[s['size']] + s['text'] else: # in the same block, so concatenate strings block_string += " " + s['text'] else: header_para.append(block_string) block_string = size_tag[s['size']] + s['text'] previous_s = s # new block started, indicating with a pipe block_string += "|" # Page Number indicator header_para.append(block_string) page_numbers.append(i + 1) return header_para, page_numbers def get_pdf_metadata(pdf_file_bytes, **kwargs) -> (fitz.Document, str, MetaData): doc_id = str(secrets.token_hex(6)) pdf_obj = fitz.open(stream=pdf_file_bytes.read(), filetype="pdf") doc_data = { 'document_id': doc_id, 'section_idx': -1, 'paragraph_idx': -1, 'sentence_idx': -1, 'text': 'document', 'title': ccxt.safe_string( pdf_obj.metadata, 'title', ccxt.safe_string(kwargs, 'title', '') ), 'url': ccxt.safe_string(kwargs, 'url', 'file'), 'line_number': -1, # TODO 'page_number': -1, 'doc_type': 'pdf', 'tokens': [] # TODO } if doc_data['title'] == '': if doc_data['url'] != 'file': doc_data['title'] = doc_data['url'].split('/')[-1] else: doc_data['title'] = 'Untitled' doc_metadata = MetaData(**doc_data) return pdf_obj, doc_id, doc_metadata def parse_pdf( pdf_obj: fitz.Document, document_id: str, document_metadata: MetaData, return_sections: bool = True, return_paragraphs: bool = True, return_sentences: bool = False, **kwargs ) -> list[ ( [str], [MetaData] ) ]: font_counts, styles = fonts(pdf_obj, granularity=False) size_tag = font_tags(font_counts, styles) num_pages = len(pdf_obj) start_idx = int(num_pages * 0.05) end_idx = int(num_pages * 0.95) header_para, page_numbers = headers_para( pdf_obj, size_tag, start_page_idx=start_idx, end_page_idx=end_idx ) current = '' metadata = document_metadata.to_dict() section_idx = 0 section = { 'header': '', 'paragraphs': [] } sections_metadata_list = [] sections_ids_list = [] paragraphs_metadata_list = [] paragraphs_ids_list = [] current_page_number = None for x, page_number in zip(header_para, page_numbers): current_page_number = page_number last = current if x.startswith("<h"): section['header'] = x.replace("|", "") current = 'header' if x.startswith("<p"): x = x.replace("<p>", " ").replace("|", "") section['paragraphs'].append(x) current = 'paragraph' p_meta = metadata p_meta['page_number'] = int(page_number) + 1 p_meta['paragraph_idx'] = len(section['paragraphs']) - 1 p_meta['text'] = x p_meta['section_idx'] = section_idx p_meta['sentence_idx'] = -1 p_meta['tokens'] = [] p_meta['line_number'] = -1 p_meta['doc_type'] = 'pdf' p_meta['url'] = metadata['url'] p_meta['title'] = metadata['title'] p_meta['document_id'] = metadata['document_id'] paragraph_metadata = MetaData(**p_meta) paragraphs_metadata_list.append(paragraph_metadata) vector_id = document_id + '_' vector_id += str(section_idx) + '_' vector_id += str(p_meta['paragraph_idx']) paragraphs_ids_list.append(vector_id) # most recent header and multiple body only if last == 'paragraph' and current == 'header': s_meta = metadata header = section['header'] header = re.sub(r'<[^>]+>', '', header) capital_header = header.upper() s_meta['section_idx'] = section_idx s_meta['text'] = capital_header + " ### " + " ".join(section['paragraphs']) s_meta['paragraph_idx'] = -1 s_meta['page_number'] = int(page_number) + 1 s_meta['sentence_idx'] = -1 s_meta['tokens'] = [] s_meta['line_number'] = -1 s_meta['doc_type'] = 'pdf' s_meta['url'] = metadata['url'] s_meta['title'] = metadata['title'] s_meta['document_id'] = metadata['document_id'] section_metadata = MetaData(**s_meta) sections_metadata_list.append(section_metadata) vector_id = document_id + '_' vector_id += str(section_idx) sections_ids_list.append(vector_id) section = { 'header': '', 'paragraphs': [] } current = 'header' section_idx += 1 # Add the last section if len(section['paragraphs']) > 0: s_meta = metadata header = section['header'] header = re.sub(r'<[^>]+>', '', header) capital_header = header.upper() s_meta['section_idx'] = section_idx s_meta['text'] = capital_header + " ### " + " ".join(section['paragraphs']) s_meta['paragraph_idx'] = -1 s_meta['page_number'] = int(current_page_number) + 1 s_meta['sentence_idx'] = -1 s_meta['tokens'] = [] s_meta['line_number'] = -1 s_meta['doc_type'] = 'pdf' s_meta['url'] = metadata['url'] s_meta['title'] = metadata['title'] s_meta['document_id'] = metadata['document_id'] section_metadata = MetaData(**s_meta) sections_metadata_list.append(section_metadata) vector_id = document_id + '_' vector_id += str(section_idx) sections_ids_list.append(vector_id) if return_paragraphs: return [ (sections_ids_list, sections_metadata_list), (paragraphs_ids_list, paragraphs_metadata_list) ] else: return [(sections_ids_list, sections_metadata_list)]
Editor is loading...