Untitled
unknown
python
10 months ago
8.0 kB
2
Indexable
Never
import psycopg2 import pandas as pd import psycopg2.extras as extras import os import csv import re import sys # DATABASE = "tariff_digitization" # USER='admin' # PASSWORD="care_54321" # HOST="10.221.42.82" # PORT = '5432' DATABASE = "tariff_digitization" USER='postgres' PASSWORD="7Yn4mFeLNDXz&tH" HOST="hospital-tariff-application.ctg23hvzex2r.ap-south-1.rds.amazonaws.com" PORT = '5432' def check_connection(): conn = psycopg2.connect( database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT ) cursor = conn.cursor() cursor.execute("select version()") data = cursor.fetchone() print("Connection established to: ", data) conn.close() def check_data(): conn = psycopg2.connect( database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT ) cursor = conn.cursor() cursor.execute(""" SELECT * FROM schedule_of_charges """) # Fetch all the table names table_names = cursor.fetchall() table_n = [] # Print the table names for table_name in table_names: print(table_name) def check_table(): conn = psycopg2.connect( database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT ) cursor = conn.cursor() cursor.execute(""" SELECT table_name FROM information_schema.tables WHERE table_schema = 'public' """) # Fetch all the table names table_names = cursor.fetchall() table_n = [] # Print the table names for table_name in table_names: table_n.append(table_name[0]) print("Table with table name '",table_name[0],"' exists") # check for table schedule_of_charges # check for table schedule_of_charges if not "schedule_of_charges" in table_n: cursor.execute(""" CREATE TABLE schedule_of_charges( serial_id SERIAL, hospital_id text not null, document text not null, page text not null, table_number text not null, s3url text not null, service_code text, category text, subcategory text, service_description text not null, bounding_box text not null default '[]', type_of_room text not null default 'tariff', price_per_unit text not null, remarks text, los text, inclusions text, exclusions text, start_date text not null, end_date text not null default '2099-12-31', tariff_type text not null, room_mapping text, confidence_score text, standard_term text, PRIMARY KEY (serial_id, hospital_id) ) PARTITION BY LIST(hospital_id);""") # cursor.execute("") conn.commit() elif not "schedule_of_charges_audit_logs" in table_n: cursor.execute("""CREATE TABLE schedule_of_charges_audit_logs( id serial PRIMARY KEY,diff jsonb,updated_by character varying(255) COLLATE pg_catalog."default",updated_date timestamp with time zone DEFAULT now(),hospital_id text COLLATE pg_catalog."default",serial_id integer )""") conn.commit() print("Tables created schedule_of_charges_audit_logs") else: print("Table Exists") conn.close() def drop_table(): conn = psycopg2.connect( database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT ) cursor = conn.cursor() sql = '''DROP TABLE schedule_of_charges ''' cursor.execute(sql) print("Table dropped !") conn.commit() conn.close() def delete_hid(hid): conn = psycopg2.connect( database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT ) cursor = conn.cursor() cursor.execute(f'''DELETE FROM schedule_of_charges WHERE hospital_id='{hid}';''') # cursor.execute(f'''ALTER TABLE care_soc DROP PARTITION "hid_{hid}";''') conn.commit() print(f"Deleted HID {hid} and its partition") conn.close() def create_partitions(hid): conn = psycopg2.connect( database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT ) cursor = conn.cursor() create_partition_sql = f""" CREATE TABLE IF NOT EXISTS hid_{hid} PARTITION OF schedule_of_charges FOR VALUES IN ({hid}); """ cursor.execute(create_partition_sql) conn.commit() conn.close() def execute_values(df): table = 'schedule_of_charges' conn = psycopg2.connect( database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT ) tuples = [tuple(x) for x in df.to_numpy()] cols = ','.join(list(df.columns)) # SQL query to execute print(df.head()) query = "INSERT INTO %s(%s) VALUES %%s" % (table, cols) print(query) cursor = conn.cursor() try: extras.execute_values(cursor, query, tuples) conn.commit() except (Exception, psycopg2.DatabaseError) as error: print("Error: %s" % error) conn.rollback() cursor.close() return 1 print("the dataframe is inserted") cursor.close() def insert_df(df): conn = psycopg2.connect( database=DATABASE, user=USER, password=PASSWORD, host=HOST, port=PORT ) df.to_sql('schedule_of_charges',con = conn, if_exists='append', index=False, method="postgres_upsert") conn.commit() conn.close() def postgresmain(df, hospital_id): print(df.head()) check_connection() # check connection with postgres check_table() # check if table exist or not ; if not create table df.rename(columns={'table': 'table_number'}, inplace=True) df.fillna("", inplace=True) hid = df['hospital_id'].iloc[0] if hid != hospital_id: df['hospital_id']=hospital_id hid=hospital_id delete_hid(hid) # if data exist with same hid delete data df = df.astype(str) import pdb # pdb.set_trace() create_partitions(hid) print('hid is created') # execute_values(df) # insert dataframe to postgres def postprocess(df, hospital_id): hid = df['hospital_id'].iloc[0] if hid=='1': df['hospital_id']=hospital_id df['hospital_id'].fillna(hid, inplace=True) df.dropna(subset=['hospital_id'], inplace=True) df['hospital_id']=(df['hospital_id'].astype(float)).astype(int) #df['document']=(df['document'].astype(float)).astype(int) #df['page']=(df['page'].astype(float)).astype(int) #df['table_number']=(df['table_number'].astype(float)).astype(int) #print(df['hospital_id']) return df def start(): # drop_table() dir_path="/usr/src/app/code/data/long" for i in os.listdir(dir_path): hospital_id = i.split('_')[0] try: path = os.path.join(dir_path, i) print('p', path) try: long_ = pd.read_csv(path, delimiter="|", quoting=csv.QUOTE_ALL, on_bad_lines="skip") print(long_.shape) except: long_ = pd.read_csv(path) long_['hospital_id'].fillna(hospital_id, inplace=True) long_ = postprocess(long_, hospital_id) postgresmain(long_, hospital_id) long_.rename(columns={'table': 'table_number'}, inplace=True) long_.fillna("", inplace=True) hid = long_['hospital_id'].iloc[1] # delete_hid(hid) df = long_.astype(str) execute_values(df) except Exception as e: print(hospital_id) print('error',e) break #drop_table() start()