Untitled

 avatar
unknown
python
2 years ago
14 kB
7
Indexable
import fitz
import io
import requests
from time import time
import json
import re
from operator import itemgetter



def fetch_pdf():
    pdf_url = "https://www.bain.com/globalassets/noindex/2023/bain_report_global_m_and_a_report_2023.pdf"
    print('Fetching')
    start = time()
    r = requests.get(pdf_url)
    pdf_file = None
    if r.status_code == 200:
        pdf_file = io.BytesIO(r.content)
    if pdf_file is None:
        raise Exception('Could not load PDF')
    print(f'Loaded in {time() - start:.2f} seconds')
    return pdf_file
    

class MetaData:
    def __init__(
            self,
            document_id: str,
            section_idx: int,
            paragraph_idx:int,
            sentence_idx: int,
            text: str,
            title: str,
            url: str,
            doc_type: str,
            line_number: int,
            page_number: int,
            tokens:list[int]=[],
    ):
        """

        :param document_id:
        :param section_idx:
        :param paragraph_idx:
        :param sentence_idx:
        :param text:
        :param title:
        :param url:
        :param doc_type:
        :param line_number:
        :param page_number:
        :param tokens:
        """
        self.document_id = document_id
        self.section_idx = section_idx
        self.paragraph_idx = paragraph_idx
        self.sentence_idx = sentence_idx
        self.text = text
        self.title = title
        self.url = url
        self.doc_type = doc_type
        self.line_number = line_number
        self.page_number = page_number
        self.tokens = tokens

    def to_json(self):
        return json.dumps(self, default=lambda o: o.__dict__,
                          sort_keys=True, indent=4)

    def to_dict(self):
        """
        Returns dictionary of the object properties and values
        :return:
        """
        return vars(self)


class DocVec:
    def __init__(
            self,
            vector_id: str,
            emb_vec: list,
            metadata: MetaData,
    ):
        """

        :param vector_id:
        :param emb_vec:
        :param metadata:
        """
        self.vector_id = vector_id
        self.emb_vec = emb_vec
        self.metadata = metadata

    def to_json(self):
        return json.dumps(self, default=lambda o: o.__dict__,
                          sort_keys=True, indent=4)

    def to_dict(self):
        """
        Returns dictionary of the object properties and values
        :return:
        """
        return vars(self)
        

def fonts(pdf_obj, granularity=False):
    """Extracts fonts and their usage in PDF documents.
    :param pdf_obj: PDF document to iterate through
    :type pdf_obj: <class 'fitz.fitz.Document'>
    :param granularity: also use 'font', 'flags' and 'color' to discriminate text
    :type granularity: bool
    :rtype: [(font_size, count), (font_size, count}], dict
    :return: most used fonts sorted by count, font style information
    """
    styles = {}
    font_counts = {}

    for page in pdf_obj:
        blocks = page.get_text("dict")["blocks"]
        for b in blocks:  # iterate through the text blocks
            if b['type'] == 0:  # block contains text
                for l in b["lines"]:  # iterate through the text lines
                    for s in l["spans"]:  # iterate through the text spans
                        if granularity:
                            identifier = "{0}_{1}_{2}_{3}".format(s['size'], s['flags'], s['font'], s['color'])
                            styles[identifier] = {'size': s['size'], 'flags': s['flags'], 'font': s['font'],
                                                  'color': s['color']}
                        else:
                            identifier = "{0}".format(s['size'])
                            styles[identifier] = {'size': s['size'], 'font': s['font']}

                        font_counts[identifier] = font_counts.get(identifier, 0) + 1  # count the fonts usage

    font_counts = sorted(font_counts.items(), key=itemgetter(1), reverse=True)

    if len(font_counts) < 1:
        raise ValueError("Zero discriminating fonts found!")

    return font_counts, styles


def font_tags(font_counts, styles):
    """Returns dictionary with font sizes as keys and tags as value.
    :param font_counts: (font_size, count) for all fonts occuring in document
    :type font_counts: list
    :param styles: all styles found in the document
    :type styles: dict
    :rtype: dict
    :return: all element tags based on font-sizes
    """
    p_style = styles[font_counts[0][0]]  # get style for most used font by count (paragraph)
    p_size = p_style['size']  # get the paragraph's size

    # sorting the font sizes high to low, so that we can append the right integer to each tag
    font_sizes = []
    for (font_size, count) in font_counts:
        font_sizes.append(float(font_size))
    font_sizes.sort(reverse=True)

    # aggregating the tags for each font size
    idx = 0
    size_tag = {}
    for size in font_sizes:
        idx += 1
        if size == p_size:
            idx = 0
            size_tag[size] = '<p>'
        if size > p_size:
            size_tag[size] = '<h{0}>'.format(idx)
        elif size < p_size:
            size_tag[size] = '<s{0}>'.format(idx)

    return size_tag


def headers_para(pdf_obj, size_tag, start_page_idx, end_page_idx):
    """Scrapes headers & paragraphs from PDF and return texts with element tags.
    :param pdf_obj: PDF document to iterate through
    :type pdf_obj: <class 'fitz.fitz.Document'>
    :param size_tag: textual element tags for each size
    :type size_tag: dict
    :param start_page_idx: index of first page to scrape
    :type start_page_idx: int
    :param end_page_idx: index of last page to scrape
    :type end_page_idx: int
    :rtype: list
    :return: texts with pre-prended element tags
    """
    header_para = []  # list with headers and paragraphs
    page_numbers = []
    first = True  # boolean operator for first header
    previous_s = {}  # previous span

    for i, page in enumerate(pdf_obj):
        if i < start_page_idx or i > end_page_idx:
            continue
        blocks = page.get_text("dict")["blocks"]
        for b in blocks:  # iterate through the text blocks
            if b['type'] == 0:  # this block contains text

                # REMEMBER: multiple fonts and sizes are possible IN one block

                block_string = ""  # text found in block
                for l in b["lines"]:  # iterate through the text lines
                    for s in l["spans"]:  # iterate through the text spans
                        if s['text'].strip():  # removing whitespaces:

                            if first:
                                previous_s = s
                                first = False
                                block_string = size_tag[s['size']] + s['text']
                            else:
                                if s['size'] == previous_s['size']:

                                    if block_string and all((c == "|") for c in block_string):
                                        # block_string only contains pipes
                                        block_string = size_tag[s['size']] + s['text']
                                    if block_string == "":
                                        # new block has started, so append size tag
                                        block_string = size_tag[s['size']] + s['text']
                                    else:  # in the same block, so concatenate strings
                                        block_string += " " + s['text']

                                else:
                                    header_para.append(block_string)
                                    block_string = size_tag[s['size']] + s['text']

                                previous_s = s

                    # new block started, indicating with a pipe
                    block_string += "|"
                    # Page Number indicator

                header_para.append(block_string)
                page_numbers.append(i + 1)

    return header_para, page_numbers


def get_pdf_metadata(pdf_file_bytes, **kwargs) -> (fitz.Document, str, MetaData):
    doc_id = str(secrets.token_hex(6))
    pdf_obj = fitz.open(stream=pdf_file_bytes.read(), filetype="pdf")
    doc_data = {
        'document_id': doc_id,
        'section_idx': -1,
        'paragraph_idx': -1,
        'sentence_idx': -1,
        'text': 'document',
        'title': ccxt.safe_string(
            pdf_obj.metadata, 'title',
            ccxt.safe_string(kwargs, 'title', '')
        ),
        'url': ccxt.safe_string(kwargs, 'url', 'file'),
        'line_number': -1,         # TODO
        'page_number': -1,
        'doc_type': 'pdf',
        'tokens': []        # TODO
    }
    if doc_data['title'] == '':
        if doc_data['url'] != 'file':
            doc_data['title'] = doc_data['url'].split('/')[-1]
        else:
            doc_data['title'] = 'Untitled'
    doc_metadata = MetaData(**doc_data)
    return pdf_obj, doc_id, doc_metadata


def parse_pdf(
        pdf_obj: fitz.Document,
        document_id: str,
        document_metadata: MetaData,
        return_sections: bool = True,
        return_paragraphs: bool = True,
        return_sentences: bool = False,
        **kwargs
) -> list[
    (
            [str], [MetaData]
    )
]:
    font_counts, styles = fonts(pdf_obj, granularity=False)
    size_tag = font_tags(font_counts, styles)
    num_pages = len(pdf_obj)
    start_idx = int(num_pages * 0.05)
    end_idx = int(num_pages * 0.95)
    header_para, page_numbers = headers_para(
        pdf_obj, size_tag,
        start_page_idx=start_idx,
        end_page_idx=end_idx
    )
    current = ''
    metadata = document_metadata.to_dict()
    section_idx = 0
    section = {
        'header': '',
        'paragraphs': []
    }

    sections_metadata_list = []
    sections_ids_list = []

    paragraphs_metadata_list = []
    paragraphs_ids_list = []

    current_page_number = None

    for x, page_number in zip(header_para, page_numbers):
        current_page_number = page_number
        last = current

        if x.startswith("<h"):
            section['header'] = x.replace("|", "")
            current = 'header'

        if x.startswith("<p"):
            x = x.replace("<p>", " ").replace("|", "")
            section['paragraphs'].append(x)
            current = 'paragraph'

            p_meta = metadata
            p_meta['page_number'] = int(page_number) + 1
            p_meta['paragraph_idx'] = len(section['paragraphs']) - 1
            p_meta['text'] = x
            p_meta['section_idx'] = section_idx
            p_meta['sentence_idx'] = -1
            p_meta['tokens'] = []
            p_meta['line_number'] = -1
            p_meta['doc_type'] = 'pdf'
            p_meta['url'] = metadata['url']
            p_meta['title'] = metadata['title']
            p_meta['document_id'] = metadata['document_id']

            paragraph_metadata = MetaData(**p_meta)
            paragraphs_metadata_list.append(paragraph_metadata)

            vector_id = document_id + '_'
            vector_id += str(section_idx) + '_'
            vector_id += str(p_meta['paragraph_idx'])
            paragraphs_ids_list.append(vector_id)

        # most recent header and multiple body only
        if last == 'paragraph' and current == 'header':
            s_meta = metadata

            header = section['header']
            header = re.sub(r'<[^>]+>', '', header)
            capital_header = header.upper()

            s_meta['section_idx'] = section_idx
            s_meta['text'] = capital_header + " ### " + " ".join(section['paragraphs'])
            s_meta['paragraph_idx'] = -1
            s_meta['page_number'] = int(page_number) + 1
            s_meta['sentence_idx'] = -1
            s_meta['tokens'] = []
            s_meta['line_number'] = -1
            s_meta['doc_type'] = 'pdf'
            s_meta['url'] = metadata['url']
            s_meta['title'] = metadata['title']
            s_meta['document_id'] = metadata['document_id']

            section_metadata = MetaData(**s_meta)
            sections_metadata_list.append(section_metadata)

            vector_id = document_id + '_'
            vector_id += str(section_idx)
            sections_ids_list.append(vector_id)

            section = {
                'header': '',
                'paragraphs': []
            }
            current = 'header'
            section_idx += 1

    # Add the last section
    if len(section['paragraphs']) > 0:
        s_meta = metadata

        header = section['header']
        header = re.sub(r'<[^>]+>', '', header)
        capital_header = header.upper()

        s_meta['section_idx'] = section_idx
        s_meta['text'] = capital_header + " ### " + " ".join(section['paragraphs'])
        s_meta['paragraph_idx'] = -1
        s_meta['page_number'] = int(current_page_number) + 1
        s_meta['sentence_idx'] = -1
        s_meta['tokens'] = []
        s_meta['line_number'] = -1
        s_meta['doc_type'] = 'pdf'
        s_meta['url'] = metadata['url']
        s_meta['title'] = metadata['title']
        s_meta['document_id'] = metadata['document_id']

        section_metadata = MetaData(**s_meta)
        sections_metadata_list.append(section_metadata)

        vector_id = document_id + '_'
        vector_id += str(section_idx)
        sections_ids_list.append(vector_id)

    if return_paragraphs:
        return [
            (sections_ids_list, sections_metadata_list),
            (paragraphs_ids_list, paragraphs_metadata_list)
        ]
    else:
        return [(sections_ids_list, sections_metadata_list)]
Editor is loading...