Untitled
unknown
python
2 years ago
3.3 kB
10
Indexable
import pandas as pd
import datetime
from psycopg2 import connect
from psycopg2.extras import execute_batch
def insert_into_postgres(df, table_name, connection, need_time_mark = False):
"""
Test inserting df to PostgreSQL
:param df: pd.DataFrame
pandas DataFrame with data to insert
:param table_name: string
name of table to insert
:param connection: psycopg2.connect
connection to PostgreSQL
:param need_time_mark: bool (default = False)
adding time mark
"""
if len(df) > 0:
df_columns = list(df)
# create (col1,col2,col3,...)
columns = ",".join(df_columns)
if need_time_mark:
columns = "date," + columns
# create VALUES('%s', '%s",...) one '%s' per column
values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
if need_time_mark:
values = values[:-1] + ",%s)"
#create INSERT INTO table (columns) VALUES('%s',...)
insert_stmt = "INSERT INTO {} ({}) {}".format(table_name,columns,values)
cursor = connection.cursor()
if need_time_mark:
df["date"] = str(datetime.datetime.now())
execute_batch(cursor, insert_stmt, df[columns.split(',')].values)
connection.commit()
else:
raise ValueError('df must not be empty')
def create_table_postgres(df, table_name, connection, need_time_mark = False):
"""
Create postgres table from pandas DataFrame
:param df: pd.DataFrame
pandas DataFrame with columns type example for creating postgres table
:param table_name: string
name of creating table
:param connection: connection: psycopg2.connect
connection to PostgreSQL
:param need_time_mark: bool (default = False)
adding time mark
"""
if type(df) == pd.DataFrame:
dict_type = df.dtypes.to_dict()
else:
raise ValueError('df must be pandas DataFrame type')
if need_time_mark:
query = "CREATE TABLE " + table_name + " ( date TIMESTAMP WITH TIME ZONE NOT NULL PRIMARY KEY, "
else:
query = "CREATE TABLE " + table_name + " ("
for key in dict_type.keys():
if "int" in dict_type[key].name:
query += key + " INTEGER, "
elif "float" in dict_type[key].name:
query += key + " FLOAT, "
elif "datetime" in dict_type[key].name:
query += key + " TIMESTAMP WITH TIME ZONE, "
else:
query += key + " VARCHAR (100), "
query = query[:-2] + " )"
cursor = connection.cursor()
cursor.execute(query)
connection.commit()
def checking_existing_table(table_name, connection):
"""
Checks for the existence of a table
:param table_name: string
the name of the table which existence you want to check
:param connection: psycopg2.connect
connection to PostgreSQL
:return: bool
True if table exist, otherwise False
"""
try:
cursor = connection.cursor()
cursor.execute("""SELECT table_name FROM information_schema.tables
WHERE table_schema = 'public'""")
for table in cursor.fetchall():
if table_name in table:
return True
return False
except:
raise ValueError("Connection error")Editor is loading...
Leave a Comment