Untitled
unknown
python
3 years ago
14 kB
12
Indexable
import fitz
import io
import requests
from time import time
import json
import re
from operator import itemgetter
def fetch_pdf():
pdf_url = "https://www.bain.com/globalassets/noindex/2023/bain_report_global_m_and_a_report_2023.pdf"
print('Fetching')
start = time()
r = requests.get(pdf_url)
pdf_file = None
if r.status_code == 200:
pdf_file = io.BytesIO(r.content)
if pdf_file is None:
raise Exception('Could not load PDF')
print(f'Loaded in {time() - start:.2f} seconds')
return pdf_file
class MetaData:
def __init__(
self,
document_id: str,
section_idx: int,
paragraph_idx:int,
sentence_idx: int,
text: str,
title: str,
url: str,
doc_type: str,
line_number: int,
page_number: int,
tokens:list[int]=[],
):
"""
:param document_id:
:param section_idx:
:param paragraph_idx:
:param sentence_idx:
:param text:
:param title:
:param url:
:param doc_type:
:param line_number:
:param page_number:
:param tokens:
"""
self.document_id = document_id
self.section_idx = section_idx
self.paragraph_idx = paragraph_idx
self.sentence_idx = sentence_idx
self.text = text
self.title = title
self.url = url
self.doc_type = doc_type
self.line_number = line_number
self.page_number = page_number
self.tokens = tokens
def to_json(self):
return json.dumps(self, default=lambda o: o.__dict__,
sort_keys=True, indent=4)
def to_dict(self):
"""
Returns dictionary of the object properties and values
:return:
"""
return vars(self)
class DocVec:
def __init__(
self,
vector_id: str,
emb_vec: list,
metadata: MetaData,
):
"""
:param vector_id:
:param emb_vec:
:param metadata:
"""
self.vector_id = vector_id
self.emb_vec = emb_vec
self.metadata = metadata
def to_json(self):
return json.dumps(self, default=lambda o: o.__dict__,
sort_keys=True, indent=4)
def to_dict(self):
"""
Returns dictionary of the object properties and values
:return:
"""
return vars(self)
def fonts(pdf_obj, granularity=False):
"""Extracts fonts and their usage in PDF documents.
:param pdf_obj: PDF document to iterate through
:type pdf_obj: <class 'fitz.fitz.Document'>
:param granularity: also use 'font', 'flags' and 'color' to discriminate text
:type granularity: bool
:rtype: [(font_size, count), (font_size, count}], dict
:return: most used fonts sorted by count, font style information
"""
styles = {}
font_counts = {}
for page in pdf_obj:
blocks = page.get_text("dict")["blocks"]
for b in blocks: # iterate through the text blocks
if b['type'] == 0: # block contains text
for l in b["lines"]: # iterate through the text lines
for s in l["spans"]: # iterate through the text spans
if granularity:
identifier = "{0}_{1}_{2}_{3}".format(s['size'], s['flags'], s['font'], s['color'])
styles[identifier] = {'size': s['size'], 'flags': s['flags'], 'font': s['font'],
'color': s['color']}
else:
identifier = "{0}".format(s['size'])
styles[identifier] = {'size': s['size'], 'font': s['font']}
font_counts[identifier] = font_counts.get(identifier, 0) + 1 # count the fonts usage
font_counts = sorted(font_counts.items(), key=itemgetter(1), reverse=True)
if len(font_counts) < 1:
raise ValueError("Zero discriminating fonts found!")
return font_counts, styles
def font_tags(font_counts, styles):
"""Returns dictionary with font sizes as keys and tags as value.
:param font_counts: (font_size, count) for all fonts occuring in document
:type font_counts: list
:param styles: all styles found in the document
:type styles: dict
:rtype: dict
:return: all element tags based on font-sizes
"""
p_style = styles[font_counts[0][0]] # get style for most used font by count (paragraph)
p_size = p_style['size'] # get the paragraph's size
# sorting the font sizes high to low, so that we can append the right integer to each tag
font_sizes = []
for (font_size, count) in font_counts:
font_sizes.append(float(font_size))
font_sizes.sort(reverse=True)
# aggregating the tags for each font size
idx = 0
size_tag = {}
for size in font_sizes:
idx += 1
if size == p_size:
idx = 0
size_tag[size] = '<p>'
if size > p_size:
size_tag[size] = '<h{0}>'.format(idx)
elif size < p_size:
size_tag[size] = '<s{0}>'.format(idx)
return size_tag
def headers_para(pdf_obj, size_tag, start_page_idx, end_page_idx):
"""Scrapes headers & paragraphs from PDF and return texts with element tags.
:param pdf_obj: PDF document to iterate through
:type pdf_obj: <class 'fitz.fitz.Document'>
:param size_tag: textual element tags for each size
:type size_tag: dict
:param start_page_idx: index of first page to scrape
:type start_page_idx: int
:param end_page_idx: index of last page to scrape
:type end_page_idx: int
:rtype: list
:return: texts with pre-prended element tags
"""
header_para = [] # list with headers and paragraphs
page_numbers = []
first = True # boolean operator for first header
previous_s = {} # previous span
for i, page in enumerate(pdf_obj):
if i < start_page_idx or i > end_page_idx:
continue
blocks = page.get_text("dict")["blocks"]
for b in blocks: # iterate through the text blocks
if b['type'] == 0: # this block contains text
# REMEMBER: multiple fonts and sizes are possible IN one block
block_string = "" # text found in block
for l in b["lines"]: # iterate through the text lines
for s in l["spans"]: # iterate through the text spans
if s['text'].strip(): # removing whitespaces:
if first:
previous_s = s
first = False
block_string = size_tag[s['size']] + s['text']
else:
if s['size'] == previous_s['size']:
if block_string and all((c == "|") for c in block_string):
# block_string only contains pipes
block_string = size_tag[s['size']] + s['text']
if block_string == "":
# new block has started, so append size tag
block_string = size_tag[s['size']] + s['text']
else: # in the same block, so concatenate strings
block_string += " " + s['text']
else:
header_para.append(block_string)
block_string = size_tag[s['size']] + s['text']
previous_s = s
# new block started, indicating with a pipe
block_string += "|"
# Page Number indicator
header_para.append(block_string)
page_numbers.append(i + 1)
return header_para, page_numbers
def get_pdf_metadata(pdf_file_bytes, **kwargs) -> (fitz.Document, str, MetaData):
doc_id = str(secrets.token_hex(6))
pdf_obj = fitz.open(stream=pdf_file_bytes.read(), filetype="pdf")
doc_data = {
'document_id': doc_id,
'section_idx': -1,
'paragraph_idx': -1,
'sentence_idx': -1,
'text': 'document',
'title': ccxt.safe_string(
pdf_obj.metadata, 'title',
ccxt.safe_string(kwargs, 'title', '')
),
'url': ccxt.safe_string(kwargs, 'url', 'file'),
'line_number': -1, # TODO
'page_number': -1,
'doc_type': 'pdf',
'tokens': [] # TODO
}
if doc_data['title'] == '':
if doc_data['url'] != 'file':
doc_data['title'] = doc_data['url'].split('/')[-1]
else:
doc_data['title'] = 'Untitled'
doc_metadata = MetaData(**doc_data)
return pdf_obj, doc_id, doc_metadata
def parse_pdf(
pdf_obj: fitz.Document,
document_id: str,
document_metadata: MetaData,
return_sections: bool = True,
return_paragraphs: bool = True,
return_sentences: bool = False,
**kwargs
) -> list[
(
[str], [MetaData]
)
]:
font_counts, styles = fonts(pdf_obj, granularity=False)
size_tag = font_tags(font_counts, styles)
num_pages = len(pdf_obj)
start_idx = int(num_pages * 0.05)
end_idx = int(num_pages * 0.95)
header_para, page_numbers = headers_para(
pdf_obj, size_tag,
start_page_idx=start_idx,
end_page_idx=end_idx
)
current = ''
metadata = document_metadata.to_dict()
section_idx = 0
section = {
'header': '',
'paragraphs': []
}
sections_metadata_list = []
sections_ids_list = []
paragraphs_metadata_list = []
paragraphs_ids_list = []
current_page_number = None
for x, page_number in zip(header_para, page_numbers):
current_page_number = page_number
last = current
if x.startswith("<h"):
section['header'] = x.replace("|", "")
current = 'header'
if x.startswith("<p"):
x = x.replace("<p>", " ").replace("|", "")
section['paragraphs'].append(x)
current = 'paragraph'
p_meta = metadata
p_meta['page_number'] = int(page_number) + 1
p_meta['paragraph_idx'] = len(section['paragraphs']) - 1
p_meta['text'] = x
p_meta['section_idx'] = section_idx
p_meta['sentence_idx'] = -1
p_meta['tokens'] = []
p_meta['line_number'] = -1
p_meta['doc_type'] = 'pdf'
p_meta['url'] = metadata['url']
p_meta['title'] = metadata['title']
p_meta['document_id'] = metadata['document_id']
paragraph_metadata = MetaData(**p_meta)
paragraphs_metadata_list.append(paragraph_metadata)
vector_id = document_id + '_'
vector_id += str(section_idx) + '_'
vector_id += str(p_meta['paragraph_idx'])
paragraphs_ids_list.append(vector_id)
# most recent header and multiple body only
if last == 'paragraph' and current == 'header':
s_meta = metadata
header = section['header']
header = re.sub(r'<[^>]+>', '', header)
capital_header = header.upper()
s_meta['section_idx'] = section_idx
s_meta['text'] = capital_header + " ### " + " ".join(section['paragraphs'])
s_meta['paragraph_idx'] = -1
s_meta['page_number'] = int(page_number) + 1
s_meta['sentence_idx'] = -1
s_meta['tokens'] = []
s_meta['line_number'] = -1
s_meta['doc_type'] = 'pdf'
s_meta['url'] = metadata['url']
s_meta['title'] = metadata['title']
s_meta['document_id'] = metadata['document_id']
section_metadata = MetaData(**s_meta)
sections_metadata_list.append(section_metadata)
vector_id = document_id + '_'
vector_id += str(section_idx)
sections_ids_list.append(vector_id)
section = {
'header': '',
'paragraphs': []
}
current = 'header'
section_idx += 1
# Add the last section
if len(section['paragraphs']) > 0:
s_meta = metadata
header = section['header']
header = re.sub(r'<[^>]+>', '', header)
capital_header = header.upper()
s_meta['section_idx'] = section_idx
s_meta['text'] = capital_header + " ### " + " ".join(section['paragraphs'])
s_meta['paragraph_idx'] = -1
s_meta['page_number'] = int(current_page_number) + 1
s_meta['sentence_idx'] = -1
s_meta['tokens'] = []
s_meta['line_number'] = -1
s_meta['doc_type'] = 'pdf'
s_meta['url'] = metadata['url']
s_meta['title'] = metadata['title']
s_meta['document_id'] = metadata['document_id']
section_metadata = MetaData(**s_meta)
sections_metadata_list.append(section_metadata)
vector_id = document_id + '_'
vector_id += str(section_idx)
sections_ids_list.append(vector_id)
if return_paragraphs:
return [
(sections_ids_list, sections_metadata_list),
(paragraphs_ids_list, paragraphs_metadata_list)
]
else:
return [(sections_ids_list, sections_metadata_list)]
Editor is loading...