Untitled
unknown
plain_text
a year ago
21 kB
4
Indexable
import logging import os import sys from datetime import datetime, timedelta import awswrangler as ar from awsglue.utils import getResolvedOptions import sagemaker from sagemaker.feature_store.feature_group import FeatureGroup import boto3 from typing import Dict logger = logging.getLogger() logger.setLevel(logging.INFO) handler = logging.StreamHandler(sys.stdout) handler.setLevel(logging.INFO) logger.addHandler(handler) logger.info("Stating Nielsen data Ingestion Job.") logger.info(f"{os.path.basename(__file__)}: Starting Job") logger.info(f"Args passed: {','.join(sys.argv)}") boto_session = boto3.Session() # Convert args to a dictionary for easy retrieval args = getResolvedOptions(sys.argv, ["JOB_NAME", "env", "event_date", "lookback_days", "all"]) env = args["env"] # dev,nonprod,prod config = { "temp_table_database": "default", # database used for temp tables "nielsen_sales_data_temp_table": "nielsen_sales_fct_partitioned", # table contains nielsen sales data "nielsen_sales_data_temp_s3_path": f"s3://twc-{env}-ml-weather-data/Nielsen/nielsen_db/raw/fct_partitioned/nielsen_sales_fct_partitioned", # location where nielsen sales data will be stored "database": "nielsen", # database for final nielsen and weather combined table "nielsen_w_weather_iceberg_table": "nielsen_sales_w_weather_data", # final nielsen and weather combined table "partitioned_by": ["year"], # partition criteria "weather_data_table": "smm", # table containing weather data } def get_table_info(env: str) -> Dict: """ Get information about the feature group table. Args: env (str): Environment. Returns: Dict: Dictionary containing information about the feature group table. """ sagemaker_ses = sagemaker.Session(boto_session=boto_session) fg_name = "hdat_weekly" logger.info(f"Getting table info for feature group {fg_name}") feature_group = FeatureGroup(name=fg_name, sagemaker_session=sagemaker_ses) return { 'database': 'sagemaker_featurestore', 'table': feature_group.describe()["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"], 's3_path': feature_group.describe()["OfflineStoreConfig"]["S3StorageConfig"]["ResolvedOutputS3Uri"], 'tmp_path': f"s3://twc-{env}-ml-weather-data/hdat/tmp_weekly_feat/" } class NielsenDataPreparation: def __init__(self, event_date, lookback_days, backfill_entire_date): """Preparing the data by combining nielsen sales, products category and weather data which will be used for model training and inferencing Args: event_date (str): end date (till which date we are processing the data) lookback_days (str): previous days information which is used to calculate start date backfill_entire_date (str): contains boolean value used as a flag to process entire data of nielsen and weither """ self.temp_table_database = config["temp_table_database"] self.nielsen_sales_data_temp_table = config["nielsen_sales_data_temp_table"] self.nielsen_sales_data_temp_s3_path = config["nielsen_sales_data_temp_s3_path"] self.database = config["database"] self.nielsen_w_weather_iceberg_table = config["nielsen_w_weather_iceberg_table"] self.end_date = event_date table_info = get_table_info(env=env) self.smm_table = table_info['table'] self.smm_database = table_info['database'] logger.info(f"Found SMM table name {self.smm_table} and database {self.smm_database}") self.lookback_days = int(lookback_days) self.start_date = (datetime.strptime(self.end_date, "%Y-%m-%d") - timedelta(days=self.lookback_days)).strftime( "%Y-%m-%d" ) self.smm = config["weather_data_table"] logger.info(f"Processing data from {self.start_date} to {self.end_date}") self.partitioned_by = config["partitioned_by"] if bool(backfill_entire_date): self.start_date = "2020-10-31" def calculate_date_range(self, year): """ Calculates the start and end dates for a given year within the overall range. Args: year (int): The year for which the range is calculated. Returns: tuple: A tuple containing the year start and end dates for the year. """ year_start_date = ( f"{year}-01-01" if year > datetime.strptime(self.start_date, "%Y-%m-%d").year else self.start_date ) year_end_date = f"{year}-12-31" if year < datetime.strptime(self.end_date, "%Y-%m-%d").year else self.end_date return year_start_date, year_end_date def nielsen_sales_data_queries(self): """ Constructs SQL queries for unloading sales data by year, accounting for partial years. Args: start_date (str): The start date in 'YYYY-MM-DD' format. end_date (str): The end date in 'YYYY-MM-DD' format. Returns: list of tuples: Each tuple contains the year and its corresponding SQL query. """ try: start_year = datetime.strptime(self.start_date, "%Y-%m-%d").year end_year = datetime.strptime(self.end_date, "%Y-%m-%d").year except ValueError as e: return f"Error parsing dates: {e}" queries = [] for year in range(start_year, end_year + 1): year_start_date, year_end_date = self.calculate_date_range(year) sql = f""" SELECT * FROM ( SELECT ROW_NUMBER() OVER (PARTITION BY smm_cleaned, upc, week ORDER BY batch_id DESC) AS order_rank, * FROM ( SELECT fct.product_key, fct.market_key, fct.period_key, fct.acv, fct.dollar, fct.base_dollar, fct.base_units, fct.units, fct.disp_wo_feat_acv, fct.feat_wo_disp_acv, fct.feat_and_disp_acv, fct.price_decr_only_acv, fct.disp_wo_feat_dollar, fct.feat_and_disp_dollar, fct.feat_wo_disp_dollar, fct.price_decr_only_dollar, fct.price_decr_only_units, fct.disp_wo_feat_units, fct.feat_wo_disp_units, fct.disp_wo_feat_unit_price_disc, fct.feat_wo_disp_unit_price_disc, fct.feat_and_disp_unit_price_disc, fct.price_decr_only_unit_price_disc, prd.period_description as period_description_short, replace(cast(cast(date_parse(substr(prd.period_description, 7), '%m/%d/%y') as date) as varchar), '-', '' ) AS week, DATE_FORMAT(date_parse(SUBSTRING(period_description, POSITION(' w/e ' IN prd.period_description) + 5), '%m/%d/%y'), '%Y%m') AS month, mrkt.market_description as market_display_name, translate( replace(lower(mrkt.market_description), ' smm xaoc', ''), '/. ', '___' ) AS smm_cleaned, prdc.super_category, prdc.upc, prdc.department, prdc.category, prdc.sub_category, prdc.segment, prdc.base_size, prdc.brand, prdc.brand_low, prdc.manufacturer, prdc.flavor, prdc.form, prdc.outer_pack_size, regexp_extract(regexp_extract(fct.file_path, 'fct_[0-9]+[._]', 0), '[0-9]+',0) as batch_id, EXTRACT(YEAR FROM date_parse(substr(prd.period_description, 7), '%m/%d/%y')) AS year FROM bo_prod_refinedz_nielsen.nielsen_fact_sales fct JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_prd_ref prd ON fct.period_key = prd.period_key AND date_parse(substr(prd.period_description, 7), '%m/%d/%y') BETWEEN cast('{year_start_date}' as date) AND cast('{year_end_date}' as date) JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_mrkt_ref mrkt ON fct.market_key = mrkt.market_key JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_prdc_ref prdc ON fct.product_key = prdc.product_key ) ) AS ranked_data WHERE order_rank = 1 ORDER BY week """.strip() queries.append((year, sql)) return queries def create_nielsen_sales_temp_table(self): """ Constructs SQL queries for creating nielsen sales temporary table which will contain the final joined nielsen dim and fact sales data. Args: None Returns: None """ logger.info(f"Creating temp table {self.nielsen_sales_data_temp_table} in database: {self.temp_table_database}") ar.catalog.create_parquet_table( database=self.temp_table_database, table=self.nielsen_sales_data_temp_table, path=self.nielsen_sales_data_temp_s3_path, columns_types={ "order_rank": "bigint", "product_key": "int", "market_key": "int", "period_key": "int", "acv": "double", "dollar": "double", "base_dollar": "double", "base_units": "double", "units": "double", "disp_wo_feat_acv": "double", "feat_wo_disp_acv": "double", "feat_and_disp_acv": "double", "price_decr_only_acv": "double", "disp_wo_feat_dollar": "double", "feat_and_disp_dollar": "double", "feat_wo_disp_dollar": "double", "price_decr_only_dollar": "double", "price_decr_only_units": "double", "disp_wo_feat_units": "double", "feat_wo_disp_units": "double", "period_description_short": "string", "week": "string", "month": "string", "market_display_name": "string", "smm_cleaned": "string", "super_category": "string", "upc": "string", "department": "string", "category": "string", "sub_category": "string", "segment": "string", "base_size": "string", "brand": "string", "brand_low": "string", "manufacturer": "string", "flavor": "string", "form": "string", "outer_pack_size": "string", "batch_id": "string", }, partitions_types={"year": "bigint"}, ) logger.info( f"Created temp table {self.nielsen_sales_data_temp_table} sucessfully in database: {self.temp_table_database}" ) def delete_nielsen_w_weather_data_between_dates(self): """ Deletes nielsen sales and weather data from an Athena table between specified start and end dates, converting from 'YYYY-MM-DD' to 'YYYYMMDD' format. This will help in removing duplicate records from the final iceberg table Returns: None """ # Convert dates from 'YYYY-MM-DD' to 'YYYYMMDD' start_date_formatted = self.start_date.replace("-", "") end_date_formatted = self.end_date.replace("-", "") logger.info( f"Deleting data from {self.database}.{self.nielsen_w_weather_iceberg_table} between {start_date_formatted} and {end_date_formatted}" ) # Construct the SQL statement to delete data sql = f""" DELETE FROM {self.database}.{self.nielsen_w_weather_iceberg_table} WHERE start_date >= '{start_date_formatted}' AND end_date <= '{end_date_formatted}' """ # Execute the SQL query using AWS Data Wrangler response = ar.athena.start_query_execution(sql=sql, database=self.database, wait=True) logger.info( f"Data deletion initiated. Query execution ID: {response['QueryExecutionId']} and status: {response['Status']}" ) def insert_nielsen_w_weather_query(self, table, nielsen_sales_data_temp_table): """ Constructs and returns the SQL query for inserting aggregated sales data from Nielsen, incorporating category data and weather data, into a specified table. This query aggregates sales data by 'smm_cleaned', 'week', and 'custom_category' and then joins the aggregated results with corresponding weather data. The aggregated data includes total units, dollars, base units, and base dollars. The final insertion also includes the specified 'end_date' as 'event_date'. Args: table (str): The name of the table into which the data will be inserted. Returns: str: A SQL string that can be executed to insert aggregated data into the specified Nielsen database table. """ sql = f""" INSERT INTO {self.database}.{table} SELECT ag.custom_category, ag.sum_units, ag.sum_dollar, ag.sum_base_units, ag.sum_base_dollar, wd.*, '{self.end_date}' as event_date FROM ( SELECT sc.smm_cleaned, sc.week, sc.custom_category, SUM(sc.units) AS sum_units, SUM(sc.dollar) AS sum_dollar, SUM(sc.base_units) AS sum_base_units, SUM(sc.base_dollar) AS sum_base_dollar FROM ( SELECT sd.smm_cleaned, sd.week, cc.custom_category, sd.units, sd.dollar, sd.base_units, sd.base_dollar FROM {self.temp_table_database}.{nielsen_sales_data_temp_table} sd JOIN custom_category cc ON sd.department = cc.department AND sd.super_category = cc.super_category AND sd.category = cc.category AND sd.sub_category = cc.sub_category AND sd.brand = cc.brand ) sc GROUP BY sc.smm_cleaned, sc.week, sc.custom_category ) ag JOIN {self.smm_database}.{self.smm_table} wd ON ag.smm_cleaned = wd.smm_cleaned AND ag.week = wd.week """ return sql def prepare_neilsen_w_weather_data(self): """ Orchestrates the entire process of managing Nielsen data, including data unloading, table creation and repair, data insertion, and cleanup based on defined business logic. Steps: 1. Constructs and unloads SQL queries for each year based on the defined range, starting with creating a temp nielsen sales data table and unloading the data into it. 2. Repairs the created table to ensure data integrity and structure. 3. Checks if the final table exists and creates it if it does not. 4. If the table exists, deletes data from it for reinsertion. 5. If the table doesn't exist, letting the pipeline fail by adding an error msg 6. Inserts aggregated data into the final table. All operations log their progress and status for debugging and operational tracking. """ # Processing nielsen sales data and storing it to temp table unload_queries = self.nielsen_sales_data_queries() if isinstance(unload_queries, str): # Check if the return value indicates an error logger.info(unload_queries) # Repairing smm table to fix and optimize table structure query_final_state = ar.athena.repair_table(table=self.smm, database=self.database) logger.info(f"Repaired Table: {self.smm}") for index, query in enumerate(unload_queries): logger.info(f"Processed query for index: {index}") # Subsequent data are unloaded normally logger.info(f"Unloading data in s3 path: {self.nielsen_sales_data_temp_s3_path}") unload_sql = f"UNLOAD ({query[1]}) TO '{self.nielsen_sales_data_temp_s3_path}' WITH(format='PARQUET', partitioned_by = ARRAY{self.partitioned_by})" # Executing the query to upload data to s3 location response = ar.athena.start_query_execution(sql=unload_sql, database=self.database, wait=True) logger.info("Temp table creation completed sucessfully.") # Creating temp table self.create_nielsen_sales_temp_table() # Repairing table to fix and optimize table structure query_final_state = ar.athena.repair_table( table=self.nielsen_sales_data_temp_table, database=self.temp_table_database ) logging.info(f"Table repair status: {str(query_final_state)}") logger.info(f"Repaired Table: {self.nielsen_sales_data_temp_table}") # Combining and aggregating nielsen + weather data and storing it to an iceberg table # Check if the insert table exists and act accordingly result = ar.catalog.does_table_exist(database=self.database, table=self.nielsen_w_weather_iceberg_table) if not result: logger.error( f"Pipeline failed due to unavailabality of {self.nielsen_w_weather_iceberg_table} table in {self.database} database" ) else: logger.info(f"Table {self.nielsen_w_weather_iceberg_table} already exist within database: {self.database}") # If table exists, removing data for the specified date range from nielsen_w_weather iceberg table self.delete_nielsen_w_weather_data_between_dates() logger.info(f"Deleted data between {self.start_date} and {self.end_date}") # Insert aggregated nielsen and weather data into the final iceberg table logger.info(f"Inserting Aggregated data in {self.nielsen_w_weather_iceberg_table}") insert_sql = self.insert_nielsen_w_weather_query( self.nielsen_w_weather_iceberg_table, self.nielsen_sales_data_temp_table ) response = ar.athena.start_query_execution(sql=insert_sql, database=self.database, wait=True) logger.info( f"Aggregated nielsen and weather data inserted successfully in the iceberg table. Query execution ID: {response['QueryExecutionId']} and status: {response['Status']}" ) logger.info("Inserted data into the iceberg table") def delete_temp_resources(self): """Deleting temporary resources Args: database (string): database name for table deletion table (string): table name which needs to be deleted """ # Additional cleanup operations could be added here ar.s3.delete_objects(self.nielsen_sales_data_temp_s3_path) logger.info(f"Removed temporary s3 path data: {self.nielsen_sales_data_temp_s3_path}") ar.catalog.delete_table_if_exists(database=self.temp_table_database, table=self.nielsen_sales_data_temp_table) logger.info(f"Removed temporary s3 path data: {self.temp_table_database}.{self.nielsen_sales_data_temp_table}") logger.info("Removed temporary s3 path and tables") if __name__ == "__main__": # creating object of NielsenDataPreparation class processor = NielsenDataPreparation( event_date=args["event_date"], lookback_days=args["lookback_days"], backfill_entire_date=args["all"] ) # Removing temp resources processor.delete_temp_resources() processor.prepare_neilsen_w_weather_data() # Removing temp resources processor.delete_temp_resources()
Editor is loading...
Leave a Comment