Untitled
unknown
plain_text
7 months ago
2.9 kB
2
Indexable
Never
# %load -s task_3 assignment2.py def task_3(data_io, product_data): # -----------------------------Column names-------------------------------- # Inputs: asin_column = 'asin' price_column = 'price' attribute = 'also_viewed' related_column = 'related' # Outputs: meanPriceAlsoViewed_column = 'meanPriceAlsoViewed' countAlsoViewed_column = 'countAlsoViewed' # ------------------------------------------------------------------------- # ---------------------- Your implementation begins------------------------ exploded_df = product_data.alias("pd").withColumn( "also_viewed_product", explode( when( (col("pd.related.also_viewed").isNotNull()) & (size(col("pd.related.also_viewed")) > 0), col("pd.related.also_viewed") ).otherwise(array(lit(None))) ) ) product_prices = product_data.alias("pp").select( col("pp.asin").alias("product_asin"), col("pp.price").alias("also_viewed_price") ) joined_df = exploded_df.join( product_prices, exploded_df.also_viewed_product == product_prices.product_asin, "left" ) agg_df = joined_df.groupBy("pd.asin").agg( avg("also_viewed_price").alias("meanPriceAlsoViewed"), count("also_viewed_product").alias("countAlsoViewed") ) stats = agg_df.select( count('*').alias('count_total'), mean('meanPriceAlsoViewed').alias('mean_meanPriceAlsoViewed'), variance('meanPriceAlsoViewed').alias('variance_meanPriceAlsoViewed'), count(when(col('meanPriceAlsoViewed').isNull(), True)).alias('numNulls_meanPriceAlsoViewed'), mean('countAlsoViewed').alias('mean_countAlsoViewed'), variance('countAlsoViewed').alias('variance_countAlsoViewed'), count(when(col('countAlsoViewed').isNull(), True)).alias('numNulls_countAlsoViewed') ).collect()[0] # ------------------------------------------------------------------------- # ---------------------- Put results in res dict -------------------------- res = { 'count_total': stats['count_total'], 'mean_meanPriceAlsoViewed': stats['mean_meanPriceAlsoViewed'], 'variance_meanPriceAlsoViewed': stats['variance_meanPriceAlsoViewed'], 'numNulls_meanPriceAlsoViewed': stats['numNulls_meanPriceAlsoViewed'], 'mean_countAlsoViewed': stats['mean_countAlsoViewed'], 'variance_countAlsoViewed': stats['variance_countAlsoViewed'], 'numNulls_countAlsoViewed': stats['numNulls_countAlsoViewed'] } # Modify res: # ------------------------------------------------------------------------- # ----------------------------- Do not change ----------------------------- data_io.save(res, 'task_3') return res # -------------------------------------------------------------------------
Leave a Comment