Untitled
def selected_top_flip(cur_day, version="v1"): sql_str = ''' select post_id, SCORE_ORDER, SCORE_DURATION, CATE1 from ( select SPU_ID, post_id, round(SCORE_ORDER,5) as SCORE_ORDER, SCORE_DURATION, CATE1, ROW_NUMBER() OVER(PARTITION BY "SPU_ID" ORDER BY score_order DESC) AS rank from FLIP_SYSTEM.dbt_prod.selected_posts_pool where dt='{}' and CREATE_DAYS <=180 and IS_CLIP=False and CATE1 not in ('Tops','null','Bottoms') ) where rank<=5 '''.format(date_minus(cur_day, 1)) top_flip_post_df = spark.read.format("snowflake") \ .options(**sfparams) \ .option("query", sql_str) \ .load() if top_flip_post_df.count() < 1000: print("rcmd:: table result count is too small!") return top_flip_posts_df = top_flip_post_df.withColumn("rank_info", F.expr("concat(POST_ID,'_',cast(SCORE_ORDER as string))")) \ .groupby("CATE1") \ .agg(F.expr("collect_list(rank_info) as rank_infos")) def sort_map(x): tmp = sorted([x.split("_") for x in x], key=lambda x: float(x[1]), reverse=True) return tmp import json def format_(row): category = row[0] res = [json.dumps({"id": y[0], "score": round(float(y[1]), 5)}).replace('"', '""') for y in row[1]] return UserSelectedTopFlipRetriever_header + "_{}".format( version) + ',' + category + ',' + '"' + '[' + ','.join( res) + ']' + '"' + ",json" new_personalize_rdd = top_flip_posts_df.rdd.map(lambda x: (x[0], sort_map(x[1]))).map(format_) s3_delete("flip-personalize-prod", "ningman/recall/online/output/flip/UserSelectedTopFlipRetriever/" + version, cur_day) new_personalize_rdd.repartition(1) \ .saveAsTextFile( "s3://flip-personalize-prod/ningman/recall/online/output/flip/UserSelectedTopFlipRetriever/" + version + "/" + cur_day)
Leave a Comment