Untitled
import json import logging import pandas as pd from typing import Dict, List, Set import re logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def load_create_policy_numbers(filename: str) -> Set[str]: logger.info(f"Loading create policy numbers from {filename}") df = pd.read_csv(filename, header=None, names=['message']) policy_numbers = set() for _, row in df.iterrows(): message = row['message'] match = re.search(r'Start create new policy: (\w+)', message) if match: policy_number = match.group(1) policy_numbers.add(policy_number) logger.info(f"Loaded {len(policy_numbers)} create policy numbers") return policy_numbers def extract_events_from_logs(filename: str) -> List[Dict]: logger.info(f"Extracting events from {filename}") df = pd.read_csv(filename, header=None, names=['message']) events = [] for _, row in df.iterrows(): message = row['message'] try: if 'Received parked message with body:' in message: # Wyodrębnij JSON między 'body:' a 'properties:' match = re.search(r'Received parked message with body: ({.*?}) properties:', message) if match: json_str = match.group(1) # Zamień podwójne cudzysłowy na pojedyncze json_str = json_str.replace('""', '"') # Usuń escape characters json_str = json_str.replace('\\', '') event = json.loads(json_str) # Extract event type from MessageProperties type_match = re.search(r'__TypeId__=([\w\.]+)', message) if type_match: event['_type'] = type_match.group(1) events.append(event) logger.debug(f"Successfully parsed event: {event.get('policyNumber', 'unknown')}") except json.JSONDecodeError as e: logger.warning(f"JSON decode error: {str(e)}\nMessage: {message}") except Exception as e: logger.error(f"Error processing message: {str(e)}\nMessage: {message}") logger.info(f"Extracted {len(events)} events") return events def group_events(events: List[Dict], create_policy_numbers: Set[str]) -> Dict[str, List[Dict]]: logger.info("Grouping events by listener type") grouped_events = { "PolicyCreateListener": [], "PolicyPurchasedListener": [], "PaymentConfirmedListener": [], "PolicyCashListener": [], "PolicyUpdateListener": [] } for event in events: event_type = event.get("_type", "") if "PolicyPurchasedEvent" in event_type: policy_number = event.get("policyNumber") if policy_number in create_policy_numbers: grouped_events["PolicyCreateListener"].append(event) else: grouped_events["PolicyPurchasedListener"].append(event) elif "PaymentConfirmedEvent" in event_type: grouped_events["PaymentConfirmedListener"].append(event) elif "PolicyCashEvent" in event_type: grouped_events["PolicyCashListener"].append(event) elif "PolicyUpdateEvent" in event_type: grouped_events["PolicyUpdateListener"].append(event) for listener, events_list in grouped_events.items(): logger.info(f"{listener}: {len(events_list)} events") return grouped_events def save_grouped_events(grouped_events: Dict[str, List[Dict]]): logger.info("Saving grouped events to JSON files") for listener, events in grouped_events.items(): filename = f"{listener}_events.json" with open(filename, 'w', encoding='utf-8') as f: json.dump(events, f, indent=2, ensure_ascii=False) logger.info(f"Saved {filename}") def main(): logger.info("Starting event processing") create_policy_numbers = load_create_policy_numbers("MOTO8406_create_policy.csv") events = extract_events_from_logs("MOTO8406.csv") grouped_events = group_events(events, create_policy_numbers) save_grouped_events(grouped_events) logger.info("Event processing completed") if __name__ == "__main__": main()
Leave a Comment