Untitled
unknown
plain_text
a year ago
18 kB
9
Indexable
import csv
import json
import hashlib
import sys
from typing import Any, Dict, List, Optional, Union, Tuple
from openpyxl.drawing.image import Image
from openpyxl.cell.rich_text import CellRichText
from openpyxl.worksheet.worksheet import Worksheet
from openpyxl.cell.cell import Cell
from openpyxl import load_workbook
from concurrent.futures import ThreadPoolExecutor, as_completed
import datetime
import logging
logging.basicConfig(level=logging.INFO)
class CellAndImageDetails:
@staticmethod
def get_cell_details(cell: Cell) -> Dict[str, Any]:
"""Retrieve detailed information about a cell."""
try:
return {
'value': cell.value if not isinstance(cell.value, datetime.datetime) else cell.value.isoformat(),
'data_type': cell.data_type,
'font': cell.font.__dict__,
'border': cell.border.__dict__,
'fill': cell.fill.__dict__,
'number_format': cell.number_format,
'alignment': cell.alignment.__dict__,
'protection': cell.protection.__dict__,
'hyperlink': cell.hyperlink,
'comment': cell.comment,
'rich_text': cell.value if isinstance(cell.value, CellRichText) else None,
'formula': cell.value if cell.data_type == 'f' else None
}
except AttributeError:
return {key: None for key in [
'value', 'data_type', 'font', 'border', 'fill', 'number_format',
'alignment', 'protection', 'hyperlink', 'comment', 'rich_text', 'formula'
]}
@staticmethod
def get_image_details(sheet: Worksheet) -> List[Dict[str, Optional[Union[str, tuple]]]]:
"""Retrieve details of all images in a worksheet."""
images = []
try:
for drawing in sheet._images:
if isinstance(drawing, Image):
images.append({
'name': drawing.path if hasattr(drawing, 'path') else None,
'size': (drawing.width, drawing.height),
'position': (drawing.anchor._from.col, drawing.anchor._from.row)
})
except AttributeError:
pass
return images
class Comparator:
@staticmethod
def compare_values(old_value: Any, new_value: Any) -> bool:
"""Compare two cell values."""
if isinstance(old_value, datetime.datetime) and isinstance(new_value, datetime.datetime):
return old_value != new_value
return str(old_value) != str(new_value)
@staticmethod
def compare_formulas(old_formula: Any, new_formula: Any) -> bool:
"""Compare two cell formulas."""
return old_formula != new_formula
@staticmethod
def compare_cell_content(old_details: Dict[str, Any], new_details: Dict[str, Any]) -> bool:
"""Compare content of two cells."""
return Comparator.compare_values(old_details['value'], new_details['value']) or \
Comparator.compare_formulas(old_details['formula'], new_details['formula'])
@staticmethod
def compare_format(old_details: Dict[str, Any], new_details: Dict[str, Any]) -> bool:
"""Compare format of two cells."""
return old_details['font'] != new_details['font'] or \
old_details['border'] != new_details['border'] or \
old_details['fill'] != new_details['fill'] or \
old_details['alignment'] != new_details['alignment']
@staticmethod
def compare_images(images_old: List[Dict[str, Any]], images_new: List[Dict[str, Any]], sheet_name: str) -> List[
Dict[str, Any]]:
"""Compare images between two worksheets."""
mismatches = []
old_image_names_positions = {(img['name'], img['position']) for img in images_old}
new_image_names_positions = {(img['name'], img['position']) for img in images_new}
mismatches.extend(
Comparator._handle_image_differences(sheet_name, old_image_names_positions, new_image_names_positions,
images_old, images_new)
)
return mismatches
@staticmethod
def _handle_image_differences(sheet_name: str, old_image_set: set, new_image_set: set,
images_old: List[Dict[str, Any]], images_new: List[Dict[str, Any]]) -> List[
Dict[str, Any]]:
"""Helper method to find added and removed images, and compare existing ones."""
mismatches = []
added_images = new_image_set - old_image_set
removed_images = old_image_set - new_image_set
for img_name, img_position in added_images:
img_new = next(img for img in images_new if img['name'] == img_name and img['position'] == img_position)
mismatches.append(Comparator._create_image_mismatch_dict(sheet_name, img_new, 'Image Added'))
for img_name, img_position in removed_images:
img_old = next(img for img in images_old if img['name'] == img_name and img['position'] == img_position)
mismatches.append(Comparator._create_image_mismatch_dict(sheet_name, img_old, 'Image Removed'))
for img_name, img_position in old_image_set & new_image_set:
img_old = next(img for img in images_old if img['name'] == img_name and img['position'] == img_position)
img_new = next(img for img in images_new if img['name'] == img_name and img['position'] == img_position)
mismatches.extend(Comparator._compare_individual_image(img_old, img_new, sheet_name))
return mismatches
@staticmethod
def _create_image_mismatch_dict(sheet_name: str, image: Dict[str, Any], difference_type: str) -> Dict[str, Any]:
"""Create a dictionary entry for image mismatches."""
return {
'sheet_name': sheet_name,
'cell_reference': f"Image at {image['position']}",
'difference_type': difference_type,
'old_value': image['name'] if difference_type == 'Image Removed' else 'N/A',
'new_value': image['name'] if difference_type == 'Image Added' else 'N/A',
'details': f"Size: {image['size']}; Position: {image['position']}"
}
@staticmethod
def _compare_individual_image(img_old: Dict[str, Any], img_new: Dict[str, Any], sheet_name: str) -> List[
Dict[str, Any]]:
"""Compare individual images for differences."""
mismatches = []
if img_old['position'] != img_new['position']:
mismatches.append({
'sheet_name': sheet_name,
'cell_reference': f"Image at {img_old['position']} to {img_new['position']}",
'difference_type': 'Image Position Changed',
'old_value': img_old['position'],
'new_value': img_new['position'],
'details': f"Image: {img_old['name']}; Old position: {img_old['position']}; New position: {img_new['position']}"
})
if img_old['size'] != img_new['size']:
mismatches.append({
'sheet_name': sheet_name,
'cell_reference': f"Image at {img_old['position']}",
'difference_type': 'Image Size Changed',
'old_value': img_old['size'],
'new_value': img_new['size'],
'details': f"Image: {img_old['name']}; Old size: {img_old['size']}; New size: {img_new['size']}"
})
return mismatches
@staticmethod
def compare_rich_text_objects(old_rich_text: Any, new_rich_text: Any) -> bool:
"""Compare two rich text objects."""
def extract_text(rich_text: Any) -> str:
if hasattr(rich_text, 'text'):
return rich_text.text
elif isinstance(rich_text, str):
return rich_text
return ""
return extract_text(old_rich_text) != extract_text(new_rich_text)
@staticmethod
def sheet_checker(sheet_name: str, ws_new: Worksheet, ws_old: Worksheet, comparison_type: str) -> List[
Dict[str, Any]]:
"""Check differences in a sheet based on comparison type."""
output_list = []
max_row = max(ws_new.max_row, ws_old.max_row)
max_column = max(ws_new.max_column, ws_old.max_column)
for row in range(1, max_row + 1):
for col in range(1, max_column + 1):
cell_new = ws_new.cell(row=row, column=col)
cell_old = ws_old.cell(row=row, column=col)
old_details = CellAndImageDetails.get_cell_details(cell_old)
new_details = CellAndImageDetails.get_cell_details(cell_new)
if comparison_type == "content":
output_list.extend(
Comparator.check_content_mismatches(old_details, new_details, sheet_name, cell_old))
elif comparison_type == "format":
output_list.extend(
Comparator.check_format_mismatches(old_details, new_details, sheet_name, cell_old))
if comparison_type == "format":
images_old = CellAndImageDetails.get_image_details(ws_old)
images_new = CellAndImageDetails.get_image_details(ws_new)
output_list.extend(Comparator.compare_images(images_old, images_new, sheet_name))
return output_list
@staticmethod
def check_content_mismatches(old_details: Dict[str, Any], new_details: Dict[str, Any], sheet_name: str,
cell: Cell) -> List[Dict[str, Any]]:
"""Check mismatches in cell content."""
differences = []
if Comparator.compare_values(old_details['value'], new_details['value']):
differences.append(f"Value mismatch: old_value={old_details['value']}, new_value={new_details['value']}")
if Comparator.compare_formulas(old_details['formula'], new_details['formula']):
differences.append(
f"Formula mismatch: old_formula={old_details['formula']}, new_formula={new_details['formula']}")
if differences:
return [{
'sheet_name': sheet_name,
'cell_reference': str(cell.coordinate),
'difference_type': 'Content Mismatch',
'old_value': old_details['value'],
'new_value': new_details['value'],
'details': "; ".join(differences)
}]
return []
@staticmethod
def check_format_mismatches(old_details: Dict[str, Any], new_details: Dict[str, Any], sheet_name: str,
cell: Cell) -> List[Dict[str, Any]]:
"""Check mismatches in cell format."""
differences = []
if old_details['value'] != new_details['value'] or old_details['formula'] != new_details['formula']:
return []
if old_details['font'] != new_details['font']:
differences.append(f"Font mismatch")
if old_details['border'] != new_details['border']:
differences.append(f"Border mismatch")
if old_details['fill'] != new_details['fill']:
differences.append(f"Fill mismatch")
if old_details['alignment'] != new_details['alignment']:
differences.append(f"Alignment mismatch")
if differences:
return [{
'sheet_name': sheet_name,
'cell_reference': str(cell.coordinate),
'difference_type': 'Format Mismatch',
'old_value': old_details['value'],
'new_value': new_details['value'],
'details': "; ".join(differences)
}]
return []
class FileUtils:
@staticmethod
def md5(fname: str) -> str:
"""Calculate MD5 checksum of a file."""
hash_md5 = hashlib.md5()
try:
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
except FileNotFoundError:
logging.error(f"File not found: {fname}")
sys.exit(1)
return hash_md5.hexdigest()
@staticmethod
def write_json(output_list: List[Dict[str, Any]], output_file: str) -> None:
"""Write a list of dictionaries to a JSON file."""
try:
with open(output_file, 'w') as jsonfile:
json.dump(output_list, jsonfile, indent=4, default=str)
except IOError:
logging.error(f"Could not write to file: {output_file}")
sys.exit(1)
@staticmethod
def json_to_csv(json_file: str, csv_file: str) -> None:
"""Convert a JSON file to a CSV file."""
try:
with open(json_file, 'r') as f:
data = json.load(f)
if isinstance(data, list):
keys = data[0].keys()
with open(csv_file, 'w', newline='') as output_file:
dict_writer = csv.DictWriter(output_file, fieldnames=keys)
dict_writer.writeheader()
dict_writer.writerows(data)
elif isinstance(data, dict):
keys = data['details'][0].keys()
with open(csv_file, 'w', newline='') as output_file:
dict_writer = csv.DictWriter(output_file, fieldnames=keys)
dict_writer.writeheader()
dict_writer.writerows(data['details'])
except Exception as e:
logging.error(f"Error converting JSON to CSV: {e}")
sys.exit(1)
class ExcelComparator:
def __init__(self):
pass
def run_comparisons(self, new_file: str, old_file: str, comparison_type: str) -> List[Dict[str, Any]]:
"""Run comparisons between two Excel files based on the specified type."""
if FileUtils.md5(new_file) != FileUtils.md5(old_file):
wb_new, wb_old = self._load_workbooks(new_file, old_file)
common_sheets = set(wb_new.sheetnames).intersection(set(wb_old.sheetnames))
return self._compare_sheets(common_sheets, wb_new, wb_old, comparison_type)
else:
logging.info("Files are identical.")
return []
def _load_workbooks(self, new_file: str, old_file: str) -> Tuple[Any, Any]:
"""Load Excel workbooks."""
try:
wb_new = load_workbook(new_file, data_only=False, rich_text=True)
wb_old = load_workbook(old_file, data_only=False, rich_text=True)
return wb_new, wb_old
except Exception as e:
logging.error(f"Error loading workbooks: {e}")
sys.exit(1)
def _compare_sheets(self, common_sheets: set, wb_new: Any, wb_old: Any, comparison_type: str) -> List[
Dict[str, Any]]:
"""Compare all sheets in the workbooks."""
output_list = []
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(Comparator.sheet_checker, sheet_name, wb_new[sheet_name], wb_old[sheet_name],
comparison_type)
for sheet_name in common_sheets
]
for future in as_completed(futures):
output_list.extend(future.result())
return output_list
def do_content_compare(self, new_file: str, old_file: str) -> Dict[str, Any]:
"""Perform content comparison between two Excel files."""
mismatches = self.run_comparisons(new_file, old_file, "content")
result = self._generate_result(mismatches, "content_comparison.json", "Content")
return result
def do_format_compare(self, new_file: str, old_file: str) -> Dict[str, Any]:
"""Perform format comparison between two Excel files."""
mismatches = self.run_comparisons(new_file, old_file, "format")
result = self._generate_result(mismatches, "format_comparison.json", "Format")
return result
def _generate_result(self, mismatches: List[Dict[str, Any]], filename: str, comparison_type: str) -> Dict[str, Any]:
"""Generate result dictionary and write to a JSON file."""
result = {
"overall_status": bool(mismatches),
f"{comparison_type.lower()}_comparison": True,
"details": mismatches
}
FileUtils.write_json(result, filename)
return result
def main():
new_file = "data_1.xlsx"
old_file = "data_2.xlsx"
comparator = ExcelComparator()
content_result = comparator.do_content_compare(new_file, old_file)
format_result = comparator.do_format_compare(new_file, old_file)
# Convert JSON results to CSV
FileUtils.json_to_csv("content_comparison.json", "content_comparison.csv")
FileUtils.json_to_csv("format_comparison.json", "format_comparison.csv")
return content_result, format_result
if __name__ == "__main__":
content_result, format_result = main()
print(json.dumps(content_result, indent=4, default=str))
print(json.dumps(format_result, indent=4, default=str))
Editor is loading...
Leave a Comment