Untitled

 avatar
unknown
plain_text
a year ago
6.7 kB
111
Indexable
Here's your code beautified:

```python
import argparse
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.functions import broadcast, col, desc, explode, input_file_name, lit, log, mean, regexp_extract, regexp_replace, row_number, split, sum, to_timestamp, exp
from pyspark.sql import functions as f


class VisitModelFeaturizer:
    customer_settlement_path: str
    glp_scores_path: str
    apay_rewards_path: str
    ww_proficiency_path: str

    def __init__(
        self,
        customer_settlement_path: str,
        glp_scores_path: str,
        apay_rewards_path: str,
        ww_proficiency_path: str,
    ):
        self.customer_settlement_path = customer_settlement_path
        self.glp_scores_path = glp_scores_path
        self.apay_rewards_path = apay_rewards_path
        self.ww_proficiency_path = ww_proficiency_path

    def get_backbone_data(self) -> DataFrame:
        backbone_df = (
            spark.read.parquet(self.customer_settlement_path)
            .withColumn("gl_array", lit('["263,328,196,60,201,79,86,200"]'))
            .withColumn("min_order_amt", lit(500))
            .withColumn("max_order_amt", lit(150))
            .withColumn("percent_value", lit(15))
        )
        backbone_df = backbone_df.cache()
        return backbone_df

    def get_backbone_with_glp(self, backbone_df) -> DataFrame:
        backbone_with_promotion_metadata_flat_gl = backbone_df.withColumn(
            "gl", explode(split(regexp_replace("gl_array", "^\[|\"|\]$", ""), ","))
        ).drop("gl_array")

        GLP_SNAPSHOT_DATETIME = datetime(2024, 3, 17, 0, 0)
        glp = spark.read.parquet(
            self.glp_scores_path.format(
                datetime.strftime(GLP_SNAPSHOT_DATETIME, "%Y/%m/%d")
            )
        ).withColumn(
            "gl", regexp_replace("modelalias", "^permille_glerr_", "")
        ).drop("modelalias").withColumnRenamed("customerid", "customer_id")

        backbone_with_glp = backbone_with_promotion_metadata_flat_gl.join(
            glp, on=["customer_id", "gl"], how="leftouter"
        ).na.fill({"probability": 0}).withColumn(
            "log_1_minus_probability", log(1.0000000001 - col("probability"))
        )

        backbone_with_glp = backbone_with_glp.cache()
        return backbone_with_glp

    def get_backbone_with_match_score_on_cid(
        self, backbone_with_glp
    ) -> DataFrame:
        backbone_with_match_score = backbone_with_glp.groupBy(["customer_id"]).agg(
            sum("log_1_minus_probability").alias("log_product_1_minus_probability")
        ).withColumn(
            "match_score", 1 - exp(col("log_product_1_minus_probability"))
        )

        backbone_with_match_score = backbone_with_match_score.cache()

        dataset_df = backbone_df.join(backbone_with_match_score, on=["customer_id"])

        dataset_df = dataset_df.cache()
        return dataset_df

    def get_surrogate_feature(self) -> DataFrame:
        surrogate_out_of_the_box_feature_df = spark.read.parquet(
            self.apay_rewards_path
        )
        surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.withColumnRenamed(
            "snapshot_day", "hit_day"
        )

        surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.filter(
            col("snapshot_day") == to_timestamp(lit("2024-03-16 00:00:00"))
        ).cache()

        surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.withColumn(
            "hit_day", to_timestamp(lit("2024-03-18 00:00:00"))
        )  # FIXME

        surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.cache()
        return surrogate_out_of_the_box_feature_df

    def get_backbone_with_surrogate_out_of_the_box_feature(
        self, dataset_df
    ) -> DataFrame:
        backbone_with_surrogate_out_of_the_box_feature = surrogate_out_of_the_box_feature.join(
            dataset, on=["customer_id", "hit_day"], how="rightouter"
        ).select(
            [
                "customer_id", "hit_day", "weeks", "segment", "orders_static_mtd",
                "units_static_mtd", "ops_static_mtd", "orders_static_qtd",
                "units_static_qtd", "ops_static_qtd", "orders_static_ytd",
                "units_static_ytd", "ops_static_ytd", "orders_static_t12m",
                "units_static_t12m", "ops_static_t12m", "weeks_ytd"
            ]
        )

        backbone_with_surrogate_out_of_the_box_feature.cache()

        dataset_df = dataset_df.join(
            backbone_with_surrogate_out_of_the_box_feature, on=["customer_id", "hit_day"]
        )

        dataset_df = dataset_df.cache()
        return dataset_df

    def get_proficiency_data(self) -> DataFrame:
        proficiency_df = spark.read.csv(
            self.ww_proficiency_path.format(month), header=True
        ).repartition(500)
        proficiency_df = proficiency_df.withColumn("filename", input_file_name())

        proficiency_df = proficiency_df.join(
            broadcast(customer), on="customer_id", how="inner"
        )

        proficiency_df = proficiency_df.withColumn(
            "month", regexp_extract("filename", r"/(\d{2})/", 1).cast("int")
        )

        proficiency_df = proficiency_df.cache()
        return proficiency_df

    def get_proficiency_latest_snapshot(self, proficiency_df) -> DataFrame:
        window = Window.partitionBy("customer_id").orderBy(col("month").desc())

        proficiency_latest_snapshot = proficiency_df.withColumn(
            "row", row_number().over(window)
        ).filter(col("row") == 1).drop("row").select(
            ["customer_id", "is_non_proficient", "degree_of_non_proficiency"]
        )

        proficiency_latest_snapshot = proficiency_latest_snapshot.cache()
        return proficiency_latest_snapshot

    def compute_backbone_with_glp_proficiency(
        self, backbone_with_glp, dataset_df
    ) -> DataFrame:

        backbone_with_glp_proficiency = backbone_with_glp.join(
            proficiency_latest_snapshot, on=["customer_id"], how="leftouter"
        )

        backbone_with_glp_proficiency = backbone_with_glp_proficiency.cache()
        dataset = dataset.join(backbone_with_glp_proficiency, on=["customer_id"])

        dataset_df = dataset_df.cache()
        return dataset_df


def visit_model_featurize(self) -> None:
    backbone_df = self.get_backbone_data()
    backbone_with_glp = self.get_backbone_with_glp(backbone_df)
    backbone_with_match_score_on_cid = self.get_backbone_with_match_score_on_cid(
        backbone_with_glp
    )
    surrogate_feature = self.get_surrogate_feature()
    backbone_with_surrogate_out_of_the_box_feature = self.get_backbone
Editor is loading...
Leave a Comment