Untitled
from pyspark.context import SparkContext from pyspark.sql.session import SparkSession from pyspark.sql.functions import * import pandas as pd from pyspark.sql.window import Window import pyspark.sql.functions as sf from pyspark.sql.functions import concat_ws from datetime import datetime, timedelta import os # spark = ( # SparkSession.builder.config("spark.driver.memory", "8g") # .config("spark.executor.cores", 8) # .getOrCreate() # ) # Path to your MySQL JDBC driver jdbc_driver_path = "/Users/admin/Downloads/Pyspark/spark-3.5.1-bin-hadoop3/jars/mysql-connector-j-8.3.0.jar" # Initialize Spark session with JDBC driver spark = ( SparkSession.builder.appName("MyApp") .config("spark.jars", jdbc_driver_path) .getOrCreate() ) def category_AppName(df): df = df.withColumn( "Type", when(col("AppName") == "CHANNEL", "Truyen Hinh") .when(col("AppName") == "RELAX", "Giai Tri") .when(col("AppName") == "CHILD", "Thieu Nhi") .when((col("AppName") == "FIMS") | (col("AppName") == "VOD"), "Phim Truyen") .when((col("AppName") == "KPLUS") | (col("AppName") == "SPORT"), "The Thao"), ) df = df.select("Contract", "Type", "TotalDuration") df = df.filter(df.Contract != "0") df = df.filter(df.Type != "Error") return df def most_watch(df): df = df.withColumn( "MostWatch", greatest( col("Giai Tri"), col("Phim Truyen"), col("The Thao"), col("Thieu Nhi"), col("Truyen Hinh"), col("Giai Tri"), ), ) df = df.withColumn( "MostWatch", when(col("MostWatch") == col("Truyen Hinh"), "Truyen Hinh") .when(col("MostWatch") == col("Phim Truyen"), "Phim Truyen") .when(col("MostWatch") == col("The Thao"), "The Thao") .when(col("MostWatch") == col("Thieu Nhi"), "Thieu Nhi") .when(col("MostWatch") == col("Giai Tri"), "Giai Tri"), ) return df def customer_taste(df): df = df.withColumn( "Taste", concat_ws( "-", when(col("Giai Tri").isNotNull(), lit("Giai Tri")), when(col("Phim Truyen").isNotNull(), lit("Phim Truyen")), when(col("The Thao").isNotNull(), lit("The Thao")), when(col("Thieu Nhi").isNotNull(), lit("Thieu Nhi")), when(col("Truyen Hinh").isNotNull(), lit("Truyen Hinh")), ), ) return df def convert_to_datevalue(string): date_value = datetime.strptime(string, "%Y%m%d").date() return date_value def convert_to_stringvalue(date): string_value = date.strftime("%Y%m%d") return string_value def date_range(start_date, end_date): date_list = [] current_date = start_date while current_date <= end_date: date_list.append(convert_to_stringvalue(current_date)) current_date += timedelta(days=1) return date_list def generate_range_date(start_date, end_date): start_date = convert_to_datevalue(start_date) end_date = convert_to_datevalue(end_date) date_list = date_range(start_date, end_date) return date_list def find_active(df): windowspec = Window.partitionBy("Contract") df = df.withColumn("Active", sf.count("Date").over(windowspec)) df = df.drop("Date") df = df.withColumn("Active", when(col("Active") > 4, "High").otherwise("Low")) df = df.groupBy("Contract").agg( sf.sum("Giai Tri").alias("Total_Giai_Tri"), sf.sum("Phim Truyen").alias("Total_Phim_Truyen"), sf.sum("The Thao").alias("Total_The_Thao"), sf.sum("Thieu Nhi").alias("Total_Thieu_Nhi"), sf.sum("Truyen Hinh").alias("Total_Truyen_Hinh"), sf.first("MostWatch").alias("MostWatch"), sf.first("Taste").alias("Taste"), sf.first("Active").alias("Active"), ) return df def ETL_1_DAY(path, path_day): print("------------------------") print("Read data from Json file") print("------------------------") df = spark.read.json(path + path_day + ".json") print("------------------------") print("Category AppName") print("------------------------") df = df.select("_source.*") df = category_AppName(df) print("-----------------------------") print("Pivoting data") print("-----------------------------") df = df.groupBy("Contract").pivot("Type").sum("TotalDuration") print("-----------------------------") print("Find most watch") print("-----------------------------") df = most_watch(df) print("-----------------------------") print("Find customer taste") print("-----------------------------") df = customer_taste(df) df = df.withColumn("Date", to_date(lit(path_day), "yyyyMMdd")) print("-----------------------------") print("Find avtive ") print("-----------------------------") df = find_active(df) return df def import_to_mysql(result): url = "jdbc:mysql://" + "localhost" + ":" + "3306" + "/" + "Olap_Output" driver = "com.mysql.cj.jdbc.Driver" user = "root" password = "TanBinh2301" result.write.format("jdbc").option("url", url).option("driver", driver).option( "dbtable", "etl_output" ).option("user", user).option("password", password).mode("append").save() return print("Data Import Successfully") def maintask(path, save_path): dir_list = os.listdir(path) # start_date=input("Nhap ngay bat dau: ") # end_date=input("Nhap ngay ket thuc: ") start_date = "20220401" end_date = "20220430" date_list = generate_range_date(start_date, end_date) print("ETL data file: " + date_list[0] + ".json") result = ETL_1_DAY(path, date_list[0]) # for x in date_list: # for y in dir_list: # if(y==dir_list[0]): # continue # if x in y: # print("ETL data file: "+y) # result=result.union(ETL_1_DAY(path,x)) for x in date_list[1:]: print("ETL data file: " + x + ".json") result = result.union(ETL_1_DAY(path, x)) print("-----------------------------") print("Showing data") print("-----------------------------") result.show(10) print("-----------------------------") print("Saving csv output") print("-----------------------------") result.repartition(1).write.csv(save_path, mode="overwrite", header=True) print("-----------------------------") print("Import result to mysql") print("-----------------------------") # import_to_mysql(result) print("Finished job") return result path = "/Users/admin/Documents/DE Course-Khoá Trước/Items Shared on 4-29-2023/Dataset/log_content/" save_path = "/Users/admin/Documents/DE_task" df = maintask(path, save_path)
Leave a Comment