Untitled
unknown
plain_text
4 years ago
6.1 kB
11
Indexable
"""
@author: Mobilize.Net
@title: Script for uploading a single large file using the
snowflake connector and a pandas dataframe.
"""
# Note that this code is written to be as easy to understand as possible.
# There are many ways to write this in a more concise manner, but this way
# will still work just fine.
# Importing the required packages for all your data framing needs.
import pandas as pd
import os
# The Snowflake Connector library.
import snowflake.connector as snow
from snowflake.connector.pandas_tools import write_pandas
## Phase I: Setup and Connect
# The connector...
conn = snow.connect(user="ROHITMI023",
password=os.getenv("snowflake_password"),
account="tc00115.ap-south-1.aws")
# Create a cursor object.
cur = conn.cursor()
# Now let's define all of the setup properties that we want to pass
# to Snowflake for each of these properties.
# Remember there are properties for each of these that you can alter
# by checking the documentation and defining those settings.
# Starting with the Role.
sql = "USE ROLE SYSADMIN"
cur.execute(sql)
# And moving on to define and select the warehouse we want to use.
# We do want to specify a size with the warehouse, but feel free
# to change the warehouse size.
sql = """CREATE WAREHOUSE IF NOT EXISTS COMPUTE_WH
WITH WAREHOUSE_SIZE = XSMALL"""
cur.execute(sql)
# And then select it.
sql = "USE WAREHOUSE COMPUTE_WH"
cur.execute(sql)
# See if the desired database exists.
sql = "CREATE DATABASE IF NOT EXISTS MOCK_PROJECT_DB"
cur.execute(sql)
# And then use it.
sql = "USE DATABASE MOCK_PROJECT_DB"
cur.execute(sql)
sql = "CREATE SCHEMA IF NOT EXISTS PUBLIC"
cur.execute(sql)
# And then use it.
sql = "USE SCHEMA PUBLIC"
cur.execute(sql)
original = r"C:\Users\rohit\Downloads\Hilton Property Rental Services\PropertySummary.csv" # <- Replace with your path.
delimiter = "," # Replace if you're using a different delimiter.
# Get it as a pandas dataframe.
dataframe = pd.read_csv(original, sep = delimiter)
create_tbl_statement = "CREATE OR REPLACE TABLE PROPERTY_SUMMARY" + " (\n"
# Loop through each column finding the datatype and adding it to the statement
for column in dataframe.columns:
if (
dataframe[column].dtype.name == "int"
or dataframe[column].dtype.name == "int64"
):
create_tbl_statement = create_tbl_statement + column + " int"
elif dataframe[column].dtype.name == "object":
create_tbl_statement = create_tbl_statement + column + " varchar(16777216)"
elif dataframe[column].dtype.name == "datetime64[ns]":
create_tbl_statement = create_tbl_statement + column + " datetime"
elif dataframe[column].dtype.name == "float64":
create_tbl_statement = create_tbl_statement + column + " float8"
elif dataframe[column].dtype.name == "bool":
create_tbl_statement = create_tbl_statement + column + " boolean"
else:
create_tbl_statement = create_tbl_statement + column + " varchar(16777216)"
# If column is not last column, add comma, else end sql-query
if dataframe[column].name != dataframe.columns[-1]:
create_tbl_statement = create_tbl_statement + ",\n"
else:
create_tbl_statement = create_tbl_statement + ")"
#Execute the SQL statement to create the table
conn.cursor().execute(create_tbl_statement)
# Create a cursor object.
# # cur = conn.cursor()
# ## Phase II: Upload from the Exported Data File.
# # Let's import a new dataframe so that we can test this.
# original = r"C:\Users\rohit\Downloads\Hilton Property Rental Services\PropertySummary.csv" # <- Replace with your path.
# delimiter = "," # Replace if you're using a different delimiter.
# # Get it as a pandas dataframe.
# total = pd.read_csv(original, sep = delimiter)
# Drop any columns you may not need (optional).
# total.drop(columns = ['A_ColumnName',
# 'B_ColumnName'],
# inplace = True)
# Rename the columns in the dataframe if they don't match your existing table.
# This is optional, but ESSENTIAL if you already have created the table format
# in Snowflake.
# total.rename(columns={"A_ColumnName": "A_COLUMN",
# "B_ColumnName": "B_COLUMN"},
# inplace=True)
# Actually write to the table in snowflake.
# write_pandas(conn, total, "PROPERTY_SUMMARY")
# (Optionally, you can check to see if what you loaded is identical
# to what you have in your pandas dataframe. Perhaps... a topic for a future
# blog post.
## Phase III: Turn off the warehouse.
# Create a cursor object.
# cur = conn.cursor()
# # Execute a statement that will turn the warehouse off.
# sql = "ALTER WAREHOUSE COMPUTE_WH SUSPEND"
# cur.execute(sql)
# # Close your cursor and your connection.
# cur.close()
# conn.close()
# And that's it. Much easier than using the load data utility, but maybe
# not as user friendly.
# original = r"C:\Users\rohit\Downloads\Hilton Property Rental Services\Feedbacksummary.csv" # <- Replace with your path.
# delimiter = "," # Replace if you're using a different delimiter.
# # reading the csv file using read_csv
# # storing the data frame in variable called df
# df = pd.read_csv(original, sep = delimiter, encoding='latin-1')
# # creating a list of column names by
# # calling the .columns
# list_of_column_names = list(df.columns)
# # Use Dataframe.dtypes to
# # give the series of
# # data types as result
# datatypes = df.dtypes
# cur.execute(
# "CREATE OR REPLACE TABLE"
# "PROPERTY_SUMMARY("
# )
# Execute a statement that will delete the data from the current folder.
# If you would prefer not to do this, then just comment it out.
# In fact, we'll leave it commented out, just in case the data file you
# are importing will be appended to the existing table, and not replacing it.
# sql = "truncate table if exists YOUR_TABLE_NAME"
# cur.execute(sql)
# Close the cursor.
# cur.close()Editor is loading...