Untitled
unknown
plain_text
2 years ago
1.8 kB
28
Indexable
/usr/lib/spark/bin/spark-submit --master yarn --deploy-mode cluster verified_tags_candidates.py 2022-05-31 5 300 /user/{studentusername}/data/events /user/master/data/snapshots/tags_verified/actual /user/{studentusername}/5.2.4/analytics/verified_tags_candidates_d5
import sys
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
import datetime
def main():
date = sys.argv[1]
days_count = sys.argv[2]
suggested_cutoff = sys.argv[3]
base_input_path = sys.argv[4]
verified_tags_path = sys.argv[5]
base_output_path = sys.argv[6]
conf = SparkConf().setAppName(f"VerifiedTagsCandidatesJob-{date}-d{days_count}-cut{suggested_cutoff}")
sc = SparkContext(conf=conf)
sql = SQLContext(sc)
dt = datetime.datetime.strptime(date, '%Y-%m-%d')
paths = [f"{base_input_path}/date={(dt-datetime.timedelta(days=x)).strftime('%Y-%m-%d')}/event_type=message" for x in range(int(days_count))]
messages = sql.read.parquet(*paths)
verified_tags = sql.read.parquet(verified_tags_path)
candidates = find_candidates(messages, verified_tags, suggested_cutoff)
candidates.write.parquet(f"{base_output_path}/date={date}")
def find_candidates(messages, verified_tags, suggested_cutoff):
all_tags = messages\
.where("event.message_channel_to is not null")\
.selectExpr(["event.message_from as user", "explode(event.tags) as tag"])\
.groupBy("tag")\
.agg(F.countDistinct("user").alias("suggested_count"))\
.where(F.col("suggested_count") >= suggested_cutoff)
return all_tags.join(verified_tags, "tag", "left_anti")
if __name__ == "__main__":
main()
Editor is loading...
Leave a Comment