Untitled
unknown
plain_text
3 years ago
24 kB
8
Indexable
import csv
import json
import numpy as np
import gc
import uuid
import numpy as np
import urllib.request
import requests
import pandas as pd
import sys
import os
from cassandra.cluster import Cluster, DefaultConnection
from cassandra.util import datetime_from_uuid1
from cassandra.query import dict_factory
from cassandra.auth import PlainTextAuthProvider
from config.logger import configlogfile
import apis.process.reconProcess as reconProcess
from config.configureAPIResponse import setResponse
from datetime import datetime
def pandas_factory(colnames,rows):
return pd.DataFrame(rows,columns = colnames)
def fetchElasticEntityData(__pdElasticData,entity,configProperties):
#apilogger.info("Task 5/" + configProperties.get('config','totalsteps') + "fetch elastic data for " + entity+ " started")
apilogger.info("Task 5 " + "fetch elastic data for " + entity+ " started")
elasticFilterCondition= configProperties.get(entity,'elasticfiltercondition')
__pdElasticEntityData = __pdElasticData.copy()
#exec("pdElasticEntityData = pdElasticData[" + elasticFilterCondition + "].reindex()",globals())
#pdElasticEntityData = pdElasticData[pdElasticData.id == pdElasticData.customerid].reindex()
__pdElasticEntityData.query(elasticFilterCondition, inplace = True)
#pdElasticEntityData = pdElasticData
apilogger.debug('__pdElasticEntityData -columns - ' + str(__pdElasticEntityData.columns.values.tolist()))
#print(pdElasticEntityData)
elasticColumnsforComparison = json.loads(configProperties.get(entity,'elasticcolumnsforcomparison'))
__pdElasticEntityData = __pdElasticEntityData[elasticColumnsforComparison]
__pdElasticEntityData = __pdElasticEntityData.reset_index(drop=True)
#pdElasticEntityData = pdElasticEntityData.loc[pdElasticEntityData['customerid'].isin(['C38028367'])].reindex()
#__pdElasticEntityData['source'] = 'Elastic'
#print(pdElasticEntityData.columns.values)
#print(len(__pdElasticData))
#print (pdElasticEntityData['contact'])
#__pdElasticEntityData.to_csv('elastic'+entity+'.csv')
gc.collect()
#apilogger.info("Task 5/" + configProperties.get('config','totalsteps') + "fetch elastic data for " + entity+ " completed")
apilogger.info("Task 5 " + "fetch elastic data for " + entity+ " completed")
return __pdElasticEntityData
def fetchSourceData(entity,configProperties,dbConnection):
#apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " + entity + " started")
apilogger.info("Task 4 " + "fetch source data for " + entity + " started")
queries = json.loads(configProperties.get(entity,'queries'))
keyColumns = json.loads(configProperties.get(entity,'keycolumns'))
#print('config-',configProperties)
pdSourceEntityData = pd.DataFrame()
i = 0
for query in queries:
sql = configProperties.get(entity,query)
#print(sql)
pdSubEntityData = pd.DataFrame()
pdSubEntityData = pd.read_sql(sql,con=dbConnection)
apilogger.debug( "Entity Volume " + str (len(pdSubEntityData)))
if i==0:
pdSourceEntityData = pdSubEntityData
else:
#print('pdSubEntityData-' + pdSubEntityData.columns.values)
#print('pdSourceEntityData-' + pdSourceEntityData.columns.values)
apilogger.debug(pdSourceEntityData.columns.values)
apilogger.debug(pdSubEntityData.columns.values)
#pdSourceEntityData = pd.merge(pdSourceEntityData,pdSubEntityData)
#pdSourceEntityData = pdSourceEntityData.merge(pdSubEntityData,on=keyColumns,how='outer')
#pdSourceEntityData = pd.set_index('customerid').join(other.set_index('customerid'))
pdSourceEntityData = pd.concat([pdSourceEntityData,pdSubEntityData], axis=0, ignore_index=True)
i = i + 1
datasets = json.loads(configProperties.get(entity,'datasets'))
apilogger.info( "Entity Volume " + str (len(pdSourceEntityData)))
apilogger.info( pdSourceEntityData.columns.values)
#print('completed fetching data of queries')
k=0
for dataset in datasets:
script = configProperties.get(entity,dataset+'.script')
import os
#print(os.getcwd())
exec('from apis.process.scripts.loadCRMSElasticData.'+script+ ' import fetchData as fetchData',globals())
#from fetchCustomerContactData import fetchData
sql = configProperties.get(entity,dataset+'.query')
pdDataset = fetchData(apilogger,dbConnection,sql,configProperties,dataset,entity)
apilogger.debug( "dataset-" + dataset + str (len(pdDataset)))
apilogger.debug( "dataset-columns" + pdDataset.columns.values)
if i>0 and k >= 0:
if len(pdDataset)>0:
pdSourceEntityData = pdSourceEntityData.merge(pdDataset,on=keyColumns,how='left')
if i==0 and k==0:
pdSourceEntityData = pdDataset
k = k +1
apilogger.debug( "dataset-columns" + pdSourceEntityData.columns.values)
apilogger.debug( "dataset-volume-" + dataset + str(len(pdSourceEntityData)))
pdColumns = pdSourceEntityData.columns
apilogger.debug( "columns-" + pdColumns)
#pdSourceEntityData.to_csv('dataset-'+entity+'.csv')
for col in pdColumns:
apilogger.debug( "columns-" + col + str('_x' in col))
if '_x' in col:
columnName = col[0:col.find('_x',0)]
apilogger.debug( "columnName-" + columnName)
pdSourceEntityData[columnName] = pdSourceEntityData[col]
pdSourceEntityData.loc[(pdSourceEntityData[col].isnull()),columnName] = pdSourceEntityData[columnName+'_y']
pdSourceEntityData = pdSourceEntityData.drop([col,columnName+'_y'],axis=1)
#pdSourceEntityData.to_csv('dataset-'+entity+'.csv.cleanup')
#pdSourceEntityData['contact'] = pdSourceEntityData['contact_x']
#pdSourceEntityData.loc[(pdSourceEntityData['contact_y'].isnull()),'contact'] = pdSourceEntityData['contact_y']
#pdSourceEntityData = pdSourceEntityData.drop(['contact_x','contact_y'],axis=1)
#apilogger.debug( "Entity Volume " + str (len(pdSubEntityData)))
apilogger.debug( pdSourceEntityData.columns.values)
if len(pdSourceEntityData) > 0 :
if 'partyId' in list(pdSourceEntityData.columns):
pdSourceEntityData['partyId'] = pdSourceEntityData['partyId'].astype(str)
if (entity in ['CustomerOrg','CustomerResidential']):
pdSourceEntityData['ownerName'] = pdSourceEntityData['firstname']+ ' '+ pdSourceEntityData['lastname']
if ('createdon' in list(pdSourceEntityData.columns)):
pdSourceEntityData['createdon'] = pdSourceEntityData['createdon'].dt.strftime('%Y-%m-%dT%H:%M:%S.000Z')
if ('dateofbirth' in list(pdSourceEntityData.columns)):
pdSourceEntityData['dateofbirth'] = pdSourceEntityData['dateofbirth'].dt.strftime('%d-%b-%Y %H:%M:%S')
if ('engagedParty' in list(pdSourceEntityData.columns)):
pdSourceEntityData['engagedParty'] = [ {} if type(x) != dict else x for x in pdSourceEntityData['engagedParty']]
if ('primaryserviceidentifier' in list(pdSourceEntityData.columns)):
pdSourceEntityData['primaryserviceidentifier'] = [ {} if type(x) != dict else x for x in pdSourceEntityData['primaryserviceidentifier']]
#pdSourceEntityData['engagedParty'] = pdSourceEntityData['engagedParty'].fillna('{}',inplace=True)
# pdSourceEntityData['engagedParty'] = pdSourceEntityData['engagedParty'].fillna({},inplace=True)
dbColumnsForComparison = json.loads(configProperties.get(entity,'dbcolumnsforcomparison'))
dbColumnsForComparison = [ column for column in dbColumnsForComparison if column in pdSourceEntityData.columns.values]
pdSourceEntityData = pdSourceEntityData[dbColumnsForComparison]
#pdSourceEntityData['source'] =configProperties.get('config','sourcedatabase')
#print('oracle -' + str(pdSourceEntityData['contact'].dtype))
#print(len(pdSourceEntityData))
#print(pdSourceEntityData.columns.values)
#pdSourceEntityData['serviceAttributes'].fillna({},inplace=True)
#pdSourceEntityData['serviceType'].fillna([],inplace=True)
pdSourceEntityData.fillna('null',inplace=True)
#print(pdSourceEntityData)
#pdSourceEntityData.to_csv('sourcedata-'+entity+'.csv')
gc.collect()
#apilogger.info("Task 4/" + configProperties.get('config','totalsteps') + "fetch source data for " + entity + " completed")
apilogger.info("Task 4 " + "fetch source data for " + entity + " completed")
return pdSourceEntityData
def fn_setProperties():
import configparser
configProperties = configparser.ConfigParser()
configProperties.read("config/reconcile/reconElasticData.config")
return configProperties
def setDBConnection(dbType,configProperties):
#apilogger.info("Task 1/" + configProperties.get('config','totalsteps') + " set database connection started")
apilogger.info("Task 1 " + " set database connection started")
if dbType == 'oracle':
import cx_Oracle
username = configProperties.get(dbType,'username')
password = configProperties.get(dbType,'password')
connectstring = configProperties.get(dbType,'connectstring')
conn = cx_Oracle.connect(username,password,connectstring)
elif (dbType == 'maria' or dbType == 'mysql'):
import mysql.connector
username = configProperties.get(dbType,'username')
password = configProperties.get(dbType,'password')
database = configProperties.get(dbType,'database')
host = configProperties.get(dbType,'host')
port = configProperties.get(dbType,'port')
conn = mysql.connector.connect( user = username,\
password = password,\
host = host,\
database = database,\
port = port)
#apilogger.info("Task 1/" + configProperties.get('config','totalsteps') + " set database connection completed")
apilogger.info("Task " + " set database connection completed")
return conn
def setElasticConnection(configProperties):
#apilogger.info("Task 2/" + configProperties.get('config','totalsteps') + " set elastic connection started")
apilogger.info("Task 2 " + " set elastic connection started")
from elasticsearch6 import Elasticsearch,RequestsHttpConnection
#from elasticsearch_dsl import Search
elasticSource = json.loads(configProperties.get('config','elasticsource'))
elasticSource = Elasticsearch(elasticSource,use_ssl=False,verify_certs=False,connection_class=RequestsHttpConnection)
#elasticSource = Elasticsearch(elasticSource,verify_certs=True)
#apilogger.info("Task 2/" + configProperties.get('config','totalsteps') + " set elastic connection completed")
apilogger.info("Task 2 " + " set elastic connection completed")
return elasticSource
def fetchElasticData(elasticSource,__elasticIndex,__elasticDocType,configProperties):
#apilogger.info("Task 3/" + configProperties.get('config','totalsteps') + " fetch elastic data started")
apilogger.info("Task 3 " + " fetch elastic data started")
#totalElasticRecords = elasticSource.count(index=__elasticIndex)['count']
from elasticsearch6 import Elasticsearch,RequestsHttpConnection
elasticTempSource = json.loads(configProperties.get('config','elasticsource'))
for server in range(len(elasticTempSource)):
elasticTempSource[server] = elasticTempSource[server] + '/'+__elasticIndex+'/' +__elasticDocType
elasticTempSource = Elasticsearch(elasticTempSource,use_ssl=False,verify_certs=False,connection_class=RequestsHttpConnection)
totalElasticRecords = elasticTempSource.count()['count']
doc = {
'size' : 10000,
'query': {
'match_all' : {}
}
}
pdElasticData = pd.DataFrame()
elasticData = elasticSource.search(index= __elasticIndex,doc_type = __elasticDocType,body=doc,scroll='1m')
#elasticData = elasticSource.search(body=doc,scroll='1m')
pdElasticData = pd.DataFrame.from_records(elasticData['hits']['hits'])
#return pdElasticData
dataFetchCount = 0
dataFetchCount = len(elasticData['hits']['hits'])
while dataFetchCount < totalElasticRecords:
elasticNextData = elasticSource.scroll(scroll_id =elasticData['_scroll_id'],scroll = '1m')
pdElasticData = pd.concat([pdElasticData,pd.DataFrame.from_records(elasticNextData['hits']['hits'])], ignore_index=True)
elasticData = elasticNextData
dataFetchCount = dataFetchCount + len(elasticNextData['hits']['hits'])
apilogger.debug('dataFetchCount-' + str(dataFetchCount) + ',totalElasticRecords-' + str(totalElasticRecords)+ 'pdElasticData-'+str(len(pdElasticData)))
if dataFetchCount == 0:
break
pdElasticData = pd.DataFrame.from_records(pdElasticData['_source'].to_list())
#pdElasticData.drop(['contact'])
#print(pdElasticData.columns.values)
gc.collect()
#apilogger.info("Task 3/" + configProperties.get('config','totalsteps') + " fetch elastic data completed")
apilogger.info("Task 3 " + " fetch elastic data completed")
return pdElasticData
def genElasticDoc(pdErroredData,elasticIndex,elasticDocType,configProperties):
import ast
erroredRecords = pdErroredData.to_dict(orient ='records')
i = 0
for record in erroredRecords:
#print(record)
#apilogger.info(record)
#print(type(record['contact']))
if 'contact' in record.keys():
if type(record['contact']) != list :
record['contact'] = ast.literal_eval(record['contact'])
i = i + 1
#print(i)
#apilogger.info(record)
yield {
'_index': elasticIndex,
# '_type': elasticDocType,
'_id' : record['id'],
'_source': record
}
"""
yield {
'_id' : record['id'],
'_source': record
}
"""
def loadElasticData(pdErroredData,elasticSource,elasticIndex,elasticDocType,configProperties):
from elasticsearch6 import helpers
#apilogger.info("Task 6/" + configProperties.get('config','totalsteps') + " load elastic data started")
apilogger.info("Task 6 " + " load elastic data started")
#pdErroredData.to_csv('elasticdata.csv')
#response = []
try:
#response = helpers.bulk(elasticSource,genElasticDoc(pdErroredData,elasticIndex,elasticDocType,configProperties),chunk_size=configProperties.get('config','elasticDataLoadVolume'),stats_only=False,raise_on_exception=False,raise_on_error = False)
response = helpers.bulk(elasticSource,genElasticDoc(pdErroredData,elasticIndex,elasticDocType,configProperties),chunk_size= 5000,stats_only=False,raise_on_exception=False,raise_on_error = False)
#print('sdsd'+str(response))
except Exception as e:
error = ""
apilogger.info('Error in elastic Data Load'+str(e))
#print('Error in elastic Data Load'+str(e))
errors = set()
erroredIds = []
apilogger.info('response - ' + str(response))
apilogger.info('response - ' + str(type(response)))
apilogger.info('response - ' + str(response[0]))
if len(response) > 0 :
apilogger.info('Reportdetails-rows-reloaded-' + str(response[0]))
apilogger.info('Reportdetails-rows-error in reloading-' + str(len(response[1])))
for i in range(len(response[1])):
erroredIds.append(str(response[1][i]['index']['_id']))
errors.add(str(response[1][i]['index']['error']))
if len(response[1]) > 0 :
apilogger.info('Reportdetails-rows-erroredIds-' + str(erroredIds))
apilogger.info('Reportdetails-rows-errors-' + str(errors))
gc.collect()
#apilogger.info("Task 6/" + configProperties.get('config','totalsteps') + " load elastic data completed")
apilogger.info("Task 6 " + " load elastic data completed")
def chkProcess(api):
if (os.path.isfile(dirpath+'/apis/process/'+ api + '.py')):
return True
def apilogfile(api):
import logging as __logging
import logging.handlers as __handlers
__logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH')
#print(__logpath)
__logging.shutdown()
__log_handler = __logging.FileHandler(__logpath+'/logs/'+api + '.log',mode='w')
formatter = __logging.Formatter(fmt='%(asctime)s [Process:%(process)d] [%(levelname)-8s] [%(filename)s - %(module)s] : %(message)s',datefmt= '%Y-%m-%d %H:%M:%S')
#formatter.converter = time.gmtime # if you want UTC tGime
__log_handler.setFormatter(formatter)
__logger = __logging.getLogger('apilog'+str(uuid.uuid1()))
if not __logger.handlers:
__logger.addHandler(__log_handler)
__logger.setLevel(__logging.DEBUG)
__logger.info("wewewe")
return __logger
def processData(api,args,configProperties):
logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH')
logger.info("Process in Progress")
if(os.path.isfile(logpath+'/logs/'+ api + '.log')):
os.rename(logpath+'/logs/'+api + '.log',logpath+'/logs/'+api + '.log' + datetime.now().strftime("%Y%m%d%H%M%S"))
#os.rename(logpath+'/logs/'+api + '.log')
global apilogger
apilogger=apilogfile(api)
apilogger.info("Initiated the process")
try:
if 'index' in args:
elasticIndex = args['index']
elasticIndexes = json.loads( configProperties.get('config','elasticindexes'))
if not (elasticIndex in elasticIndexes):
raise Exception("Invalid index value passed as argument.Valid index values are " + ','.join(elasticIndexes) )
else:
raise Exception("Mandatory argument index is not passed")
if 'docType' in args:
elasticDocType = args['docType']
elasticDocTypes = json.loads( configProperties.get('config',elasticIndex))
if not (elasticDocType in elasticDocTypes):
raise Exception("Invalid document type value passed as argument.Valid document types are " + ','.join(elasticDocTypes))
else:
raise Exception("Mandatory argument docType is not passed")
if 'reconcile' in args:
enableElasticComparison = args['reconcile']
else:
if configProperties.get('config','enableelasticcomparison') == "True":
enableElasticComparison = True
else:
enableElasticComparison = False
#apilogger.debug('elasticDataLoad-' + args['elasticDataLoad'])
if 'elasticDataLoad' in args:
enableElasticDataload = args['elasticDataLoad']
else:
if configProperties.get('config','enableelasticdataload') =="True":
enableElasticDataload = True
else:
enableElasticDataload = False
apilogger.debug('enableElasticDataload-' + str(enableElasticDataload));
apilogger.debug("elasticIndex " + elasticIndex)
apilogger.debug("elasticDocType " + elasticDocType)
entitiesForRecon = json.loads(configProperties.get('config',elasticDocType))
sourceDBType = configProperties.get('config','sourcedatabase')
dbConnection = setDBConnection(sourceDBType,configProperties);
elasticSource = setElasticConnection(configProperties);
#enableElasticComparison = configProperties.get('config','enableelasticcomparison')
#enableElasticDataload = configProperties.get('config','enableelasticdataload')
apilogger.debug('enableElasticComparison-' + str(enableElasticComparison))
apilogger.debug('enableElasticDataload-' + str(enableElasticDataload))
if (enableElasticComparison):
pdElasticData = fetchElasticData(elasticSource,elasticIndex,elasticDocType,configProperties)
#print('pdElasticData-'+ str(len(pdElasticData)))
pdCompareData = pd.DataFrame()
for entity in entitiesForRecon:
#return ("Entity")
#print('entity-' + entity)
pdSourceData = fetchSourceData(entity,configProperties,dbConnection)
apilogger.info('Reportdetails-start-' + entity)
apilogger.info('Reportdetails-rows-source-' + str(len(pdSourceData)))
if 'contact' in pdSourceData.columns:
pdSourceData['contact'] = [ [] if type(x) != list else x for x in pdSourceData['contact']]
if 'serviceType' in pdSourceData.columns:
pdSourceData['serviceType'] = [ [] if type(x) != list else x for x in pdSourceData['serviceType']]
if 'serviceAttributes' in pdSourceData.columns:
pdSourceData['serviceAttributes'] = [ {} if type(x) != dict else x for x in pdSourceData['serviceAttributes']]
if enableElasticComparison:
#print('pdElasticData-'+ str(len(pdElasticData)))
pdElasticEntityData = fetchElasticEntityData(pdElasticData,entity+'-elastic',configProperties)
#isnull = pdSourceData.contact.isnull()
#pdSourceData.loc[isnull,'contact'] = []* isnull.sum()
#pdElasticEntityData['contact'] = pdSourceData['contact'].astype(str)
elasticColumnsforComparison = json.loads(configProperties.get(entity,'elasticcolumnsforcomparison'))
dbColumnsForComparison = json.loads(configProperties.get(entity,'dbcolumnsforcomparison'))
#print(pdSourceData)
if elasticColumnsforComparison != dbColumnsForComparison:
continue
#print(pdSourceData.columns.values)
#print('source-' + str(len(pdSourceData)))
apilogger.info(pdElasticEntityData.columns.values)
pdElasticEntityDataVolume = pdElasticEntityData.astype(str)
pdElasticEntityDataVolume.drop_duplicates(keep='last',inplace=True)
#print('elastic-' + str(len(pdElasticEntityDataVolume))+str(len(pdElasticEntityData)))
apilogger.info('Reportdetails-rows-elastic-' + str(len(pdElasticEntityDataVolume)))
#print((pdSourceData.astype(str)).equals(pdElasticEntityData.astype(str)))
if not pdSourceData.equals(pdElasticEntityData):
pdSourceData['source']=sourceDBType
pdElasticEntityData['source']='Elastic'
pdCompareData = pd.concat([pdElasticEntityData,pdSourceData],ignore_index=True)
#print(len(pdCompareData))
pdCompareData = pdCompareData.astype(str)
#pdCompareData.to_csv('compare-'+entity +'.csv')
#print(len(pdCompareData))
pdCompareData.drop_duplicates(subset=dbColumnsForComparison,keep=False,inplace=True)
#pdCompareData.to_csv('Duplicates-'+entity +'.csv')
#print(pdCompareData)
#print(len(pdCompareData))
pdCompareData = pdCompareData.loc[pdCompareData['source']==sourceDBType]
#pdCompareData.to_csv('Missing Records-'+entity +'.csv')
#print(pdCompareData.columns.values)
pdCompareData.drop(['source'],axis=1,inplace=True)
#print(pdCompareData.columns.values)
#print(pdCompareData)
enableErroredDataLoad = configProperties.get('config','enableErroredDataLoad')
#print(len(pdCompareData))
apilogger.info('Reportdetails-rows-errored-' + str(len(pdCompareData)))
apilogger.info('Reportdetails-rows-errors-' + str(pdCompareData['id'].tolist()))
if (len(pdCompareData)>0):
if enableElasticDataload:
loadElasticData(pdCompareData,elasticSource,elasticIndex,elasticDocType,configProperties)
#print(len(pdCompareData))
if (enableElasticDataload and not(enableElasticComparison)):
loadElasticData(pdSourceData,elasticSource,elasticIndex,elasticDocType,configProperties)
gc.collect()
apilogger.info("Successfully Completed the Reconciliation/data load Process")
except Exception as e:
gc.collect()
logger.error("Error in Reconciliation process - " + str(e))
apilogger.error("Error in elastic data load - {} . Line No - {} ".format(str(e),str(sys.exc_info()[-1].tb_lineno)))
def postData(api,args,configProperties):
logpath = os.environ.get('ANALYTICSDATASERVICELOGPATH')
global logger
logger = configlogfile()
try:
validkeys = json.loads(configProperties.get('apiconfiguration','validkeys',raw=True))
if set(list(args.keys())).issubset(set(validkeys)):
processStatus = reconProcess.processReconciliation(api,args,configProperties,False)
return processStatus
else:
#raise Exception("Arguments are invalid and invalid arguments are " + ','.join(list(set(list(args.keys()))-set(validkeys))) + ".")
return ((["9999"],"Arguments are invalid and invalid arguments are " + ','.join(list(set(list(args.keys()))-set(validkeys))) + "."))
except Exception as e:
logger.error(str(e))
return ((["9999"],str(e)))
if __name__ == '__main__':
postData(api,args,configProperties)
Editor is loading...