import psycopg2
import pandas as pd
import psycopg2.extras as extras
import os
import csv
import re
import sys
# DATABASE = "tariff_digitization"
# USER='admin'
# PASSWORD="care_54321"
# HOST="10.221.42.82"
# PORT = '5432'
DATABASE = "tariff_digitization"
USER='postgres'
PASSWORD="7Yn4mFeLNDXz&tH"
HOST="hospital-tariff-application.ctg23hvzex2r.ap-south-1.rds.amazonaws.com"
PORT = '5432'
def check_connection():
conn = psycopg2.connect(
database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
)
cursor = conn.cursor()
cursor.execute("select version()")
data = cursor.fetchone()
print("Connection established to: ", data)
conn.close()
def check_data():
conn = psycopg2.connect(
database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
)
cursor = conn.cursor()
cursor.execute("""
SELECT *
FROM schedule_of_charges
""")
# Fetch all the table names
table_names = cursor.fetchall()
table_n = []
# Print the table names
for table_name in table_names:
print(table_name)
def check_table():
conn = psycopg2.connect(
database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
)
cursor = conn.cursor()
cursor.execute("""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = 'public'
""")
# Fetch all the table names
table_names = cursor.fetchall()
table_n = []
# Print the table names
for table_name in table_names:
table_n.append(table_name[0])
print("Table with table name '",table_name[0],"' exists")
# check for table schedule_of_charges
# check for table schedule_of_charges
if not "schedule_of_charges" in table_n:
cursor.execute("""
CREATE TABLE schedule_of_charges(
serial_id SERIAL,
hospital_id text not null,
document text not null,
page text not null,
table_number text not null,
s3url text not null,
service_code text,
category text,
subcategory text,
service_description text not null,
bounding_box text not null default '[]',
type_of_room text not null default 'tariff',
price_per_unit text not null,
remarks text,
los text,
inclusions text,
exclusions text,
start_date text not null,
end_date text not null default '2099-12-31',
tariff_type text not null,
room_mapping text,
confidence_score text,
standard_term text,
PRIMARY KEY (serial_id, hospital_id)
)
PARTITION BY LIST(hospital_id);""")
# cursor.execute("")
conn.commit()
elif not "schedule_of_charges_audit_logs" in table_n:
cursor.execute("""CREATE TABLE schedule_of_charges_audit_logs(
id serial PRIMARY KEY,diff jsonb,updated_by character varying(255) COLLATE pg_catalog."default",updated_date timestamp with time zone DEFAULT now(),hospital_id text COLLATE pg_catalog."default",serial_id integer
)""")
conn.commit()
print("Tables created schedule_of_charges_audit_logs")
else:
print("Table Exists")
conn.close()
def drop_table():
conn = psycopg2.connect(
database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
)
cursor = conn.cursor()
sql = '''DROP TABLE schedule_of_charges '''
cursor.execute(sql)
print("Table dropped !")
conn.commit()
conn.close()
def delete_hid(hid):
conn = psycopg2.connect(
database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
)
cursor = conn.cursor()
cursor.execute(f'''DELETE FROM schedule_of_charges WHERE hospital_id='{hid}';''')
# cursor.execute(f'''ALTER TABLE care_soc DROP PARTITION "hid_{hid}";''')
conn.commit()
print(f"Deleted HID {hid} and its partition")
conn.close()
def create_partitions(hid):
conn = psycopg2.connect(
database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
)
cursor = conn.cursor()
create_partition_sql = f"""
CREATE TABLE IF NOT EXISTS hid_{hid}
PARTITION OF schedule_of_charges
FOR VALUES IN ({hid});
"""
cursor.execute(create_partition_sql)
conn.commit()
conn.close()
def execute_values(df):
table = 'schedule_of_charges'
conn = psycopg2.connect(
database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
)
tuples = [tuple(x) for x in df.to_numpy()]
cols = ','.join(list(df.columns))
# SQL query to execute
print(df.head())
query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols)
print(query)
cursor = conn.cursor()
try:
extras.execute_values(cursor, query, tuples)
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
cursor.close()
return 1
print("the dataframe is inserted")
cursor.close()
def insert_df(df):
conn = psycopg2.connect(
database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT
)
df.to_sql('schedule_of_charges',con = conn,
if_exists='append',
index=False,
method="postgres_upsert")
conn.commit()
conn.close()
def postgresmain(df, hospital_id):
print(df.head())
check_connection() # check connection with postgres
check_table() # check if table exist or not ; if not create table
df.rename(columns={'table': 'table_number'}, inplace=True)
df.fillna("", inplace=True)
hid = df['hospital_id'].iloc[0]
if hid != hospital_id:
df['hospital_id']=hospital_id
hid=hospital_id
delete_hid(hid) # if data exist with same hid delete data
df = df.astype(str)
import pdb
# pdb.set_trace()
create_partitions(hid)
print('hid is created')
# execute_values(df) # insert dataframe to postgres
def postprocess(df, hospital_id):
hid = df['hospital_id'].iloc[0]
if hid=='1':
df['hospital_id']=hospital_id
df['hospital_id'].fillna(hid, inplace=True)
df.dropna(subset=['hospital_id'], inplace=True)
df['hospital_id']=(df['hospital_id'].astype(float)).astype(int)
#df['document']=(df['document'].astype(float)).astype(int)
#df['page']=(df['page'].astype(float)).astype(int)
#df['table_number']=(df['table_number'].astype(float)).astype(int)
#print(df['hospital_id'])
return df
def start():
# drop_table()
dir_path="/usr/src/app/code/data/long"
for i in os.listdir(dir_path):
hospital_id = i.split('_')[0]
try:
path = os.path.join(dir_path, i)
print('p', path)
try:
long_ = pd.read_csv(path, delimiter="|", quoting=csv.QUOTE_ALL, on_bad_lines="skip")
print(long_.shape)
except:
long_ = pd.read_csv(path)
long_['hospital_id'].fillna(hospital_id, inplace=True)
long_ = postprocess(long_, hospital_id)
postgresmain(long_, hospital_id)
long_.rename(columns={'table': 'table_number'}, inplace=True)
long_.fillna("", inplace=True)
hid = long_['hospital_id'].iloc[1]
# delete_hid(hid)
df = long_.astype(str)
execute_values(df)
except Exception as e:
print(hospital_id)
print('error',e)
break
#drop_table()
start()