Untitled

 avatar
unknown
plain_text
a year ago
21 kB
4
Indexable
import logging
import os
import sys
from datetime import datetime, timedelta

import awswrangler as ar
from awsglue.utils import getResolvedOptions
import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup
import boto3
from typing import Dict
logger = logging.getLogger()
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logging.INFO)
logger.addHandler(handler)
logger.info("Stating Nielsen data Ingestion Job.")

logger.info(f"{os.path.basename(__file__)}: Starting Job")
logger.info(f"Args passed: {','.join(sys.argv)}")
boto_session = boto3.Session()
# Convert args to a dictionary for easy retrieval
args = getResolvedOptions(sys.argv, ["JOB_NAME", "env", "event_date", "lookback_days", "all"])

env = args["env"]  # dev,nonprod,prod

config = {
    "temp_table_database": "default",  # database used for temp tables
    "nielsen_sales_data_temp_table": "nielsen_sales_fct_partitioned",  # table contains nielsen sales data
    "nielsen_sales_data_temp_s3_path": f"s3://twc-{env}-ml-weather-data/Nielsen/nielsen_db/raw/fct_partitioned/nielsen_sales_fct_partitioned",  # location where nielsen sales data will be stored
    "database": "nielsen",  # database for final nielsen and weather combined table
    "nielsen_w_weather_iceberg_table": "nielsen_sales_w_weather_data",  # final nielsen and weather combined table
    "partitioned_by": ["year"],  # partition criteria
    "weather_data_table": "smm",  # table containing weather data
}


def get_table_info(env: str) -> Dict:
    """
    Get information about the feature group table.

    Args:
    env (str): Environment.

    Returns:
    Dict: Dictionary containing information about the feature group table.
    """
    sagemaker_ses = sagemaker.Session(boto_session=boto_session)
    
    fg_name = "hdat_weekly"
    logger.info(f"Getting table info for feature group {fg_name}")
    feature_group = FeatureGroup(name=fg_name, sagemaker_session=sagemaker_ses)

    return {
        'database': 'sagemaker_featurestore',
        'table': feature_group.describe()["OfflineStoreConfig"]["DataCatalogConfig"]["TableName"],
        's3_path': feature_group.describe()["OfflineStoreConfig"]["S3StorageConfig"]["ResolvedOutputS3Uri"],
        'tmp_path': f"s3://twc-{env}-ml-weather-data/hdat/tmp_weekly_feat/"
    }

class NielsenDataPreparation:
    def __init__(self, event_date, lookback_days, backfill_entire_date):
        """Preparing the data by combining nielsen sales, products category and weather data which will be used for model training and inferencing

        Args:
            event_date (str): end date (till which date we are processing the data)
            lookback_days (str): previous days information which is used to calculate start date
            backfill_entire_date (str): contains boolean value used as a flag to process entire data of nielsen and weither
        """
        self.temp_table_database = config["temp_table_database"]
        self.nielsen_sales_data_temp_table = config["nielsen_sales_data_temp_table"]
        self.nielsen_sales_data_temp_s3_path = config["nielsen_sales_data_temp_s3_path"]
        self.database = config["database"]
        self.nielsen_w_weather_iceberg_table = config["nielsen_w_weather_iceberg_table"]
        self.end_date = event_date
        table_info = get_table_info(env=env)
        self.smm_table = table_info['table']
        self.smm_database = table_info['database']
        
        logger.info(f"Found SMM table name {self.smm_table} and database {self.smm_database}")
        
        self.lookback_days = int(lookback_days)
        self.start_date = (datetime.strptime(self.end_date, "%Y-%m-%d") - timedelta(days=self.lookback_days)).strftime(
            "%Y-%m-%d"
        )
        self.smm = config["weather_data_table"]
        logger.info(f"Processing data from {self.start_date} to {self.end_date}")
        self.partitioned_by = config["partitioned_by"]
        if bool(backfill_entire_date):
            self.start_date = "2020-10-31"

    def calculate_date_range(self, year):
        """
        Calculates the start and end dates for a given year within the overall range.

        Args:
            year (int): The year for which the range is calculated.

        Returns:
            tuple: A tuple containing the year start and end dates for the year.
        """
        year_start_date = (
            f"{year}-01-01" if year > datetime.strptime(self.start_date, "%Y-%m-%d").year else self.start_date
        )
        year_end_date = f"{year}-12-31" if year < datetime.strptime(self.end_date, "%Y-%m-%d").year else self.end_date
        return year_start_date, year_end_date

    def nielsen_sales_data_queries(self):
        """
        Constructs SQL queries for unloading sales data by year, accounting for partial years.

        Args:
            start_date (str): The start date in 'YYYY-MM-DD' format.
            end_date (str): The end date in 'YYYY-MM-DD' format.

        Returns:
            list of tuples: Each tuple contains the year and its corresponding SQL query.
        """
        try:
            start_year = datetime.strptime(self.start_date, "%Y-%m-%d").year
            end_year = datetime.strptime(self.end_date, "%Y-%m-%d").year
        except ValueError as e:
            return f"Error parsing dates: {e}"

        queries = []
        for year in range(start_year, end_year + 1):
            year_start_date, year_end_date = self.calculate_date_range(year)
            sql = f"""
            SELECT 
                * 
                FROM (
                    SELECT 
                        ROW_NUMBER() OVER (PARTITION BY smm_cleaned, upc, week ORDER BY batch_id DESC) AS order_rank,
                        *
                    FROM (
                        SELECT
                            fct.product_key,
                            fct.market_key,
                            fct.period_key,
                            fct.acv,
                            fct.dollar,
                            fct.base_dollar,
                            fct.base_units,
                            fct.units,
                            fct.disp_wo_feat_acv,
                            fct.feat_wo_disp_acv,
                            fct.feat_and_disp_acv,
                            fct.price_decr_only_acv,
                            fct.disp_wo_feat_dollar,
                            fct.feat_and_disp_dollar,
                            fct.feat_wo_disp_dollar,
                            fct.price_decr_only_dollar,
                            fct.price_decr_only_units,
                            fct.disp_wo_feat_units,
                            fct.feat_wo_disp_units,
                            fct.disp_wo_feat_unit_price_disc,
                            fct.feat_wo_disp_unit_price_disc,
                            fct.feat_and_disp_unit_price_disc,
                            fct.price_decr_only_unit_price_disc,
                            prd.period_description as period_description_short,
                            replace(cast(cast(date_parse(substr(prd.period_description, 7), '%m/%d/%y') as date) as varchar),
                                    '-',
                                    ''
                            ) AS week,
                            DATE_FORMAT(date_parse(SUBSTRING(period_description, POSITION(' w/e ' IN prd.period_description) + 5), '%m/%d/%y'), '%Y%m') AS month,
                            mrkt.market_description as market_display_name,
                            translate(
                                    replace(lower(mrkt.market_description), ' smm xaoc', ''),
                                    '/. ',
                                    '___'
                            ) AS smm_cleaned,
                            prdc.super_category,
                            prdc.upc,
                            prdc.department,
                            prdc.category,
                            prdc.sub_category,
                            prdc.segment,
                            prdc.base_size,
                            prdc.brand,
                            prdc.brand_low,
                            prdc.manufacturer,
                            prdc.flavor,
                            prdc.form,
                            prdc.outer_pack_size,
                            regexp_extract(regexp_extract(fct.file_path, 'fct_[0-9]+[._]', 0), '[0-9]+',0) as batch_id,
                            EXTRACT(YEAR FROM date_parse(substr(prd.period_description, 7), '%m/%d/%y')) AS year
                        FROM bo_prod_refinedz_nielsen.nielsen_fact_sales fct
                        JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_prd_ref prd 
                        ON fct.period_key = prd.period_key 
                        AND date_parse(substr(prd.period_description, 7), '%m/%d/%y') BETWEEN cast('{year_start_date}' as date) AND cast('{year_end_date}' as date)
                        JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_mrkt_ref mrkt ON fct.market_key = mrkt.market_key
                        JOIN bo_prod_refinedz_nielsen.nielsen_dim_sales_prdc_ref prdc ON fct.product_key = prdc.product_key
                    )
                ) AS ranked_data
            WHERE order_rank = 1
            ORDER BY week
            """.strip()

            queries.append((year, sql))

        return queries

    def create_nielsen_sales_temp_table(self):
        """
        Constructs SQL queries for creating nielsen sales temporary table which will contain the final joined nielsen dim and fact sales data.

        Args: None

        Returns: None
        """
        logger.info(f"Creating temp table {self.nielsen_sales_data_temp_table} in database: {self.temp_table_database}")
        ar.catalog.create_parquet_table(
            database=self.temp_table_database,
            table=self.nielsen_sales_data_temp_table,
            path=self.nielsen_sales_data_temp_s3_path,
            columns_types={
                "order_rank": "bigint",
                "product_key": "int",
                "market_key": "int",
                "period_key": "int",
                "acv": "double",
                "dollar": "double",
                "base_dollar": "double",
                "base_units": "double",
                "units": "double",
                "disp_wo_feat_acv": "double",
                "feat_wo_disp_acv": "double",
                "feat_and_disp_acv": "double",
                "price_decr_only_acv": "double",
                "disp_wo_feat_dollar": "double",
                "feat_and_disp_dollar": "double",
                "feat_wo_disp_dollar": "double",
                "price_decr_only_dollar": "double",
                "price_decr_only_units": "double",
                "disp_wo_feat_units": "double",
                "feat_wo_disp_units": "double",
                "period_description_short": "string",
                "week": "string",
                "month": "string",
                "market_display_name": "string",
                "smm_cleaned": "string",
                "super_category": "string",
                "upc": "string",
                "department": "string",
                "category": "string",
                "sub_category": "string",
                "segment": "string",
                "base_size": "string",
                "brand": "string",
                "brand_low": "string",
                "manufacturer": "string",
                "flavor": "string",
                "form": "string",
                "outer_pack_size": "string",
                "batch_id": "string",
            },
            partitions_types={"year": "bigint"},
        )
        logger.info(
            f"Created temp table {self.nielsen_sales_data_temp_table} sucessfully in database: {self.temp_table_database}"
        )

    def delete_nielsen_w_weather_data_between_dates(self):
        """
        Deletes nielsen sales and weather data from an Athena table between specified start and end dates, converting from 'YYYY-MM-DD' to 'YYYYMMDD' format. This will help in removing duplicate records from the final iceberg table

        Returns: None
        """
        # Convert dates from 'YYYY-MM-DD' to 'YYYYMMDD'
        start_date_formatted = self.start_date.replace("-", "")
        end_date_formatted = self.end_date.replace("-", "")
        logger.info(
            f"Deleting data from {self.database}.{self.nielsen_w_weather_iceberg_table} between {start_date_formatted} and {end_date_formatted}"
        )

        # Construct the SQL statement to delete data
        sql = f"""
        DELETE FROM {self.database}.{self.nielsen_w_weather_iceberg_table}
        WHERE start_date >= '{start_date_formatted}' AND end_date <= '{end_date_formatted}'
        """
        # Execute the SQL query using AWS Data Wrangler
        response = ar.athena.start_query_execution(sql=sql, database=self.database, wait=True)
        logger.info(
            f"Data deletion initiated. Query execution ID: {response['QueryExecutionId']} and status: {response['Status']}"
        )

    def insert_nielsen_w_weather_query(self, table, nielsen_sales_data_temp_table):
        """
        Constructs and returns the SQL query for inserting aggregated sales data from Nielsen,
        incorporating category data and weather data, into a specified table.

        This query aggregates sales data by 'smm_cleaned', 'week', and 'custom_category' and then joins
        the aggregated results with corresponding weather data. The aggregated data includes total units, dollars,
        base units, and base dollars. The final insertion also includes the specified 'end_date' as 'event_date'.

        Args:
            table (str): The name of the table into which the data will be inserted.

        Returns:
            str: A SQL string that can be executed to insert aggregated data into the specified Nielsen database table.
        """
        sql = f"""
        INSERT INTO {self.database}.{table}
        SELECT 
            ag.custom_category,
            ag.sum_units,
            ag.sum_dollar,
            ag.sum_base_units,
            ag.sum_base_dollar,
            wd.*,
            '{self.end_date}' as event_date
        FROM (
            SELECT 
                sc.smm_cleaned,
                sc.week,
                sc.custom_category,
                SUM(sc.units) AS sum_units,
                SUM(sc.dollar) AS sum_dollar,
                SUM(sc.base_units) AS sum_base_units,
                SUM(sc.base_dollar) AS sum_base_dollar
            FROM (
                SELECT 
                    sd.smm_cleaned,
                    sd.week,
                    cc.custom_category,
                    sd.units,
                    sd.dollar,
                    sd.base_units,
                    sd.base_dollar
                FROM 
                    {self.temp_table_database}.{nielsen_sales_data_temp_table} sd
                JOIN 
                    custom_category cc 
                ON 
                    sd.department = cc.department AND 
                    sd.super_category = cc.super_category AND
                    sd.category = cc.category AND 
                    sd.sub_category = cc.sub_category AND 
                    sd.brand = cc.brand
            ) sc
            GROUP BY 
                sc.smm_cleaned,
                sc.week,
                sc.custom_category
        ) ag
        JOIN 
            {self.smm_database}.{self.smm_table} wd 
        ON 
            ag.smm_cleaned = wd.smm_cleaned AND 
            ag.week = wd.week
        """
        return sql

    def prepare_neilsen_w_weather_data(self):
        """
        Orchestrates the entire process of managing Nielsen data, including data unloading,
        table creation and repair, data insertion, and cleanup based on defined business logic.

        Steps:
        1. Constructs and unloads SQL queries for each year based on the defined range,
        starting with creating a temp nielsen sales data table and unloading the data into it.
        2. Repairs the created table to ensure data integrity and structure.
        3. Checks if the final table exists and creates it if it does not.
        4. If the table exists, deletes data from it for reinsertion.
        5. If the table doesn't exist, letting the pipeline fail by adding an error msg
        6. Inserts aggregated data into the final table.

        All operations log their progress and status for debugging and operational tracking.
        """
        # Processing nielsen sales data and storing it to temp table
        unload_queries = self.nielsen_sales_data_queries()
        if isinstance(unload_queries, str):  # Check if the return value indicates an error
            logger.info(unload_queries)

        # Repairing smm table to fix and optimize table structure
        query_final_state = ar.athena.repair_table(table=self.smm, database=self.database)
        logger.info(f"Repaired Table: {self.smm}")

        for index, query in enumerate(unload_queries):
            logger.info(f"Processed query for index: {index}")

            # Subsequent data are unloaded normally
            logger.info(f"Unloading data in s3 path: {self.nielsen_sales_data_temp_s3_path}")
            unload_sql = f"UNLOAD ({query[1]}) TO '{self.nielsen_sales_data_temp_s3_path}' WITH(format='PARQUET', partitioned_by = ARRAY{self.partitioned_by})"

            # Executing the query to upload data to s3 location
            response = ar.athena.start_query_execution(sql=unload_sql, database=self.database, wait=True)
            logger.info("Temp table creation completed sucessfully.")

        # Creating temp table
        self.create_nielsen_sales_temp_table()

        # Repairing table to fix and optimize table structure
        query_final_state = ar.athena.repair_table(
            table=self.nielsen_sales_data_temp_table, database=self.temp_table_database
        )
        logging.info(f"Table repair status: {str(query_final_state)}")
        logger.info(f"Repaired Table: {self.nielsen_sales_data_temp_table}")

        # Combining and aggregating nielsen + weather data and storing it to an iceberg table
        # Check if the insert table exists and act accordingly
        result = ar.catalog.does_table_exist(database=self.database, table=self.nielsen_w_weather_iceberg_table)

        if not result:
            logger.error(
                f"Pipeline failed due to unavailabality of {self.nielsen_w_weather_iceberg_table} table in {self.database} database"
            )
        else:
            logger.info(f"Table {self.nielsen_w_weather_iceberg_table} already exist within database: {self.database}")
            # If table exists, removing data for the specified date range from nielsen_w_weather iceberg table
            self.delete_nielsen_w_weather_data_between_dates()
        logger.info(f"Deleted data between {self.start_date} and {self.end_date}")

        # Insert aggregated nielsen and weather data into the final iceberg table
        logger.info(f"Inserting Aggregated data in {self.nielsen_w_weather_iceberg_table}")
        insert_sql = self.insert_nielsen_w_weather_query(
            self.nielsen_w_weather_iceberg_table, self.nielsen_sales_data_temp_table
        )

        response = ar.athena.start_query_execution(sql=insert_sql, database=self.database, wait=True)
        logger.info(
            f"Aggregated nielsen and weather data inserted successfully in the iceberg table. Query execution ID: {response['QueryExecutionId']} and status: {response['Status']}"
        )
        logger.info("Inserted data into the iceberg table")

    def delete_temp_resources(self):
        """Deleting temporary resources

        Args:
            database (string): database name for table deletion
            table (string): table name which needs to be deleted
        """
        # Additional cleanup operations could be added here
        ar.s3.delete_objects(self.nielsen_sales_data_temp_s3_path)
        logger.info(f"Removed temporary s3 path data: {self.nielsen_sales_data_temp_s3_path}")
        ar.catalog.delete_table_if_exists(database=self.temp_table_database, table=self.nielsen_sales_data_temp_table)
        logger.info(f"Removed temporary s3 path data: {self.temp_table_database}.{self.nielsen_sales_data_temp_table}")
        logger.info("Removed temporary s3 path and tables")


if __name__ == "__main__":
    # creating object of NielsenDataPreparation class
    processor = NielsenDataPreparation(
        event_date=args["event_date"], lookback_days=args["lookback_days"], backfill_entire_date=args["all"]
    )

    # Removing temp resources
    processor.delete_temp_resources()
    processor.prepare_neilsen_w_weather_data()

    # Removing temp resources
    processor.delete_temp_resources()
Editor is loading...
Leave a Comment