Untitled
unknown
plain_text
a year ago
20 kB
8
Indexable
import logging
import os
import sys
from datetime import datetime, timedelta
import awswrangler as ar
from awsglue.utils import getResolvedOptions
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
logger.info("Stating Nielsen data Ingestion Job.")
logger.info(f"{os.path.basename(__file__)}: Starting Job")
logger.info(f"Args passed: {','.join(sys.argv)}")
# Convert args to a dictionary for easy retrieval
args = getResolvedOptions(sys.argv, ["JOB_NAME", "env", "event_date", "lookback_days", "all"])
env = args["env"] # dev,nonprod,prod
config = {
"temp_table_database": "default", # database used for temp tables
"nielsen_sales_data_temp_table": "nielsen_sales_fct_partitioned", # table contains nielsen sales data
"nielsen_sales_data_temp_s3_path": f"s3://twc-{env}-ml-weather-data/Nielsen/nielsen_db/raw/fct_partitioned/nielsen_sales_fct_partitioned", # location where nielsen sales data will be stored
"database": "nielsen", # database for final nielsen and weather combined table,
"database_weather": "sagemaker_featurestore", # database for weather data
"nielsen_w_weather_iceberg_table": "nielsen_sales_w_weather_data", # final nielsen and weather combined table
"partitioned_by": ["year"], # partition criteria
"weather_data_table": "smm", # table containing weather data #
}
class NielsenDataPreparation:
def __init__(self, event_date, lookback_days, backfill_entire_date):
"""Preparing the data by combining nielsen sales, products category and weather data which will be used for model training and inferencing
Args:
event_date (str): end date (till which date we are processing the data)
lookback_days (str): previous days information which is used to calculate start date
backfill_entire_date (str): contains boolean value used as a flag to process entire data of nielsen and weither
"""
self.temp_table_database = config["temp_table_database"]
self.nielsen_sales_data_temp_table = config["nielsen_sales_data_temp_table"]
self.nielsen_sales_data_temp_s3_path = config["nielsen_sales_data_temp_s3_path"]
self.database = config["database"]
self.nielsen_w_weather_iceberg_table = config["nielsen_w_weather_iceberg_table"]
self.end_date = event_date
self.lookback_days = int(lookback_days)
self.start_date = (datetime.strptime(self.end_date, "%Y-%m-%d") - timedelta(days=self.lookback_days)).strftime(
"%Y-%m-%d"
)
self.smm = config["weather_data_table"]
logger.info(f"Processing data from {self.start_date} to {self.end_date}")
self.partitioned_by = config["partitioned_by"]
if bool(backfill_entire_date):
self.start_date = "2020-10-31"
def calculate_date_range(self, year):
"""
Calculates the start and end dates for a given year within the overall range.
Args:
year (int): The year for which the range is calculated.
Returns:
tuple: A tuple containing the year start and end dates for the year.
"""
year_start_date = (
f"{year}-01-01" if year > datetime.strptime(self.start_date, "%Y-%m-%d").year else self.start_date
)
year_end_date = f"{year}-12-31" if year < datetime.strptime(self.end_date, "%Y-%m-%d").year else self.end_date
return year_start_date, year_end_date
def nielsen_sales_data_queries(self):
"""
Constructs SQL queries for unloading sales data by year, accounting for partial years.
Args:
start_date (str): The start date in 'YYYY-MM-DD' format.
end_date (str): The end date in 'YYYY-MM-DD' format.
Returns:
list of tuples: Each tuple contains the year and its corresponding SQL query.
"""
try:
start_year = datetime.strptime(self.start_date, "%Y-%m-%d").year
end_year = datetime.strptime(self.end_date, "%Y-%m-%d").year
except ValueError as e:
return f"Error parsing dates: {e}"
queries = []
for year in range(start_year, end_year + 1):
year_start_date, year_end_date = self.calculate_date_range(year)
sql = f"""
SELECT
*
FROM (
SELECT
ROW_NUMBER() OVER (PARTITION BY smm_cleaned, upc, week ORDER BY batch_id DESC) AS order_rank,
*
FROM (
SELECT
fct.product_key,
fct.market_key,
fct.period_key,
fct.acv,
fct.dollar,
fct.base_dollar,
fct.base_units,
fct.units,
fct.disp_wo_feat_acv,
fct.feat_wo_disp_acv,
fct.feat_and_disp_acv,
fct.price_decr_only_acv,
fct.disp_wo_feat_dollar,
fct.feat_and_disp_dollar,
fct.feat_wo_disp_dollar,
fct.price_decr_only_dollar,
fct.price_decr_only_units,
fct.disp_wo_feat_units,
fct.feat_wo_disp_units,
fct.disp_wo_feat_unit_price_disc,
fct.feat_wo_disp_unit_price_disc,
fct.feat_and_disp_unit_price_disc,
fct.price_decr_only_unit_price_disc,
prd.period_description as period_description_short,
replace(cast(cast(date_parse(substr(prd.period_description, 7), '%m/%d/%y') as date) as varchar),
'-',
''
) AS week,
DATE_FORMAT(date_parse(SUBSTRING(period_description, POSITION(' w/e ' IN prd.period_description) + 5), '%m/%d/%y'), '%Y%m') AS month,
mrkt.market_description as market_display_name,
translate(
replace(lower(mrkt.market_description), ' smm xaoc', ''),
'/. ',
'___'
) AS smm_cleaned,
prdc.super_category,
prdc.upc,
prdc.department,
prdc.category,
prdc.sub_category,
prdc.segment,
prdc.base_size,
prdc.brand,
prdc.brand_low,
prdc.manufacturer,
prdc.flavor,
prdc.form,
prdc.outer_pack_size,
regexp_extract(regexp_extract(fct.file_path, 'fct_[0-9]+[._]', 0), '[0-9]+',0) as batch_id,
EXTRACT(YEAR FROM date_parse(substr(prd.period_description, 7), '%m/%d/%y')) AS year
FROM bo_prod_refinedz_nielsen.nielsen_fact_sales fct
JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_prd_ref prd
ON fct.period_key = prd.period_key
AND date_parse(substr(prd.period_description, 7), '%m/%d/%y') BETWEEN cast('{year_start_date}' as date) AND cast('{year_end_date}' as date)
JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_mrkt_ref mrkt ON fct.market_key = mrkt.market_key
JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_prdc_ref prdc ON fct.product_key = prdc.product_key
)
) AS ranked_data
WHERE order_rank = 1
ORDER BY week
""".strip()
queries.append((year, sql))
return queries
def create_nielsen_sales_temp_table(self):
"""
Constructs SQL queries for creating nielsen sales temporary table which will contain the final joined nielsen dim and fact sales data.
Args: None
Returns: None
"""
logger.info(f"Creating temp table {self.nielsen_sales_data_temp_table} in database: {self.temp_table_database}")
ar.catalog.create_parquet_table(
database=self.temp_table_database,
table=self.nielsen_sales_data_temp_table,
path=self.nielsen_sales_data_temp_s3_path,
columns_types={
"order_rank": "bigint",
"product_key": "int",
"market_key": "int",
"period_key": "int",
"acv": "double",
"dollar": "double",
"base_dollar": "double",
"base_units": "double",
"units": "double",
"disp_wo_feat_acv": "double",
"feat_wo_disp_acv": "double",
"feat_and_disp_acv": "double",
"price_decr_only_acv": "double",
"disp_wo_feat_dollar": "double",
"feat_and_disp_dollar": "double",
"feat_wo_disp_dollar": "double",
"price_decr_only_dollar": "double",
"price_decr_only_units": "double",
"disp_wo_feat_units": "double",
"feat_wo_disp_units": "double",
"period_description_short": "string",
"week": "string",
"month": "string",
"market_display_name": "string",
"smm_cleaned": "string",
"super_category": "string",
"upc": "string",
"department": "string",
"category": "string",
"sub_category": "string",
"segment": "string",
"base_size": "string",
"brand": "string",
"brand_low": "string",
"manufacturer": "string",
"flavor": "string",
"form": "string",
"outer_pack_size": "string",
"batch_id": "string",
},
partitions_types={"year": "bigint"},
)
logger.info(
f"Created temp table {self.nielsen_sales_data_temp_table} sucessfully in database: {self.temp_table_database}"
)
def delete_nielsen_w_weather_data_between_dates(self):
"""
Deletes nielsen sales and weather data from an Athena table between specified start and end dates, converting from 'YYYY-MM-DD' to 'YYYYMMDD' format. This will help in removing duplicate records from the final iceberg table
Returns: None
"""
# Convert dates from 'YYYY-MM-DD' to 'YYYYMMDD'
start_date_formatted = self.start_date.replace("-", "")
end_date_formatted = self.end_date.replace("-", "")
logger.info(
f"Deleting data from {self.database}.{self.nielsen_w_weather_iceberg_table} between {start_date_formatted} and {end_date_formatted}"
)
# Construct the SQL statement to delete data
sql = f"""
DELETE FROM {self.database}.{self.nielsen_w_weather_iceberg_table}
WHERE start_date >= '{start_date_formatted}' AND end_date <= '{end_date_formatted}'
"""
# Execute the SQL query using AWS Data Wrangler
response = ar.athena.start_query_execution(sql=sql, database=self.database, wait=True)
logger.info(
f"Data deletion initiated. Query execution ID: {response['QueryExecutionId']} and status: {response['Status']}"
)
def insert_nielsen_w_weather_query(self, table, nielsen_sales_data_temp_table):
"""
Constructs and returns the SQL query for inserting aggregated sales data from Nielsen,
incorporating category data and weather data, into a specified table.
This query aggregates sales data by 'smm_cleaned', 'week', and 'custom_category' and then joins
the aggregated results with corresponding weather data. The aggregated data includes total units, dollars,
base units, and base dollars. The final insertion also includes the specified 'end_date' as 'event_date'.
Args:
table (str): The name of the table into which the data will be inserted.
Returns:
str: A SQL string that can be executed to insert aggregated data into the specified Nielsen database table.
"""
sql = f"""
INSERT INTO {self.database}.{table}
SELECT
ag.custom_category,
ag.sum_units,
ag.sum_dollar,
ag.sum_base_units,
ag.sum_base_dollar,
wd.*,
'{self.end_date}' as event_date
FROM (
SELECT
sc.smm_cleaned,
sc.week,
sc.custom_category,
SUM(sc.units) AS sum_units,
SUM(sc.dollar) AS sum_dollar,
SUM(sc.base_units) AS sum_base_units,
SUM(sc.base_dollar) AS sum_base_dollar
FROM (
SELECT
sd.smm_cleaned,
sd.week,
cc.custom_category,
sd.units,
sd.dollar,
sd.base_units,
sd.base_dollar
FROM
{self.temp_table_database}.{nielsen_sales_data_temp_table} sd
JOIN
custom_category cc
ON
sd.department = cc.department AND
sd.super_category = cc.super_category AND
sd.category = cc.category AND
sd.sub_category = cc.sub_category AND
sd.brand = cc.brand
) sc
GROUP BY
sc.smm_cleaned,
sc.week,
sc.custom_category
) ag
JOIN
smm wd # parameters for weather data table
ON
ag.smm_cleaned = wd.smm_cleaned AND
ag.week = wd.week
"""
return sql
def prepare_neilsen_w_weather_data(self):
"""
Orchestrates the entire process of managing Nielsen data, including data unloading,
table creation and repair, data insertion, and cleanup based on defined business logic.
Steps:
1. Constructs and unloads SQL queries for each year based on the defined range,
starting with creating a temp nielsen sales data table and unloading the data into it.
2. Repairs the created table to ensure data integrity and structure.
3. Checks if the final table exists and creates it if it does not.
4. If the table exists, deletes data from it for reinsertion.
5. If the table doesn't exist, letting the pipeline fail by adding an error msg
6. Inserts aggregated data into the final table.
All operations log their progress and status for debugging and operational tracking.
"""
# Processing nielsen sales data and storing it to temp table
unload_queries = self.nielsen_sales_data_queries()
if isinstance(unload_queries, str): # Check if the return value indicates an error
logger.info(unload_queries)
# Repairing smm table to fix and optimize table structure
query_final_state = ar.athena.repair_table(table=self.smm, database=self.database)
logger.info(f"Repaired Table: {self.smm}")
for index, query in enumerate(unload_queries):
logger.info(f"Processed query for index: {index}")
# Subsequent data are unloaded normally
logger.info(f"Unloading data in s3 path: {self.nielsen_sales_data_temp_s3_path}")
unload_sql = f"UNLOAD ({query[1]}) TO '{self.nielsen_sales_data_temp_s3_path}' WITH(format='PARQUET', partitioned_by = ARRAY{self.partitioned_by})"
# Executing the query to upload data to s3 location
response = ar.athena.start_query_execution(sql=unload_sql, database=self.database, wait=True)
logger.info("Temp table creation completed sucessfully.")
# Creating temp table
self.create_nielsen_sales_temp_table()
# Repairing table to fix and optimize table structure
query_final_state = ar.athena.repair_table(
table=self.nielsen_sales_data_temp_table, database=self.temp_table_database
)
logging.info(f"Table repair status: {str(query_final_state)}")
logger.info(f"Repaired Table: {self.nielsen_sales_data_temp_table}")
# Combining and aggregating nielsen + weather data and storing it to an iceberg table
# Check if the insert table exists and act accordingly
result = ar.catalog.does_table_exist(database=self.database, table=self.nielsen_w_weather_iceberg_table)
if not result:
logger.error(
"Pipeline failed due to unavailabality of {self.nielsen_w_weather_iceberg_table} table in {self.database} database"
)
else:
logger.info(f"Table {self.nielsen_w_weather_iceberg_table} already exist within database: {self.database}")
# If table exists, removing data for the specified date range from nielsen_w_weather iceberg table
self.delete_nielsen_w_weather_data_between_dates()
logger.info(f"Deleted data between {self.start_date} and {self.end_date}")
# Insert aggregated nielsen and weather data into the final iceberg table
logger.info(f"Inserting Aggregated data in {self.nielsen_w_weather_iceberg_table}")
insert_sql = self.insert_nielsen_w_weather_query(
self.nielsen_w_weather_iceberg_table, self.nielsen_sales_data_temp_table
)
response = ar.athena.start_query_execution(sql=insert_sql, database=self.database, wait=True)
logger.info(
f"Aggregated nielsen and weather data inserted successfully in the iceberg table. Query execution ID: {response['QueryExecutionId']} and status: {response['Status']}"
)
logger.info("Inserted data into the iceberg table")
def delete_temp_resources(self):
"""Deleting temporary resources
Args:
database (string): database name for table deletion
table (string): table name which needs to be deleted
"""
# Additional cleanup operations could be added here
ar.s3.delete_objects(self.nielsen_sales_data_temp_s3_path)
logger.info(f"Removed temporary s3 path data: {self.nielsen_sales_data_temp_s3_path}")
ar.catalog.delete_table_if_exists(database=self.temp_table_database, table=self.nielsen_sales_data_temp_table)
logger.info(f"Removed temporary s3 path data: {self.temp_table_database}.{self.nielsen_sales_data_temp_table}")
logger.info("Removed temporary s3 path and tables")
if __name__ == "__main__":
# creating object of NielsenDataPreparation class
processor = NielsenDataPreparation(
event_date=args["event_date"], lookback_days=args["lookback_days"], backfill_entire_date=args["all"]
)
# Removing temp resources
processor.delete_temp_resources()
processor.prepare_neilsen_w_weather_data()
# Removing temp resources
processor.delete_temp_resources()
Editor is loading...
Leave a Comment