Untitled
unknown
plain_text
3 years ago
5.9 kB
8
Indexable
def fetchSourceData(entity,configProperties,dbConnection):
#apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " + entity + " started")
apilogger.info("Task 4 " + "fetch source data for " + entity + " started")
queries = json.loads(configProperties.get(entity,'queries'))
keyColumns = json.loads(configProperties.get(entity,'keycolumns'))
#print('config-',configProperties)
pdSourceEntityData = pd.DataFrame()
i = 0
for query in queries:
sql = configProperties.get(entity,query)
#print(sql)
pdSubEntityData = pd.DataFrame()
pdSubEntityData = pd.read_sql(sql,con=dbConnection)
apilogger.debug( "Entity Volume " + str (len(pdSubEntityData)))
if i==0:
pdSourceEntityData = pdSubEntityData
else:
#print('pdSubEntityData-' + pdSubEntityData.columns.values)
#print('pdSourceEntityData-' + pdSourceEntityData.columns.values)
apilogger.debug(pdSourceEntityData.columns.values)
apilogger.debug(pdSubEntityData.columns.values)
#pdSourceEntityData = pd.merge(pdSourceEntityData,pdSubEntityData)
#pdSourceEntityData = pdSourceEntityData.merge(pdSubEntityData,on=keyColumns,how='outer')
#pdSourceEntityData = pd.set_index('customerid').join(other.set_index('customerid'))
pdSourceEntityData = pd.concat([pdSourceEntityData,pdSubEntityData], axis=0, ignore_index=True)
i = i + 1
datasets = json.loads(configProperties.get(entity,'datasets'))
apilogger.info( "Entity Volume " + str (len(pdSourceEntityData)))
apilogger.info( pdSourceEntityData.columns.values)
#print('completed fetching data of queries')
k=0
for dataset in datasets:
script = configProperties.get(entity,dataset+'.script')
import os
#print(os.getcwd())
exec('from apis.process.scripts.loadCRMSElasticData.'+script+ ' import fetchData as fetchData',globals())
#from fetchCustomerContactData import fetchData
sql = configProperties.get(entity,dataset+'.query')
pdDataset = fetchData(apilogger,dbConnection,sql,configProperties,dataset,entity)
apilogger.debug( "dataset-" + dataset + str (len(pdDataset)))
apilogger.debug( "dataset-columns" + pdDataset.columns.values)
if i>0 and k >= 0:
if len(pdDataset)>0:
pdSourceEntityData = pdSourceEntityData.merge(pdDataset,on=keyColumns,how='left')
if i==0 and k==0:
pdSourceEntityData = pdDataset
k = k +1
apilogger.debug( "dataset-columns" + pdSourceEntityData.columns.values)
apilogger.debug( "dataset-volume-" + dataset + str(len(pdSourceEntityData)))
pdColumns = pdSourceEntityData.columns
apilogger.debug( "columns-" + pdColumns)
#pdSourceEntityData.to_csv('dataset-'+entity+'.csv')
for col in pdColumns:
apilogger.debug( "columns-" + col + str('_x' in col))
if '_x' in col:
columnName = col[0:col.find('_x',0)]
apilogger.debug( "columnName-" + columnName)
pdSourceEntityData[columnName] = pdSourceEntityData[col]
pdSourceEntityData.loc[(pdSourceEntityData[col].isnull()),columnName] = pdSourceEntityData[columnName+'_y']
pdSourceEntityData = pdSourceEntityData.drop([col,columnName+'_y'],axis=1)
#pdSourceEntityData.to_csv('dataset-'+entity+'.csv.cleanup')
#pdSourceEntityData['contact'] = pdSourceEntityData['contact_x']
#pdSourceEntityData.loc[(pdSourceEntityData['contact_y'].isnull()),'contact'] = pdSourceEntityData['contact_y']
#pdSourceEntityData = pdSourceEntityData.drop(['contact_x','contact_y'],axis=1)
#apilogger.debug( "Entity Volume " + str (len(pdSubEntityData)))
apilogger.debug( pdSourceEntityData.columns.values)
if len(pdSourceEntityData) > 0 :
if 'partyId' in list(pdSourceEntityData.columns):
pdSourceEntityData['partyId'] = pdSourceEntityData['partyId'].astype(str)
if (entity in ['CustomerOrg','CustomerResidential']):
pdSourceEntityData['ownerName'] = pdSourceEntityData['firstname']+ ' '+ pdSourceEntityData['lastname']
if ('createdon' in list(pdSourceEntityData.columns)):
pdSourceEntityData['createdon'] = pdSourceEntityData['createdon'].dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')
if ('dateofbirth' in list(pdSourceEntityData.columns)):
pdSourceEntityData['dateofbirth'] = pdSourceEntityData['dateofbirth'].dt.strftime('%d-%b-%Y %H:%M:%S')
if ('engagedParty' in list(pdSourceEntityData.columns)):
pdSourceEntityData['engagedParty'] = [ {} if type(x) != dict else x for x in pdSourceEntityData['engagedParty']]
if ('primaryserviceidentifier' in list(pdSourceEntityData.columns)):
pdSourceEntityData['primaryserviceidentifier'] = [ {} if type(x) != dict else x for x in pdSourceEntityData['primaryserviceidentifier']]
#pdSourceEntityData['engagedParty'] = pdSourceEntityData['engagedParty'].fillna('{}',inplace=True)
# pdSourceEntityData['engagedParty'] = pdSourceEntityData['engagedParty'].fillna({},inplace=True)
dbColumnsForComparison = json.loads(configProperties.get(entity,'dbcolumnsforcomparison'))
dbColumnsForComparison = [ column for column in dbColumnsForComparison if column in pdSourceEntityData.columns.values]
pdSourceEntityData = pdSourceEntityData[dbColumnsForComparison]
#pdSourceEntityData['source'] =configProperties.get('config','sourcedatabase')
#print('oracle -' + str(pdSourceEntityData['contact'].dtype))
#print(len(pdSourceEntityData))
#print(pdSourceEntityData.columns.values)
#pdSourceEntityData['serviceAttributes'].fillna({},inplace=True)
#pdSourceEntityData['serviceType'].fillna([],inplace=True)
pdSourceEntityData.fillna('null',inplace=True)
#print(pdSourceEntityData)
#pdSourceEntityData.to_csv('sourcedata-'+entity+'.csv')
gc.collect()
#apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " + entity + " completed")
apilogger.info("Task 4 " + "fetch source data for " + entity + " completed")
return pdSourceEntityDataEditor is loading...