Untitled

 avatar
user_9572470
plain_text
a year ago
7.0 kB
6
Indexable
from zipfile import ZipFile
import pandas as pd
from typing import Union
from google.cloud import bigquery
import ftplib
from google.auth import load_credentials_from_file

import pandas_gbq

def downloadZipfile(df: pd.DataFrame, columns: Union[str, list]) -> pd.DataFrame:
     # Extract CSV file and process DataFrame
    FTP_HOST = "asdsadasdsadsada.com"
    FTP_USER = "fssafdsf"
    FTP_PASS = "8v%u2$v7zi2!"
    #Connect to FTP server

    ftp = ftplib.FTP(FTP_HOST, FTP_USER, FTP_PASS)
    print("Connected to FTP server")
    ftp.cwd('GCP')
    ftp.cwd('LAT')
    
    #Download file from FTP server

    with open(zip_file_name, 'wb') as f:
        ftp.retrbinary('RETR ' + zip_file_name, f.write)

def extract_csv_from_zip(zip_file_path: str, csv_filename: str) -> pd.DataFrame:
    """
    Extracts a CSV file from a zip archive and returns it as a pandas DataFrame.

    Args:
    - zip_file_path (str): Path to the zip file
    - csv_filename (str): Name of the CSV file to extract

    Returns:
    - pd.DataFrame: Extracted DataFrame
    """
    

    with ZipFile(zip_file_path, 'r') as zip_object:
        zip_object.extract(csv_filename)

    print(csv_filename+ " File Extracted")
    #data = pd.read_csv(csv_filename)

    # Read the CSV file into a Pandas DataFrame
    df = pd.read_csv(csv_filename)
    df = df[df["Line: Type"] == "Line Item"]
    requiredcolumn = ['Created At', 'Name', 'Line: SKU', 'Line: Price','Line: Quantity','Line: Pre Tax Price', 'Line: Properties','Tags']
    #new_column_order = ['Created At', 'Name', 'Email', 'Customer: ID']
    df = df[requiredcolumn]
 
    df["DROPSHIPMODE"] = "" 
    df["COST"] = 0
    df["Name"] = df["Name"].astype(str)
    df["TAGS_ORDER_NAME"] = df["Tags"]
    df["TAGS_SKU"] = df["Line: Properties"]
    df['Line: Price'] = df["Line: Price"] * df["Line: Quantity"] 
    df['DISCOUNTS'] = (df["Line: Price"]) - (df["Line: Pre Tax Price"] )

    df["Line: SKU"] = df["Line: SKU"].astype(str)
    df["Line: Quantity"] = df["Line: Quantity"].astype(int)
    df["COST"] = df["COST"].astype('int64')
    df["DISCOUNTS"] = df["DISCOUNTS"].round(2)
    df["BRAND"] = "LNT" 
    df["FINAL_SALE_TAG"] = "N" 
    # Rename the the Column Name according to DB
    df.rename(columns={"Created At": "ORDER_DATE"}, inplace=True)
    df.rename(columns={"Name": "ORDER_NAME"}, inplace=True)
    df.rename(columns={"Line: SKU": "VARIANT_SKU"}, inplace=True)
    df.rename(columns={"Line: Price": "GROSS_SALES"}, inplace=True)
    df.rename(columns={"Line: Quantity": "ORDER_ITEM_QUANTITY"}, inplace=True)
    df.rename(columns={"Line: Pre Tax Price": "DEMAND_DOLLARS"}, inplace=True)
    df.rename(columns={"Tags: Products": "TAGS_PRODUCTS"}, inplace=True)
    mask = df['TAGS_SKU'].str.contains('Final sale', case=False) & ~df['TAGS_SKU'].isna()
    df.loc[mask, 'FINAL_SALE_TAG'] = 'Y'

    #Reorder the columns as like in BigQuery table
    SchemaOrder = [
        'ORDER_DATE', 
        'ORDER_NAME', 
        'VARIANT_SKU', 
        'GROSS_SALES', 
        'ORDER_ITEM_QUANTITY',
        'DEMAND_DOLLARS', 
        'DROPSHIPMODE', 
        'COST', 
        'FINAL_SALE_TAG', 
        'TAGS_ORDER_NAME', 
        'TAGS_SKU', 
        'DISCOUNTS', 
        'BRAND', 
    ]
    df = df[SchemaOrder]
    print(df.dtypes)
    return df


def convert_to_datetime(df: pd.DataFrame, columns: Union[str, list]) -> pd.DataFrame:
    """
    Converts specified columns in a DataFrame to datetime format.

    Args:
    - df (pd.DataFrame): Input DataFrame
    - columns (Union[str, list]): Column name(s) to convert to datetime

    Returns:
    - pd.DataFrame: DataFrame with specified columns converted to datetime
    """
    if isinstance(columns, str):
        columns = [columns]
    for col in columns:
        if col in df.columns:
            df[col] =  pd.to_datetime(df[col] , errors='coerce',  utc=True)
            df[col] =  df[col].dt.tz_convert('America/New_York')
            df[col] =  pd.to_datetime(df[col]).dt.strftime('%Y-%m-%d')
          # Set the path for the new CSV file
    df = df[df["ORDER_DATE"] == perviousDate]
    new_csv_file_path = folder+'output\\output_orders_LNT.csv'

    # Write the DataFrame back to the new CSV file
    df.to_csv(new_csv_file_path, index=False)
    return df

def upload(df: pd.DataFrame) -> pd.DataFrame:

    # Set up your BigQuery credentials
    # Replace with your actual credentials file path
    credentials, project = load_credentials_from_file(folder+'config\\credentials_prod.json')
    client = bigquery.Client(credentials=credentials, project=project)

    # Replace with your BigQuery project and dataset information
    project_id = 'production-365120'
    dataset_id = 'MSIPDATA'
    table_id = 'SHOPIFY_IMPORT'

    data_frame = df

    # Specify the BigQuery dataset and table
    dataset_ref = client.dataset(dataset_id)
    table_ref = dataset_ref.table(table_id)

    

    # Set up the SQL query
    #sql_query = 'SELECT * FROM `{}.{}.SHOPIFY_IMPORT_MG`'.format(project_id, dataset_id)

    # Read data from BigQuery into a pandas DataFrame
    #dff = pd.read_gbq(sql_query, project_id=project_id, credentials=credentials_path)
    #print(dff)

    #dFf = pandas_gbq.read_gbq(sql_query, project_id=project_id, credentials=credentials_path)


    # Upload the DataFrame to BigQuery
    job_config = bigquery.LoadJobConfig(
        write_disposition='WRITE_APPEND',  # Options: WRITE_TRUNCATE, WRITE_APPEND, WRITE_EMPTY
    )

    job = client.load_table_from_dataframe(data_frame, table_ref, job_config=job_config)
    job.result()  # Wait for the job to complete

    print(f"Loaded {job.output_rows} rows into {project_id}:{dataset_id}.{table_id}")

if __name__ == '__main__':
    # File paths
    folder = "D:\\scripts\\automation\\orders-data\\"
    #current_date = pd.Timestamp.now() - pd.Timedelta(days=1)
    #current_date = current_date.strftime('%Y-%m-%d')
    current_date = pd.Timestamp.now().strftime('%Y-%m-%d')
    perviousDate = pd.Timestamp.now() - pd.Timedelta(days=1)
    perviousDate = perviousDate.strftime('%Y-%m-%d')
    zip_file_name = "Shopify_Order_Feed_LAT_"+current_date+".zip"
    zip_file_path = folder+zip_file_name
    print(perviousDate+' Data from ' + current_date+'File')   
    csv_filename = "Orders.csv"
    # Extract CSV file and process DataFrame
    downloadZipfile(zip_file_path, csv_filename)
    
    extracted_df = extract_csv_from_zip(zip_file_path, csv_filename)

    # Convert columns to appropriate data types
    columns_to_datetime = ['ORDER_DATE']

    extracted_df = convert_to_datetime(extracted_df, columns_to_datetime)

    #print(extracted_df)
    upload(extracted_df)
    #rm output_orders_LNT.csv
    #bq load --skip_leading_rows=1 --allow_jagged_rows=true --allow_quoted_newlines=true MSIPDATA.SHOPIFY_IMPORT_MG output_orders_LNT.csv
  

 
Editor is loading...
Leave a Comment