Untitled
unknown
plain_text
3 years ago
6.1 kB
9
Indexable
""" @author: Mobilize.Net @title: Script for uploading a single large file using the snowflake connector and a pandas dataframe. """ # Note that this code is written to be as easy to understand as possible. # There are many ways to write this in a more concise manner, but this way # will still work just fine. # Importing the required packages for all your data framing needs. import pandas as pd import os # The Snowflake Connector library. import snowflake.connector as snow from snowflake.connector.pandas_tools import write_pandas ## Phase I: Setup and Connect # The connector... conn = snow.connect(user="ROHITMI023", password=os.getenv("snowflake_password"), account="tc00115.ap-south-1.aws") # Create a cursor object. cur = conn.cursor() # Now let's define all of the setup properties that we want to pass # to Snowflake for each of these properties. # Remember there are properties for each of these that you can alter # by checking the documentation and defining those settings. # Starting with the Role. sql = "USE ROLE SYSADMIN" cur.execute(sql) # And moving on to define and select the warehouse we want to use. # We do want to specify a size with the warehouse, but feel free # to change the warehouse size. sql = """CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH WITH WAREHOUSE_SIZE = XSMALL""" cur.execute(sql) # And then select it. sql = "USE WAREHOUSE COMPUTE_WH" cur.execute(sql) # See if the desired database exists. sql = "CREATE DATABASE IF NOT EXISTS MOCK_PROJECT_DB" cur.execute(sql) # And then use it. sql = "USE DATABASE MOCK_PROJECT_DB" cur.execute(sql) sql = "CREATE SCHEMA IF NOT EXISTS PUBLIC" cur.execute(sql) # And then use it. sql = "USE SCHEMA PUBLIC" cur.execute(sql) original = r"C:\Users\rohit\Downloads\Hilton Property Rental Services\PropertySummary.csv" # <- Replace with your path. delimiter = "," # Replace if you're using a different delimiter. # Get it as a pandas dataframe. dataframe = pd.read_csv(original, sep = delimiter) create_tbl_statement = "CREATE OR REPLACE TABLE PROPERTY_SUMMARY" + " (\n" # Loop through each column finding the datatype and adding it to the statement for column in dataframe.columns: if ( dataframe[column].dtype.name == "int" or dataframe[column].dtype.name == "int64" ): create_tbl_statement = create_tbl_statement + column + " int" elif dataframe[column].dtype.name == "object": create_tbl_statement = create_tbl_statement + column + " varchar(16777216)" elif dataframe[column].dtype.name == "datetime64[ns]": create_tbl_statement = create_tbl_statement + column + " datetime" elif dataframe[column].dtype.name == "float64": create_tbl_statement = create_tbl_statement + column + " float8" elif dataframe[column].dtype.name == "bool": create_tbl_statement = create_tbl_statement + column + " boolean" else: create_tbl_statement = create_tbl_statement + column + " varchar(16777216)" # If column is not last column, add comma, else end sql-query if dataframe[column].name != dataframe.columns[-1]: create_tbl_statement = create_tbl_statement + ",\n" else: create_tbl_statement = create_tbl_statement + ")" #Execute the SQL statement to create the table conn.cursor().execute(create_tbl_statement) # Create a cursor object. # # cur = conn.cursor() # ## Phase II: Upload from the Exported Data File. # # Let's import a new dataframe so that we can test this. # original = r"C:\Users\rohit\Downloads\Hilton Property Rental Services\PropertySummary.csv" # <- Replace with your path. # delimiter = "," # Replace if you're using a different delimiter. # # Get it as a pandas dataframe. # total = pd.read_csv(original, sep = delimiter) # Drop any columns you may not need (optional). # total.drop(columns = ['A_ColumnName', # 'B_ColumnName'], # inplace = True) # Rename the columns in the dataframe if they don't match your existing table. # This is optional, but ESSENTIAL if you already have created the table format # in Snowflake. # total.rename(columns={"A_ColumnName": "A_COLUMN", # "B_ColumnName": "B_COLUMN"}, # inplace=True) # Actually write to the table in snowflake. # write_pandas(conn, total, "PROPERTY_SUMMARY") # (Optionally, you can check to see if what you loaded is identical # to what you have in your pandas dataframe. Perhaps... a topic for a future # blog post. ## Phase III: Turn off the warehouse. # Create a cursor object. # cur = conn.cursor() # # Execute a statement that will turn the warehouse off. # sql = "ALTER WAREHOUSE COMPUTE_WH SUSPEND" # cur.execute(sql) # # Close your cursor and your connection. # cur.close() # conn.close() # And that's it. Much easier than using the load data utility, but maybe # not as user friendly. # original = r"C:\Users\rohit\Downloads\Hilton Property Rental Services\Feedbacksummary.csv" # <- Replace with your path. # delimiter = "," # Replace if you're using a different delimiter. # # reading the csv file using read_csv # # storing the data frame in variable called df # df = pd.read_csv(original, sep = delimiter, encoding='latin-1') # # creating a list of column names by # # calling the .columns # list_of_column_names = list(df.columns) # # Use Dataframe.dtypes to # # give the series of # # data types as result # datatypes = df.dtypes # cur.execute( # "CREATE OR REPLACE TABLE" # "PROPERTY_SUMMARY(" # ) # Execute a statement that will delete the data from the current folder. # If you would prefer not to do this, then just comment it out. # In fact, we'll leave it commented out, just in case the data file you # are importing will be appended to the existing table, and not replacing it. # sql = "truncate table if exists YOUR_TABLE_NAME" # cur.execute(sql) # Close the cursor. # cur.close()
Editor is loading...