Untitled

mail@pastecode.io avatar
unknown
plain_text
25 days ago
8.4 kB
2
Indexable
Never
import datetime
import sklearn
import typing as tp
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression

X_type = tp.NewType("X_type", np.ndarray)
X_row_type = tp.NewType("X_row_type", np.ndarray)
Y_type = tp.NewType("Y_type", np.ndarray)
TS_type = tp.NewType("TS_type", pd.Series)
Model_type = tp.TypeVar("Model_type")


def read_timeseries(path_to_df: str = "train.csv") -> TS_type:
    """Функция для чтения данных и получения обучающей и тестовой выборок"""
    df = pd.read_csv(path_to_df)
    df = df[(df['store'] == 1) & (df['item'] == 1)]
    df["date"] = pd.to_datetime(df["date"])
    df = df.set_index("date")
    ts = df["sales"]
    train_ts = ts[:-365]
    test_ts = ts[-365:]
    return train_ts, test_ts


def extract_hybrid_strategy_features(
    timeseries: TS_type,
    model_idx: int,
    window_size: int = 7
) -> X_row_type:
    """
    Функция для получения вектора фичей согласно гибридной схеме. На вход подаётся временной ряд
    до момента T, функция выделяет из него фичи, необходимые модели под номером model_idx для
    прогноза на момент времени T
    
    Args:
        timeseries --- временной ряд до момента времени T (не включительно), pd.Series с датой 
                       в качестве индекса
        model_idx --- индекс модели, то есть номер шага прогноза, 
                      для которого нужно получить признаки, нумерация с нуля
        window_size --- количество последних значений ряда, используемых для прогноза 
                        (без учёта количества прогнозов с предыдущих этапов)

    Returns:
        Одномерный вектор фичей для модели с индексом model_idx (np.array), 
        чтобы сделать прогноз для момента времени T
    """
    feature_window = window_size + model_idx
    return timeseries[-feature_window:].values


def build_datasets(
    timeseries: TS_type,
    extract_features: tp.Callable[..., X_row_type],
    window_size: int,
    model_count: int
) -> tp.List[tp.Tuple[X_type, Y_type]]:
    """
    Функция для получения обучающих датасетов согласно гибридной схеме
    
    Args:
        timeseries --- временной ряд
        extract_features --- функция для генерации вектора фичей
        window_size --- количество последних значений ряда, используемых для прогноза
        model_count --- количество моделей, используемых для получения предскзаний 

    Returns:
        Список из model_count датасетов, i-й датасет используется для обучения i-й модели 
        и представляет собой пару из двумерного массива фичей и одномерного массива таргетов
    """
    datasets = []
    
    for model_idx in range(model_count):
        X, y = [], timeseries[model_idx + window_size: ].values
        for i in range(len(y)):
            X.append(extract_features(timeseries[:i + model_idx + window_size], model_idx, window_size))
        datasets.append((np.array(X), np.array(y)))
    
    assert len(datasets) == model_count
    return datasets

def predict(
    timeseries: TS_type,
    models: tp.List[Model_type],
    extract_features: tp.Callable[..., X_row_type] = extract_hybrid_strategy_features
):
    """
    Функция для получения прогноза len(models) следующих значений временного ряда
    
    Args:
        timeseries --- временной ряд, по которому необходимо сделать прогноз на следующие даты
        models --- список обученных моделей, i-я модель используется для получения i-го прогноза
        extract_features --- функция для генерации вектора фичей. Если вы реализуете свою функцию 
                             извлечения фичей для конечной модели, передавайте этим аргументом.
                             Внутри функции predict функцию extract_features нужно вызывать только
                             с аргументами timeseries и model_idx, остальные должны быть со значениями
                             по умолчанию

    Returns:
        Прогноз len(models) следующих значений временного ряда
    """
    predictions, prediction_inds = [], []
    current_day = timeseries.index.max()
    
    for model_idx, model in enumerate(models):
        if model_idx == 0:
            features = extract_features(timeseries, model_idx)
        else:
            features = extract_features(
                pd.concat([timeseries, pd.Series(index=prediction_inds, data=predictions)]),
                model_idx
            )
        prediction = model.predict([features])#[0]
        predictions = np.append(predictions, prediction)
        prediction_inds = np.append(prediction_inds, current_day + pd.Timedelta(days=1))
        current_day = prediction_inds[-1]

    return predictions # pd.Series(predictions, index=pd.date_range(start=timeseries.index[-1], periods=len(predictions), freq='D'))


def train_models(
    train_timeseries: TS_type,
    model_count: int
) -> tp.List[Model_type]:
    """
    Функция для получения обученных моделей
    
    Args:
        train_timeseries --- обучающий временной ряд
        model_count --- количество моделей для обучения согласно гибридной схеме.
                        Прогнозирование должно выполняться на model_count дней вперёд

    Returns:
        Список из len(datasets) обученных моделей
    """
    models = []
    datasets = build_datasets(train_timeseries, extract_hybrid_strategy_features, window_size=7, model_count=model_count)
    
    for X, y in datasets:
        model = LinearRegression()
        model.fit(X, y)
        models.append(model)
    
    assert len(models) == len(datasets)
    return models


def score_models(
    train_ts: TS_type,
    test_ts: TS_type, 
    models: tp.List[Model_type],
    predict: tp.Callable[[TS_type, tp.List[Model_type]], TS_type] = predict
):
    """
    Функция для оценки качества обученных моделей по метрике MSE
    
    Args:
        train_ts --- обучающий временной ряд
        test_ts --- тестовый временной ряд
        models --- список обученных моделей
        predict --- функция для получения прогноза временного ряда

    Returns:
        Усредненное MSE для прогноза моделей по всей тестовой выборке
    """
    predict_len = len(models)
    predictions = []
    targets = []

    for i in range(len(test_ts) - predict_len + 1):
        predictions.extend(list(predict(train_ts, models)))
        targets.extend(list(test_ts[i:i+predict_len]))
        train_ts = pd.concat((train_ts, test_ts[i:i+1]), axis=0) 

    return sklearn.metrics.mean_squared_error(targets, predictions)


# train_ts, test_ts = read_timeseries()
# models = train_models(train_ts, 7)
# score_models(train_ts, test_ts, models)
Leave a Comment