Untitled
unknown
plain_text
2 years ago
15 kB
4
Indexable
import json from datetime import date import logging import re from pathlib import Path from multiprocessing import Pool from sqlalchemy import text import pandas as pd import numpy as np from samesyslib.utils import sql_from_file from db import get_conn logging.basicConfig( encoding="utf-8", level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger(__name__) params = { "NUM_OF_THREADS": 8, "START_DATE": "2012-01-01", "STOP_DATE": f"{date.today().year+1}-12-31", "DAILY_SCHEMA": "daily", "KPI_SCHEMA": "kpi_data", "SCHEMA_SAMESYSTEM": "samesystem", "OPENING_HOURS_TABLE": "opening_hours", "TABLE_SHOP_LIST": "v1_shop_list", "TABLE_PUBLIC_HOLIDAYS": "public_holidays", "TABLE_SHOP_HOURS": "shop_hours", "SCHEMA_DAILY": "daily", "TABLE_CALENDAR_DAYS": "calendar_days", } SQL_PATH = Path(__file__).absolute().parent / Path("SQL") def read_shop_list(conn, shop_ids): shops = ", ".join(str(x) for x in shop_ids) shop_list = conn.get( text( f""" SELECT distinct client_id, shop_id, country_id FROM daily.v1_shop_list WHERE shop_id IN ({shops}); """ ) ).set_index("shop_id") return shop_list def shop_hours_calculate(data_dict): """ Logic to calculate working hours from the same system "compressed format" Method populates working hours according to dayofweek, taking into account exception setting, where each week of month could have different setting Input: data_dict['shop_hours'] -- columns shop_id, schedule_change_date, dayofweek, shop_open, shop_start_time, shop_end_time, exceptions_list data_dict['data'] -- columns date, dayofweek, year, month, weekofmonth, weekofmonth_max Output: pandas data frame with columns shop_id, shop_open, shop_start_time, shop_end_time """ data = data_dict["data"].copy() shop_hours = data_dict["shop_hours"] if shop_hours.shape[0] > 0: dates = np.append( shop_hours.schedule_change_date.sort_values().unique(), np.array(["2100-12-01T00:00:00.000000000"], dtype="datetime64[ns]"), ) data["schedule_change_date"] = pd.cut( data.date, bins=dates, right=False, labels=dates[:-1] ).astype("datetime64[ns]") data = data.merge( shop_hours, on=["schedule_change_date", "dayofweek"], how="left" ) for index, row in shop_hours[shop_hours.exceptions_list.notnull()].iterrows(): idx_period = (data.schedule_change_date == row["schedule_change_date"]) & ( data["dayofweek"] == row["dayofweek"] ) for e_dict in row["exceptions_list"]: week_of_month = int(e_dict["type"]) + 1 open_shop = 1 if e_dict["open_shop"] else 0 if week_of_month == 5: idx_max_wom = ( data["weekofmonth"] == data["weekofmonth_max"] ) & idx_period data.loc[idx_max_wom, "shop_open"] = open_shop data.loc[idx_max_wom, "shop_start_time"] = float( e_dict["start_time"] ) data.loc[idx_max_wom, "shop_end_time"] = float(e_dict["end_time"]) else: idx_wom = (data["weekofmonth"] == week_of_month) & idx_period data.loc[idx_wom, "shop_open"] = open_shop data.loc[idx_wom, "shop_start_time"] = float(e_dict["start_time"]) data.loc[idx_wom, "shop_end_time"] = float(e_dict["end_time"]) else: data["shop_id"] = data_dict["shop_id"] data["shop_open"] = np.nan data["shop_start_time"] = np.nan data["shop_end_time"] = np.nan return data[["shop_id", "date", "shop_open", "shop_start_time", "shop_end_time"]] def load_json(x): """ Small function to load json variables Used in pandas apply method """ if x: y = json.loads(x) if len(y) > 0: return y else: None else: return None def get_data_from_query( sql_dir, table, conn, params=None, batch=None, client_list=None, verbose=True ): sql_file = Path(sql_dir, f"{table}.sql") if batch is not None: shop_list = ",".join([str(s) for s in batch]) else: shop_list = None if client_list is not None: client_list = ",".join([str(s) for s in client_list]) query = sql_from_file(sql_file)[0].format( params=params, shop_list=shop_list, client_list=client_list ) df = conn.get(text(query)) return df def weekly_transformation( data, month="month", week_of_month="weekofmonth", max_week_of_month="max_weekofmonth", ): """ :param data: :param month: :param week_of_month: :param max_week_of_month: :return: transformed weekdays from range [1.0 -- 12.8] X.0 is first week of month, X.8 is last week of month, X is month number """ out = data[month] * 10 + (data[week_of_month] - 1) * 2 idx = data[week_of_month] == data[max_week_of_month] out[idx] = data.loc[idx, month] * 10 + 8 return out def add_date_info(data, date_name="date"): # should be called only for full months! # if month is not full, max_week of month is incorrect # lines covered by ifs should move into data pipeline phase if "dayofweek" not in data.columns: data["dayofweek"] = data[date_name].dt.dayofweek if "dayofmonth" not in data.columns: data["dayofmonth"] = data[date_name].dt.day if "weekofmonth" not in data.columns: data["weekofmonth"] = np.ceil(data[date_name].dt.day / 7).astype(int) if "weekofyear" not in data.columns: data["weekofyear"] = data[date_name].dt.isocalendar().week if "year" not in data.columns: data["year"] = data[date_name].dt.year if "month" not in data.columns: data["month"] = data[date_name].dt.month if "weekofmonth_max" not in data.columns: data = data.join( data.groupby(["year", "month", "dayofweek"]) .agg({"weekofmonth": "max"}) .astype(int) .rename(columns={"weekofmonth": "weekofmonth_max"}), on=["year", "month", "dayofweek"], how="left", ) if "wom_group" not in data.columns: data["wom_group"] = weekly_transformation( data, month="month", week_of_month="weekofmonth", max_week_of_month="weekofmonth_max", ).astype(int) return data run_id = pd.Timestamp.utcnow().to_pydatetime() shop_list = [15101] def _generate_future_ts(): logger.info("Generating ts features dataset") features = pd.DataFrame( { "date": pd.date_range( end=params["STOP_DATE"], start=params["START_DATE"], freq="D", ) } ) features = add_date_info(features) features["date_join_index"] = pd.NaT idx = features.year == pd.to_datetime(params["START_DATE"]).year features.loc[idx, "date_join_index"] = features.loc[idx, "date"] features.loc[idx, "month_adj"] = features.loc[idx, "month"] features.loc[idx, "dayofmonth_adj"] = features.loc[idx, "dayofmonth"] features.loc[~idx, "month_adj"] = np.nan features.loc[~idx, "dayofmonth_adj"] = np.nan for year in features.year.sort_values().unique()[1:]: idx = features.year == year features.loc[idx, "date_join_index"] = features.loc[ idx, "date" ] - pd.DateOffset(weeks=52) bins = np.arange( features.month_adj.last_valid_index() + 1, features.shape[0], 350 ).tolist() + [features.shape[0]] for bin1, bin2 in zip(bins[:-1], bins[1:]): block = features.iloc[bin1:bin2][["date_join_index"]].join( features.set_index("date")[["month_adj", "dayofmonth_adj"]], on="date_join_index", how="left", )[["month_adj", "dayofmonth_adj"]] features.loc[block.index, ["month_adj", "dayofmonth_adj"]] = block[ ["month_adj", "dayofmonth_adj"] ] features["month_adj"] = features["month_adj"].astype(int) features["dayofmonth_adj"] = features["dayofmonth_adj"].astype(int) features.drop(["date_join_index"], axis=1, inplace=True) future_ts = features.loc[ :, ["date", "dayofweek", "year", "month", "weekofmonth", "weekofmonth_max"] ] future_ts.date = pd.to_datetime(future_ts.date) return future_ts def run(conn, shop_ids, client_ids): future_ts = _generate_future_ts() shop_list = read_shop_list(conn, shop_ids) shop_ids_joined = ",".join([str(x) for x in shop_list.index.to_list()]) SQL = f"SELECT DISTINCT client_id FROM samesystem.shops WHERE id in ({shop_ids_joined})" client_batch = conn.get(text(SQL)).client_id.to_list() logger.info("Reading schedule information") shop_hours = get_data_from_query( sql_dir=SQL_PATH, table="shop_hours", conn=conn, params=params, batch=shop_ids, ) shop_hours.schedule_change_date = pd.to_datetime(shop_hours.schedule_change_date) # extract exceptiosn json into list of dicts shop_hours["exceptions_list"] = shop_hours.exceptions_json.apply(load_json) logger.info("Reading public holidays **shop** level information") public_holidays_shop = get_data_from_query( sql_dir=SQL_PATH, table="public_holidays_shop", conn=conn, params=params, batch=shop_ids, ) public_holidays_shop.date = pd.to_datetime( public_holidays_shop.date, errors="coerce" ) public_holidays_shop[ "shop_open_shop_list" ] = public_holidays_shop.shop_open_shops.apply( lambda x: re.findall("\d+", x) if x is not None else None ) public_holidays_shop.drop("shop_open_shops", axis=1, inplace=True) logger.info("Reading public holidays **client** level information") public_holidays_client = get_data_from_query( sql_dir=SQL_PATH, table="public_holidays_client", conn=conn, params=params, client_list=client_batch, ) public_holidays_client.date = pd.to_datetime( public_holidays_client.date, errors="coerce" ) public_holidays_client[ "client_open_shop_list" ] = public_holidays_client.client_open_shops.apply( lambda x: re.findall("\d+", x) if x is not None else None ) public_holidays_client.drop("client_open_shops", axis=1, inplace=True) logger.info("Reading calendar_days") calendar_days = get_data_from_query( sql_dir=SQL_PATH, table="calendar_days", conn=conn, params=params, batch=shop_ids, ) calendar_days.date = pd.to_datetime(calendar_days.date, errors="coerce") logger.info( f"Starting multithreaded shops hours calculation with NUM_OF_THREADS={params['NUM_OF_THREADS']}" ) # DEBUG # for shop_id in shop_ids: # shop_hours_calculate({ # "shop_id": shop_id, # "data": future_ts, # "shop_hours": shop_hours[shop_hours.shop_id == shop_id], # }) # return with Pool(params["NUM_OF_THREADS"]) as p: result = p.map( shop_hours_calculate, ( { "shop_id": shop_id, "data": future_ts, "shop_hours": shop_hours[shop_hours.shop_id == shop_id], } for shop_id in shop_ids ), ) sh_hr = pd.concat(result) logger.info("Merging calendar_days") sh_hr = sh_hr.merge(calendar_days, on=["shop_id", "date"], how="left") sh_hr = sh_hr.merge(shop_list.reset_index(), on=["shop_id"], how="left") logger.info("Merging public_holidays") sh_hr = sh_hr.merge(public_holidays_client, on=["client_id", "date"], how="left") sh_hr = sh_hr.merge(public_holidays_shop, on=["shop_id", "date"], how="left") logger.info("Identify shop exceptions") idx_client_lvl_exceptions = sh_hr[sh_hr.client_open_shop_list.notnull()].apply( lambda x: str(x["shop_id"]) in x["client_open_shop_list"], axis=1 ) sh_hr["client_active_after_exceptions"] = sh_hr.client_active if len(idx_client_lvl_exceptions) > 0: sh_hr.loc[ idx_client_lvl_exceptions[idx_client_lvl_exceptions].index, "client_active_after_exceptions", ] = np.NaN idx_shop_lvl_exceptions = sh_hr[sh_hr.shop_open_shop_list.notnull()].apply( lambda x: str(x["shop_id"]) in x["shop_open_shop_list"], axis=1 ) sh_hr["shop_active_after_exceptions"] = sh_hr.shop_active if len(idx_shop_lvl_exceptions) > 0: sh_hr.loc[ idx_shop_lvl_exceptions[idx_shop_lvl_exceptions].index, "shop_active_after_exceptions", ] = np.NaN logger.info( "Filling in opening/closing hours according to schedule and calendar days" ) sh_hr[["opening_hour", "closing_hour", "open"]] = sh_hr[ ["shop_start_time", "shop_end_time", "shop_open"] ] # calendar_lines takes priority over shop_hours idx = sh_hr.cl_open_shop.notna() sh_hr["open"] = np.where(idx, sh_hr.cl_open_shop, sh_hr.open) sh_hr.loc[idx, "opening_hour"] = sh_hr.loc[idx, "cl_start_time"] sh_hr.loc[idx, "closing_hour"] = sh_hr.loc[idx, "cl_end_time"] # public holidays takes priority over shop_hours and calendar_lines sh_hr.loc[(sh_hr.shop_active_after_exceptions == 1), "open"] = 0 sh_hr.loc[(sh_hr.client_active_after_exceptions == 1), "open"] = 0 opening_hours_table = sh_hr.loc[ :, ["shop_id", "date", "open", "opening_hour", "closing_hour"] ] opening_hours_table["date"] = pd.to_datetime(opening_hours_table["date"]) opening_hours_table["run_id"] = run_id opening_hours_table["last_updated"] = pd.Timestamp.utcnow().to_pydatetime() if len(opening_hours_table) > 0: logger.info("Upload data using upsert_table") opening_hours_table["shop_id"] = opening_hours_table["shop_id"].astype(int) cols_df = conn.get( text( f""" SELECT * FROM {params['KPI_SCHEMA']}.{params['OPENING_HOURS_TABLE']} LIMIT 0; """ ) ).columns conn.send_replace( opening_hours_table[cols_df], table=params['OPENING_HOURS_TABLE'], schema=params['KPI_SCHEMA'], if_exists="append", ) else: logger.info("No opening_hours data to update") success_shops = set(sh_hr.shop_id) logger.info( f"Script finished! Shops updated: {success_shops}" ) if __name__ == "__main__": conn = get_conn("ds", "shard1", schema="kpi_data") # run(conn=conn, shop_ids=[1019, 1020, 1021, 1036, 1037, 1038, 1039, 1040], client_ids=[]) run(conn=conn, shop_ids=[1019], client_ids=[])
Editor is loading...