Untitled

mail@pastecode.io avatar
unknown
python
a month ago
3.3 kB
5
Indexable
Never
import pandas as pd
import datetime
from psycopg2 import connect
from psycopg2.extras import execute_batch

def insert_into_postgres(df, table_name, connection, need_time_mark = False):
    """
    Test inserting df to PostgreSQL
    :param df: pd.DataFrame
        pandas DataFrame with data to insert
    :param table_name: string
        name of table to insert
    :param connection: psycopg2.connect
        connection to PostgreSQL
    :param need_time_mark: bool (default = False)
        adding time mark
    """
    if len(df) > 0:
        df_columns = list(df)
        # create (col1,col2,col3,...)
        columns = ",".join(df_columns)
        if need_time_mark:
            columns = "date," + columns

        # create VALUES('%s', '%s",...) one '%s' per column
        values = "VALUES({})".format(",".join(["%s" for _ in df_columns]))
        if need_time_mark:
            values = values[:-1] + ",%s)"

        #create INSERT INTO table (columns) VALUES('%s',...)
        insert_stmt = "INSERT INTO {} ({}) {}".format(table_name,columns,values)

        cursor = connection.cursor()
        if need_time_mark:
            df["date"] = str(datetime.datetime.now())

        execute_batch(cursor, insert_stmt, df[columns.split(',')].values)
        connection.commit()

    else:
        raise ValueError('df must not be empty')

def create_table_postgres(df, table_name, connection, need_time_mark = False):
    """
    Create postgres table from pandas DataFrame
    :param df: pd.DataFrame
        pandas DataFrame with columns type example for creating postgres table
    :param table_name: string
        name of creating table
    :param connection: connection: psycopg2.connect
        connection to PostgreSQL
    :param need_time_mark: bool (default = False)
        adding time mark
    """
    if type(df) == pd.DataFrame:
        dict_type = df.dtypes.to_dict()
    else:
        raise ValueError('df must be pandas DataFrame type')

    if need_time_mark:
        query = "CREATE TABLE " + table_name + " ( date TIMESTAMP WITH TIME ZONE NOT NULL PRIMARY KEY, "
    else:
        query = "CREATE TABLE " + table_name + " ("
    for key in dict_type.keys():
        if "int" in dict_type[key].name:
            query += key + " INTEGER, "
        elif "float" in dict_type[key].name:
            query += key + " FLOAT, "
        elif "datetime" in dict_type[key].name:
            query += key + " TIMESTAMP WITH TIME ZONE, "
        else:
            query += key + " VARCHAR (100), "
    query = query[:-2] + " )"
    cursor = connection.cursor()
    cursor.execute(query)
    connection.commit()

def checking_existing_table(table_name, connection):
    """
    Checks for the existence of a table
    :param table_name: string
        the name of the table which existence you want to check
    :param connection: psycopg2.connect
        connection to PostgreSQL
    :return: bool
        True if table exist, otherwise False
    """
    try:
        cursor = connection.cursor()
        cursor.execute("""SELECT table_name FROM information_schema.tables
               WHERE table_schema = 'public'""")
        for table in cursor.fetchall():
            if table_name in table:
                return True

        return False
    except:
        raise ValueError("Connection error")
Leave a Comment