Untitled
unknown
plain_text
3 years ago
15 kB
5
Indexable
import json
from datetime import date
import logging
import re
from pathlib import Path
from multiprocessing import Pool
from sqlalchemy import text
import pandas as pd
import numpy as np
from samesyslib.utils import sql_from_file
from db import get_conn
logging.basicConfig(
encoding="utf-8",
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
params = {
"NUM_OF_THREADS": 8,
"START_DATE": "2012-01-01",
"STOP_DATE": f"{date.today().year+1}-12-31",
"DAILY_SCHEMA": "daily",
"KPI_SCHEMA": "kpi_data",
"SCHEMA_SAMESYSTEM": "samesystem",
"OPENING_HOURS_TABLE": "opening_hours",
"TABLE_SHOP_LIST": "v1_shop_list",
"TABLE_PUBLIC_HOLIDAYS": "public_holidays",
"TABLE_SHOP_HOURS": "shop_hours",
"SCHEMA_DAILY": "daily",
"TABLE_CALENDAR_DAYS": "calendar_days",
}
SQL_PATH = Path(__file__).absolute().parent / Path("SQL")
def read_shop_list(conn, shop_ids):
shops = ", ".join(str(x) for x in shop_ids)
shop_list = conn.get(
text(
f"""
SELECT distinct client_id, shop_id, country_id
FROM daily.v1_shop_list
WHERE shop_id IN ({shops});
"""
)
).set_index("shop_id")
return shop_list
def shop_hours_calculate(data_dict):
"""
Logic to calculate working hours from the same system "compressed format"
Method populates working hours according to dayofweek, taking into account
exception setting, where each week of month could have different setting
Input:
data_dict['shop_hours'] -- columns
shop_id,
schedule_change_date,
dayofweek,
shop_open,
shop_start_time,
shop_end_time,
exceptions_list
data_dict['data'] -- columns
date,
dayofweek,
year,
month,
weekofmonth,
weekofmonth_max
Output: pandas data frame with columns
shop_id,
shop_open,
shop_start_time,
shop_end_time
"""
data = data_dict["data"].copy()
shop_hours = data_dict["shop_hours"]
if shop_hours.shape[0] > 0:
dates = np.append(
shop_hours.schedule_change_date.sort_values().unique(),
np.array(["2100-12-01T00:00:00.000000000"], dtype="datetime64[ns]"),
)
data["schedule_change_date"] = pd.cut(
data.date, bins=dates, right=False, labels=dates[:-1]
).astype("datetime64[ns]")
data = data.merge(
shop_hours, on=["schedule_change_date", "dayofweek"], how="left"
)
for index, row in shop_hours[shop_hours.exceptions_list.notnull()].iterrows():
idx_period = (data.schedule_change_date == row["schedule_change_date"]) & (
data["dayofweek"] == row["dayofweek"]
)
for e_dict in row["exceptions_list"]:
week_of_month = int(e_dict["type"]) + 1
open_shop = 1 if e_dict["open_shop"] else 0
if week_of_month == 5:
idx_max_wom = (
data["weekofmonth"] == data["weekofmonth_max"]
) & idx_period
data.loc[idx_max_wom, "shop_open"] = open_shop
data.loc[idx_max_wom, "shop_start_time"] = float(
e_dict["start_time"]
)
data.loc[idx_max_wom, "shop_end_time"] = float(e_dict["end_time"])
else:
idx_wom = (data["weekofmonth"] == week_of_month) & idx_period
data.loc[idx_wom, "shop_open"] = open_shop
data.loc[idx_wom, "shop_start_time"] = float(e_dict["start_time"])
data.loc[idx_wom, "shop_end_time"] = float(e_dict["end_time"])
else:
data["shop_id"] = data_dict["shop_id"]
data["shop_open"] = np.nan
data["shop_start_time"] = np.nan
data["shop_end_time"] = np.nan
return data[["shop_id", "date", "shop_open", "shop_start_time", "shop_end_time"]]
def load_json(x):
"""
Small function to load json variables
Used in pandas apply method
"""
if x:
y = json.loads(x)
if len(y) > 0:
return y
else:
None
else:
return None
def get_data_from_query(
sql_dir, table, conn, params=None, batch=None, client_list=None, verbose=True
):
sql_file = Path(sql_dir, f"{table}.sql")
if batch is not None:
shop_list = ",".join([str(s) for s in batch])
else:
shop_list = None
if client_list is not None:
client_list = ",".join([str(s) for s in client_list])
query = sql_from_file(sql_file)[0].format(
params=params, shop_list=shop_list, client_list=client_list
)
df = conn.get(text(query))
return df
def weekly_transformation(
data,
month="month",
week_of_month="weekofmonth",
max_week_of_month="max_weekofmonth",
):
"""
:param data:
:param month:
:param week_of_month:
:param max_week_of_month:
:return:
transformed weekdays from range [1.0 -- 12.8]
X.0 is first week of month, X.8 is last week of month, X is month number
"""
out = data[month] * 10 + (data[week_of_month] - 1) * 2
idx = data[week_of_month] == data[max_week_of_month]
out[idx] = data.loc[idx, month] * 10 + 8
return out
def add_date_info(data, date_name="date"):
# should be called only for full months!
# if month is not full, max_week of month is incorrect
# lines covered by ifs should move into data pipeline phase
if "dayofweek" not in data.columns:
data["dayofweek"] = data[date_name].dt.dayofweek
if "dayofmonth" not in data.columns:
data["dayofmonth"] = data[date_name].dt.day
if "weekofmonth" not in data.columns:
data["weekofmonth"] = np.ceil(data[date_name].dt.day / 7).astype(int)
if "weekofyear" not in data.columns:
data["weekofyear"] = data[date_name].dt.isocalendar().week
if "year" not in data.columns:
data["year"] = data[date_name].dt.year
if "month" not in data.columns:
data["month"] = data[date_name].dt.month
if "weekofmonth_max" not in data.columns:
data = data.join(
data.groupby(["year", "month", "dayofweek"])
.agg({"weekofmonth": "max"})
.astype(int)
.rename(columns={"weekofmonth": "weekofmonth_max"}),
on=["year", "month", "dayofweek"],
how="left",
)
if "wom_group" not in data.columns:
data["wom_group"] = weekly_transformation(
data,
month="month",
week_of_month="weekofmonth",
max_week_of_month="weekofmonth_max",
).astype(int)
return data
run_id = pd.Timestamp.utcnow().to_pydatetime()
shop_list = [15101]
def _generate_future_ts():
logger.info("Generating ts features dataset")
features = pd.DataFrame(
{
"date": pd.date_range(
end=params["STOP_DATE"],
start=params["START_DATE"],
freq="D",
)
}
)
features = add_date_info(features)
features["date_join_index"] = pd.NaT
idx = features.year == pd.to_datetime(params["START_DATE"]).year
features.loc[idx, "date_join_index"] = features.loc[idx, "date"]
features.loc[idx, "month_adj"] = features.loc[idx, "month"]
features.loc[idx, "dayofmonth_adj"] = features.loc[idx, "dayofmonth"]
features.loc[~idx, "month_adj"] = np.nan
features.loc[~idx, "dayofmonth_adj"] = np.nan
for year in features.year.sort_values().unique()[1:]:
idx = features.year == year
features.loc[idx, "date_join_index"] = features.loc[
idx, "date"
] - pd.DateOffset(weeks=52)
bins = np.arange(
features.month_adj.last_valid_index() + 1, features.shape[0], 350
).tolist() + [features.shape[0]]
for bin1, bin2 in zip(bins[:-1], bins[1:]):
block = features.iloc[bin1:bin2][["date_join_index"]].join(
features.set_index("date")[["month_adj", "dayofmonth_adj"]],
on="date_join_index",
how="left",
)[["month_adj", "dayofmonth_adj"]]
features.loc[block.index, ["month_adj", "dayofmonth_adj"]] = block[
["month_adj", "dayofmonth_adj"]
]
features["month_adj"] = features["month_adj"].astype(int)
features["dayofmonth_adj"] = features["dayofmonth_adj"].astype(int)
features.drop(["date_join_index"], axis=1, inplace=True)
future_ts = features.loc[
:, ["date", "dayofweek", "year", "month", "weekofmonth", "weekofmonth_max"]
]
future_ts.date = pd.to_datetime(future_ts.date)
return future_ts
def run(conn, shop_ids, client_ids):
future_ts = _generate_future_ts()
shop_list = read_shop_list(conn, shop_ids)
shop_ids_joined = ",".join([str(x) for x in shop_list.index.to_list()])
SQL = f"SELECT DISTINCT client_id FROM samesystem.shops WHERE id in ({shop_ids_joined})"
client_batch = conn.get(text(SQL)).client_id.to_list()
logger.info("Reading schedule information")
shop_hours = get_data_from_query(
sql_dir=SQL_PATH,
table="shop_hours",
conn=conn,
params=params,
batch=shop_ids,
)
shop_hours.schedule_change_date = pd.to_datetime(shop_hours.schedule_change_date)
# extract exceptiosn json into list of dicts
shop_hours["exceptions_list"] = shop_hours.exceptions_json.apply(load_json)
logger.info("Reading public holidays **shop** level information")
public_holidays_shop = get_data_from_query(
sql_dir=SQL_PATH,
table="public_holidays_shop",
conn=conn,
params=params,
batch=shop_ids,
)
public_holidays_shop.date = pd.to_datetime(
public_holidays_shop.date, errors="coerce"
)
public_holidays_shop[
"shop_open_shop_list"
] = public_holidays_shop.shop_open_shops.apply(
lambda x: re.findall("\d+", x) if x is not None else None
)
public_holidays_shop.drop("shop_open_shops", axis=1, inplace=True)
logger.info("Reading public holidays **client** level information")
public_holidays_client = get_data_from_query(
sql_dir=SQL_PATH,
table="public_holidays_client",
conn=conn,
params=params,
client_list=client_batch,
)
public_holidays_client.date = pd.to_datetime(
public_holidays_client.date, errors="coerce"
)
public_holidays_client[
"client_open_shop_list"
] = public_holidays_client.client_open_shops.apply(
lambda x: re.findall("\d+", x) if x is not None else None
)
public_holidays_client.drop("client_open_shops", axis=1, inplace=True)
logger.info("Reading calendar_days")
calendar_days = get_data_from_query(
sql_dir=SQL_PATH,
table="calendar_days",
conn=conn,
params=params,
batch=shop_ids,
)
calendar_days.date = pd.to_datetime(calendar_days.date, errors="coerce")
logger.info(
f"Starting multithreaded shops hours calculation with NUM_OF_THREADS={params['NUM_OF_THREADS']}"
)
# DEBUG
# for shop_id in shop_ids:
# shop_hours_calculate({
# "shop_id": shop_id,
# "data": future_ts,
# "shop_hours": shop_hours[shop_hours.shop_id == shop_id],
# })
# return
with Pool(params["NUM_OF_THREADS"]) as p:
result = p.map(
shop_hours_calculate,
(
{
"shop_id": shop_id,
"data": future_ts,
"shop_hours": shop_hours[shop_hours.shop_id == shop_id],
}
for shop_id in shop_ids
),
)
sh_hr = pd.concat(result)
logger.info("Merging calendar_days")
sh_hr = sh_hr.merge(calendar_days, on=["shop_id", "date"], how="left")
sh_hr = sh_hr.merge(shop_list.reset_index(), on=["shop_id"], how="left")
logger.info("Merging public_holidays")
sh_hr = sh_hr.merge(public_holidays_client, on=["client_id", "date"], how="left")
sh_hr = sh_hr.merge(public_holidays_shop, on=["shop_id", "date"], how="left")
logger.info("Identify shop exceptions")
idx_client_lvl_exceptions = sh_hr[sh_hr.client_open_shop_list.notnull()].apply(
lambda x: str(x["shop_id"]) in x["client_open_shop_list"], axis=1
)
sh_hr["client_active_after_exceptions"] = sh_hr.client_active
if len(idx_client_lvl_exceptions) > 0:
sh_hr.loc[
idx_client_lvl_exceptions[idx_client_lvl_exceptions].index,
"client_active_after_exceptions",
] = np.NaN
idx_shop_lvl_exceptions = sh_hr[sh_hr.shop_open_shop_list.notnull()].apply(
lambda x: str(x["shop_id"]) in x["shop_open_shop_list"], axis=1
)
sh_hr["shop_active_after_exceptions"] = sh_hr.shop_active
if len(idx_shop_lvl_exceptions) > 0:
sh_hr.loc[
idx_shop_lvl_exceptions[idx_shop_lvl_exceptions].index,
"shop_active_after_exceptions",
] = np.NaN
logger.info(
"Filling in opening/closing hours according to schedule and calendar days"
)
sh_hr[["opening_hour", "closing_hour", "open"]] = sh_hr[
["shop_start_time", "shop_end_time", "shop_open"]
]
# calendar_lines takes priority over shop_hours
idx = sh_hr.cl_open_shop.notna()
sh_hr["open"] = np.where(idx, sh_hr.cl_open_shop, sh_hr.open)
sh_hr.loc[idx, "opening_hour"] = sh_hr.loc[idx, "cl_start_time"]
sh_hr.loc[idx, "closing_hour"] = sh_hr.loc[idx, "cl_end_time"]
# public holidays takes priority over shop_hours and calendar_lines
sh_hr.loc[(sh_hr.shop_active_after_exceptions == 1), "open"] = 0
sh_hr.loc[(sh_hr.client_active_after_exceptions == 1), "open"] = 0
opening_hours_table = sh_hr.loc[
:, ["shop_id", "date", "open", "opening_hour", "closing_hour"]
]
opening_hours_table["date"] = pd.to_datetime(opening_hours_table["date"])
opening_hours_table["run_id"] = run_id
opening_hours_table["last_updated"] = pd.Timestamp.utcnow().to_pydatetime()
if len(opening_hours_table) > 0:
logger.info("Upload data using upsert_table")
opening_hours_table["shop_id"] = opening_hours_table["shop_id"].astype(int)
cols_df = conn.get(
text(
f"""
SELECT *
FROM {params['KPI_SCHEMA']}.{params['OPENING_HOURS_TABLE']}
LIMIT 0;
"""
)
).columns
conn.send_replace(
opening_hours_table[cols_df],
table=params['OPENING_HOURS_TABLE'],
schema=params['KPI_SCHEMA'],
if_exists="append",
)
else:
logger.info("No opening_hours data to update")
success_shops = set(sh_hr.shop_id)
logger.info(
f"Script finished! Shops updated: {success_shops}"
)
if __name__ == "__main__":
conn = get_conn("ds", "shard1", schema="kpi_data")
# run(conn=conn, shop_ids=[1019, 1020, 1021, 1036, 1037, 1038, 1039, 1040], client_ids=[])
run(conn=conn, shop_ids=[1019], client_ids=[])
Editor is loading...