Untitled
unknown
python
a year ago
3.3 kB
7
Indexable
import pandas as pd import datetime from psycopg2 import connect from psycopg2.extras import execute_batch def insert_into_postgres(df, table_name, connection, need_time_mark = False): """ Test inserting df to PostgreSQL :param df: pd.DataFrame pandas DataFrame with data to insert :param table_name: string name of table to insert :param connection: psycopg2.connect connection to PostgreSQL :param need_time_mark: bool (default = False) adding time mark """ if len(df) > 0: df_columns = list(df) # create (col1,col2,col3,...) columns = ",".join(df_columns) if need_time_mark: columns = "date," + columns # create VALUES('%s', '%s",...) one '%s' per column values = "VALUES({})".format(",".join(["%s" for _ in df_columns])) if need_time_mark: values = values[:-1] + ",%s)" #create INSERT INTO table (columns) VALUES('%s',...) insert_stmt = "INSERT INTO {} ({}) {}".format(table_name,columns,values) cursor = connection.cursor() if need_time_mark: df["date"] = str(datetime.datetime.now()) execute_batch(cursor, insert_stmt, df[columns.split(',')].values) connection.commit() else: raise ValueError('df must not be empty') def create_table_postgres(df, table_name, connection, need_time_mark = False): """ Create postgres table from pandas DataFrame :param df: pd.DataFrame pandas DataFrame with columns type example for creating postgres table :param table_name: string name of creating table :param connection: connection: psycopg2.connect connection to PostgreSQL :param need_time_mark: bool (default = False) adding time mark """ if type(df) == pd.DataFrame: dict_type = df.dtypes.to_dict() else: raise ValueError('df must be pandas DataFrame type') if need_time_mark: query = "CREATE TABLE " + table_name + " ( date TIMESTAMP WITH TIME ZONE NOT NULL PRIMARY KEY, " else: query = "CREATE TABLE " + table_name + " (" for key in dict_type.keys(): if "int" in dict_type[key].name: query += key + " INTEGER, " elif "float" in dict_type[key].name: query += key + " FLOAT, " elif "datetime" in dict_type[key].name: query += key + " TIMESTAMP WITH TIME ZONE, " else: query += key + " VARCHAR (100), " query = query[:-2] + " )" cursor = connection.cursor() cursor.execute(query) connection.commit() def checking_existing_table(table_name, connection): """ Checks for the existence of a table :param table_name: string the name of the table which existence you want to check :param connection: psycopg2.connect connection to PostgreSQL :return: bool True if table exist, otherwise False """ try: cursor = connection.cursor() cursor.execute("""SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'""") for table in cursor.fetchall(): if table_name in table: return True return False except: raise ValueError("Connection error")
Editor is loading...
Leave a Comment