Untitled

mail@pastecode.io avatar
unknown
python
a year ago
8.0 kB
2
Indexable
import psycopg2
import pandas as pd
import psycopg2.extras as extras
import os
import csv
import re
import sys

# DATABASE = "tariff_digitization"
# USER='admin'
# PASSWORD="care_54321"
# HOST="10.221.42.82"
# PORT = '5432'

DATABASE = "tariff_digitization"
USER='postgres'
PASSWORD="7Yn4mFeLNDXz&tH"
HOST="hospital-tariff-application.ctg23hvzex2r.ap-south-1.rds.amazonaws.com"
PORT = '5432'

def check_connection():
    conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
    )
    cursor = conn.cursor()
    cursor.execute("select version()")
    data = cursor.fetchone()
    print("Connection established to: ", data)
    conn.close()



def check_data():
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()

   cursor.execute("""
       SELECT *
       FROM schedule_of_charges
   """)

   # Fetch all the table names
   table_names = cursor.fetchall()
   table_n = []
   # Print the table names
   for table_name in table_names:
      print(table_name)

def check_table():
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()

   cursor.execute("""
       SELECT table_name
       FROM information_schema.tables
       WHERE table_schema = 'public'
   """)

   # Fetch all the table names
   table_names = cursor.fetchall()
   table_n = []
   # Print the table names
   for table_name in table_names:
      table_n.append(table_name[0])
      print("Table with table name '",table_name[0],"' exists")

   # check for table  schedule_of_charges

# check for table  schedule_of_charges
   if not "schedule_of_charges" in table_n:
         cursor.execute("""
               CREATE TABLE schedule_of_charges(
               serial_id SERIAL,
               hospital_id text not null,
               document text not null,
               page text not null,
               table_number text not null,
               s3url text not null,
               service_code text,
               category text,
               subcategory text,
               service_description text not null,
               bounding_box text not null default '[]',
               type_of_room text not null default 'tariff',
               price_per_unit text not null,
               remarks text,
               los text,
               inclusions text,
               exclusions text,
               start_date text not null,
               end_date text not null default '2099-12-31',
               tariff_type text not null,
               room_mapping text,
               confidence_score text,
               standard_term text,
               PRIMARY KEY (serial_id, hospital_id)
               )
               PARTITION BY LIST(hospital_id);""")
         # cursor.execute("")
         conn.commit()
   elif not "schedule_of_charges_audit_logs" in table_n:
         cursor.execute("""CREATE TABLE schedule_of_charges_audit_logs(
         id serial PRIMARY KEY,diff jsonb,updated_by character varying(255) COLLATE pg_catalog."default",updated_date timestamp with time zone DEFAULT now(),hospital_id text COLLATE pg_catalog."default",serial_id integer
         )""")
         conn.commit()
         print("Tables created schedule_of_charges_audit_logs")
   else:
         print("Table Exists")
         conn.close()



def drop_table():
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()

   sql = '''DROP TABLE schedule_of_charges '''

   cursor.execute(sql)
   print("Table dropped !")

   conn.commit()
   conn.close()

def delete_hid(hid):
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()
   cursor.execute(f'''DELETE FROM schedule_of_charges WHERE hospital_id='{hid}';''')
   # cursor.execute(f'''ALTER TABLE care_soc DROP PARTITION "hid_{hid}";''')
   conn.commit()
   print(f"Deleted HID {hid} and its partition")
   conn.close()



def create_partitions(hid):
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   cursor = conn.cursor()
   create_partition_sql = f"""
   CREATE TABLE IF NOT EXISTS hid_{hid}
   PARTITION OF schedule_of_charges
   FOR VALUES IN ({hid});
   """
   cursor.execute(create_partition_sql)
   conn.commit()
   conn.close()




def execute_values(df):
   table = 'schedule_of_charges'
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   tuples = [tuple(x) for x in df.to_numpy()]

   cols = ','.join(list(df.columns))
   # SQL query to execute
   print(df.head())
   query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)

   print(query)
   cursor = conn.cursor()
   try:
      extras.execute_values(cursor, query, tuples)
      conn.commit()
   except (Exception, psycopg2.DatabaseError) as error:
      print("Error: %s" % error)
      conn.rollback()
      cursor.close()
      return 1
   print("the dataframe is inserted")
   cursor.close()


def insert_df(df):
   conn = psycopg2.connect(
      database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
   )
   df.to_sql('schedule_of_charges',con = conn,
             if_exists='append',
             index=False,
             method="postgres_upsert")
   conn.commit()
   conn.close()


def postgresmain(df, hospital_id):
      print(df.head())
      check_connection()  # check connection with postgres
      check_table()  # check if table exist or not ; if not create table
      df.rename(columns={'table': 'table_number'}, inplace=True)
      df.fillna("", inplace=True)
      hid = df['hospital_id'].iloc[0]
      if hid != hospital_id:
          df['hospital_id']=hospital_id
          hid=hospital_id
      delete_hid(hid)  # if data exist with same hid delete data

      df = df.astype(str)
      import pdb
      # pdb.set_trace()
      create_partitions(hid)
      print('hid is created')
      # execute_values(df)  # insert dataframe to postgres


def postprocess(df, hospital_id):
   hid = df['hospital_id'].iloc[0]
   if hid=='1':
       df['hospital_id']=hospital_id
   df['hospital_id'].fillna(hid, inplace=True)
   df.dropna(subset=['hospital_id'], inplace=True)
   df['hospital_id']=(df['hospital_id'].astype(float)).astype(int)

   #df['document']=(df['document'].astype(float)).astype(int)

   #df['page']=(df['page'].astype(float)).astype(int)

   #df['table_number']=(df['table_number'].astype(float)).astype(int)
   #print(df['hospital_id'])
   return df


def start():
   # drop_table()
   dir_path="/usr/src/app/code/data/long"
   for i in os.listdir(dir_path):
         hospital_id = i.split('_')[0]
         try:
               path = os.path.join(dir_path, i)
               print('p', path)
               try:
                     long_ = pd.read_csv(path, delimiter="|", quoting=csv.QUOTE_ALL, on_bad_lines="skip")
                     print(long_.shape)
               except:
                     long_ = pd.read_csv(path)

               long_['hospital_id'].fillna(hospital_id, inplace=True)
               long_ = postprocess(long_, hospital_id)

               postgresmain(long_, hospital_id)
               long_.rename(columns={'table': 'table_number'}, inplace=True)
               long_.fillna("", inplace=True)
               hid = long_['hospital_id'].iloc[1]
               # delete_hid(hid)
               df = long_.astype(str)
               execute_values(df)
         except Exception as e:
               print(hospital_id)
               print('error',e)
               break

#drop_table()
start()