Untitled
unknown
plain_text
a year ago
19 kB
8
Indexable
import json
from datetime import datetime, timedelta
from typing import Dict, List, Set
import os
import threading
import time
# Define thresholds as constants
SORTING_EXPERT_THRESHOLDS = [100, 500, 1000]
ECO_WARRIOR_THRESHOLDS = [10, 50, 100]
class MetricsHandler:
# Environmental impact constants
IMPACT_FACTORS = {
'Paper': {
'co2_per_kg': 0.9, # kg CO2 saved per kg paper recycled
'trees_per_kg': 0.017, # trees saved per kg paper recycled
'avg_item_weight': 0.1 # kg per paper item (estimated average)
},
'Recycling': {
'co2_per_kg': 1.6, # kg CO2 saved per kg plastic recycled
'avg_item_weight': 0.05 # kg per plastic item (estimated average)
},
'Organic': {
'co2_per_kg': 0.5, # kg CO2 saved per kg organic waste composted
'avg_item_weight': 0.2 # kg per organic waste item (estimated average)
},
'Residual': {
'co2_per_kg': 0.0, # No direct savings for residual waste
'avg_item_weight': 0.15 # kg per residual item (estimated average)
}
}
ACHIEVEMENTS = {
'first_sort': {
'id': 'first_sort',
'name': 'First Steps',
'description': 'Sort your first item',
'condition': lambda metrics: metrics['basic_metrics']['total_items_sorted'] >= 1
},
'sorting_expert': {
'id': 'sorting_expert',
'tiers': ['bronze', 'silver', 'gold'],
'name': 'Sorting Expert',
'description': 'Sort items with high accuracy',
'thresholds': SORTING_EXPERT_THRESHOLDS,
'condition': lambda metrics, tier_idx: metrics['basic_metrics']['total_items_sorted'] >= SORTING_EXPERT_THRESHOLDS[tier_idx]
},
'perfect_balance': {
'id': 'perfect_balance',
'name': 'Perfect Balance',
'description': 'Keep all bins within 20% fill level of each other for 7 days',
'condition': lambda metrics: metrics.get('balance_tracking', {}).get('days_within_threshold', 0) >= 7
},
'time_lord': {
'id': 'time_lord',
'name': 'Time Lord',
'description': 'Sort items in every hour of the day',
'condition': lambda metrics: len(set(metrics['hour_coverage']['hours_sorted'])) == 24
},
'weekend_warrior': {
'id': 'weekend_warrior',
'name': 'Weekend Warrior',
'description': 'Sort items on 4 consecutive weekends',
'condition': lambda metrics: metrics['hour_coverage']['weekend_streak']['count'] >= 4
},
'seasonal_master': {
'id': 'seasonal_master',
'name': 'Seasonal Master',
'description': 'Sort items in both summer and winter',
'condition': lambda metrics: all(metrics['seasonal_tracking']['seasonal_counts'][season] > 0
for season in ['summer', 'winter'])
},
'eco_warrior': {
'id': 'eco_warrior',
'tiers': ['bronze', 'silver', 'gold'],
'name': 'Eco Warrior',
'description': 'Save CO2 through proper recycling',
'thresholds': ECO_WARRIOR_THRESHOLDS,
'condition': lambda metrics, tier_idx: metrics['environmental_impact']['co2_saved'] >= ECO_WARRIOR_THRESHOLDS[tier_idx]
}
}
def __init__(self, metrics_file='metrics.json'):
self.metrics_file = metrics_file
self.metrics = self._load_metrics()
def _load_metrics(self) -> Dict:
"""Load metrics from JSON file or create default structure"""
if os.path.exists(self.metrics_file):
try:
with open(self.metrics_file, 'r') as f:
return json.load(f)
except json.JSONDecodeError:
return self._create_default_metrics()
return self._create_default_metrics()
def _create_default_metrics(self) -> Dict:
"""Create default metrics structure"""
current_time = datetime.now()
month = current_time.month
current_season = 'summer' if 5 <= month <= 8 else 'winter'
return {
'basic_metrics': {
'total_items_sorted': 0,
'items_sorted_per_bin': {},
'classification_results': [],
'bin_emptying_counts': {},
'api_vs_local_usage': {'api': 0, 'local': 0}
},
'time_metrics': {
'daily_usage_counts': {},
'weekly_usage_counts': {},
'monthly_usage_counts': {},
'time_of_day_patterns': [0] * 24,
'daily_weekly_monthly_streaks': {
'daily': 0,
'weekly': 0,
'monthly': 0
},
'fill_rate_per_bin': {}
},
'short_term_tracking': {
'items_last_5_minutes': {
'count': 0,
'timestamps': []
},
'current_sorting_streak': {
'start_date': None,
'days': 0,
'last_sort_date': None
}
},
'hour_coverage': {
'hours_sorted': [],
'weekend_streak': {
'count': 0,
'dates': []
}
},
'seasonal_tracking': {
'seasonal_counts': {
'summer': 0,
'winter': 0,
},
'current_season': current_season,
'season_start': current_time.isoformat()
},
'environmental_impact': {
'co2_saved': 0.0,
'trees_saved': 0.0,
'paper_weight_recycled': 0.0,
'plastic_weight_recycled': 0.0,
'organic_weight_processed': 0.0
},
'fill_level_history': {},
'achievements': {},
'bin_specialization': {}
}
def record_sort(self, bin_id: str, classification_result: str, fill_level: float, classification_method: str):
"""Record a sorting event"""
timestamp = datetime.now().isoformat()
# Update basic metrics immediately
self.metrics['basic_metrics']['total_items_sorted'] += 1
self.metrics['basic_metrics']['items_sorted_per_bin'][bin_id] = \
self.metrics['basic_metrics']['items_sorted_per_bin'].get(bin_id, 0) + 1
self.metrics['basic_metrics']['api_vs_local_usage'][classification_method] += 1
# Schedule delayed fill level recording
def record_delayed_metrics():
try:
# Read updated fill level after delay
with open('fill_levels.json', 'r') as f:
fill_levels = json.load(f)
current_fill_level = fill_levels.get(bin_id, 0.0)
# Record sort details
sort_record = {
'timestamp': timestamp,
'bin_id': bin_id,
'classification_result': classification_result,
'fill_level': current_fill_level,
'method': classification_method
}
self.metrics['basic_metrics']['classification_results'].append(sort_record)
# Update environmental impact
self._update_environmental_impact(classification_result)
# Update time-based metrics
self._update_time_metrics(timestamp)
# Update seasonal tracking
self._update_seasonal_tracking(timestamp)
# Update bin specialization
self._update_bin_specialization(bin_id, classification_result)
# Update fill rates
self._update_fill_rates(bin_id, current_fill_level)
# Update achievements
self._check_achievements()
# Save all updates
self.save_metrics()
except Exception as e:
print(f"Error recording delayed metrics: {e}")
# Start delayed recording thread
thread = threading.Thread(target=lambda: [time.sleep(6), record_delayed_metrics()])
thread.daemon = True
thread.start()
def _update_environmental_impact(self, classification_result: str):
"""Update environmental impact calculations"""
impact_factor = self.IMPACT_FACTORS.get(classification_result)
if impact_factor:
# Calculate environmental impact
item_weight = impact_factor['avg_item_weight']
if classification_result == 'Paper':
self.metrics['environmental_impact']['paper_weight_recycled'] += item_weight
self.metrics['environmental_impact']['trees_saved'] += item_weight * impact_factor['trees_per_kg']
elif classification_result == 'Recycling':
self.metrics['environmental_impact']['plastic_weight_recycled'] += item_weight
elif classification_result == 'Organic':
self.metrics['environmental_impact']['organic_weight_processed'] += item_weight
# Calculate CO2 savings
co2_saved = item_weight * impact_factor['co2_per_kg']
self.metrics['environmental_impact']['co2_saved'] += co2_saved
def _update_time_metrics(self, timestamp: str):
"""Update time-based metrics"""
current_time = datetime.fromisoformat(timestamp)
# Update daily/weekly/monthly counts
date_str = current_time.date().isoformat()
week_str = f"{current_time.year}-W{current_time.isocalendar()[1]}"
month_str = f"{current_time.year}-{current_time.month:02d}"
self.metrics['time_metrics']['daily_usage_counts'][date_str] = \
self.metrics['time_metrics']['daily_usage_counts'].get(date_str, 0) + 1
self.metrics['time_metrics']['weekly_usage_counts'][week_str] = \
self.metrics['time_metrics']['weekly_usage_counts'].get(week_str, 0) + 1
self.metrics['time_metrics']['monthly_usage_counts'][month_str] = \
self.metrics['time_metrics']['monthly_usage_counts'].get(month_str, 0) + 1
# Update time of day patterns
hour = current_time.hour
self.metrics['time_metrics']['time_of_day_patterns'][hour] += 1
# Update hour coverage
if hour not in self.metrics['hour_coverage']['hours_sorted']:
self.metrics['hour_coverage']['hours_sorted'].append(hour)
# Update weekend tracking
if current_time.weekday() >= 5: # Saturday = 5, Sunday = 6
weekend_dates = self.metrics['hour_coverage']['weekend_streak']['dates']
# Only add if it's a new weekend
if not weekend_dates or \
(current_time.date() - datetime.fromisoformat(weekend_dates[-1]).date()).days > 1:
weekend_dates.append(timestamp)
# Check for consecutive weekends
if len(weekend_dates) >= 2:
prev_weekend = datetime.fromisoformat(weekend_dates[-2]).date()
this_weekend = current_time.date()
weeks_diff = (this_weekend - prev_weekend).days // 7
if weeks_diff == 1: # Consecutive weekends
self.metrics['hour_coverage']['weekend_streak']['count'] += 1
elif weeks_diff > 1: # Streak broken
self.metrics['hour_coverage']['weekend_streak']['count'] = 1
def _update_fill_rates(self, bin_id: str, current_fill_level: float):
"""Update fill rates per bin"""
classification_results = self.metrics['basic_metrics']['classification_results']
if len(classification_results) >= 2:
# Find last fill level for this bin
for result in reversed(classification_results[:-1]): # Skip current result
if result['bin_id'] == bin_id:
last_time = datetime.fromisoformat(result['timestamp'])
current_time = datetime.now()
time_diff = (current_time - last_time).total_seconds() / 3600 # hours
if time_diff > 0:
level_diff = current_fill_level - result['fill_level']
fill_rate = level_diff / time_diff # percent per hour
if bin_id not in self.metrics['time_metrics']['fill_rate_per_bin']:
self.metrics['time_metrics']['fill_rate_per_bin'][bin_id] = []
self.metrics['time_metrics']['fill_rate_per_bin'][bin_id].append({
'timestamp': current_time.isoformat(),
'rate': fill_rate
})
break
def _update_bin_specialization(self, bin_id: str, classification_result: str):
"""Update bin specialization metrics"""
if bin_id not in self.metrics['bin_specialization']:
self.metrics['bin_specialization'][bin_id] = {
'total_items': 0,
'items_by_type': {},
'most_common_type': None,
'type_accuracy': 0.0
}
spec = self.metrics['bin_specialization'][bin_id]
spec['total_items'] += 1
spec['items_by_type'][classification_result] = \
spec['items_by_type'].get(classification_result, 0) + 1
# Update most common type
most_common = max(spec['items_by_type'].items(), key=lambda x: x[1])
spec['most_common_type'] = most_common[0]
spec['type_accuracy'] = (most_common[1] / spec['total_items']) * 100
def _update_seasonal_tracking(self, timestamp: str):
"""Update seasonal tracking"""
current_time = datetime.fromisoformat(timestamp)
month = current_time.month
# Simple season determination (Northern Hemisphere)
current_season = 'summer' if 5 <= month <= 8 else 'winter'
# Update season if changed
if self.metrics['seasonal_tracking']['current_season'] != current_season:
self.metrics['seasonal_tracking']['current_season'] = current_season
self.metrics['seasonal_tracking']['season_start'] = timestamp
# Increment seasonal count
self.metrics['seasonal_tracking']['seasonal_counts'][current_season] += 1
def _check_achievements(self):
"""Check and update achievements"""
if 'achievements' not in self.metrics:
self.metrics['achievements'] = {}
# Check each achievement
for achievement_id, achievement in self.ACHIEVEMENTS.items():
if 'tiers' in achievement:
# Multi-tier achievement
for tier_idx, tier in enumerate(achievement['tiers']):
achievement_key = f"{achievement_id}_{tier}"
if achievement_key not in self.metrics['achievements']:
if achievement['condition'](self.metrics, tier_idx):
self.metrics['achievements'][achievement_key] = {
'status': 'completed',
'unlock_date': datetime.now().isoformat(),
'tier': tier
}
else:
# Single-tier achievement
if achievement_id not in self.metrics['achievements']:
if achievement['condition'](self.metrics):
self.metrics['achievements'][achievement_id] = {
'status': 'completed',
'unlock_date': datetime.now().isoformat()
}
def record_bin_empty(self, bin_id: str):
"""Record when a bin is emptied"""
timestamp = datetime.now().isoformat()
# Get the fill level before emptying
try:
with open('fill_levels.json', 'r') as f:
fill_levels = json.load(f)
previous_level = fill_levels.get(bin_id, 0.0)
except Exception:
previous_level = 0.0
# Update emptying counts
self.metrics['basic_metrics']['bin_emptying_counts'][bin_id] = \
self.metrics['basic_metrics']['bin_emptying_counts'].get(bin_id, 0) + 1
# Update fill level history
if bin_id not in self.metrics['fill_level_history']:
self.metrics['fill_level_history'][bin_id] = {
'emptying_timestamps': [],
'fill_levels_at_empty': []
}
self.metrics['fill_level_history'][bin_id]['emptying_timestamps'].append(timestamp)
self.metrics['fill_level_history'][bin_id]['fill_levels_at_empty'].append(previous_level)
self.save_metrics()
def check_bin_balance(self):
"""Check if bins are balanced (within 20% of each other)"""
# Get current fill levels from file
try:
with open('fill_levels.json', 'r') as f:
fill_levels = json.load(f)
levels = list(fill_levels.values())
if not levels:
return False
max_level = max(levels)
min_level = min(levels)
return (max_level - min_level) <= 20.0
except Exception:
return False
def save_metrics(self):
"""Save metrics to JSON file"""
with open(self.metrics_file, 'w') as f:
json.dump(self.metrics, f, indent=2)
def get_metrics(self) -> Dict:
"""Get all metrics"""
return self.metrics
def get_metric(self, category: str, metric: str) -> any:
"""Get a specific metric"""
return self.metrics.get(category, {}).get(metric, None)Editor is loading...
Leave a Comment