Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
5.9 kB
1
Indexable
Never
def fetchSourceData(entity,configProperties,dbConnection):
	#apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " +  entity + " started")
	apilogger.info("Task 4 " + "fetch source data for " +  entity + " started")
	queries = json.loads(configProperties.get(entity,'queries'))
	keyColumns = json.loads(configProperties.get(entity,'keycolumns'))
	#print('config-',configProperties)	
	pdSourceEntityData = pd.DataFrame()
	i = 0 
	for query in queries:
		sql = configProperties.get(entity,query)
		#print(sql)
		pdSubEntityData = pd.DataFrame()
		pdSubEntityData = pd.read_sql(sql,con=dbConnection)
		apilogger.debug( "Entity Volume " + str (len(pdSubEntityData)))	
		if i==0:
			pdSourceEntityData = pdSubEntityData
		else:
			#print('pdSubEntityData-' + pdSubEntityData.columns.values)
			#print('pdSourceEntityData-' + pdSourceEntityData.columns.values)
			apilogger.debug(pdSourceEntityData.columns.values)
			apilogger.debug(pdSubEntityData.columns.values)
			#pdSourceEntityData = pd.merge(pdSourceEntityData,pdSubEntityData)
			#pdSourceEntityData = pdSourceEntityData.merge(pdSubEntityData,on=keyColumns,how='outer')
			#pdSourceEntityData = pd.set_index('customerid').join(other.set_index('customerid'))
			pdSourceEntityData = pd.concat([pdSourceEntityData,pdSubEntityData], axis=0, ignore_index=True)
		i = i + 1
	datasets = json.loads(configProperties.get(entity,'datasets'))
	apilogger.info( "Entity Volume " + str (len(pdSourceEntityData)))
	apilogger.info( pdSourceEntityData.columns.values)
	#print('completed fetching data of queries')
	k=0 
	for dataset in datasets:
		script = configProperties.get(entity,dataset+'.script')
		import os
		#print(os.getcwd())
		exec('from apis.process.scripts.loadCRMSElasticData.'+script+ ' import fetchData as fetchData',globals())
		#from fetchCustomerContactData import fetchData
		sql = configProperties.get(entity,dataset+'.query')
		pdDataset = fetchData(apilogger,dbConnection,sql,configProperties,dataset,entity)
		apilogger.debug( "dataset-" + dataset + str (len(pdDataset)))
		apilogger.debug( "dataset-columns" + pdDataset.columns.values)
		if i>0 and k >= 0:
			if len(pdDataset)>0:
				pdSourceEntityData = pdSourceEntityData.merge(pdDataset,on=keyColumns,how='left')
		if i==0 and k==0:
			pdSourceEntityData = pdDataset
		k = k +1
		apilogger.debug( "dataset-columns" + pdSourceEntityData.columns.values)
		apilogger.debug( "dataset-volume-" + dataset + str(len(pdSourceEntityData)))
	pdColumns = pdSourceEntityData.columns
	apilogger.debug( "columns-" + pdColumns)
	#pdSourceEntityData.to_csv('dataset-'+entity+'.csv')
	for col in pdColumns:
		apilogger.debug( "columns-" + col + str('_x' in col))
		if '_x' in col:
			columnName = col[0:col.find('_x',0)]
			apilogger.debug( "columnName-" + columnName)
			pdSourceEntityData[columnName] = pdSourceEntityData[col]
			pdSourceEntityData.loc[(pdSourceEntityData[col].isnull()),columnName] = pdSourceEntityData[columnName+'_y']
			pdSourceEntityData =  pdSourceEntityData.drop([col,columnName+'_y'],axis=1)
	#pdSourceEntityData.to_csv('dataset-'+entity+'.csv.cleanup')
	#pdSourceEntityData['contact'] = pdSourceEntityData['contact_x']
	#pdSourceEntityData.loc[(pdSourceEntityData['contact_y'].isnull()),'contact'] = pdSourceEntityData['contact_y']
	#pdSourceEntityData = pdSourceEntityData.drop(['contact_x','contact_y'],axis=1)
	#apilogger.debug( "Entity Volume " + str (len(pdSubEntityData)))
	apilogger.debug( pdSourceEntityData.columns.values)
	if len(pdSourceEntityData) > 0 :
		if 'partyId' in list(pdSourceEntityData.columns):
			pdSourceEntityData['partyId'] = pdSourceEntityData['partyId'].astype(str)
		if (entity in ['CustomerOrg','CustomerResidential']):
			pdSourceEntityData['ownerName'] = pdSourceEntityData['firstname']+ ' '+ pdSourceEntityData['lastname']
		if ('createdon' in list(pdSourceEntityData.columns)):
			pdSourceEntityData['createdon'] = pdSourceEntityData['createdon'].dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')
		if ('dateofbirth' in list(pdSourceEntityData.columns)):
			pdSourceEntityData['dateofbirth'] = pdSourceEntityData['dateofbirth'].dt.strftime('%d-%b-%Y %H:%M:%S')
		if ('engagedParty' in list(pdSourceEntityData.columns)):
			pdSourceEntityData['engagedParty'] = [ {} if type(x) != dict  else x for x in  pdSourceEntityData['engagedParty']]
		if ('primaryserviceidentifier' in list(pdSourceEntityData.columns)):
			pdSourceEntityData['primaryserviceidentifier'] =  [ {} if type(x) != dict  else x for x in  pdSourceEntityData['primaryserviceidentifier']]
			#pdSourceEntityData['engagedParty']  = pdSourceEntityData['engagedParty'].fillna('{}',inplace=True)
		#	pdSourceEntityData['engagedParty']  = pdSourceEntityData['engagedParty'].fillna({},inplace=True)
	dbColumnsForComparison = json.loads(configProperties.get(entity,'dbcolumnsforcomparison'))
	dbColumnsForComparison = [ column for column in dbColumnsForComparison if column in pdSourceEntityData.columns.values]
	pdSourceEntityData = pdSourceEntityData[dbColumnsForComparison]
	#pdSourceEntityData['source'] =configProperties.get('config','sourcedatabase') 
	#print('oracle -' + str(pdSourceEntityData['contact'].dtype))
	
	#print(len(pdSourceEntityData))
	#print(pdSourceEntityData.columns.values)
	#pdSourceEntityData['serviceAttributes'].fillna({},inplace=True)
	#pdSourceEntityData['serviceType'].fillna([],inplace=True)
	pdSourceEntityData.fillna('null',inplace=True)
	#print(pdSourceEntityData)
	#pdSourceEntityData.to_csv('sourcedata-'+entity+'.csv')
	gc.collect()
	#apilogger.info("Task 4/" + configProperties.get('config','totalsteps') +  "fetch source data for " +  entity + " completed")
	apilogger.info("Task 4 " +  "fetch source data for " +  entity + " completed")
	return pdSourceEntityData