Untitled
unknown
plain_text
a year ago
1.9 kB
3
Indexable
Never
import pandas as pd import json import ast import gc def validateData(value): try: return(ast.literal_eval(value)) except Exception as e: accountLogger.warn(f"Invalid account extn attributes - {value}") return [] def fetchData(apilogger,connection,sql,configProperties,dataset,entity): #print(sql) global accountLogger accountLogger = apilogger accountLogger.debug('started extracting account extn data') pdAccountExtnData = pd.DataFrame() pdAccountExtnData = pd.read_sql(sql,connection) pdAccountExtnData.drop_duplicates(inplace=True) pdAccountExtnData['account_extn_attributes'] = pdAccountExtnData['account_extn_attributes'].apply(validateData) pdAccountExtnData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/pdAccountExtnData1.csv') pdAccountExtnData = pdAccountExtnData.account_extn_attributes.apply(pd.Series) \ .merge(pdAccountExtnData, left_index = True, right_index = True)\ .drop(['account_extn_attributes'],axis=1) \ .melt(id_vars = ['accountid'],value_name = 'account_extn_attributes')\ .drop("variable", axis = 1) \ .dropna() gc.collect() pdAccountExtnData = pd.concat([pdAccountExtnData['accountid'],pdAccountExtnData['account_extn_attributes'].apply(pd.Series)],axis =1 ) gc.collect() pdAccountExtnData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/pdAccountExtnData2.csv') #accountLogger.debug(pdAccountExtnData.attributeName.unique()) pdAccountExtnData = pdAccountExtnData.pivot_table(columns = 'attributeName',index = ['accountid'],values = 'attributeValue',aggfunc='first').reset_index() #pdAccountExtnData = pdAccountExtnData.reindex_axis(['empnm'], axis=1) pdAccountExtnData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/pdAccountExtnData.csv') gc.collect() return pdAccountExtnData if __name__ == '__main__': fetchData(connection)