Untitled
unknown
plain_text
2 years ago
3.8 kB
12
Indexable
import random import string from dataclasses import dataclass from typing import Set, List from datetime import date from functools import wraps from time import time import concurrent.futures def timing(f): @wraps(f) def wrap(*args, **kw): ts = time() result = f(*args, **kw) te = time() print('func:%r args:[%r, %r] took: %2.4f sec' % (f.__name__, args, kw, te-ts)) return result return wrap """ The program that takes 2 parameters: 1. Set of city id - (TLV, NY…) 2. Aggregation type (avg, max, median,... ) The output: Collection of top N cities with the aggregated temperature. The program should calculate the Top N cities by requested temperature aggregation whose population is over 50K people. Notes: 1. No need to implement the HTTP calls to the REST APIs, you can use the supplied interface above. 2. You can implement only one aggregator type, but note that it could be any additional aggregation type (avg, max, median,... ) 3. Take into consideration efficiency (the number of cities could be very big) """ @dataclass class City: id: str name: str population: int def __repr__(self): return f'{self.name} - {self.id} - {self.population}' def __eq__(self, other): if not isinstance(other, City): return False return self.id == other.id def __hash__(self): return hash(self.id) @dataclass class DailyTemp: date: date temperature: float class WeatherAPI: @timing @staticmethod def getAllCitiesByIds(city_ids: Set[str]) -> Set[City]: # return cities return {City( f'{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}', 'XXXXX', random.randrange(0, 10_000_000, 20)) for i in range(10_000)} @staticmethod def getLastYearTemperature(city_id: str) -> List[DailyTemp]: # return daily_temperatures return [DailyTemp(date.today(), random.randrange(-20, 35)) for i in range(365)] def average(temps: List[DailyTemp]) -> float: return sum(x.temperature for x in temps) / len(temps) def switch_agg_func(agg_type): if agg_type == 'AVG': return average # ... def aggregate_city(city, agg_type): temps = WeatherAPI.getLastYearTemperature(city.id) agg_func = switch_agg_func(agg_type) agg_result = agg_func(temps) return city.id, agg_result class Aggregation: @timing @staticmethod def aggregate(city_ids: Set[str], agg_type: str): all_cities = WeatherAPI.getAllCitiesByIds(city_ids) default_threshold_population = 50_000 big_cities = filter(lambda x: x.population > default_threshold_population, all_cities) res_list = [] with concurrent.futures.ThreadPoolExecutor() as executor: # Use list comprehension to submit tasks to the executor future_to_city = {executor.submit(aggregate_city, city, agg_type): city for city in big_cities} # Iterate through the completed futures for future in concurrent.futures.as_completed(future_to_city): city = future_to_city[future] try: city_id, agg_result = future.result() res_list.append((city_id, agg_result)) except Exception as e: print(f"Error occurred for city {city.id}: {e}") return res_list if __name__ == '__main__': aggregation = Aggregation.aggregate({'a', 'b'}, 'AVG') print(aggregation, len(aggregation))
Editor is loading...