Untitled

 avatar
unknown
plain_text
a month ago
4.3 kB
6
Indexable
import json
import logging
import pandas as pd
from typing import Dict, List, Set
import re

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def load_create_policy_numbers(filename: str) -> Set[str]:
    logger.info(f"Loading create policy numbers from {filename}")
    df = pd.read_csv(filename, header=None, names=['message'])
    policy_numbers = set()

    for _, row in df.iterrows():
        message = row['message']
        match = re.search(r'Start create new policy: (\w+)', message)
        if match:
            policy_number = match.group(1)
            policy_numbers.add(policy_number)

    logger.info(f"Loaded {len(policy_numbers)} create policy numbers")
    return policy_numbers

def extract_events_from_logs(filename: str) -> List[Dict]:
    logger.info(f"Extracting events from {filename}")
    df = pd.read_csv(filename, header=None, names=['message'])
    events = []

    for _, row in df.iterrows():
        message = row['message']
        try:
            if 'Received parked message with body:' in message:
                # Wyodrębnij JSON między 'body:' a 'properties:'
                match = re.search(r'Received parked message with body: ({.*?}) properties:', message)
                if match:
                    json_str = match.group(1)
                    # Zamień podwójne cudzysłowy na pojedyncze
                    json_str = json_str.replace('""', '"')
                    # Usuń escape characters
                    json_str = json_str.replace('\\', '')
                    event = json.loads(json_str)

                    # Extract event type from MessageProperties
                    type_match = re.search(r'__TypeId__=([\w\.]+)', message)
                    if type_match:
                        event['_type'] = type_match.group(1)

                    events.append(event)
                    logger.debug(f"Successfully parsed event: {event.get('policyNumber', 'unknown')}")
        except json.JSONDecodeError as e:
            logger.warning(f"JSON decode error: {str(e)}\nMessage: {message}")
        except Exception as e:
            logger.error(f"Error processing message: {str(e)}\nMessage: {message}")

    logger.info(f"Extracted {len(events)} events")
    return events

def group_events(events: List[Dict], create_policy_numbers: Set[str]) -> Dict[str, List[Dict]]:
    logger.info("Grouping events by listener type")
    grouped_events = {
        "PolicyCreateListener": [],
        "PolicyPurchasedListener": [],
        "PaymentConfirmedListener": [],
        "PolicyCashListener": [],
        "PolicyUpdateListener": []
    }

    for event in events:
        event_type = event.get("_type", "")

        if "PolicyPurchasedEvent" in event_type:
            policy_number = event.get("policyNumber")
            if policy_number in create_policy_numbers:
                grouped_events["PolicyCreateListener"].append(event)
            else:
                grouped_events["PolicyPurchasedListener"].append(event)
        elif "PaymentConfirmedEvent" in event_type:
            grouped_events["PaymentConfirmedListener"].append(event)
        elif "PolicyCashEvent" in event_type:
            grouped_events["PolicyCashListener"].append(event)
        elif "PolicyUpdateEvent" in event_type:
            grouped_events["PolicyUpdateListener"].append(event)

    for listener, events_list in grouped_events.items():
        logger.info(f"{listener}: {len(events_list)} events")

    return grouped_events

def save_grouped_events(grouped_events: Dict[str, List[Dict]]):
    logger.info("Saving grouped events to JSON files")

    for listener, events in grouped_events.items():
        filename = f"{listener}_events.json"
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(events, f, indent=2, ensure_ascii=False)
        logger.info(f"Saved {filename}")

def main():
    logger.info("Starting event processing")

    create_policy_numbers = load_create_policy_numbers("MOTO8406_create_policy.csv")
    events = extract_events_from_logs("MOTO8406.csv")
    grouped_events = group_events(events, create_policy_numbers)
    save_grouped_events(grouped_events)

    logger.info("Event processing completed")

if __name__ == "__main__":
    main()
Leave a Comment