Untitled

mail@pastecode.io avatar
unknown
plain_text
a month ago
18 kB
2
Indexable
Never
import csv
import json
import hashlib
import sys
from typing import Any, Dict, List, Optional, Union, Tuple
from openpyxl.drawing.image import Image
from openpyxl.cell.rich_text import CellRichText
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.cell.cell import Cell
from openpyxl import load_workbook
from concurrent.futures import ThreadPoolExecutor, as_completed
import datetime
import logging

logging.basicConfig(level=logging.INFO)


class CellAndImageDetails:
    @staticmethod
    def get_cell_details(cell: Cell) -> Dict[str, Any]:
        """Retrieve detailed information about a cell."""
        try:
            return {
                'value': cell.value if not isinstance(cell.value, datetime.datetime) else cell.value.isoformat(),
                'data_type': cell.data_type,
                'font': cell.font.__dict__,
                'border': cell.border.__dict__,
                'fill': cell.fill.__dict__,
                'number_format': cell.number_format,
                'alignment': cell.alignment.__dict__,
                'protection': cell.protection.__dict__,
                'hyperlink': cell.hyperlink,
                'comment': cell.comment,
                'rich_text': cell.value if isinstance(cell.value, CellRichText) else None,
                'formula': cell.value if cell.data_type == 'f' else None
            }
        except AttributeError:
            return {key: None for key in [
                'value', 'data_type', 'font', 'border', 'fill', 'number_format',
                'alignment', 'protection', 'hyperlink', 'comment', 'rich_text', 'formula'
            ]}

    @staticmethod
    def get_image_details(sheet: Worksheet) -> List[Dict[str, Optional[Union[str, tuple]]]]:
        """Retrieve details of all images in a worksheet."""
        images = []
        try:
            for drawing in sheet._images:
                if isinstance(drawing, Image):
                    images.append({
                        'name': drawing.path if hasattr(drawing, 'path') else None,
                        'size': (drawing.width, drawing.height),
                        'position': (drawing.anchor._from.col, drawing.anchor._from.row)
                    })
        except AttributeError:
            pass
        return images


class Comparator:
    @staticmethod
    def compare_values(old_value: Any, new_value: Any) -> bool:
        """Compare two cell values."""
        if isinstance(old_value, datetime.datetime) and isinstance(new_value, datetime.datetime):
            return old_value != new_value
        return str(old_value) != str(new_value)

    @staticmethod
    def compare_formulas(old_formula: Any, new_formula: Any) -> bool:
        """Compare two cell formulas."""
        return old_formula != new_formula

    @staticmethod
    def compare_cell_content(old_details: Dict[str, Any], new_details: Dict[str, Any]) -> bool:
        """Compare content of two cells."""
        return Comparator.compare_values(old_details['value'], new_details['value']) or \
            Comparator.compare_formulas(old_details['formula'], new_details['formula'])

    @staticmethod
    def compare_format(old_details: Dict[str, Any], new_details: Dict[str, Any]) -> bool:
        """Compare format of two cells."""
        return old_details['font'] != new_details['font'] or \
            old_details['border'] != new_details['border'] or \
            old_details['fill'] != new_details['fill'] or \
            old_details['alignment'] != new_details['alignment']

    @staticmethod
    def compare_images(images_old: List[Dict[str, Any]], images_new: List[Dict[str, Any]], sheet_name: str) -> List[
        Dict[str, Any]]:
        """Compare images between two worksheets."""
        mismatches = []
        old_image_names_positions = {(img['name'], img['position']) for img in images_old}
        new_image_names_positions = {(img['name'], img['position']) for img in images_new}

        mismatches.extend(
            Comparator._handle_image_differences(sheet_name, old_image_names_positions, new_image_names_positions,
                                                 images_old, images_new)
        )

        return mismatches

    @staticmethod
    def _handle_image_differences(sheet_name: str, old_image_set: set, new_image_set: set,
                                  images_old: List[Dict[str, Any]], images_new: List[Dict[str, Any]]) -> List[
        Dict[str, Any]]:
        """Helper method to find added and removed images, and compare existing ones."""
        mismatches = []

        added_images = new_image_set - old_image_set
        removed_images = old_image_set - new_image_set

        for img_name, img_position in added_images:
            img_new = next(img for img in images_new if img['name'] == img_name and img['position'] == img_position)
            mismatches.append(Comparator._create_image_mismatch_dict(sheet_name, img_new, 'Image Added'))

        for img_name, img_position in removed_images:
            img_old = next(img for img in images_old if img['name'] == img_name and img['position'] == img_position)
            mismatches.append(Comparator._create_image_mismatch_dict(sheet_name, img_old, 'Image Removed'))

        for img_name, img_position in old_image_set & new_image_set:
            img_old = next(img for img in images_old if img['name'] == img_name and img['position'] == img_position)
            img_new = next(img for img in images_new if img['name'] == img_name and img['position'] == img_position)
            mismatches.extend(Comparator._compare_individual_image(img_old, img_new, sheet_name))

        return mismatches

    @staticmethod
    def _create_image_mismatch_dict(sheet_name: str, image: Dict[str, Any], difference_type: str) -> Dict[str, Any]:
        """Create a dictionary entry for image mismatches."""
        return {
            'sheet_name': sheet_name,
            'cell_reference': f"Image at {image['position']}",
            'difference_type': difference_type,
            'old_value': image['name'] if difference_type == 'Image Removed' else 'N/A',
            'new_value': image['name'] if difference_type == 'Image Added' else 'N/A',
            'details': f"Size: {image['size']}; Position: {image['position']}"
        }

    @staticmethod
    def _compare_individual_image(img_old: Dict[str, Any], img_new: Dict[str, Any], sheet_name: str) -> List[
        Dict[str, Any]]:
        """Compare individual images for differences."""
        mismatches = []
        if img_old['position'] != img_new['position']:
            mismatches.append({
                'sheet_name': sheet_name,
                'cell_reference': f"Image at {img_old['position']} to {img_new['position']}",
                'difference_type': 'Image Position Changed',
                'old_value': img_old['position'],
                'new_value': img_new['position'],
                'details': f"Image: {img_old['name']}; Old position: {img_old['position']}; New position: {img_new['position']}"
            })
        if img_old['size'] != img_new['size']:
            mismatches.append({
                'sheet_name': sheet_name,
                'cell_reference': f"Image at {img_old['position']}",
                'difference_type': 'Image Size Changed',
                'old_value': img_old['size'],
                'new_value': img_new['size'],
                'details': f"Image: {img_old['name']}; Old size: {img_old['size']}; New size: {img_new['size']}"
            })
        return mismatches

    @staticmethod
    def compare_rich_text_objects(old_rich_text: Any, new_rich_text: Any) -> bool:
        """Compare two rich text objects."""

        def extract_text(rich_text: Any) -> str:
            if hasattr(rich_text, 'text'):
                return rich_text.text
            elif isinstance(rich_text, str):
                return rich_text
            return ""

        return extract_text(old_rich_text) != extract_text(new_rich_text)

    @staticmethod
    def sheet_checker(sheet_name: str, ws_new: Worksheet, ws_old: Worksheet, comparison_type: str) -> List[
        Dict[str, Any]]:
        """Check differences in a sheet based on comparison type."""
        output_list = []
        max_row = max(ws_new.max_row, ws_old.max_row)
        max_column = max(ws_new.max_column, ws_old.max_column)

        for row in range(1, max_row + 1):
            for col in range(1, max_column + 1):
                cell_new = ws_new.cell(row=row, column=col)
                cell_old = ws_old.cell(row=row, column=col)

                old_details = CellAndImageDetails.get_cell_details(cell_old)
                new_details = CellAndImageDetails.get_cell_details(cell_new)

                if comparison_type == "content":
                    output_list.extend(
                        Comparator.check_content_mismatches(old_details, new_details, sheet_name, cell_old))
                elif comparison_type == "format":
                    output_list.extend(
                        Comparator.check_format_mismatches(old_details, new_details, sheet_name, cell_old))

        if comparison_type == "format":
            images_old = CellAndImageDetails.get_image_details(ws_old)
            images_new = CellAndImageDetails.get_image_details(ws_new)
            output_list.extend(Comparator.compare_images(images_old, images_new, sheet_name))

        return output_list

    @staticmethod
    def check_content_mismatches(old_details: Dict[str, Any], new_details: Dict[str, Any], sheet_name: str,
                                 cell: Cell) -> List[Dict[str, Any]]:
        """Check mismatches in cell content."""
        differences = []

        if Comparator.compare_values(old_details['value'], new_details['value']):
            differences.append(f"Value mismatch: old_value={old_details['value']}, new_value={new_details['value']}")
        if Comparator.compare_formulas(old_details['formula'], new_details['formula']):
            differences.append(
                f"Formula mismatch: old_formula={old_details['formula']}, new_formula={new_details['formula']}")

        if differences:
            return [{
                'sheet_name': sheet_name,
                'cell_reference': str(cell.coordinate),
                'difference_type': 'Content Mismatch',
                'old_value': old_details['value'],
                'new_value': new_details['value'],
                'details': "; ".join(differences)
            }]

        return []

    @staticmethod
    def check_format_mismatches(old_details: Dict[str, Any], new_details: Dict[str, Any], sheet_name: str,
                                cell: Cell) -> List[Dict[str, Any]]:
        """Check mismatches in cell format."""
        differences = []

        if old_details['value'] != new_details['value'] or old_details['formula'] != new_details['formula']:
            return []

        if old_details['font'] != new_details['font']:
            differences.append(f"Font mismatch")
        if old_details['border'] != new_details['border']:
            differences.append(f"Border mismatch")
        if old_details['fill'] != new_details['fill']:
            differences.append(f"Fill mismatch")
        if old_details['alignment'] != new_details['alignment']:
            differences.append(f"Alignment mismatch")

        if differences:
            return [{
                'sheet_name': sheet_name,
                'cell_reference': str(cell.coordinate),
                'difference_type': 'Format Mismatch',
                'old_value': old_details['value'],
                'new_value': new_details['value'],
                'details': "; ".join(differences)
            }]

        return []


class FileUtils:
    @staticmethod
    def md5(fname: str) -> str:
        """Calculate MD5 checksum of a file."""
        hash_md5 = hashlib.md5()
        try:
            with open(fname, "rb") as f:
                for chunk in iter(lambda: f.read(4096), b""):
                    hash_md5.update(chunk)
        except FileNotFoundError:
            logging.error(f"File not found: {fname}")
            sys.exit(1)
        return hash_md5.hexdigest()

    @staticmethod
    def write_json(output_list: List[Dict[str, Any]], output_file: str) -> None:
        """Write a list of dictionaries to a JSON file."""
        try:
            with open(output_file, 'w') as jsonfile:
                json.dump(output_list, jsonfile, indent=4, default=str)
        except IOError:
            logging.error(f"Could not write to file: {output_file}")
            sys.exit(1)

    @staticmethod
    def json_to_csv(json_file: str, csv_file: str) -> None:
        """Convert a JSON file to a CSV file."""
        try:
            with open(json_file, 'r') as f:
                data = json.load(f)
                if isinstance(data, list):
                    keys = data[0].keys()
                    with open(csv_file, 'w', newline='') as output_file:
                        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
                        dict_writer.writeheader()
                        dict_writer.writerows(data)
                elif isinstance(data, dict):
                    keys = data['details'][0].keys()
                    with open(csv_file, 'w', newline='') as output_file:
                        dict_writer = csv.DictWriter(output_file, fieldnames=keys)
                        dict_writer.writeheader()
                        dict_writer.writerows(data['details'])
        except Exception as e:
            logging.error(f"Error converting JSON to CSV: {e}")
            sys.exit(1)


class ExcelComparator:
    def __init__(self):
        pass

    def run_comparisons(self, new_file: str, old_file: str, comparison_type: str) -> List[Dict[str, Any]]:
        """Run comparisons between two Excel files based on the specified type."""
        if FileUtils.md5(new_file) != FileUtils.md5(old_file):
            wb_new, wb_old = self._load_workbooks(new_file, old_file)
            common_sheets = set(wb_new.sheetnames).intersection(set(wb_old.sheetnames))

            return self._compare_sheets(common_sheets, wb_new, wb_old, comparison_type)
        else:
            logging.info("Files are identical.")
            return []

    def _load_workbooks(self, new_file: str, old_file: str) -> Tuple[Any, Any]:
        """Load Excel workbooks."""
        try:
            wb_new = load_workbook(new_file, data_only=False, rich_text=True)
            wb_old = load_workbook(old_file, data_only=False, rich_text=True)
            return wb_new, wb_old
        except Exception as e:
            logging.error(f"Error loading workbooks: {e}")
            sys.exit(1)

    def _compare_sheets(self, common_sheets: set, wb_new: Any, wb_old: Any, comparison_type: str) -> List[
        Dict[str, Any]]:
        """Compare all sheets in the workbooks."""
        output_list = []
        with ThreadPoolExecutor() as executor:
            futures = [
                executor.submit(Comparator.sheet_checker, sheet_name, wb_new[sheet_name], wb_old[sheet_name],
                                comparison_type)
                for sheet_name in common_sheets
            ]
            for future in as_completed(futures):
                output_list.extend(future.result())
        return output_list

    def do_content_compare(self, new_file: str, old_file: str) -> Dict[str, Any]:
        """Perform content comparison between two Excel files."""
        mismatches = self.run_comparisons(new_file, old_file, "content")
        result = self._generate_result(mismatches, "content_comparison.json", "Content")
        return result

    def do_format_compare(self, new_file: str, old_file: str) -> Dict[str, Any]:
        """Perform format comparison between two Excel files."""
        mismatches = self.run_comparisons(new_file, old_file, "format")
        result = self._generate_result(mismatches, "format_comparison.json", "Format")
        return result

    def _generate_result(self, mismatches: List[Dict[str, Any]], filename: str, comparison_type: str) -> Dict[str, Any]:
        """Generate result dictionary and write to a JSON file."""
        result = {
            "overall_status": bool(mismatches),
            f"{comparison_type.lower()}_comparison": True,
            "details": mismatches
        }
        FileUtils.write_json(result, filename)
        return result


def main():
    new_file = "data_1.xlsx"
    old_file = "data_2.xlsx"
    comparator = ExcelComparator()
    content_result = comparator.do_content_compare(new_file, old_file)
    format_result = comparator.do_format_compare(new_file, old_file)

    # Convert JSON results to CSV
    FileUtils.json_to_csv("content_comparison.json", "content_comparison.csv")
    FileUtils.json_to_csv("format_comparison.json", "format_comparison.csv")

    return content_result, format_result


if __name__ == "__main__":
    content_result, format_result = main()
    print(json.dumps(content_result, indent=4, default=str))
    print(json.dumps(format_result, indent=4, default=str))
Leave a Comment