Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
5.7 kB
1
Indexable
Never
import psycopg2
import pandas as pd

DATABASE = "tariff_digitization"
USER='postgres'
PASSWORD="7Yn4mFeLNDXz&tH"
HOST="hospital-tariff-application.ctg23hvzex2r.ap-south-1.rds.amazonaws.com"
PORT = '5432'
def check_connection():
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()
   cursor.execute("select version()")
   data = cursor.fetchone()
   print("Connection established to: ", data)
   conn.close()



def check_data():
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()

   cursor.execute("""
       SELECT *
       FROM schedule_of_charges
   """)

   # Fetch all the table names
   table_names = cursor.fetchall()
   table_n = []
   # Print the table names
   for table_name in table_names:
      print(table_name)

def check_table():
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()

   cursor.execute("""
       SELECT table_name
       FROM information_schema.tables
       WHERE table_schema = 'public'
   """)

   # Fetch all the table names
   table_names = cursor.fetchall()
   table_n = []
   # Print the table names
   for table_name in table_names:
      table_n.append(table_name[0])
      print("Table with table name '",table_name[0],"' exists")

   # check for table  schedule_of_charges

   # check for table  schedule_of_charges
      if not "schedule_of_charges" in table_n:
            cursor.execute("""
                  CREATE TABLE schedule_of_charges(
    serial_id SERIAL PRIMARY KEY,
    hospital_id text not null,
    document text not null,
    page text not null,
    table_number text not null,
    s3url text not null,
    service_code text,
    category text,
    subcategory text,
    service_description text not null,
    bounding_box text not null default '[]',
    type_of_room text not null default 'tariff',
    price_per_unit text not null,
    remarks text,
    los text,
 inclusions text,
 exclusions text,
 start_date text not null,
 end_date text not null default '2099-12-31',
 tariff_type text not null,
 room_mapping text,
 confidence_score text,
 standard_term text
)""")
            conn.commit()
            print("Tables created schedule_of_charges")
      elif not "schedule_of_charges_audit_logs" in table_n:
            cursor.execute("""CREATE TABLE schedule_of_charges_audit_logs(
   id serial PRIMARY KEY,diff jsonb,updated_by character varying(255) COLLATE pg_catalog."default",updated_date timestamp with time zone DEFAULT now(),hospital_id text COLLATE pg_catalog."default",serial_id integer
      )""")
            conn.commit()
            print("Tables created schedule_of_charges_audit_logs")
      else:
            print("Table Exists")
            conn.close()


def drop_table():
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()

   sql = '''DROP TABLE schedule_of_charges '''

   cursor.execute(sql)
   print("Table dropped !")

   conn.commit()
   conn.close()

def delete_hid(hid):
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()
   cursor.execute(f'''DELETE FROM schedule_of_charges WHERE hospital_id='{hid}';''')
   conn.commit()
   print(f"Deleted HID {hid}")
   conn.close()

import psycopg2.extras as extras


def execute_values(df):
   table = 'schedule_of_charges'
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   tuples = [tuple(x) for x in df.to_numpy()]

   cols = ','.join(list(df.columns))
   # SQL query to execute
   query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
   cursor = conn.cursor()
   try:
      extras.execute_values(cursor, query, tuples)
      conn.commit()
   except (Exception, psycopg2.DatabaseError) as error:
      print("Error: %s" % error)
      conn.rollback()
      cursor.close()
      return 1
   print("the dataframe is inserted")
   cursor.close()
def insert_df(df):
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   df.to_sql('schedule_of_charges',con = conn,
             if_exists='append',
             index=False,
             method="postgres_upsert")
   conn.commit()
   conn.close()


def postgresmain(df):
      check_connection()  # check connection with postgres
      check_table()  # check if table exist or not ; if not create table
      # df = pd.read_csv(path,index_col=0)
      df.rename(columns={'table': 'table_number'}, inplace=True)

      df.fillna("", inplace=True)
      hid = df['hospital_id'].iloc[0]
      delete_hid(hid)  # if data exist with same hid delete data
      df = df.astype(str)
      execute_values(df)  # insert dataframe to postgres


# main("/home/vaibhav/Downloads/docker_50144634_output/temp_interim/50144634/long_format.csv")
# drop_table()
import os
import csv

for i in os.listdir("/usr/src/app/code/data/long"):
      try:
            path = os.path.join("/usr/src/app/code/data/long", i)
            try:
                  long_ = pd.read_csv(path, delimiter="|", quoting=csv.QUOTE_ALL, on_bad_lines="skip")
                  print(long_.shape)
            except:
                  long_ = pd.read_csv(path)
            postgresmain(long_)
      except Exception as e:
            print(path, e)