Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
3.8 kB
9
Indexable
Never
import random
import string
from dataclasses import dataclass
from typing import Set, List
from datetime import date
from functools import wraps
from time import time
import concurrent.futures


def timing(f):
    @wraps(f)
    def wrap(*args, **kw):
        ts = time()
        result = f(*args, **kw)
        te = time()
        print('func:%r args:[%r, %r] took: %2.4f sec' % (f.__name__, args, kw, te-ts))
        return result
    return wrap
"""
The program that takes 2 parameters:
1.	Set of city id - (TLV, NY…)
2.	Aggregation type (avg, max, median,... )  

The output: 
Collection of top N cities with the aggregated temperature.


The program should  calculate the Top N cities by requested temperature aggregation whose population is over 50K people.

Notes:
1.	No need to implement the HTTP calls to the REST APIs, you can use the supplied interface above.
2.	You can implement only one aggregator type, but note that it could be any additional aggregation type (avg, max, median,... ) 
3.	Take into consideration efficiency (the number of cities could be very big)
"""


@dataclass
class City:
    id: str
    name: str
    population: int

    def __repr__(self):
        return f'{self.name} - {self.id} - {self.population}'

    def __eq__(self, other):
        if not isinstance(other, City):
            return False
        return self.id == other.id

    def __hash__(self):
        return hash(self.id)


@dataclass
class DailyTemp:
    date: date
    temperature: float


class WeatherAPI:

    @timing
    @staticmethod
    def getAllCitiesByIds(city_ids: Set[str]) -> Set[City]:
        # return cities
        return {City(
            f'{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}{random.choice(string.ascii_uppercase)}',
            'XXXXX', random.randrange(0, 10_000_000, 20)) for i in range(10_000)}

    @staticmethod
    def getLastYearTemperature(city_id: str) -> List[DailyTemp]:
        # return daily_temperatures
        return [DailyTemp(date.today(), random.randrange(-20, 35)) for i in range(365)]


def average(temps: List[DailyTemp]) -> float:
    return sum(x.temperature for x in temps) / len(temps)


def switch_agg_func(agg_type):
    if agg_type == 'AVG':
        return average
    # ...


def aggregate_city(city, agg_type):
    temps = WeatherAPI.getLastYearTemperature(city.id)
    agg_func = switch_agg_func(agg_type)
    agg_result = agg_func(temps)
    return city.id, agg_result


class Aggregation:
    @timing
    @staticmethod
    def aggregate(city_ids: Set[str], agg_type: str):
        all_cities = WeatherAPI.getAllCitiesByIds(city_ids)
        default_threshold_population = 50_000
        big_cities = filter(lambda x: x.population > default_threshold_population, all_cities)
        res_list = []

        with concurrent.futures.ThreadPoolExecutor() as executor:
            # Use list comprehension to submit tasks to the executor
            future_to_city = {executor.submit(aggregate_city, city, agg_type): city for city in big_cities}

            # Iterate through the completed futures
            for future in concurrent.futures.as_completed(future_to_city):
                city = future_to_city[future]
                try:
                    city_id, agg_result = future.result()
                    res_list.append((city_id, agg_result))
                except Exception as e:
                    print(f"Error occurred for city {city.id}: {e}")

        return res_list


if __name__ == '__main__':
    aggregation = Aggregation.aggregate({'a', 'b'}, 'AVG')
    print(aggregation, len(aggregation))