Untitled
unknown
plain_text
2 years ago
2.9 kB
10
Indexable
# %load -s task_3 assignment2.py
def task_3(data_io, product_data):
# -----------------------------Column names--------------------------------
# Inputs:
asin_column = 'asin'
price_column = 'price'
attribute = 'also_viewed'
related_column = 'related'
# Outputs:
meanPriceAlsoViewed_column = 'meanPriceAlsoViewed'
countAlsoViewed_column = 'countAlsoViewed'
# -------------------------------------------------------------------------
# ---------------------- Your implementation begins------------------------
exploded_df = product_data.alias("pd").withColumn(
"also_viewed_product",
explode(
when(
(col("pd.related.also_viewed").isNotNull()) & (size(col("pd.related.also_viewed")) > 0),
col("pd.related.also_viewed")
).otherwise(array(lit(None)))
)
)
product_prices = product_data.alias("pp").select(
col("pp.asin").alias("product_asin"),
col("pp.price").alias("also_viewed_price")
)
joined_df = exploded_df.join(
product_prices,
exploded_df.also_viewed_product == product_prices.product_asin,
"left"
)
agg_df = joined_df.groupBy("pd.asin").agg(
avg("also_viewed_price").alias("meanPriceAlsoViewed"),
count("also_viewed_product").alias("countAlsoViewed")
)
stats = agg_df.select(
count('*').alias('count_total'),
mean('meanPriceAlsoViewed').alias('mean_meanPriceAlsoViewed'),
variance('meanPriceAlsoViewed').alias('variance_meanPriceAlsoViewed'),
count(when(col('meanPriceAlsoViewed').isNull(), True)).alias('numNulls_meanPriceAlsoViewed'),
mean('countAlsoViewed').alias('mean_countAlsoViewed'),
variance('countAlsoViewed').alias('variance_countAlsoViewed'),
count(when(col('countAlsoViewed').isNull(), True)).alias('numNulls_countAlsoViewed')
).collect()[0]
# -------------------------------------------------------------------------
# ---------------------- Put results in res dict --------------------------
res = {
'count_total': stats['count_total'],
'mean_meanPriceAlsoViewed': stats['mean_meanPriceAlsoViewed'],
'variance_meanPriceAlsoViewed': stats['variance_meanPriceAlsoViewed'],
'numNulls_meanPriceAlsoViewed': stats['numNulls_meanPriceAlsoViewed'],
'mean_countAlsoViewed': stats['mean_countAlsoViewed'],
'variance_countAlsoViewed': stats['variance_countAlsoViewed'],
'numNulls_countAlsoViewed': stats['numNulls_countAlsoViewed']
}
# Modify res:
# -------------------------------------------------------------------------
# ----------------------------- Do not change -----------------------------
data_io.save(res, 'task_3')
return res
# -------------------------------------------------------------------------
Editor is loading...
Leave a Comment