Untitled

 avatar
unknown
plain_text
2 years ago
2.7 kB
4
Indexable
import pandas as pd
import json
import gc
import sys


def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)


def mapPrimarySI(serviceAttributes):
    try:
        # log.debug('primarySIRule---' + str(primarySIRule))
        # log.debug('serviceAttributes--' + str(serviceAttributes))

        for rule in primarySIRule:
            primarySI = {}
            # log.debug('serviceAttributes--' + str(serviceAttributes))
            # log.debug('rule-' + rule)
            for k, v in serviceAttributes.items():
                if k == rule:
                    primarySI[k] = ' '.join(v)
                    return primarySI
        return {}
    except Exception as e:
        log.warn('Error-' + str(e))
        return {}


def fetchData(apilogger, connection, sql, configProperties, dataset, entity):
    # print(sql)
    try:
        global log
        log = apilogger
        pdServiceAttributesData = pd.DataFrame()
        pdServiceAttributesData = pd.read_sql(sql, connection)
        pdServiceAttributesData.drop_duplicates(inplace=True)
        pdPrimarySIRule = pd.read_sql(
        "select param_value as \"paramValue\"  from tib_chassis.tib_control_parameters where param_key like '%service_identifier_precedence_rule'",
        connection)
        global primarySIRule
        if len(pdPrimarySIRule) > 0:
            primarySIRule = pdPrimarySIRule['paramValue'][0]
            primarySIRule = primarySIRule.split(',')
            apilogger.debug('primarySIRule-' + str(primarySIRule))
        if len(pdServiceAttributesData)>0:    
            pdServiceAttributesData = pdServiceAttributesData.groupby(['subscriberid', 'serviceAttributeKey'])[
        'serviceAttributeValue'].apply(list).reset_index()
            #pdServiceAttributesData.to_csv('/app/server/HOBS-DataPipeline/logs/loadCRMSElasticData/aaa.csv')
            pdServiceAttributesData = pdServiceAttributesData.groupby('subscriberid').apply(
            lambda x: dict(zip(x['serviceAttributeKey'], (x['serviceAttributeValue'])))).reset_index()
            pdServiceAttributesData['primaryserviceidentifier'] = pdServiceAttributesData['serviceAttributes'].apply(
            mapPrimarySI)
            apilogger.debug(pdServiceAttributesData.columns.values)
    # pdServiceAttributesData.to_csv('pdServiceAttributesData.csv')
    # print(pdServiceAttributesData['serviceType'].dtype)
    # print(pdServiceAttributesData)
        gc.collect()
        return pdServiceAttributesData
    except Exception as e:
        log.warn('Error-' + str(e))
        log.error("Error - {} . Line No - {} ".format(str(e), str(sys.exc_info()[-1].tb_lineno)))
        return {}


if __name__ == '__main__':
    fetchData(connection)
Editor is loading...