Untitled
unknown
plain_text
2 years ago
3.8 kB
15
Indexable
import random
import string
from dataclasses import dataclass
from typing import Set, List
from datetime import date
from functools import wraps
from time import time
import concurrent.futures
def timing(f):
@wraps(f)
def wrap(*args, **kw):
ts = time()
result = f(*args, **kw)
te = time()
print('func:%r args:[%r, %r] took: %2.4f sec' % (f.__name__, args, kw, te-ts))
return result
return wrap
"""
The program that takes 2 parameters:
1. Set of city id - (TLV, NY…)
2. Aggregation type (avg, max, median,... )
The output:
Collection of top N cities with the aggregated temperature.
The program should calculate the Top N cities by requested temperature aggregation whose population is over 50K people.
Notes:
1. No need to implement the HTTP calls to the REST APIs, you can use the supplied interface above.
2. You can implement only one aggregator type, but note that it could be any additional aggregation type (avg, max, median,... )
3. Take into consideration efficiency (the number of cities could be very big)
"""
@dataclass
class City:
id: str
name: str
population: int
def __repr__(self):
return f'{self.name} - {self.id} - {self.population}'
def __eq__(self, other):
if not isinstance(other, City):
return False
return self.id == other.id
def __hash__(self):
return hash(self.id)
@dataclass
class DailyTemp:
date: date
temperature: float
class WeatherAPI:
@timing
@staticmethod
def getAllCitiesByIds(city_ids: Set[str]) -> Set[City]:
# return cities
return {City(
f'{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}',
'XXXXX', random.randrange(0, 10_000_000, 20)) for i in range(10_000)}
@staticmethod
def getLastYearTemperature(city_id: str) -> List[DailyTemp]:
# return daily_temperatures
return [DailyTemp(date.today(), random.randrange(-20, 35)) for i in range(365)]
def average(temps: List[DailyTemp]) -> float:
return sum(x.temperature for x in temps) / len(temps)
def switch_agg_func(agg_type):
if agg_type == 'AVG':
return average
# ...
def aggregate_city(city, agg_type):
temps = WeatherAPI.getLastYearTemperature(city.id)
agg_func = switch_agg_func(agg_type)
agg_result = agg_func(temps)
return city.id, agg_result
class Aggregation:
@timing
@staticmethod
def aggregate(city_ids: Set[str], agg_type: str):
all_cities = WeatherAPI.getAllCitiesByIds(city_ids)
default_threshold_population = 50_000
big_cities = filter(lambda x: x.population > default_threshold_population, all_cities)
res_list = []
with concurrent.futures.ThreadPoolExecutor() as executor:
# Use list comprehension to submit tasks to the executor
future_to_city = {executor.submit(aggregate_city, city, agg_type): city for city in big_cities}
# Iterate through the completed futures
for future in concurrent.futures.as_completed(future_to_city):
city = future_to_city[future]
try:
city_id, agg_result = future.result()
res_list.append((city_id, agg_result))
except Exception as e:
print(f"Error occurred for city {city.id}: {e}")
return res_list
if __name__ == '__main__':
aggregation = Aggregation.aggregate({'a', 'b'}, 'AVG')
print(aggregation, len(aggregation))
Editor is loading...