Untitled
unknown
python
a year ago
2.0 kB
11
Indexable
def selected_top_flip(cur_day, version="v1"):
sql_str = '''
select
post_id,
SCORE_ORDER,
SCORE_DURATION,
CATE1
from
(
select
SPU_ID,
post_id,
round(SCORE_ORDER,5) as SCORE_ORDER,
SCORE_DURATION,
CATE1,
ROW_NUMBER() OVER(PARTITION BY "SPU_ID" ORDER BY score_order DESC) AS rank
from FLIP_SYSTEM.dbt_prod.selected_posts_pool
where dt='{}' and CREATE_DAYS <=180 and IS_CLIP=False
and CATE1 not in ('Tops','null','Bottoms')
) where
rank<=5
'''.format(date_minus(cur_day, 1))
top_flip_post_df = spark.read.format("snowflake") \
.options(**sfparams) \
.option("query", sql_str) \
.load()
if top_flip_post_df.count() < 1000:
print("rcmd:: table result count is too small!")
return
top_flip_posts_df = top_flip_post_df.withColumn("rank_info",
F.expr("concat(POST_ID,'_',cast(SCORE_ORDER as string))")) \
.groupby("CATE1") \
.agg(F.expr("collect_list(rank_info) as rank_infos"))
def sort_map(x):
tmp = sorted([x.split("_") for x in x], key=lambda x: float(x[1]), reverse=True)
return tmp
import json
def format_(row):
category = row[0]
res = [json.dumps({"id": y[0], "score": round(float(y[1]), 5)}).replace('"', '""') for y in row[1]]
return UserSelectedTopFlipRetriever_header + "_{}".format(
version) + ',' + category + ',' + '"' + '[' + ','.join(
res) + ']' + '"' + ",json"
new_personalize_rdd = top_flip_posts_df.rdd.map(lambda x: (x[0], sort_map(x[1]))).map(format_)
s3_delete("flip-personalize-prod", "ningman/recall/online/output/flip/UserSelectedTopFlipRetriever/" + version,
cur_day)
new_personalize_rdd.repartition(1) \
.saveAsTextFile(
"s3://flip-personalize-prod/ningman/recall/online/output/flip/UserSelectedTopFlipRetriever/" + version + "/" + cur_day)
Editor is loading...
Leave a Comment