import pandas as pd
import json
import ast
import gc
def validateData(value):
try:
return(ast.literal_eval(value))
except Exception as e:
accountLogger.warn(f"Invalid account extn attributes - {value}")
return []
def fetchData(apilogger,connection,sql,configProperties,dataset,entity):
#print(sql)
global accountLogger
accountLogger = apilogger
accountLogger.debug('started extracting account extn data')
pdAccountExtnData = pd.DataFrame()
pdAccountExtnData = pd.read_sql(sql,connection)
pdAccountExtnData.drop_duplicates(inplace=True)
pdAccountExtnData['account_extn_attributes'] = pdAccountExtnData['account_extn_attributes'].apply(validateData)
pdAccountExtnData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/pdAccountExtnData1.csv')
pdAccountExtnData = pdAccountExtnData.account_extn_attributes.apply(pd.Series) \
.merge(pdAccountExtnData, left_index = True, right_index = True)\
.drop(['account_extn_attributes'],axis=1) \
.melt(id_vars = ['accountid'],value_name = 'account_extn_attributes')\
.drop("variable", axis = 1) \
.dropna()
gc.collect()
pdAccountExtnData = pd.concat([pdAccountExtnData['accountid'],pdAccountExtnData['account_extn_attributes'].apply(pd.Series)],axis =1 )
gc.collect()
pdAccountExtnData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/pdAccountExtnData2.csv')
#accountLogger.debug(pdAccountExtnData.attributeName.unique())
pdAccountExtnData = pdAccountExtnData.pivot_table(columns = 'attributeName',index = ['accountid'],values = 'attributeValue',aggfunc='first').reset_index()
#pdAccountExtnData = pdAccountExtnData.reindex_axis(['empnm'], axis=1)
pdAccountExtnData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/pdAccountExtnData.csv')
gc.collect()
return pdAccountExtnData
if __name__ == '__main__':
fetchData(connection)