Untitled
unknown
plain_text
2 years ago
5.9 kB
5
Indexable
def fetchSourceData(entity,configProperties,dbConnection): #apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " + entity + " started") apilogger.info("Task 4 " + "fetch source data for " + entity + " started") queries = json.loads(configProperties.get(entity,'queries')) keyColumns = json.loads(configProperties.get(entity,'keycolumns')) #print('config-',configProperties) pdSourceEntityData = pd.DataFrame() i = 0 for query in queries: sql = configProperties.get(entity,query) #print(sql) pdSubEntityData = pd.DataFrame() pdSubEntityData = pd.read_sql(sql,con=dbConnection) apilogger.debug( "Entity Volume " + str (len(pdSubEntityData))) if i==0: pdSourceEntityData = pdSubEntityData else: #print('pdSubEntityData-' + pdSubEntityData.columns.values) #print('pdSourceEntityData-' + pdSourceEntityData.columns.values) apilogger.debug(pdSourceEntityData.columns.values) apilogger.debug(pdSubEntityData.columns.values) #pdSourceEntityData = pd.merge(pdSourceEntityData,pdSubEntityData) #pdSourceEntityData = pdSourceEntityData.merge(pdSubEntityData,on=keyColumns,how='outer') #pdSourceEntityData = pd.set_index('customerid').join(other.set_index('customerid')) pdSourceEntityData = pd.concat([pdSourceEntityData,pdSubEntityData], axis=0, ignore_index=True) i = i + 1 datasets = json.loads(configProperties.get(entity,'datasets')) apilogger.info( "Entity Volume " + str (len(pdSourceEntityData))) apilogger.info( pdSourceEntityData.columns.values) #print('completed fetching data of queries') k=0 for dataset in datasets: script = configProperties.get(entity,dataset+'.script') import os #print(os.getcwd()) exec('from apis.process.scripts.loadCRMSElasticData.'+script+ ' import fetchData as fetchData',globals()) #from fetchCustomerContactData import fetchData sql = configProperties.get(entity,dataset+'.query') pdDataset = fetchData(apilogger,dbConnection,sql,configProperties,dataset,entity) apilogger.debug( "dataset-" + dataset + str (len(pdDataset))) apilogger.debug( "dataset-columns" + pdDataset.columns.values) if i>0 and k >= 0: if len(pdDataset)>0: pdSourceEntityData = pdSourceEntityData.merge(pdDataset,on=keyColumns,how='left') if i==0 and k==0: pdSourceEntityData = pdDataset k = k +1 apilogger.debug( "dataset-columns" + pdSourceEntityData.columns.values) apilogger.debug( "dataset-volume-" + dataset + str(len(pdSourceEntityData))) pdColumns = pdSourceEntityData.columns apilogger.debug( "columns-" + pdColumns) #pdSourceEntityData.to_csv('dataset-'+entity+'.csv') for col in pdColumns: apilogger.debug( "columns-" + col + str('_x' in col)) if '_x' in col: columnName = col[0:col.find('_x',0)] apilogger.debug( "columnName-" + columnName) pdSourceEntityData[columnName] = pdSourceEntityData[col] pdSourceEntityData.loc[(pdSourceEntityData[col].isnull()),columnName] = pdSourceEntityData[columnName+'_y'] pdSourceEntityData = pdSourceEntityData.drop([col,columnName+'_y'],axis=1) #pdSourceEntityData.to_csv('dataset-'+entity+'.csv.cleanup') #pdSourceEntityData['contact'] = pdSourceEntityData['contact_x'] #pdSourceEntityData.loc[(pdSourceEntityData['contact_y'].isnull()),'contact'] = pdSourceEntityData['contact_y'] #pdSourceEntityData = pdSourceEntityData.drop(['contact_x','contact_y'],axis=1) #apilogger.debug( "Entity Volume " + str (len(pdSubEntityData))) apilogger.debug( pdSourceEntityData.columns.values) if len(pdSourceEntityData) > 0 : if 'partyId' in list(pdSourceEntityData.columns): pdSourceEntityData['partyId'] = pdSourceEntityData['partyId'].astype(str) if (entity in ['CustomerOrg','CustomerResidential']): pdSourceEntityData['ownerName'] = pdSourceEntityData['firstname']+ ' '+ pdSourceEntityData['lastname'] if ('createdon' in list(pdSourceEntityData.columns)): pdSourceEntityData['createdon'] = pdSourceEntityData['createdon'].dt.strftime('%Y-%m-%dT%H:%M:%S.000Z') if ('dateofbirth' in list(pdSourceEntityData.columns)): pdSourceEntityData['dateofbirth'] = pdSourceEntityData['dateofbirth'].dt.strftime('%d-%b-%Y %H:%M:%S') if ('engagedParty' in list(pdSourceEntityData.columns)): pdSourceEntityData['engagedParty'] = [ {} if type(x) != dict else x for x in pdSourceEntityData['engagedParty']] if ('primaryserviceidentifier' in list(pdSourceEntityData.columns)): pdSourceEntityData['primaryserviceidentifier'] = [ {} if type(x) != dict else x for x in pdSourceEntityData['primaryserviceidentifier']] #pdSourceEntityData['engagedParty'] = pdSourceEntityData['engagedParty'].fillna('{}',inplace=True) # pdSourceEntityData['engagedParty'] = pdSourceEntityData['engagedParty'].fillna({},inplace=True) dbColumnsForComparison = json.loads(configProperties.get(entity,'dbcolumnsforcomparison')) dbColumnsForComparison = [ column for column in dbColumnsForComparison if column in pdSourceEntityData.columns.values] pdSourceEntityData = pdSourceEntityData[dbColumnsForComparison] #pdSourceEntityData['source'] =configProperties.get('config','sourcedatabase') #print('oracle -' + str(pdSourceEntityData['contact'].dtype)) #print(len(pdSourceEntityData)) #print(pdSourceEntityData.columns.values) #pdSourceEntityData['serviceAttributes'].fillna({},inplace=True) #pdSourceEntityData['serviceType'].fillna([],inplace=True) pdSourceEntityData.fillna('null',inplace=True) #print(pdSourceEntityData) #pdSourceEntityData.to_csv('sourcedata-'+entity+'.csv') gc.collect() #apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " + entity + " completed") apilogger.info("Task 4 " + "fetch source data for " + entity + " completed") return pdSourceEntityData
Editor is loading...