Untitled

 avatar
unknown
plain_text
2 years ago
15 kB
4
Indexable
import json
from datetime import date
import logging
import re
from pathlib import Path
from multiprocessing import Pool

from sqlalchemy import text
import pandas as pd
import numpy as np

from samesyslib.utils import sql_from_file

from db import get_conn

logging.basicConfig(
    encoding="utf-8",
    level=logging.INFO,
    format="%(asctime)s  - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)

params = {
    "NUM_OF_THREADS": 8,
    "START_DATE": "2012-01-01",
    "STOP_DATE": f"{date.today().year+1}-12-31",
    "DAILY_SCHEMA": "daily",
    "KPI_SCHEMA": "kpi_data",
    "SCHEMA_SAMESYSTEM": "samesystem",
    "OPENING_HOURS_TABLE": "opening_hours",
    "TABLE_SHOP_LIST": "v1_shop_list",
    "TABLE_PUBLIC_HOLIDAYS": "public_holidays",
    "TABLE_SHOP_HOURS": "shop_hours",
    "SCHEMA_DAILY": "daily",
    "TABLE_CALENDAR_DAYS": "calendar_days",
}
SQL_PATH = Path(__file__).absolute().parent / Path("SQL")


def read_shop_list(conn, shop_ids):
    shops = ", ".join(str(x) for x in shop_ids)
    shop_list = conn.get(
        text(
            f"""
        SELECT distinct client_id, shop_id, country_id
        FROM daily.v1_shop_list
        WHERE shop_id IN ({shops});
    """
        )
    ).set_index("shop_id")
    return shop_list


def shop_hours_calculate(data_dict):
    """
    Logic to calculate working hours from the same system "compressed format"
    Method populates working hours according to dayofweek, taking into account
    exception setting, where each week of month could have different setting

    Input:
        data_dict['shop_hours'] -- columns
                shop_id,
                schedule_change_date,
                dayofweek,
                shop_open,
                shop_start_time,
                shop_end_time,
                exceptions_list
        data_dict['data'] -- columns
                date,
                dayofweek,
                year,
                month,
                weekofmonth,
                weekofmonth_max
    Output: pandas data frame with columns
     shop_id,
     shop_open,
     shop_start_time,
     shop_end_time
    """

    data = data_dict["data"].copy()
    shop_hours = data_dict["shop_hours"]

    if shop_hours.shape[0] > 0:
        dates = np.append(
            shop_hours.schedule_change_date.sort_values().unique(),
            np.array(["2100-12-01T00:00:00.000000000"], dtype="datetime64[ns]"),
        )

        data["schedule_change_date"] = pd.cut(
            data.date, bins=dates, right=False, labels=dates[:-1]
        ).astype("datetime64[ns]")
        data = data.merge(
            shop_hours, on=["schedule_change_date", "dayofweek"], how="left"
        )

        for index, row in shop_hours[shop_hours.exceptions_list.notnull()].iterrows():
            idx_period = (data.schedule_change_date == row["schedule_change_date"]) & (
                data["dayofweek"] == row["dayofweek"]
            )
            for e_dict in row["exceptions_list"]:
                week_of_month = int(e_dict["type"]) + 1
                open_shop = 1 if e_dict["open_shop"] else 0
                if week_of_month == 5:
                    idx_max_wom = (
                        data["weekofmonth"] == data["weekofmonth_max"]
                    ) & idx_period

                    data.loc[idx_max_wom, "shop_open"] = open_shop
                    data.loc[idx_max_wom, "shop_start_time"] = float(
                        e_dict["start_time"]
                    )
                    data.loc[idx_max_wom, "shop_end_time"] = float(e_dict["end_time"])
                else:
                    idx_wom = (data["weekofmonth"] == week_of_month) & idx_period
                    data.loc[idx_wom, "shop_open"] = open_shop
                    data.loc[idx_wom, "shop_start_time"] = float(e_dict["start_time"])
                    data.loc[idx_wom, "shop_end_time"] = float(e_dict["end_time"])
    else:
        data["shop_id"] = data_dict["shop_id"]
        data["shop_open"] = np.nan
        data["shop_start_time"] = np.nan
        data["shop_end_time"] = np.nan

    return data[["shop_id", "date", "shop_open", "shop_start_time", "shop_end_time"]]


def load_json(x):
    """
    Small function to load json variables
    Used in pandas apply method
    """
    if x:
        y = json.loads(x)
        if len(y) > 0:
            return y
        else:
            None
    else:
        return None


def get_data_from_query(
    sql_dir, table, conn, params=None, batch=None, client_list=None, verbose=True
):
    sql_file = Path(sql_dir, f"{table}.sql")
    if batch is not None:
        shop_list = ",".join([str(s) for s in batch])
    else:
        shop_list = None
    if client_list is not None:
        client_list = ",".join([str(s) for s in client_list])

    query = sql_from_file(sql_file)[0].format(
        params=params, shop_list=shop_list, client_list=client_list
    )
    df = conn.get(text(query))
    return df


def weekly_transformation(
    data,
    month="month",
    week_of_month="weekofmonth",
    max_week_of_month="max_weekofmonth",
):
    """
    :param data:
    :param month:
    :param week_of_month:
    :param max_week_of_month:
    :return:
    transformed weekdays from range [1.0 -- 12.8]
    X.0 is first week of month, X.8 is last week of month, X is month number
    """
    out = data[month] * 10 + (data[week_of_month] - 1) * 2
    idx = data[week_of_month] == data[max_week_of_month]
    out[idx] = data.loc[idx, month] * 10 + 8
    return out


def add_date_info(data, date_name="date"):
    # should be called only for full months!

    # if month is not full, max_week of month is incorrect
    # lines covered by ifs should move into data pipeline phase
    if "dayofweek" not in data.columns:
        data["dayofweek"] = data[date_name].dt.dayofweek
    if "dayofmonth" not in data.columns:
        data["dayofmonth"] = data[date_name].dt.day
    if "weekofmonth" not in data.columns:
        data["weekofmonth"] = np.ceil(data[date_name].dt.day / 7).astype(int)
    if "weekofyear" not in data.columns:
        data["weekofyear"] = data[date_name].dt.isocalendar().week
    if "year" not in data.columns:
        data["year"] = data[date_name].dt.year
    if "month" not in data.columns:
        data["month"] = data[date_name].dt.month

    if "weekofmonth_max" not in data.columns:
        data = data.join(
            data.groupby(["year", "month", "dayofweek"])
            .agg({"weekofmonth": "max"})
            .astype(int)
            .rename(columns={"weekofmonth": "weekofmonth_max"}),
            on=["year", "month", "dayofweek"],
            how="left",
        )
    if "wom_group" not in data.columns:
        data["wom_group"] = weekly_transformation(
            data,
            month="month",
            week_of_month="weekofmonth",
            max_week_of_month="weekofmonth_max",
        ).astype(int)
    return data


run_id = pd.Timestamp.utcnow().to_pydatetime()
shop_list = [15101]


def _generate_future_ts():
    logger.info("Generating ts features dataset")
    features = pd.DataFrame(
        {
            "date": pd.date_range(
                end=params["STOP_DATE"],
                start=params["START_DATE"],
                freq="D",
            )
        }
    )
    features = add_date_info(features)

    features["date_join_index"] = pd.NaT
    idx = features.year == pd.to_datetime(params["START_DATE"]).year
    features.loc[idx, "date_join_index"] = features.loc[idx, "date"]
    features.loc[idx, "month_adj"] = features.loc[idx, "month"]
    features.loc[idx, "dayofmonth_adj"] = features.loc[idx, "dayofmonth"]
    features.loc[~idx, "month_adj"] = np.nan
    features.loc[~idx, "dayofmonth_adj"] = np.nan
    for year in features.year.sort_values().unique()[1:]:
        idx = features.year == year
        features.loc[idx, "date_join_index"] = features.loc[
            idx, "date"
        ] - pd.DateOffset(weeks=52)

    bins = np.arange(
        features.month_adj.last_valid_index() + 1, features.shape[0], 350
    ).tolist() + [features.shape[0]]

    for bin1, bin2 in zip(bins[:-1], bins[1:]):
        block = features.iloc[bin1:bin2][["date_join_index"]].join(
            features.set_index("date")[["month_adj", "dayofmonth_adj"]],
            on="date_join_index",
            how="left",
        )[["month_adj", "dayofmonth_adj"]]
        features.loc[block.index, ["month_adj", "dayofmonth_adj"]] = block[
            ["month_adj", "dayofmonth_adj"]
        ]
    features["month_adj"] = features["month_adj"].astype(int)
    features["dayofmonth_adj"] = features["dayofmonth_adj"].astype(int)
    features.drop(["date_join_index"], axis=1, inplace=True)

    future_ts = features.loc[
        :, ["date", "dayofweek", "year", "month", "weekofmonth", "weekofmonth_max"]
    ]
    future_ts.date = pd.to_datetime(future_ts.date)

    return future_ts


def run(conn, shop_ids, client_ids):
    future_ts = _generate_future_ts()

    shop_list = read_shop_list(conn, shop_ids)
    shop_ids_joined = ",".join([str(x) for x in shop_list.index.to_list()])
    SQL = f"SELECT DISTINCT client_id FROM samesystem.shops WHERE id in ({shop_ids_joined})"
    client_batch = conn.get(text(SQL)).client_id.to_list()

    logger.info("Reading schedule information")
    shop_hours = get_data_from_query(
        sql_dir=SQL_PATH,
        table="shop_hours",
        conn=conn,
        params=params,
        batch=shop_ids,
    )
    shop_hours.schedule_change_date = pd.to_datetime(shop_hours.schedule_change_date)
    # extract exceptiosn json into list of dicts
    shop_hours["exceptions_list"] = shop_hours.exceptions_json.apply(load_json)

    logger.info("Reading public holidays **shop** level information")
    public_holidays_shop = get_data_from_query(
        sql_dir=SQL_PATH,
        table="public_holidays_shop",
        conn=conn,
        params=params,
        batch=shop_ids,
    )
    public_holidays_shop.date = pd.to_datetime(
        public_holidays_shop.date, errors="coerce"
    )
    public_holidays_shop[
        "shop_open_shop_list"
    ] = public_holidays_shop.shop_open_shops.apply(
        lambda x: re.findall("\d+", x) if x is not None else None
    )
    public_holidays_shop.drop("shop_open_shops", axis=1, inplace=True)

    logger.info("Reading public holidays **client** level information")
    public_holidays_client = get_data_from_query(
        sql_dir=SQL_PATH,
        table="public_holidays_client",
        conn=conn,
        params=params,
        client_list=client_batch,
    )
    public_holidays_client.date = pd.to_datetime(
        public_holidays_client.date, errors="coerce"
    )
    public_holidays_client[
        "client_open_shop_list"
    ] = public_holidays_client.client_open_shops.apply(
        lambda x: re.findall("\d+", x) if x is not None else None
    )
    public_holidays_client.drop("client_open_shops", axis=1, inplace=True)

    logger.info("Reading calendar_days")
    calendar_days = get_data_from_query(
        sql_dir=SQL_PATH,
        table="calendar_days",
        conn=conn,
        params=params,
        batch=shop_ids,
    )
    calendar_days.date = pd.to_datetime(calendar_days.date, errors="coerce")

    logger.info(
        f"Starting multithreaded shops hours calculation with NUM_OF_THREADS={params['NUM_OF_THREADS']}"
    )

    # DEBUG
    # for shop_id in shop_ids:
    #     shop_hours_calculate({
    #                 "shop_id": shop_id,
    #                 "data": future_ts,
    #                 "shop_hours": shop_hours[shop_hours.shop_id == shop_id],
    #             })
    # return

    with Pool(params["NUM_OF_THREADS"]) as p:
        result = p.map(
            shop_hours_calculate,
            (
                {
                    "shop_id": shop_id,
                    "data": future_ts,
                    "shop_hours": shop_hours[shop_hours.shop_id == shop_id],
                }
                for shop_id in shop_ids
            ),
        )
    sh_hr = pd.concat(result)

    logger.info("Merging calendar_days")
    sh_hr = sh_hr.merge(calendar_days, on=["shop_id", "date"], how="left")

    sh_hr = sh_hr.merge(shop_list.reset_index(), on=["shop_id"], how="left")

    logger.info("Merging public_holidays")
    sh_hr = sh_hr.merge(public_holidays_client, on=["client_id", "date"], how="left")

    sh_hr = sh_hr.merge(public_holidays_shop, on=["shop_id", "date"], how="left")

    logger.info("Identify shop exceptions")
    idx_client_lvl_exceptions = sh_hr[sh_hr.client_open_shop_list.notnull()].apply(
        lambda x: str(x["shop_id"]) in x["client_open_shop_list"], axis=1
    )
    sh_hr["client_active_after_exceptions"] = sh_hr.client_active
    if len(idx_client_lvl_exceptions) > 0:
        sh_hr.loc[
            idx_client_lvl_exceptions[idx_client_lvl_exceptions].index,
            "client_active_after_exceptions",
        ] = np.NaN

    idx_shop_lvl_exceptions = sh_hr[sh_hr.shop_open_shop_list.notnull()].apply(
        lambda x: str(x["shop_id"]) in x["shop_open_shop_list"], axis=1
    )
    sh_hr["shop_active_after_exceptions"] = sh_hr.shop_active
    if len(idx_shop_lvl_exceptions) > 0:
        sh_hr.loc[
            idx_shop_lvl_exceptions[idx_shop_lvl_exceptions].index,
            "shop_active_after_exceptions",
        ] = np.NaN

    logger.info(
        "Filling in opening/closing hours according to schedule and calendar days"
    )

    sh_hr[["opening_hour", "closing_hour", "open"]] = sh_hr[
        ["shop_start_time", "shop_end_time", "shop_open"]
    ]

    # calendar_lines takes priority over shop_hours
    idx = sh_hr.cl_open_shop.notna()
    sh_hr["open"] = np.where(idx, sh_hr.cl_open_shop, sh_hr.open)
    sh_hr.loc[idx, "opening_hour"] = sh_hr.loc[idx, "cl_start_time"]
    sh_hr.loc[idx, "closing_hour"] = sh_hr.loc[idx, "cl_end_time"]

    # public holidays takes priority over shop_hours and calendar_lines
    sh_hr.loc[(sh_hr.shop_active_after_exceptions == 1), "open"] = 0
    sh_hr.loc[(sh_hr.client_active_after_exceptions == 1), "open"] = 0

    opening_hours_table = sh_hr.loc[
        :, ["shop_id", "date", "open", "opening_hour", "closing_hour"]
    ]
    opening_hours_table["date"] = pd.to_datetime(opening_hours_table["date"])
    opening_hours_table["run_id"] = run_id
    opening_hours_table["last_updated"] = pd.Timestamp.utcnow().to_pydatetime()

    if len(opening_hours_table) > 0:
        logger.info("Upload data using upsert_table")
        opening_hours_table["shop_id"] = opening_hours_table["shop_id"].astype(int)

        cols_df = conn.get(
            text(
                f"""
            SELECT *
            FROM {params['KPI_SCHEMA']}.{params['OPENING_HOURS_TABLE']}
            LIMIT 0;
        """
            )
        ).columns

        conn.send_replace(
            opening_hours_table[cols_df],
            table=params['OPENING_HOURS_TABLE'],
            schema=params['KPI_SCHEMA'],
            if_exists="append",
        )

    else:
        logger.info("No opening_hours data to update")

    success_shops = set(sh_hr.shop_id)

    logger.info(
        f"Script finished! Shops updated: {success_shops}"
    )


if __name__ == "__main__":
    conn = get_conn("ds", "shard1", schema="kpi_data")
    # run(conn=conn, shop_ids=[1019, 1020, 1021, 1036, 1037, 1038, 1039, 1040], client_ids=[])
    run(conn=conn, shop_ids=[1019], client_ids=[])
Editor is loading...