mail@pastecode.io avatar
a year ago
3.8 kB
import random
import string
from dataclasses import dataclass
from typing import Set, List
from datetime import date
from functools import wraps
from time import time
import concurrent.futures

def timing(f):
    def wrap(*args, **kw):
        ts = time()
        result = f(*args, **kw)
        te = time()
        print('func:%r args:[%r, %r] took: %2.4f sec' % (f.__name__, args, kw, te-ts))
        return result
    return wrap
The program that takes 2 parameters:
1.	Set of city id - (TLV, NY…)
2.	Aggregation type (avg, max, median,... )  

The output: 
Collection of top N cities with the aggregated temperature.

The program should  calculate the Top N cities by requested temperature aggregation whose population is over 50K people.

1.	No need to implement the HTTP calls to the REST APIs, you can use the supplied interface above.
2.	You can implement only one aggregator type, but note that it could be any additional aggregation type (avg, max, median,... ) 
3.	Take into consideration efficiency (the number of cities could be very big)

class City:
    id: str
    name: str
    population: int

    def __repr__(self):
        return f'{self.name} - {self.id} - {self.population}'

    def __eq__(self, other):
        if not isinstance(other, City):
            return False
        return self.id == other.id

    def __hash__(self):
        return hash(self.id)

class DailyTemp:
    date: date
    temperature: float

class WeatherAPI:

    def getAllCitiesByIds(city_ids: Set[str]) -> Set[City]:
        # return cities
        return {City(
            'XXXXX', random.randrange(0, 10_000_000, 20)) for i in range(10_000)}

    def getLastYearTemperature(city_id: str) -> List[DailyTemp]:
        # return daily_temperatures
        return [DailyTemp(date.today(), random.randrange(-20, 35)) for i in range(365)]

def average(temps: List[DailyTemp]) -> float:
    return sum(x.temperature for x in temps) / len(temps)

def switch_agg_func(agg_type):
    if agg_type == 'AVG':
        return average
    # ...

def aggregate_city(city, agg_type):
    temps = WeatherAPI.getLastYearTemperature(city.id)
    agg_func = switch_agg_func(agg_type)
    agg_result = agg_func(temps)
    return city.id, agg_result

class Aggregation:
    def aggregate(city_ids: Set[str], agg_type: str):
        all_cities = WeatherAPI.getAllCitiesByIds(city_ids)
        default_threshold_population = 50_000
        big_cities = filter(lambda x: x.population > default_threshold_population, all_cities)
        res_list = []

        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Use list comprehension to submit tasks to the executor
            future_to_city = {executor.submit(aggregate_city, city, agg_type): city for city in big_cities}

            # Iterate through the completed futures
            for future in concurrent.futures.as_completed(future_to_city):
                city = future_to_city[future]
                    city_id, agg_result = future.result()
                    res_list.append((city_id, agg_result))
                except Exception as e:
                    print(f"Error occurred for city {city.id}: {e}")

        return res_list

if __name__ == '__main__':
    aggregation = Aggregation.aggregate({'a', 'b'}, 'AVG')
    print(aggregation, len(aggregation))