Untitled
unknown
plain_text
2 years ago
2.7 kB
4
Indexable
import pandas as pd import json import gc import sys def pandas_factory(colnames, rows): return pd.DataFrame(rows, columns=colnames) def mapPrimarySI(serviceAttributes): try: # log.debug('primarySIRule---' + str(primarySIRule)) # log.debug('serviceAttributes--' + str(serviceAttributes)) for rule in primarySIRule: primarySI = {} # log.debug('serviceAttributes--' + str(serviceAttributes)) # log.debug('rule-' + rule) for k, v in serviceAttributes.items(): if k == rule: primarySI[k] = ' '.join(v) return primarySI return {} except Exception as e: log.warn('Error-' + str(e)) return {} def fetchData(apilogger, connection, sql, configProperties, dataset, entity): # print(sql) try: global log log = apilogger pdServiceAttributesData = pd.DataFrame() pdServiceAttributesData = pd.read_sql(sql, connection) pdServiceAttributesData.drop_duplicates(inplace=True) pdPrimarySIRule = pd.read_sql( "select param_value as \"paramValue\" from tib_chassis.tib_control_parameters where param_key like '%service_identifier_precedence_rule'", connection) global primarySIRule if len(pdPrimarySIRule) > 0: primarySIRule = pdPrimarySIRule['paramValue'][0] primarySIRule = primarySIRule.split(',') apilogger.debug('primarySIRule-' + str(primarySIRule)) if len(pdServiceAttributesData)>0: pdServiceAttributesData = pdServiceAttributesData.groupby(['subscriberid', 'serviceAttributeKey'])[ 'serviceAttributeValue'].apply(list).reset_index() #pdServiceAttributesData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/aaa.csv') pdServiceAttributesData = pdServiceAttributesData.groupby('subscriberid').apply( lambda x: dict(zip(x['serviceAttributeKey'], (x['serviceAttributeValue'])))).reset_index() pdServiceAttributesData['primaryserviceidentifier'] = pdServiceAttributesData['serviceAttributes'].apply( mapPrimarySI) apilogger.debug(pdServiceAttributesData.columns.values) # pdServiceAttributesData.to_csv('pdServiceAttributesData.csv') # print(pdServiceAttributesData['serviceType'].dtype) # print(pdServiceAttributesData) gc.collect() return pdServiceAttributesData except Exception as e: log.warn('Error-' + str(e)) log.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno))) return {} if __name__ == '__main__': fetchData(connection)
Editor is loading...