Untitled
unknown
plain_text
3 years ago
2.7 kB
9
Indexable
import pandas as pd
import json
import gc
import sys
def pandas_factory(colnames, rows):
return pd.DataFrame(rows, columns=colnames)
def mapPrimarySI(serviceAttributes):
try:
# log.debug('primarySIRule---' + str(primarySIRule))
# log.debug('serviceAttributes--' + str(serviceAttributes))
for rule in primarySIRule:
primarySI = {}
# log.debug('serviceAttributes--' + str(serviceAttributes))
# log.debug('rule-' + rule)
for k, v in serviceAttributes.items():
if k == rule:
primarySI[k] = ' '.join(v)
return primarySI
return {}
except Exception as e:
log.warn('Error-' + str(e))
return {}
def fetchData(apilogger, connection, sql, configProperties, dataset, entity):
# print(sql)
try:
global log
log = apilogger
pdServiceAttributesData = pd.DataFrame()
pdServiceAttributesData = pd.read_sql(sql, connection)
pdServiceAttributesData.drop_duplicates(inplace=True)
pdPrimarySIRule = pd.read_sql(
"select param_value as \"paramValue\" from tib_chassis.tib_control_parameters where param_key like '%service_identifier_precedence_rule'",
connection)
global primarySIRule
if len(pdPrimarySIRule) > 0:
primarySIRule = pdPrimarySIRule['paramValue'][0]
primarySIRule = primarySIRule.split(',')
apilogger.debug('primarySIRule-' + str(primarySIRule))
if len(pdServiceAttributesData)>0:
pdServiceAttributesData = pdServiceAttributesData.groupby(['subscriberid', 'serviceAttributeKey'])[
'serviceAttributeValue'].apply(list).reset_index()
#pdServiceAttributesData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/aaa.csv')
pdServiceAttributesData = pdServiceAttributesData.groupby('subscriberid').apply(
lambda x: dict(zip(x['serviceAttributeKey'], (x['serviceAttributeValue'])))).reset_index()
pdServiceAttributesData['primaryserviceidentifier'] = pdServiceAttributesData['serviceAttributes'].apply(
mapPrimarySI)
apilogger.debug(pdServiceAttributesData.columns.values)
# pdServiceAttributesData.to_csv('pdServiceAttributesData.csv')
# print(pdServiceAttributesData['serviceType'].dtype)
# print(pdServiceAttributesData)
gc.collect()
return pdServiceAttributesData
except Exception as e:
log.warn('Error-' + str(e))
log.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
return {}
if __name__ == '__main__':
fetchData(connection)
Editor is loading...