Untitled

mail@pastecode.io avatar
unknown
plain_text
7 months ago
2.9 kB
2
Indexable
Never
# %load -s task_3 assignment2.py
def task_3(data_io, product_data):
    # -----------------------------Column names--------------------------------
    # Inputs:
    asin_column = 'asin'
    price_column = 'price'
    attribute = 'also_viewed'
    related_column = 'related'
    # Outputs:
    meanPriceAlsoViewed_column = 'meanPriceAlsoViewed'
    countAlsoViewed_column = 'countAlsoViewed'
    # -------------------------------------------------------------------------

    # ---------------------- Your implementation begins------------------------
    exploded_df = product_data.alias("pd").withColumn(
        "also_viewed_product", 
        explode(
            when(
                (col("pd.related.also_viewed").isNotNull()) & (size(col("pd.related.also_viewed")) > 0), 
                col("pd.related.also_viewed")
            ).otherwise(array(lit(None)))
        )
    )
    
    product_prices = product_data.alias("pp").select(
        col("pp.asin").alias("product_asin"), 
        col("pp.price").alias("also_viewed_price")
    )
    
    joined_df = exploded_df.join(
        product_prices, 
        exploded_df.also_viewed_product == product_prices.product_asin, 
        "left"
    )

    agg_df = joined_df.groupBy("pd.asin").agg(
        avg("also_viewed_price").alias("meanPriceAlsoViewed"),
        count("also_viewed_product").alias("countAlsoViewed")
    )
    
    stats = agg_df.select(
        count('*').alias('count_total'),
        mean('meanPriceAlsoViewed').alias('mean_meanPriceAlsoViewed'),
        variance('meanPriceAlsoViewed').alias('variance_meanPriceAlsoViewed'),
        count(when(col('meanPriceAlsoViewed').isNull(), True)).alias('numNulls_meanPriceAlsoViewed'),
        mean('countAlsoViewed').alias('mean_countAlsoViewed'),
        variance('countAlsoViewed').alias('variance_countAlsoViewed'),
        count(when(col('countAlsoViewed').isNull(), True)).alias('numNulls_countAlsoViewed')
    ).collect()[0]

    # -------------------------------------------------------------------------

    # ---------------------- Put results in res dict --------------------------
    res = {
        'count_total': stats['count_total'],
        'mean_meanPriceAlsoViewed': stats['mean_meanPriceAlsoViewed'],
        'variance_meanPriceAlsoViewed': stats['variance_meanPriceAlsoViewed'],
        'numNulls_meanPriceAlsoViewed': stats['numNulls_meanPriceAlsoViewed'],
        'mean_countAlsoViewed': stats['mean_countAlsoViewed'],
        'variance_countAlsoViewed': stats['variance_countAlsoViewed'],
        'numNulls_countAlsoViewed': stats['numNulls_countAlsoViewed']
    }
    # Modify res:




    # -------------------------------------------------------------------------

    # ----------------------------- Do not change -----------------------------
    data_io.save(res, 'task_3')
    return res
    # -------------------------------------------------------------------------
Leave a Comment