Untitled
unknown
plain_text
10 months ago
4.3 kB
8
Indexable
import json
import logging
import pandas as pd
from typing import Dict, List, Set
import re
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def load_create_policy_numbers(filename: str) -> Set[str]:
logger.info(f"Loading create policy numbers from {filename}")
df = pd.read_csv(filename, header=None, names=['message'])
policy_numbers = set()
for _, row in df.iterrows():
message = row['message']
match = re.search(r'Start create new policy: (\w+)', message)
if match:
policy_number = match.group(1)
policy_numbers.add(policy_number)
logger.info(f"Loaded {len(policy_numbers)} create policy numbers")
return policy_numbers
def extract_events_from_logs(filename: str) -> List[Dict]:
logger.info(f"Extracting events from {filename}")
df = pd.read_csv(filename, header=None, names=['message'])
events = []
for _, row in df.iterrows():
message = row['message']
try:
if 'Received parked message with body:' in message:
# Wyodrębnij JSON między 'body:' a 'properties:'
match = re.search(r'Received parked message with body: ({.*?}) properties:', message)
if match:
json_str = match.group(1)
# Zamień podwójne cudzysłowy na pojedyncze
json_str = json_str.replace('""', '"')
# Usuń escape characters
json_str = json_str.replace('\\', '')
event = json.loads(json_str)
# Extract event type from MessageProperties
type_match = re.search(r'__TypeId__=([\w\.]+)', message)
if type_match:
event['_type'] = type_match.group(1)
events.append(event)
logger.debug(f"Successfully parsed event: {event.get('policyNumber', 'unknown')}")
except json.JSONDecodeError as e:
logger.warning(f"JSON decode error: {str(e)}\nMessage: {message}")
except Exception as e:
logger.error(f"Error processing message: {str(e)}\nMessage: {message}")
logger.info(f"Extracted {len(events)} events")
return events
def group_events(events: List[Dict], create_policy_numbers: Set[str]) -> Dict[str, List[Dict]]:
logger.info("Grouping events by listener type")
grouped_events = {
"PolicyCreateListener": [],
"PolicyPurchasedListener": [],
"PaymentConfirmedListener": [],
"PolicyCashListener": [],
"PolicyUpdateListener": []
}
for event in events:
event_type = event.get("_type", "")
if "PolicyPurchasedEvent" in event_type:
policy_number = event.get("policyNumber")
if policy_number in create_policy_numbers:
grouped_events["PolicyCreateListener"].append(event)
else:
grouped_events["PolicyPurchasedListener"].append(event)
elif "PaymentConfirmedEvent" in event_type:
grouped_events["PaymentConfirmedListener"].append(event)
elif "PolicyCashEvent" in event_type:
grouped_events["PolicyCashListener"].append(event)
elif "PolicyUpdateEvent" in event_type:
grouped_events["PolicyUpdateListener"].append(event)
for listener, events_list in grouped_events.items():
logger.info(f"{listener}: {len(events_list)} events")
return grouped_events
def save_grouped_events(grouped_events: Dict[str, List[Dict]]):
logger.info("Saving grouped events to JSON files")
for listener, events in grouped_events.items():
filename = f"{listener}_events.json"
with open(filename, 'w', encoding='utf-8') as f:
json.dump(events, f, indent=2, ensure_ascii=False)
logger.info(f"Saved {filename}")
def main():
logger.info("Starting event processing")
create_policy_numbers = load_create_policy_numbers("MOTO8406_create_policy.csv")
events = extract_events_from_logs("MOTO8406.csv")
grouped_events = group_events(events, create_policy_numbers)
save_grouped_events(grouped_events)
logger.info("Event processing completed")
if __name__ == "__main__":
main()Editor is loading...
Leave a Comment