Untitled
unknown
plain_text
2 years ago
24 kB
5
Indexable
import csv import json import numpy as np import gc import uuid import numpy as np import urllib.request import requests import pandas as pd import sys import os from cassandra.cluster import Cluster, DefaultConnection from cassandra.util import datetime_from_uuid1 from cassandra.query import dict_factory from cassandra.auth import PlainTextAuthProvider from config.logger import configlogfile import apis.process.reconProcess as reconProcess from config.configureAPIResponse import setResponse from datetime import datetime def pandas_factory(colnames,rows): return pd.DataFrame(rows,columns = colnames) def fetchElasticEntityData(__pdElasticData,entity,configProperties): #apilogger.info("Task 5/" + configProperties.get('config','totalsteps') + "fetch elastic data for " + entity+ " started") apilogger.info("Task 5 " + "fetch elastic data for " + entity+ " started") elasticFilterCondition= configProperties.get(entity,'elasticfiltercondition') __pdElasticEntityData = __pdElasticData.copy() #exec("pdElasticEntityData = pdElasticData[" + elasticFilterCondition + "].reindex()",globals()) #pdElasticEntityData = pdElasticData[pdElasticData.id == pdElasticData.customerid].reindex() __pdElasticEntityData.query(elasticFilterCondition, inplace = True) #pdElasticEntityData = pdElasticData apilogger.debug('__pdElasticEntityData -columns - ' + str(__pdElasticEntityData.columns.values.tolist())) #print(pdElasticEntityData) elasticColumnsforComparison = json.loads(configProperties.get(entity,'elasticcolumnsforcomparison')) __pdElasticEntityData = __pdElasticEntityData[elasticColumnsforComparison] __pdElasticEntityData = __pdElasticEntityData.reset_index(drop=True) #pdElasticEntityData = pdElasticEntityData.loc[pdElasticEntityData['customerid'].isin(['C38028367'])].reindex() #__pdElasticEntityData['source'] = 'Elastic' #print(pdElasticEntityData.columns.values) #print(len(__pdElasticData)) #print (pdElasticEntityData['contact']) #__pdElasticEntityData.to_csv('elastic'+entity+'.csv') gc.collect() #apilogger.info("Task 5/" + configProperties.get('config','totalsteps') + "fetch elastic data for " + entity+ " completed") apilogger.info("Task 5 " + "fetch elastic data for " + entity+ " completed") return __pdElasticEntityData def fetchSourceData(entity,configProperties,dbConnection): #apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " + entity + " started") apilogger.info("Task 4 " + "fetch source data for " + entity + " started") queries = json.loads(configProperties.get(entity,'queries')) keyColumns = json.loads(configProperties.get(entity,'keycolumns')) #print('config-',configProperties) pdSourceEntityData = pd.DataFrame() i = 0 for query in queries: sql = configProperties.get(entity,query) #print(sql) pdSubEntityData = pd.DataFrame() pdSubEntityData = pd.read_sql(sql,con=dbConnection) apilogger.debug( "Entity Volume " + str (len(pdSubEntityData))) if i==0: pdSourceEntityData = pdSubEntityData else: #print('pdSubEntityData-' + pdSubEntityData.columns.values) #print('pdSourceEntityData-' + pdSourceEntityData.columns.values) apilogger.debug(pdSourceEntityData.columns.values) apilogger.debug(pdSubEntityData.columns.values) #pdSourceEntityData = pd.merge(pdSourceEntityData,pdSubEntityData) #pdSourceEntityData = pdSourceEntityData.merge(pdSubEntityData,on=keyColumns,how='outer') #pdSourceEntityData = pd.set_index('customerid').join(other.set_index('customerid')) pdSourceEntityData = pd.concat([pdSourceEntityData,pdSubEntityData], axis=0, ignore_index=True) i = i + 1 datasets = json.loads(configProperties.get(entity,'datasets')) apilogger.info( "Entity Volume " + str (len(pdSourceEntityData))) apilogger.info( pdSourceEntityData.columns.values) #print('completed fetching data of queries') k=0 for dataset in datasets: script = configProperties.get(entity,dataset+'.script') import os #print(os.getcwd()) exec('from apis.process.scripts.loadCRMSElasticData.'+script+ ' import fetchData as fetchData',globals()) #from fetchCustomerContactData import fetchData sql = configProperties.get(entity,dataset+'.query') pdDataset = fetchData(apilogger,dbConnection,sql,configProperties,dataset,entity) apilogger.debug( "dataset-" + dataset + str (len(pdDataset))) apilogger.debug( "dataset-columns" + pdDataset.columns.values) if i>0 and k >= 0: if len(pdDataset)>0: pdSourceEntityData = pdSourceEntityData.merge(pdDataset,on=keyColumns,how='left') if i==0 and k==0: pdSourceEntityData = pdDataset k = k +1 apilogger.debug( "dataset-columns" + pdSourceEntityData.columns.values) apilogger.debug( "dataset-volume-" + dataset + str(len(pdSourceEntityData))) pdColumns = pdSourceEntityData.columns apilogger.debug( "columns-" + pdColumns) #pdSourceEntityData.to_csv('dataset-'+entity+'.csv') for col in pdColumns: apilogger.debug( "columns-" + col + str('_x' in col)) if '_x' in col: columnName = col[0:col.find('_x',0)] apilogger.debug( "columnName-" + columnName) pdSourceEntityData[columnName] = pdSourceEntityData[col] pdSourceEntityData.loc[(pdSourceEntityData[col].isnull()),columnName] = pdSourceEntityData[columnName+'_y'] pdSourceEntityData = pdSourceEntityData.drop([col,columnName+'_y'],axis=1) #pdSourceEntityData.to_csv('dataset-'+entity+'.csv.cleanup') #pdSourceEntityData['contact'] = pdSourceEntityData['contact_x'] #pdSourceEntityData.loc[(pdSourceEntityData['contact_y'].isnull()),'contact'] = pdSourceEntityData['contact_y'] #pdSourceEntityData = pdSourceEntityData.drop(['contact_x','contact_y'],axis=1) #apilogger.debug( "Entity Volume " + str (len(pdSubEntityData))) apilogger.debug( pdSourceEntityData.columns.values) if len(pdSourceEntityData) > 0 : if 'partyId' in list(pdSourceEntityData.columns): pdSourceEntityData['partyId'] = pdSourceEntityData['partyId'].astype(str) if (entity in ['CustomerOrg','CustomerResidential']): pdSourceEntityData['ownerName'] = pdSourceEntityData['firstname']+ ' '+ pdSourceEntityData['lastname'] if ('createdon' in list(pdSourceEntityData.columns)): pdSourceEntityData['createdon'] = pdSourceEntityData['createdon'].dt.strftime('%Y-%m-%dT%H:%M:%S.000Z') if ('dateofbirth' in list(pdSourceEntityData.columns)): pdSourceEntityData['dateofbirth'] = pdSourceEntityData['dateofbirth'].dt.strftime('%d-%b-%Y %H:%M:%S') if ('engagedParty' in list(pdSourceEntityData.columns)): pdSourceEntityData['engagedParty'] = [ {} if type(x) != dict else x for x in pdSourceEntityData['engagedParty']] if ('primaryserviceidentifier' in list(pdSourceEntityData.columns)): pdSourceEntityData['primaryserviceidentifier'] = [ {} if type(x) != dict else x for x in pdSourceEntityData['primaryserviceidentifier']] #pdSourceEntityData['engagedParty'] = pdSourceEntityData['engagedParty'].fillna('{}',inplace=True) # pdSourceEntityData['engagedParty'] = pdSourceEntityData['engagedParty'].fillna({},inplace=True) dbColumnsForComparison = json.loads(configProperties.get(entity,'dbcolumnsforcomparison')) dbColumnsForComparison = [ column for column in dbColumnsForComparison if column in pdSourceEntityData.columns.values] pdSourceEntityData = pdSourceEntityData[dbColumnsForComparison] #pdSourceEntityData['source'] =configProperties.get('config','sourcedatabase') #print('oracle -' + str(pdSourceEntityData['contact'].dtype)) #print(len(pdSourceEntityData)) #print(pdSourceEntityData.columns.values) #pdSourceEntityData['serviceAttributes'].fillna({},inplace=True) #pdSourceEntityData['serviceType'].fillna([],inplace=True) pdSourceEntityData.fillna('null',inplace=True) #print(pdSourceEntityData) #pdSourceEntityData.to_csv('sourcedata-'+entity+'.csv') gc.collect() #apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " + entity + " completed") apilogger.info("Task 4 " + "fetch source data for " + entity + " completed") return pdSourceEntityData def fn_setProperties(): import configparser configProperties = configparser.ConfigParser() configProperties.read("config/reconcile/reconElasticData.config") return configProperties def setDBConnection(dbType,configProperties): #apilogger.info("Task 1/" + configProperties.get('config','totalsteps') + " set database connection started") apilogger.info("Task 1 " + " set database connection started") if dbType == 'oracle': import cx_Oracle username = configProperties.get(dbType,'username') password = configProperties.get(dbType,'password') connectstring = configProperties.get(dbType,'connectstring') conn = cx_Oracle.connect(username,password,connectstring) elif (dbType == 'maria' or dbType == 'mysql'): import mysql.connector username = configProperties.get(dbType,'username') password = configProperties.get(dbType,'password') database = configProperties.get(dbType,'database') host = configProperties.get(dbType,'host') port = configProperties.get(dbType,'port') conn = mysql.connector.connect( user = username,\ password = password,\ host = host,\ database = database,\ port = port) #apilogger.info("Task 1/" + configProperties.get('config','totalsteps') + " set database connection completed") apilogger.info("Task " + " set database connection completed") return conn def setElasticConnection(configProperties): #apilogger.info("Task 2/" + configProperties.get('config','totalsteps') + " set elastic connection started") apilogger.info("Task 2 " + " set elastic connection started") from elasticsearch6 import Elasticsearch,RequestsHttpConnection #from elasticsearch_dsl import Search elasticSource = json.loads(configProperties.get('config','elasticsource')) elasticSource = Elasticsearch(elasticSource,use_ssl=False,verify_certs=False,connection_class=RequestsHttpConnection) #elasticSource = Elasticsearch(elasticSource,verify_certs=True) #apilogger.info("Task 2/" + configProperties.get('config','totalsteps') + " set elastic connection completed") apilogger.info("Task 2 " + " set elastic connection completed") return elasticSource def fetchElasticData(elasticSource,__elasticIndex,__elasticDocType,configProperties): #apilogger.info("Task 3/" + configProperties.get('config','totalsteps') + " fetch elastic data started") apilogger.info("Task 3 " + " fetch elastic data started") #totalElasticRecords = elasticSource.count(index=__elasticIndex)['count'] from elasticsearch6 import Elasticsearch,RequestsHttpConnection elasticTempSource = json.loads(configProperties.get('config','elasticsource')) for server in range(len(elasticTempSource)): elasticTempSource[server] = elasticTempSource[server] + '/'+__elasticIndex+'/' +__elasticDocType elasticTempSource = Elasticsearch(elasticTempSource,use_ssl=False,verify_certs=False,connection_class=RequestsHttpConnection) totalElasticRecords = elasticTempSource.count()['count'] doc = { 'size' : 10000, 'query': { 'match_all' : {} } } pdElasticData = pd.DataFrame() elasticData = elasticSource.search(index= __elasticIndex,doc_type = __elasticDocType,body=doc,scroll='1m') #elasticData = elasticSource.search(body=doc,scroll='1m') pdElasticData = pd.DataFrame.from_records(elasticData['hits']['hits']) #return pdElasticData dataFetchCount = 0 dataFetchCount = len(elasticData['hits']['hits']) while dataFetchCount < totalElasticRecords: elasticNextData = elasticSource.scroll(scroll_id =elasticData['_scroll_id'],scroll = '1m') pdElasticData = pd.concat([pdElasticData,pd.DataFrame.from_records(elasticNextData['hits']['hits'])], ignore_index=True) elasticData = elasticNextData dataFetchCount = dataFetchCount + len(elasticNextData['hits']['hits']) apilogger.debug('dataFetchCount-' + str(dataFetchCount) + ',totalElasticRecords-' + str(totalElasticRecords)+ 'pdElasticData-'+str(len(pdElasticData))) if dataFetchCount == 0: break pdElasticData = pd.DataFrame.from_records(pdElasticData['_source'].to_list()) #pdElasticData.drop(['contact']) #print(pdElasticData.columns.values) gc.collect() #apilogger.info("Task 3/" + configProperties.get('config','totalsteps') + " fetch elastic data completed") apilogger.info("Task 3 " + " fetch elastic data completed") return pdElasticData def genElasticDoc(pdErroredData,elasticIndex,elasticDocType,configProperties): import ast erroredRecords = pdErroredData.to_dict(orient ='records') i = 0 for record in erroredRecords: #print(record) #apilogger.info(record) #print(type(record['contact'])) if 'contact' in record.keys(): if type(record['contact']) != list : record['contact'] = ast.literal_eval(record['contact']) i = i + 1 #print(i) #apilogger.info(record) yield { '_index': elasticIndex, # '_type': elasticDocType, '_id' : record['id'], '_source': record } """ yield { '_id' : record['id'], '_source': record } """ def loadElasticData(pdErroredData,elasticSource,elasticIndex,elasticDocType,configProperties): from elasticsearch6 import helpers #apilogger.info("Task 6/" + configProperties.get('config','totalsteps') + " load elastic data started") apilogger.info("Task 6 " + " load elastic data started") #pdErroredData.to_csv('elasticdata.csv') #response = [] try: #response = helpers.bulk(elasticSource,genElasticDoc(pdErroredData,elasticIndex,elasticDocType,configProperties),chunk_size=configProperties.get('config','elasticDataLoadVolume'),stats_only=False,raise_on_exception=False,raise_on_error = False) response = helpers.bulk(elasticSource,genElasticDoc(pdErroredData,elasticIndex,elasticDocType,configProperties),chunk_size= 5000,stats_only=False,raise_on_exception=False,raise_on_error = False) #print('sdsd'+str(response)) except Exception as e: error = "" apilogger.info('Error in elastic Data Load'+str(e)) #print('Error in elastic Data Load'+str(e)) errors = set() erroredIds = [] apilogger.info('response - ' + str(response)) apilogger.info('response - ' + str(type(response))) apilogger.info('response - ' + str(response[0])) if len(response) > 0 : apilogger.info('Reportdetails-rows-reloaded-' + str(response[0])) apilogger.info('Reportdetails-rows-error in reloading-' + str(len(response[1]))) for i in range(len(response[1])): erroredIds.append(str(response[1][i]['index']['_id'])) errors.add(str(response[1][i]['index']['error'])) if len(response[1]) > 0 : apilogger.info('Reportdetails-rows-erroredIds-' + str(erroredIds)) apilogger.info('Reportdetails-rows-errors-' + str(errors)) gc.collect() #apilogger.info("Task 6/" + configProperties.get('config','totalsteps') + " load elastic data completed") apilogger.info("Task 6 " + " load elastic data completed") def chkProcess(api): if (os.path.isfile(dirpath+'/apis/process/'+ api + '.py')): return True def apilogfile(api): import logging as __logging import logging.handlers as __handlers __logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH') #print(__logpath) __logging.shutdown() __log_handler = __logging.FileHandler(__logpath+'/logs/'+api + '.log',mode='w') formatter = __logging.Formatter(fmt='%(asctime)s [Process:%(process)d] [%(levelname)-8s] [%(filename)s - %(module)s] : %(message)s',datefmt= '%Y-%m-%d %H:%M:%S') #formatter.converter = time.gmtime # if you want UTC tGime __log_handler.setFormatter(formatter) __logger = __logging.getLogger('apilog'+str(uuid.uuid1())) if not __logger.handlers: __logger.addHandler(__log_handler) __logger.setLevel(__logging.DEBUG) __logger.info("wewewe") return __logger def processData(api,args,configProperties): logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH') logger.info("Process in Progress") if(os.path.isfile(logpath+'/logs/'+ api + '.log')): os.rename(logpath+'/logs/'+api + '.log',logpath+'/logs/'+api + '.log' + datetime.now().strftime("%Y%m%d%H%M%S")) #os.rename(logpath+'/logs/'+api + '.log') global apilogger apilogger=apilogfile(api) apilogger.info("Initiated the process") try: if 'index' in args: elasticIndex = args['index'] elasticIndexes = json.loads( configProperties.get('config','elasticindexes')) if not (elasticIndex in elasticIndexes): raise Exception("Invalid index value passed as argument.Valid index values are " + ','.join(elasticIndexes) ) else: raise Exception("Mandatory argument index is not passed") if 'docType' in args: elasticDocType = args['docType'] elasticDocTypes = json.loads( configProperties.get('config',elasticIndex)) if not (elasticDocType in elasticDocTypes): raise Exception("Invalid document type value passed as argument.Valid document types are " + ','.join(elasticDocTypes)) else: raise Exception("Mandatory argument docType is not passed") if 'reconcile' in args: enableElasticComparison = args['reconcile'] else: if configProperties.get('config','enableelasticcomparison') == "True": enableElasticComparison = True else: enableElasticComparison = False #apilogger.debug('elasticDataLoad-' + args['elasticDataLoad']) if 'elasticDataLoad' in args: enableElasticDataload = args['elasticDataLoad'] else: if configProperties.get('config','enableelasticdataload') =="True": enableElasticDataload = True else: enableElasticDataload = False apilogger.debug('enableElasticDataload-' + str(enableElasticDataload)); apilogger.debug("elasticIndex " + elasticIndex) apilogger.debug("elasticDocType " + elasticDocType) entitiesForRecon = json.loads(configProperties.get('config',elasticDocType)) sourceDBType = configProperties.get('config','sourcedatabase') dbConnection = setDBConnection(sourceDBType,configProperties); elasticSource = setElasticConnection(configProperties); #enableElasticComparison = configProperties.get('config','enableelasticcomparison') #enableElasticDataload = configProperties.get('config','enableelasticdataload') apilogger.debug('enableElasticComparison-' + str(enableElasticComparison)) apilogger.debug('enableElasticDataload-' + str(enableElasticDataload)) if (enableElasticComparison): pdElasticData = fetchElasticData(elasticSource,elasticIndex,elasticDocType,configProperties) #print('pdElasticData-'+ str(len(pdElasticData))) pdCompareData = pd.DataFrame() for entity in entitiesForRecon: #return ("Entity") #print('entity-' + entity) pdSourceData = fetchSourceData(entity,configProperties,dbConnection) apilogger.info('Reportdetails-start-' + entity) apilogger.info('Reportdetails-rows-source-' + str(len(pdSourceData))) if 'contact' in pdSourceData.columns: pdSourceData['contact'] = [ [] if type(x) != list else x for x in pdSourceData['contact']] if 'serviceType' in pdSourceData.columns: pdSourceData['serviceType'] = [ [] if type(x) != list else x for x in pdSourceData['serviceType']] if 'serviceAttributes' in pdSourceData.columns: pdSourceData['serviceAttributes'] = [ {} if type(x) != dict else x for x in pdSourceData['serviceAttributes']] if enableElasticComparison: #print('pdElasticData-'+ str(len(pdElasticData))) pdElasticEntityData = fetchElasticEntityData(pdElasticData,entity+'-elastic',configProperties) #isnull = pdSourceData.contact.isnull() #pdSourceData.loc[isnull,'contact'] = []* isnull.sum() #pdElasticEntityData['contact'] = pdSourceData['contact'].astype(str) elasticColumnsforComparison = json.loads(configProperties.get(entity,'elasticcolumnsforcomparison')) dbColumnsForComparison = json.loads(configProperties.get(entity,'dbcolumnsforcomparison')) #print(pdSourceData) if elasticColumnsforComparison != dbColumnsForComparison: continue #print(pdSourceData.columns.values) #print('source-' + str(len(pdSourceData))) apilogger.info(pdElasticEntityData.columns.values) pdElasticEntityDataVolume = pdElasticEntityData.astype(str) pdElasticEntityDataVolume.drop_duplicates(keep='last',inplace=True) #print('elastic-' + str(len(pdElasticEntityDataVolume))+str(len(pdElasticEntityData))) apilogger.info('Reportdetails-rows-elastic-' + str(len(pdElasticEntityDataVolume))) #print((pdSourceData.astype(str)).equals(pdElasticEntityData.astype(str))) if not pdSourceData.equals(pdElasticEntityData): pdSourceData['source']=sourceDBType pdElasticEntityData['source']='Elastic' pdCompareData = pd.concat([pdElasticEntityData,pdSourceData],ignore_index=True) #print(len(pdCompareData)) pdCompareData = pdCompareData.astype(str) #pdCompareData.to_csv('compare-'+entity +'.csv') #print(len(pdCompareData)) pdCompareData.drop_duplicates(subset=dbColumnsForComparison,keep=False,inplace=True) #pdCompareData.to_csv('Duplicates-'+entity +'.csv') #print(pdCompareData) #print(len(pdCompareData)) pdCompareData = pdCompareData.loc[pdCompareData['source']==sourceDBType] #pdCompareData.to_csv('Missing Records-'+entity +'.csv') #print(pdCompareData.columns.values) pdCompareData.drop(['source'],axis=1,inplace=True) #print(pdCompareData.columns.values) #print(pdCompareData) enableErroredDataLoad = configProperties.get('config','enableErroredDataLoad') #print(len(pdCompareData)) apilogger.info('Reportdetails-rows-errored-' + str(len(pdCompareData))) apilogger.info('Reportdetails-rows-errors-' + str(pdCompareData['id'].tolist())) if (len(pdCompareData)>0): if enableElasticDataload: loadElasticData(pdCompareData,elasticSource,elasticIndex,elasticDocType,configProperties) #print(len(pdCompareData)) if (enableElasticDataload and not(enableElasticComparison)): loadElasticData(pdSourceData,elasticSource,elasticIndex,elasticDocType,configProperties) gc.collect() apilogger.info("Successfully Completed the Reconciliation/data load Process") except Exception as e: gc.collect() logger.error("Error in Reconciliation process - " + str(e)) apilogger.error("Error in elastic data load - {} . Line No - {} ".format(str(e),str(sys.exc_info()[-1].tb_lineno))) def postData(api,args,configProperties): logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH') global logger logger = configlogfile() try: validkeys = json.loads(configProperties.get('apiconfiguration','validkeys',raw=True)) if set(list(args.keys())).issubset(set(validkeys)): processStatus = reconProcess.processReconciliation(api,args,configProperties,False) return processStatus else: #raise Exception("Arguments are invalid and invalid arguments are " + ','.join(list(set(list(args.keys()))-set(validkeys))) + ".") return ((["9999"],"Arguments are invalid and invalid arguments are " + ','.join(list(set(list(args.keys()))-set(validkeys))) + ".")) except Exception as e: logger.error(str(e)) return ((["9999"],str(e))) if __name__ == '__main__': postData(api,args,configProperties)
Editor is loading...