Untitled

mail@pastecode.io avatar
unknown
plain_text
5 months ago
6.8 kB
2
Indexable
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
import pandas as pd
from pyspark.sql.window import Window
import pyspark.sql.functions as sf
from pyspark.sql.functions import concat_ws
from datetime import datetime, timedelta
import os

# spark = (
#     SparkSession.builder.config("spark.driver.memory", "8g")
#     .config("spark.executor.cores", 8)
#     .getOrCreate()
# )

# Path to your MySQL JDBC driver
jdbc_driver_path = "/Users/admin/Downloads/Pyspark/spark-3.5.1-bin-hadoop3/jars/mysql-connector-j-8.3.0.jar"

# Initialize Spark session with JDBC driver
spark = (
    SparkSession.builder.appName("MyApp")
    .config("spark.jars", jdbc_driver_path)
    .getOrCreate()
)


def category_AppName(df):
    df = df.withColumn(
        "Type",
        when(col("AppName") == "CHANNEL", "Truyen Hinh")
        .when(col("AppName") == "RELAX", "Giai Tri")
        .when(col("AppName") == "CHILD", "Thieu Nhi")
        .when((col("AppName") == "FIMS") | (col("AppName") == "VOD"), "Phim Truyen")
        .when((col("AppName") == "KPLUS") | (col("AppName") == "SPORT"), "The Thao"),
    )
    df = df.select("Contract", "Type", "TotalDuration")
    df = df.filter(df.Contract != "0")
    df = df.filter(df.Type != "Error")
    return df


def most_watch(df):
    df = df.withColumn(
        "MostWatch",
        greatest(
            col("Giai Tri"),
            col("Phim Truyen"),
            col("The Thao"),
            col("Thieu Nhi"),
            col("Truyen Hinh"),
            col("Giai Tri"),
        ),
    )
    df = df.withColumn(
        "MostWatch",
        when(col("MostWatch") == col("Truyen Hinh"), "Truyen Hinh")
        .when(col("MostWatch") == col("Phim Truyen"), "Phim Truyen")
        .when(col("MostWatch") == col("The Thao"), "The Thao")
        .when(col("MostWatch") == col("Thieu Nhi"), "Thieu Nhi")
        .when(col("MostWatch") == col("Giai Tri"), "Giai Tri"),
    )
    return df


def customer_taste(df):
    df = df.withColumn(
        "Taste",
        concat_ws(
            "-",
            when(col("Giai Tri").isNotNull(), lit("Giai Tri")),
            when(col("Phim Truyen").isNotNull(), lit("Phim Truyen")),
            when(col("The Thao").isNotNull(), lit("The Thao")),
            when(col("Thieu Nhi").isNotNull(), lit("Thieu Nhi")),
            when(col("Truyen Hinh").isNotNull(), lit("Truyen Hinh")),
        ),
    )
    return df


def convert_to_datevalue(string):
    date_value = datetime.strptime(string, "%Y%m%d").date()
    return date_value


def convert_to_stringvalue(date):
    string_value = date.strftime("%Y%m%d")
    return string_value


def date_range(start_date, end_date):
    date_list = []
    current_date = start_date
    while current_date <= end_date:
        date_list.append(convert_to_stringvalue(current_date))
        current_date += timedelta(days=1)
    return date_list


def generate_range_date(start_date, end_date):
    start_date = convert_to_datevalue(start_date)
    end_date = convert_to_datevalue(end_date)
    date_list = date_range(start_date, end_date)
    return date_list


def find_active(df):
    windowspec = Window.partitionBy("Contract")
    df = df.withColumn("Active", sf.count("Date").over(windowspec))
    df = df.drop("Date")
    df = df.withColumn("Active", when(col("Active") > 4, "High").otherwise("Low"))
    df = df.groupBy("Contract").agg(
        sf.sum("Giai Tri").alias("Total_Giai_Tri"),
        sf.sum("Phim Truyen").alias("Total_Phim_Truyen"),
        sf.sum("The Thao").alias("Total_The_Thao"),
        sf.sum("Thieu Nhi").alias("Total_Thieu_Nhi"),
        sf.sum("Truyen Hinh").alias("Total_Truyen_Hinh"),
        sf.first("MostWatch").alias("MostWatch"),
        sf.first("Taste").alias("Taste"),
        sf.first("Active").alias("Active"),
    )
    return df


def ETL_1_DAY(path, path_day):
    print("------------------------")
    print("Read data from Json file")
    print("------------------------")
    df = spark.read.json(path + path_day + ".json")
    print("------------------------")
    print("Category AppName")
    print("------------------------")
    df = df.select("_source.*")
    df = category_AppName(df)
    print("-----------------------------")
    print("Pivoting data")
    print("-----------------------------")
    df = df.groupBy("Contract").pivot("Type").sum("TotalDuration")
    print("-----------------------------")
    print("Find most watch")
    print("-----------------------------")
    df = most_watch(df)
    print("-----------------------------")
    print("Find customer taste")
    print("-----------------------------")
    df = customer_taste(df)
    df = df.withColumn("Date", to_date(lit(path_day), "yyyyMMdd"))
    print("-----------------------------")
    print("Find avtive ")
    print("-----------------------------")
    df = find_active(df)
    return df


def import_to_mysql(result):
    url = "jdbc:mysql://" + "localhost" + ":" + "3306" + "/" + "Olap_Output"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "TanBinh2301"
    result.write.format("jdbc").option("url", url).option("driver", driver).option(
        "dbtable", "etl_output"
    ).option("user", user).option("password", password).mode("append").save()
    return print("Data Import Successfully")


def maintask(path, save_path):
    dir_list = os.listdir(path)
    # start_date=input("Nhap ngay bat dau: ")
    # end_date=input("Nhap ngay ket thuc: ")
    start_date = "20220401"
    end_date = "20220430"
    date_list = generate_range_date(start_date, end_date)
    print("ETL data file: " + date_list[0] + ".json")
    result = ETL_1_DAY(path, date_list[0])
    # for x in date_list:
    #     for y in dir_list:
    #         if(y==dir_list[0]):
    #             continue
    #         if x in y:
    #             print("ETL data file: "+y)
    #             result=result.union(ETL_1_DAY(path,x))
    for x in date_list[1:]:
        print("ETL data file: " + x + ".json")
        result = result.union(ETL_1_DAY(path, x))
    print("-----------------------------")
    print("Showing data")
    print("-----------------------------")
    result.show(10)
    print("-----------------------------")
    print("Saving csv output")
    print("-----------------------------")
    result.repartition(1).write.csv(save_path, mode="overwrite", header=True)
    print("-----------------------------")
    print("Import result to mysql")
    print("-----------------------------")
    # import_to_mysql(result)
    print("Finished job")
    return result


path = "/Users/admin/Documents/DE Course-Khoá Trước/Items Shared on 4-29-2023/Dataset/log_content/"
save_path = "/Users/admin/Documents/DE_task"
df = maintask(path, save_path)
Leave a Comment