Untitled
unknown
plain_text
2 years ago
6.7 kB
113
Indexable
Here's your code beautified:
```python
import argparse
from datetime import datetime
from pyspark.sql import SparkSession, DataFrame, Window
from pyspark.sql.functions import broadcast, col, desc, explode, input_file_name, lit, log, mean, regexp_extract, regexp_replace, row_number, split, sum, to_timestamp, exp
from pyspark.sql import functions as f
class VisitModelFeaturizer:
customer_settlement_path: str
glp_scores_path: str
apay_rewards_path: str
ww_proficiency_path: str
def __init__(
self,
customer_settlement_path: str,
glp_scores_path: str,
apay_rewards_path: str,
ww_proficiency_path: str,
):
self.customer_settlement_path = customer_settlement_path
self.glp_scores_path = glp_scores_path
self.apay_rewards_path = apay_rewards_path
self.ww_proficiency_path = ww_proficiency_path
def get_backbone_data(self) -> DataFrame:
backbone_df = (
spark.read.parquet(self.customer_settlement_path)
.withColumn("gl_array", lit('["263,328,196,60,201,79,86,200"]'))
.withColumn("min_order_amt", lit(500))
.withColumn("max_order_amt", lit(150))
.withColumn("percent_value", lit(15))
)
backbone_df = backbone_df.cache()
return backbone_df
def get_backbone_with_glp(self, backbone_df) -> DataFrame:
backbone_with_promotion_metadata_flat_gl = backbone_df.withColumn(
"gl", explode(split(regexp_replace("gl_array", "^\[|\"|\]$", ""), ","))
).drop("gl_array")
GLP_SNAPSHOT_DATETIME = datetime(2024, 3, 17, 0, 0)
glp = spark.read.parquet(
self.glp_scores_path.format(
datetime.strftime(GLP_SNAPSHOT_DATETIME, "%Y/%m/%d")
)
).withColumn(
"gl", regexp_replace("modelalias", "^permille_glerr_", "")
).drop("modelalias").withColumnRenamed("customerid", "customer_id")
backbone_with_glp = backbone_with_promotion_metadata_flat_gl.join(
glp, on=["customer_id", "gl"], how="leftouter"
).na.fill({"probability": 0}).withColumn(
"log_1_minus_probability", log(1.0000000001 - col("probability"))
)
backbone_with_glp = backbone_with_glp.cache()
return backbone_with_glp
def get_backbone_with_match_score_on_cid(
self, backbone_with_glp
) -> DataFrame:
backbone_with_match_score = backbone_with_glp.groupBy(["customer_id"]).agg(
sum("log_1_minus_probability").alias("log_product_1_minus_probability")
).withColumn(
"match_score", 1 - exp(col("log_product_1_minus_probability"))
)
backbone_with_match_score = backbone_with_match_score.cache()
dataset_df = backbone_df.join(backbone_with_match_score, on=["customer_id"])
dataset_df = dataset_df.cache()
return dataset_df
def get_surrogate_feature(self) -> DataFrame:
surrogate_out_of_the_box_feature_df = spark.read.parquet(
self.apay_rewards_path
)
surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.withColumnRenamed(
"snapshot_day", "hit_day"
)
surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.filter(
col("snapshot_day") == to_timestamp(lit("2024-03-16 00:00:00"))
).cache()
surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.withColumn(
"hit_day", to_timestamp(lit("2024-03-18 00:00:00"))
) # FIXME
surrogate_out_of_the_box_feature_df = surrogate_out_of_the_box_feature_df.cache()
return surrogate_out_of_the_box_feature_df
def get_backbone_with_surrogate_out_of_the_box_feature(
self, dataset_df
) -> DataFrame:
backbone_with_surrogate_out_of_the_box_feature = surrogate_out_of_the_box_feature.join(
dataset, on=["customer_id", "hit_day"], how="rightouter"
).select(
[
"customer_id", "hit_day", "weeks", "segment", "orders_static_mtd",
"units_static_mtd", "ops_static_mtd", "orders_static_qtd",
"units_static_qtd", "ops_static_qtd", "orders_static_ytd",
"units_static_ytd", "ops_static_ytd", "orders_static_t12m",
"units_static_t12m", "ops_static_t12m", "weeks_ytd"
]
)
backbone_with_surrogate_out_of_the_box_feature.cache()
dataset_df = dataset_df.join(
backbone_with_surrogate_out_of_the_box_feature, on=["customer_id", "hit_day"]
)
dataset_df = dataset_df.cache()
return dataset_df
def get_proficiency_data(self) -> DataFrame:
proficiency_df = spark.read.csv(
self.ww_proficiency_path.format(month), header=True
).repartition(500)
proficiency_df = proficiency_df.withColumn("filename", input_file_name())
proficiency_df = proficiency_df.join(
broadcast(customer), on="customer_id", how="inner"
)
proficiency_df = proficiency_df.withColumn(
"month", regexp_extract("filename", r"/(\d{2})/", 1).cast("int")
)
proficiency_df = proficiency_df.cache()
return proficiency_df
def get_proficiency_latest_snapshot(self, proficiency_df) -> DataFrame:
window = Window.partitionBy("customer_id").orderBy(col("month").desc())
proficiency_latest_snapshot = proficiency_df.withColumn(
"row", row_number().over(window)
).filter(col("row") == 1).drop("row").select(
["customer_id", "is_non_proficient", "degree_of_non_proficiency"]
)
proficiency_latest_snapshot = proficiency_latest_snapshot.cache()
return proficiency_latest_snapshot
def compute_backbone_with_glp_proficiency(
self, backbone_with_glp, dataset_df
) -> DataFrame:
backbone_with_glp_proficiency = backbone_with_glp.join(
proficiency_latest_snapshot, on=["customer_id"], how="leftouter"
)
backbone_with_glp_proficiency = backbone_with_glp_proficiency.cache()
dataset = dataset.join(backbone_with_glp_proficiency, on=["customer_id"])
dataset_df = dataset_df.cache()
return dataset_df
def visit_model_featurize(self) -> None:
backbone_df = self.get_backbone_data()
backbone_with_glp = self.get_backbone_with_glp(backbone_df)
backbone_with_match_score_on_cid = self.get_backbone_with_match_score_on_cid(
backbone_with_glp
)
surrogate_feature = self.get_surrogate_feature()
backbone_with_surrogate_out_of_the_box_feature = self.get_backboneEditor is loading...
Leave a Comment