Untitled
unknown
plain_text
a year ago
6.7 kB
111
Indexable
Here's your code beautified: ```python import argparse from datetime import datetime from pyspark.sql import SparkSession, DataFrame, Window from pyspark.sql.functions import broadcast, col, desc, explode, input_file_name, lit, log, mean, regexp_extract, regexp_replace, row_number, split, sum, to_timestamp, exp from pyspark.sql import functions as f class VisitModelFeaturizer: customer_settlement_path: str glp_scores_path: str apay_rewards_path: str ww_proficiency_path: str def __init__( self, customer_settlement_path: str, glp_scores_path: str, apay_rewards_path: str, ww_proficiency_path: str, ): self.customer_settlement_path = customer_settlement_path self.glp_scores_path = glp_scores_path self.apay_rewards_path = apay_rewards_path self.ww_proficiency_path = ww_proficiency_path def get_backbone_data(self) -> DataFrame: backbone_df = ( spark.read.parquet(self.customer_settlement_path) .withColumn("gl_array", lit('["263,328,196,60,201,79,86,200"]')) .withColumn("min_order_amt", lit(500)) .withColumn("max_order_amt", lit(150)) .withColumn("percent_value", lit(15)) ) backbone_df = backbone_df.cache() return backbone_df def get_backbone_with_glp(self, backbone_df) -> DataFrame: backbone_with_promotion_metadata_flat_gl = backbone_df.withColumn( "gl", explode(split(regexp_replace("gl_array", "^\[|\"|\]$", ""), ",")) ).drop("gl_array") GLP_SNAPSHOT_DATETIME = datetime(2024, 3, 17, 0, 0) glp = spark.read.parquet( self.glp_scores_path.format( datetime.strftime(GLP_SNAPSHOT_DATETIME, "%Y/%m/%d") ) ).withColumn( "gl", regexp_replace("modelalias", "^permille_glerr_", "") ).drop("modelalias").withColumnRenamed("customerid", "customer_id") backbone_with_glp = backbone_with_promotion_metadata_flat_gl.join( glp, on=["customer_id", "gl"], how="leftouter" ).na.fill({"probability": 0}).withColumn( "log_1_minus_probability", log(1.0000000001 - col("probability")) ) backbone_with_glp = backbone_with_glp.cache() return backbone_with_glp def get_backbone_with_match_score_on_cid( self, backbone_with_glp ) -> DataFrame: backbone_with_match_score = backbone_with_glp.groupBy(["customer_id"]).agg( sum("log_1_minus_probability").alias("log_product_1_minus_probability") ).withColumn( "match_score", 1 - exp(col("log_product_1_minus_probability")) ) backbone_with_match_score = backbone_with_match_score.cache() dataset_df = backbone_df.join(backbone_with_match_score, on=["customer_id"]) dataset_df = dataset_df.cache() return dataset_df def get_surrogate_feature(self) -> DataFrame: surrogate_out_of_the_box_feature_df = spark.read.parquet( self.apay_rewards_path ) surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.withColumnRenamed( "snapshot_day", "hit_day" ) surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.filter( col("snapshot_day") == to_timestamp(lit("2024-03-16 00:00:00")) ).cache() surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.withColumn( "hit_day", to_timestamp(lit("2024-03-18 00:00:00")) ) # FIXME surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.cache() return surrogate_out_of_the_box_feature_df def get_backbone_with_surrogate_out_of_the_box_feature( self, dataset_df ) -> DataFrame: backbone_with_surrogate_out_of_the_box_feature = surrogate_out_of_the_box_feature.join( dataset, on=["customer_id", "hit_day"], how="rightouter" ).select( [ "customer_id", "hit_day", "weeks", "segment", "orders_static_mtd", "units_static_mtd", "ops_static_mtd", "orders_static_qtd", "units_static_qtd", "ops_static_qtd", "orders_static_ytd", "units_static_ytd", "ops_static_ytd", "orders_static_t12m", "units_static_t12m", "ops_static_t12m", "weeks_ytd" ] ) backbone_with_surrogate_out_of_the_box_feature.cache() dataset_df = dataset_df.join( backbone_with_surrogate_out_of_the_box_feature, on=["customer_id", "hit_day"] ) dataset_df = dataset_df.cache() return dataset_df def get_proficiency_data(self) -> DataFrame: proficiency_df = spark.read.csv( self.ww_proficiency_path.format(month), header=True ).repartition(500) proficiency_df = proficiency_df.withColumn("filename", input_file_name()) proficiency_df = proficiency_df.join( broadcast(customer), on="customer_id", how="inner" ) proficiency_df = proficiency_df.withColumn( "month", regexp_extract("filename", r"/(\d{2})/", 1).cast("int") ) proficiency_df = proficiency_df.cache() return proficiency_df def get_proficiency_latest_snapshot(self, proficiency_df) -> DataFrame: window = Window.partitionBy("customer_id").orderBy(col("month").desc()) proficiency_latest_snapshot = proficiency_df.withColumn( "row", row_number().over(window) ).filter(col("row") == 1).drop("row").select( ["customer_id", "is_non_proficient", "degree_of_non_proficiency"] ) proficiency_latest_snapshot = proficiency_latest_snapshot.cache() return proficiency_latest_snapshot def compute_backbone_with_glp_proficiency( self, backbone_with_glp, dataset_df ) -> DataFrame: backbone_with_glp_proficiency = backbone_with_glp.join( proficiency_latest_snapshot, on=["customer_id"], how="leftouter" ) backbone_with_glp_proficiency = backbone_with_glp_proficiency.cache() dataset = dataset.join(backbone_with_glp_proficiency, on=["customer_id"]) dataset_df = dataset_df.cache() return dataset_df def visit_model_featurize(self) -> None: backbone_df = self.get_backbone_data() backbone_with_glp = self.get_backbone_with_glp(backbone_df) backbone_with_match_score_on_cid = self.get_backbone_with_match_score_on_cid( backbone_with_glp ) surrogate_feature = self.get_surrogate_feature() backbone_with_surrogate_out_of_the_box_feature = self.get_backbone
Editor is loading...
Leave a Comment