Untitled
unknown
plain_text
a month ago
18 kB
2
Indexable
Never
import csv import json import hashlib import sys from typing import Any, Dict, List, Optional, Union, Tuple from openpyxl.drawing.image import Image from openpyxl.cell.rich_text import CellRichText from openpyxl.worksheet.worksheet import Worksheet from openpyxl.cell.cell import Cell from openpyxl import load_workbook from concurrent.futures import ThreadPoolExecutor, as_completed import datetime import logging logging.basicConfig(level=logging.INFO) class CellAndImageDetails: @staticmethod def get_cell_details(cell: Cell) -> Dict[str, Any]: """Retrieve detailed information about a cell.""" try: return { 'value': cell.value if not isinstance(cell.value, datetime.datetime) else cell.value.isoformat(), 'data_type': cell.data_type, 'font': cell.font.__dict__, 'border': cell.border.__dict__, 'fill': cell.fill.__dict__, 'number_format': cell.number_format, 'alignment': cell.alignment.__dict__, 'protection': cell.protection.__dict__, 'hyperlink': cell.hyperlink, 'comment': cell.comment, 'rich_text': cell.value if isinstance(cell.value, CellRichText) else None, 'formula': cell.value if cell.data_type == 'f' else None } except AttributeError: return {key: None for key in [ 'value', 'data_type', 'font', 'border', 'fill', 'number_format', 'alignment', 'protection', 'hyperlink', 'comment', 'rich_text', 'formula' ]} @staticmethod def get_image_details(sheet: Worksheet) -> List[Dict[str, Optional[Union[str, tuple]]]]: """Retrieve details of all images in a worksheet.""" images = [] try: for drawing in sheet._images: if isinstance(drawing, Image): images.append({ 'name': drawing.path if hasattr(drawing, 'path') else None, 'size': (drawing.width, drawing.height), 'position': (drawing.anchor._from.col, drawing.anchor._from.row) }) except AttributeError: pass return images class Comparator: @staticmethod def compare_values(old_value: Any, new_value: Any) -> bool: """Compare two cell values.""" if isinstance(old_value, datetime.datetime) and isinstance(new_value, datetime.datetime): return old_value != new_value return str(old_value) != str(new_value) @staticmethod def compare_formulas(old_formula: Any, new_formula: Any) -> bool: """Compare two cell formulas.""" return old_formula != new_formula @staticmethod def compare_cell_content(old_details: Dict[str, Any], new_details: Dict[str, Any]) -> bool: """Compare content of two cells.""" return Comparator.compare_values(old_details['value'], new_details['value']) or \ Comparator.compare_formulas(old_details['formula'], new_details['formula']) @staticmethod def compare_format(old_details: Dict[str, Any], new_details: Dict[str, Any]) -> bool: """Compare format of two cells.""" return old_details['font'] != new_details['font'] or \ old_details['border'] != new_details['border'] or \ old_details['fill'] != new_details['fill'] or \ old_details['alignment'] != new_details['alignment'] @staticmethod def compare_images(images_old: List[Dict[str, Any]], images_new: List[Dict[str, Any]], sheet_name: str) -> List[ Dict[str, Any]]: """Compare images between two worksheets.""" mismatches = [] old_image_names_positions = {(img['name'], img['position']) for img in images_old} new_image_names_positions = {(img['name'], img['position']) for img in images_new} mismatches.extend( Comparator._handle_image_differences(sheet_name, old_image_names_positions, new_image_names_positions, images_old, images_new) ) return mismatches @staticmethod def _handle_image_differences(sheet_name: str, old_image_set: set, new_image_set: set, images_old: List[Dict[str, Any]], images_new: List[Dict[str, Any]]) -> List[ Dict[str, Any]]: """Helper method to find added and removed images, and compare existing ones.""" mismatches = [] added_images = new_image_set - old_image_set removed_images = old_image_set - new_image_set for img_name, img_position in added_images: img_new = next(img for img in images_new if img['name'] == img_name and img['position'] == img_position) mismatches.append(Comparator._create_image_mismatch_dict(sheet_name, img_new, 'Image Added')) for img_name, img_position in removed_images: img_old = next(img for img in images_old if img['name'] == img_name and img['position'] == img_position) mismatches.append(Comparator._create_image_mismatch_dict(sheet_name, img_old, 'Image Removed')) for img_name, img_position in old_image_set & new_image_set: img_old = next(img for img in images_old if img['name'] == img_name and img['position'] == img_position) img_new = next(img for img in images_new if img['name'] == img_name and img['position'] == img_position) mismatches.extend(Comparator._compare_individual_image(img_old, img_new, sheet_name)) return mismatches @staticmethod def _create_image_mismatch_dict(sheet_name: str, image: Dict[str, Any], difference_type: str) -> Dict[str, Any]: """Create a dictionary entry for image mismatches.""" return { 'sheet_name': sheet_name, 'cell_reference': f"Image at {image['position']}", 'difference_type': difference_type, 'old_value': image['name'] if difference_type == 'Image Removed' else 'N/A', 'new_value': image['name'] if difference_type == 'Image Added' else 'N/A', 'details': f"Size: {image['size']}; Position: {image['position']}" } @staticmethod def _compare_individual_image(img_old: Dict[str, Any], img_new: Dict[str, Any], sheet_name: str) -> List[ Dict[str, Any]]: """Compare individual images for differences.""" mismatches = [] if img_old['position'] != img_new['position']: mismatches.append({ 'sheet_name': sheet_name, 'cell_reference': f"Image at {img_old['position']} to {img_new['position']}", 'difference_type': 'Image Position Changed', 'old_value': img_old['position'], 'new_value': img_new['position'], 'details': f"Image: {img_old['name']}; Old position: {img_old['position']}; New position: {img_new['position']}" }) if img_old['size'] != img_new['size']: mismatches.append({ 'sheet_name': sheet_name, 'cell_reference': f"Image at {img_old['position']}", 'difference_type': 'Image Size Changed', 'old_value': img_old['size'], 'new_value': img_new['size'], 'details': f"Image: {img_old['name']}; Old size: {img_old['size']}; New size: {img_new['size']}" }) return mismatches @staticmethod def compare_rich_text_objects(old_rich_text: Any, new_rich_text: Any) -> bool: """Compare two rich text objects.""" def extract_text(rich_text: Any) -> str: if hasattr(rich_text, 'text'): return rich_text.text elif isinstance(rich_text, str): return rich_text return "" return extract_text(old_rich_text) != extract_text(new_rich_text) @staticmethod def sheet_checker(sheet_name: str, ws_new: Worksheet, ws_old: Worksheet, comparison_type: str) -> List[ Dict[str, Any]]: """Check differences in a sheet based on comparison type.""" output_list = [] max_row = max(ws_new.max_row, ws_old.max_row) max_column = max(ws_new.max_column, ws_old.max_column) for row in range(1, max_row + 1): for col in range(1, max_column + 1): cell_new = ws_new.cell(row=row, column=col) cell_old = ws_old.cell(row=row, column=col) old_details = CellAndImageDetails.get_cell_details(cell_old) new_details = CellAndImageDetails.get_cell_details(cell_new) if comparison_type == "content": output_list.extend( Comparator.check_content_mismatches(old_details, new_details, sheet_name, cell_old)) elif comparison_type == "format": output_list.extend( Comparator.check_format_mismatches(old_details, new_details, sheet_name, cell_old)) if comparison_type == "format": images_old = CellAndImageDetails.get_image_details(ws_old) images_new = CellAndImageDetails.get_image_details(ws_new) output_list.extend(Comparator.compare_images(images_old, images_new, sheet_name)) return output_list @staticmethod def check_content_mismatches(old_details: Dict[str, Any], new_details: Dict[str, Any], sheet_name: str, cell: Cell) -> List[Dict[str, Any]]: """Check mismatches in cell content.""" differences = [] if Comparator.compare_values(old_details['value'], new_details['value']): differences.append(f"Value mismatch: old_value={old_details['value']}, new_value={new_details['value']}") if Comparator.compare_formulas(old_details['formula'], new_details['formula']): differences.append( f"Formula mismatch: old_formula={old_details['formula']}, new_formula={new_details['formula']}") if differences: return [{ 'sheet_name': sheet_name, 'cell_reference': str(cell.coordinate), 'difference_type': 'Content Mismatch', 'old_value': old_details['value'], 'new_value': new_details['value'], 'details': "; ".join(differences) }] return [] @staticmethod def check_format_mismatches(old_details: Dict[str, Any], new_details: Dict[str, Any], sheet_name: str, cell: Cell) -> List[Dict[str, Any]]: """Check mismatches in cell format.""" differences = [] if old_details['value'] != new_details['value'] or old_details['formula'] != new_details['formula']: return [] if old_details['font'] != new_details['font']: differences.append(f"Font mismatch") if old_details['border'] != new_details['border']: differences.append(f"Border mismatch") if old_details['fill'] != new_details['fill']: differences.append(f"Fill mismatch") if old_details['alignment'] != new_details['alignment']: differences.append(f"Alignment mismatch") if differences: return [{ 'sheet_name': sheet_name, 'cell_reference': str(cell.coordinate), 'difference_type': 'Format Mismatch', 'old_value': old_details['value'], 'new_value': new_details['value'], 'details': "; ".join(differences) }] return [] class FileUtils: @staticmethod def md5(fname: str) -> str: """Calculate MD5 checksum of a file.""" hash_md5 = hashlib.md5() try: with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) except FileNotFoundError: logging.error(f"File not found: {fname}") sys.exit(1) return hash_md5.hexdigest() @staticmethod def write_json(output_list: List[Dict[str, Any]], output_file: str) -> None: """Write a list of dictionaries to a JSON file.""" try: with open(output_file, 'w') as jsonfile: json.dump(output_list, jsonfile, indent=4, default=str) except IOError: logging.error(f"Could not write to file: {output_file}") sys.exit(1) @staticmethod def json_to_csv(json_file: str, csv_file: str) -> None: """Convert a JSON file to a CSV file.""" try: with open(json_file, 'r') as f: data = json.load(f) if isinstance(data, list): keys = data[0].keys() with open(csv_file, 'w', newline='') as output_file: dict_writer = csv.DictWriter(output_file, fieldnames=keys) dict_writer.writeheader() dict_writer.writerows(data) elif isinstance(data, dict): keys = data['details'][0].keys() with open(csv_file, 'w', newline='') as output_file: dict_writer = csv.DictWriter(output_file, fieldnames=keys) dict_writer.writeheader() dict_writer.writerows(data['details']) except Exception as e: logging.error(f"Error converting JSON to CSV: {e}") sys.exit(1) class ExcelComparator: def __init__(self): pass def run_comparisons(self, new_file: str, old_file: str, comparison_type: str) -> List[Dict[str, Any]]: """Run comparisons between two Excel files based on the specified type.""" if FileUtils.md5(new_file) != FileUtils.md5(old_file): wb_new, wb_old = self._load_workbooks(new_file, old_file) common_sheets = set(wb_new.sheetnames).intersection(set(wb_old.sheetnames)) return self._compare_sheets(common_sheets, wb_new, wb_old, comparison_type) else: logging.info("Files are identical.") return [] def _load_workbooks(self, new_file: str, old_file: str) -> Tuple[Any, Any]: """Load Excel workbooks.""" try: wb_new = load_workbook(new_file, data_only=False, rich_text=True) wb_old = load_workbook(old_file, data_only=False, rich_text=True) return wb_new, wb_old except Exception as e: logging.error(f"Error loading workbooks: {e}") sys.exit(1) def _compare_sheets(self, common_sheets: set, wb_new: Any, wb_old: Any, comparison_type: str) -> List[ Dict[str, Any]]: """Compare all sheets in the workbooks.""" output_list = [] with ThreadPoolExecutor() as executor: futures = [ executor.submit(Comparator.sheet_checker, sheet_name, wb_new[sheet_name], wb_old[sheet_name], comparison_type) for sheet_name in common_sheets ] for future in as_completed(futures): output_list.extend(future.result()) return output_list def do_content_compare(self, new_file: str, old_file: str) -> Dict[str, Any]: """Perform content comparison between two Excel files.""" mismatches = self.run_comparisons(new_file, old_file, "content") result = self._generate_result(mismatches, "content_comparison.json", "Content") return result def do_format_compare(self, new_file: str, old_file: str) -> Dict[str, Any]: """Perform format comparison between two Excel files.""" mismatches = self.run_comparisons(new_file, old_file, "format") result = self._generate_result(mismatches, "format_comparison.json", "Format") return result def _generate_result(self, mismatches: List[Dict[str, Any]], filename: str, comparison_type: str) -> Dict[str, Any]: """Generate result dictionary and write to a JSON file.""" result = { "overall_status": bool(mismatches), f"{comparison_type.lower()}_comparison": True, "details": mismatches } FileUtils.write_json(result, filename) return result def main(): new_file = "data_1.xlsx" old_file = "data_2.xlsx" comparator = ExcelComparator() content_result = comparator.do_content_compare(new_file, old_file) format_result = comparator.do_format_compare(new_file, old_file) # Convert JSON results to CSV FileUtils.json_to_csv("content_comparison.json", "content_comparison.csv") FileUtils.json_to_csv("format_comparison.json", "format_comparison.csv") return content_result, format_result if __name__ == "__main__": content_result, format_result = main() print(json.dumps(content_result, indent=4, default=str)) print(json.dumps(format_result, indent=4, default=str))
Leave a Comment