Untitled

mail@pastecode.io avatar
unknown
plain_text
a year ago
38 kB
1
Indexable
Never
import pandas as pd
import json
import ast, sys
from epc.dags.syncEPCStores import mergeEntityMasterData
from datetime import date, datetime
from utils.formatSQLQuery import formatSQLQuery
from airflow import AirflowException


# from.fetchDiscountPriceData import fetchDiscount as discount

def applyDiscount(apilogger, price, discountValue, discountUnits):
    try:
        print('discountLogic')
        print(price)
        print(discountValue)
        print(discountUnits)
        discountedPrice = 0
        if not (pd.isna(discountValue)):
            if discountUnits == '%':
                discountedPrice = price * (float(discountValue) / 100)
            elif discountUnits == 'VAL':
                discountedPrice = float(discountValue)
            else:
                discountedPrice = 0

        else:
            discountedPrice = 0

        return discountedPrice

    except Exception as errInDiscCalc:
        logger.error(str(errInDiscCalc))
        logger.error("Error - {} . Line No - {} ".format(str(errInDiscCalc), str(sys.exc_info()[-1].tb_lineno)))
    raise AirflowException("Task failed due to error-." + str(e))


# return discountedPrice

def fetchDiscount(connection, sql, apilogger, __productIds, args, configProperties, keyColumns, entity,
                  __pdReleaseEntities, __pdProductPriceData):
    try:
        dicountQuery = configProperties.get('discountDetails', 'query', raw=True)
        __pdDiscountDetails = mergeEntityMasterData(entity, configProperties, connection, __productIds, args,
                                                    __pdReleaseEntities, dicountQuery, 'discountDetails')
        __pdProductPriceData = __pdProductPriceData.drop(columns=['discountId'])
        if len(__pdDiscountDetails) > 0:
            if 'units' in __pdDiscountDetails.columns.values:
                __pdDiscountDetails['discountUnits'] = __pdDiscountDetails['units']
                __pdDiscountDetails = __pdDiscountDetails.drop(columns=['units'])
            if 'amount' in __pdDiscountDetails.columns.values:
                __pdDiscountDetails['discountValue'] = __pdDiscountDetails['amount']
                __pdDiscountDetails = __pdDiscountDetails.drop(columns=['amount'])
            if 'Version' in __pdDiscountDetails.columns.values:
                __pdDiscountDetails = __pdDiscountDetails.drop(columns=['Version'])

            __pdDiscountDetails.to_csv('__pdProductDiscountDetails_1.' + args['releaseId'] + '.csv')
            itemDiscountQuery = configProperties.get('itemDiscount', 'query', raw=True)
            __pdItemDiscountDetails = mergeEntityMasterData(entity, configProperties, connection, __productIds, args,
                                                            __pdReleaseEntities, itemDiscountQuery, 'itemDiscount')
            if len(__pdItemDiscountDetails) > 0:
                if 'Version' in __pdItemDiscountDetails.columns.values:
                    __pdItemDiscountDetails = __pdItemDiscountDetails.drop(columns=['Version'])
                __pdItemDiscountDetails.to_csv('__pdProductItemDiscountDetails.' + args['releaseId'] + '.csv')
                __pdDiscountDetails = __pdDiscountDetails.merge(__pdItemDiscountDetails,
                                                                right_on=['discountedItemId', 'opID', 'buID'],
                                                                left_on=['discountId', 'opID', 'buID'], how='inner')
                __pdDiscountDetails.to_csv('__pdProductDiscountDetails_2.' + args['releaseId'] + '.csv')
            __pdProductPriceData1 = __pdProductPriceData.merge(__pdDiscountDetails,
                                                               left_on=["productComponentId", "opID", "buID"],
                                                               right_on=["discountAppliedOnComponentID", "opID",
                                                                         "buID"], how='left')
            __pdProductPriceData1.to_csv('__pdProductDiscountDetails_3.' + args['releaseId'] + '.csv')
            __pdProductPriceData1['discountedPrice'] = __pdProductPriceData1[
                ['price', 'discountValue', 'discountUnits']].apply(
                lambda x: applyDiscount(apilogger, x['price'], x['discountValue'], x['discountUnits']), axis=1)
            __pdProductPriceData1 = __pdProductPriceData1[
                ['discountId', 'opID', 'buID', 'discountedPrice', 'discountAppliedOn', 'discountAppliedOnComponentID']]
            __pdProductPriceData1 = __pdProductPriceData.merge(__pdProductPriceData1,
                                                               left_on=["productComponentId", "opID", "buID"],
                                                               right_on=["discountId", "opID", "buID"], how='left')
            __pdProductPriceData1.to_csv('__pdProductDiscountDetails_0.' + args['releaseId'] + '.csv')
            __pdProductPriceData1.loc[__pdProductPriceData1['discountedPrice'] > 0, 'discountApplied'] = 'Y'
            __pdProductPriceData1.loc[__pdProductPriceData1['discountedPrice'] == 0, 'discountApplied'] = 'N'
            __pdProductPriceData1.to_csv('__pdProductDiscountDetails_0.1.' + args['releaseId'] + '.csv')
            __pdProductPriceData1 = __pdProductPriceData1[__pdProductPriceData1['discountApplied'] == 'Y']
            __pdProductPriceData1['discountComponentId'] = __pdProductPriceData1['discountId']
            __pdProductPriceData1['discountId'] = __pdProductPriceData1['productid']
            __pdProductPriceData1['discountedPrice'] = __pdProductPriceData1['discountedPrice'] * -1
            __pdProductPriceData1 = __pdProductPriceData1[
                ['discountId', 'discountComponentId', 'opID', 'buID', 'discountedPrice', 'discountAppliedOn',
                 'discountAppliedOnComponentID', 'discountAmount']]

            __pdProductPriceData1.to_csv('__pdProductDiscountDetails.' + args['releaseId'] + '.csv')
        return __pdProductPriceData1
    except Exception as errInDiscountLogic:
        apilogger.error(str(errInDiscountLogic))
        apilogger.error("Error in fetchDiscount function - {} . Line No - {} ".format(str(errInDiscountLogic), str(
            sys.exc_info()[-1].tb_lineno)))
    raise AirflowException("Task failed due to error-." + str(e))


# return __pdProductPriceData


def validateData(value):
    try:
        return ast.literal_eval(value)

    except Exception as e:
        # apilogger.debug(str(e))
        productPriceLogger.error("Error in formatting data - {} . Line No - {} ".format(str(e) + '-value-' + str(value),
                                                                                        str(sys.exc_info()[
                                                                                                -1].tb_lineno)))
        return value


def assignDescription(value):
    try:
        productPriceLogger.debug('label-' + str(value))
        productPriceLogger.debug(str('DEFAULT' in value))
        # logger.debug(str(value['DEFAULT']))
        productPriceLogger.debug(str(type(value)))
        if 'DEFAULT' in value['offerPriceDescription']:
            return value['offerPriceDescription']['DEFAULT']
        else:
            return ''
    except Exception as e:
        productPriceLogger.error("Error-" + str(e))
        return value


def formatDate(startDateTime, endDateTime):
    try:
        formattedDate = {}
        startDateTime = pd.to_datetime(startDateTime)
        endDateTime = pd.to_datetime(endDateTime)
        print('startDateTime-' + str(startDateTime) + 'endDateTime-' + str(endDateTime))
        if not pd.isna(startDateTime):
            formattedDate['startDateTime'] = startDateTime.strftime('%Y-%m-%dT%H:%M:%SZ')

        if not pd.isna(endDateTime):
            formattedDate['endDateTime'] = endDateTime.strftime('%Y-%m-%dT%H:%M:%SZ')
        return formattedDate
    except Exception as e:
        productPriceLogger.error('Error in formatting effectiveTimePeriod columns - ' + str(e))
        productPriceLogger.error("Error in fomartting effectiveTimePeriod columns - {} . Line No - {} ".format(
            str(e) + '-startdate-' + str(startDateTime) + '-endDate-' + str(endDateTime),
            str(sys.exc_info()[-1].tb_lineno)))
        raise AirflowException("Task failed due to error-." + str(e))
        # return {}


def mapTermPeriod(startDateTime, endDateTime, duration, units):
    try:
        formattedValue = {}
        if not pd.isna(startDateTime):
            formattedValue['startDateTime'] = startDateTime.strftime('%Y-%m-%dT%H:%M:%SZ')

        if not pd.isna(endDateTime):
            formattedValue['endDateTime'] = endDateTime.strftime('%Y-%m-%dT%H:%M:%SZ')

        if not pd.isna(duration):
            formattedValue['duration'] = duration
        if not pd.isna(units):
            formattedValue['units'] = units
        return formattedValue
    except Exception as e:
        productPriceLogger.error('Error in formatting effectiveTimePeriod columns - ' + str(e))
        productPriceLogger.error("Error in fomartting effectiveTimePeriod columns - {} . Line No - {} ".format(
            str(e) + '-startdate-' + str(startDateTime) + '-endDate-' + str(endDateTime),
            str(sys.exc_info()[-1].tb_lineno)))
        raise AirflowException("Task failed due to error-." + str(e))
        # return {}


def fetchDiscountDetails(configProperties, connection, __productIds, args, __pdReleaseEntities):
    pdDiscountDetails = pd.DataFrame()
    try:
        productPriceLogger.info('started the process to fetchDiscountDetauls')
        discountSql = configProperties.get('discountDetails', 'query')
        productPriceLogger.info('discountSql-' + discountSql)
        pdDiscountDetails = mergeEntityMasterData('DiscountDefinition', configProperties, connection, __productIds,
                                                  args, __pdReleaseEntities, discountSql, 'discountDetails')
        if len(pdDiscountDetails) > 0:
            pdDiscountDetails['discountValue'] = pdDiscountDetails[['amount', 'units']].to_dict(orient='records')
            pdItemDiscountDetails = fetchItemDiscountDetails(configProperties, connection, __productIds, args,
                                                             __pdReleaseEntities)
            if len(pdItemDiscountDetails) > 0:
                # pdItemDiscountDetails.to_csv('pdItemDiscountDetails.csv')
                pdDiscountDetails = pdDiscountDetails.merge(pdItemDiscountDetails,
                                                            left_on=['discountId', 'opID', 'buID'],
                                                            right_on=['discountedItemId', 'opID', 'buID'], how='left')
                pdDiscountDetails['discountedItem'].fillna('[]', inplace=True)
                pdDiscountDetails['discountedItem'] = pdDiscountDetails['discountedItem'].astype(str)

                pdDiscountDetails['discountedItem'] = pdDiscountDetails['discountedItem'].apply(validateData)
                pdDiscountDetails.to_csv('pdItemDiscountDetails.' + args['releaseId'] + '.csv')
            dictColumns = json.loads(configProperties.get('discountDetails', 'dictcolumns'))
            print('dictColumns-' + str(dictColumns))
            dictColumns = [column for column in dictColumns if column in pdDiscountDetails.columns]
            pdDiscountDetails['discountSpecification'] = pdDiscountDetails[dictColumns].to_dict(orient='records')
            pdDiscountDetails['discountSpecification'] = pdDiscountDetails['discountSpecification'].apply(
                lambda x: {'discountSpecification': x})
            pdDiscountDetails = pdDiscountDetails[['discountId', 'opID', 'buID', 'discountSpecification']]
        pdDiscountDetails.to_csv('pdDiscountDetails_func.' + args['releaseId'] + '.csv')
        return pdDiscountDetails
    except Exception as err:
        productPriceLogger.error("Error  - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
        return pdDiscountDetails


def fetchItemDiscountDetails(configProperties, connection, __productIds, args, __pdReleaseEntities):
    pdItemDiscountDetails = pd.DataFrame()
    try:
        productPriceLogger.info('started the process to fetchItemDiscountDetauls')
        itemDiscountSql = configProperties.get('itemDiscount', 'query')
        productPriceLogger.info('ItemDiscountSql-' + itemDiscountSql)
        pdItemDiscountDetails = mergeEntityMasterData('ItemDiscountMap', configProperties, connection, __productIds,
                                                      args, __pdReleaseEntities, itemDiscountSql, 'itemDiscount')
        dictColumns = json.loads(configProperties.get('itemDiscount', 'dictcolumns'))
        print('dictColumns-' + str(dictColumns))
        dictColumns = [column for column in dictColumns if column in pdItemDiscountDetails.columns]
        if len(pdItemDiscountDetails) > 0:
            pdItemDiscountDetails.fillna('', inplace=True)
            pdItemDiscountDetails['discountedItem'] = pdItemDiscountDetails[dictColumns].to_dict(orient='records')
            # pdItemDiscountDetails['discountItems'] = pdItemDiscountDetails['discountItems'].apply(lambda x : {'discountItems' : x  })
            pdItemDiscountDetails = pdItemDiscountDetails.groupby(['discountedItemId', 'opID', 'buID']).apply(
                lambda x: list(x['discountedItem'])).reset_index(name='discountedItem')
            pdItemDiscountDetails = pdItemDiscountDetails[['discountedItemId', 'opID', 'buID', 'discountedItem']]
        pdItemDiscountDetails.to_csv('pdItemDiscountDetails_func.' + args['releaseId'] + '.csv')
        return pdItemDiscountDetails
    except Exception as err:
        productPriceLogger.error("Error  - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
        raise AirflowException("Task failed due to error-." + str(e))
        # return pdItemDiscountDetails


def fetchData(connection, sql, apilogger, __productIds, args, configProperties, keyColumns, entity,
              __pdReleaseEntities):
    apilogger.debug(sql)
    global productPriceLogger
    productPriceLogger = apilogger
    sql = sql.replace(':releases', ','.join('\'{0}\''.format(product) for product in __productIds))
    sqlArguments = json.loads(configProperties.get('queries', 'sqlParams'))
    apilogger.debug('sqlArguments-' + str(sqlArguments))
    sqlArguments = {k: v for k, v in args.items() if k in sqlArguments}
    apilogger.debug('sqlArguments-' + str(sqlArguments))
    dictColumns = json.loads(configProperties.get('productPrice', 'dictColumns'))
    pdProductPriceData = mergeEntityMasterData(entity, configProperties, connection, __productIds, args,
                                               __pdReleaseEntities, sql, 'productPrice')
    # pdProductPriceData.fillna('',inplace=True)
    subQuerysql2 = configProperties.get('productCompExtAttributes', 'query')
    pdProductCompExtAttributes = mergeEntityMasterData('ExternalAttrMap', configProperties, connection, __productIds,
                                                       args, __pdReleaseEntities, subQuerysql2,
                                                       'productCompExtAttributes')
    if len(pdProductCompExtAttributes) > 0:
        pdProductCompExtAttributes.fillna('',
                                          inplace=True)  # included the logic to hanndle attributes which have null values
        pdProductCompExtAttributes['pricingAttributes'] = pdProductCompExtAttributes[['name', 'value']].to_dict(
            orient='records')
        pdProductCompExtAttributes = pdProductCompExtAttributes.groupby(['productComponentId', 'opID', 'buID']).apply(
            lambda x: {'pricingAttributes': list(x['pricingAttributes'])}).reset_index(name='pricingAttributes')
        # pdProductPriceData = pdProductPriceData.merge(pdProductCompExtAttributes,on =  ['productComponentId','opID','buID']  , how = 'left')
    # pdProductCompExtAttributes.to_csv('pdProductCompExtAttributes.csv')

    # pdProductPriceData['productOfferingPrice'] = pdProductPriceData[dictColumns].to_dict(orient = 'records')
    # pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].astype(str)
    apilogger.debug('pdProductPriceData-' + str(len(pdProductPriceData)))
    if len(pdProductPriceData) > 0:
        # pdProductPriceData.drop_duplicates(inplace=True)
        pdProductPriceData = pdProductPriceData[
            (pdProductPriceData['endDateTime'].isnull()) | (pdProductPriceData['endDateTime'] > datetime.now())]
        if (len(pdProductPriceData)) > 0:
            pdProductPriceData['createdDate'] = pdProductPriceData['createdDate'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
            ### Included the logic to format values for termPeriod
            ### begin
            # pdProductPriceData['endDateTime'] = pdProductPriceData['endDateTime'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
            # pdProductPriceData['startDateTime']= pdProductPriceData['startDateTime'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
            pdProductPriceData['duration'].fillna(0, inplace=True)
            pdProductPriceData['duration'] = pdProductPriceData['duration'].round(0).astype(int)
            pdProductPriceData['units'].fillna('', inplace=True)
            # pdProductPriceData['termPeriod'] = pdProductPriceData[['startDateTime','endDateTime','duration','units']].to_dict(orient ='records')
            pdProductPriceData['validFor'] = pdProductPriceData[['startDateTime', 'endDateTime']].apply(
                lambda x: formatDate(x['startDateTime'], x['endDateTime']), axis=1)
            pdProductPriceData['termPeriod'] = pdProductPriceData[
                ['startDateTime', 'endDateTime', 'duration', 'units']].apply(
                lambda x: mapTermPeriod(x['startDateTime'], x['endDateTime'], x['duration'], x['units']), axis=1)
            ### end
            apilogger.debug('pdProductPriceData-' + str(len(pdProductPriceData)))
            subQuerysql = configProperties.get('productPriceLang', 'query', raw=True)
            subQuerysql = subQuerysql.replace(':releases',
                                              ','.join('\'{0}\''.format(product) for product in __productIds))
            apilogger.debug('subQuerysql-' + subQuerysql)
            apilogger.debug('sqlArguments-' + str(sqlArguments))
            formatQueryFlag, subQuerysql = formatSQLQuery(subQuerysql, sqlArguments)
            if not formatQueryFlag:
                print("Error in format Sql query {} for Arguments {} ".format(subQuerysql, sqlArguments))
                raise Exception("Error in format Sql query {} for Arguments {} ".format(subQuerysql, sqlArguments))
            pdProductComponentDefn1 = pd.read_sql(subQuerysql, con=connection)
            pdProductComponentDefn1 = pdProductComponentDefn1[
                (pdProductComponentDefn1['endDate'].isnull()) | (pdProductComponentDefn1['endDate'] > datetime.now())]
            pdProductComponentDefn1.fillna('', inplace=True)
            pdProductComponentDefn1 = pdProductComponentDefn1.drop(columns=['endDate'], axis=1)
            subQuerysql1 = configProperties.get('productPriceBillText', 'query')
            subQuerysql1 = subQuerysql1.replace(':releases',
                                                ','.join('\'{0}\''.format(product) for product in __productIds))
            # pdProductComponentDefn2 = pd.read_sql(subQuerysql1,params = sqlArguments,con = connection)
            pdProductComponentDefn2 = mergeEntityMasterData(entity, configProperties, connection, __productIds, args,
                                                            __pdReleaseEntities, subQuerysql1, 'productPriceBillText')
            pdProductComponentDefn2.fillna('', inplace=True)
            # pdProductComponentDefn2.to_csv('compdefn1.csv')
            pdProductComponents = pdProductComponentDefn2.copy()
            pdProductComponents = pdProductComponents[
                ["productComponentId", "opID", "buID", "effectiveStartDate", "effectiveEndDate", "computationFormula",
                 "lifecycleStatus", "overridable"]]
            pdProductComponents['effectiveTimePeriod'] = pdProductComponents[
                ['effectiveStartDate', 'effectiveEndDate']].apply(
                lambda x: formatDate(x['effectiveStartDate'], x['effectiveEndDate']), axis=1)
            pdProductComponents['effectiveTimePeriod'] = pdProductComponents['effectiveTimePeriod'].astype(str)
            pdProductComponents.to_csv('pdProductComponents.' + args['releaseId'] + '.csv')
            pdProductPriceData = pdProductPriceData.merge(pdProductComponents,
                                                          on=['productComponentId', 'opID', 'buID'], how='left')
            pdProductPriceData['effectiveTimePeriod'].fillna('{}', inplace=True)
            pdProductPriceData['computationFormula'].fillna('', inplace=True)
            pdProductPriceData['lifecycleStatus'].fillna('', inplace=True)
            pdProductPriceData['overridable'].fillna('', inplace=True)
            if 'overridable' in pdProductPriceData.columns:
                pdProductPriceData.loc[pdProductPriceData['overridable'] != 'Y', 'overridable'] = False
                pdProductPriceData.loc[pdProductPriceData['overridable'] == 'Y', 'overridable'] = True
            else:
                pdProductPriceData['overridable'] = False
            if 'isTaxInclusive' in pdProductPriceData.columns:
                pdProductPriceData.loc[pdProductPriceData['isTaxInclusive'] != 'Y', 'isTaxInclusive'] = False
                pdProductPriceData.loc[pdProductPriceData['isTaxInclusive'] == 'Y', 'isTaxInclusive'] = True
            else:
                pdProductPriceData['isTaxInclusive'] = False
            if 'installmentEnabled' in pdProductPriceData.columns:
                pdProductPriceData.loc[pdProductPriceData['installmentEnabled'] != 'Y', 'installmentEnabled'] = False
                pdProductPriceData.loc[pdProductPriceData['installmentEnabled'] == 'Y', 'installmentEnabled'] = True
            else:
                pdProductPriceData['installmentEnabled'] = False

            pdProductPriceData['effectiveTimePeriod'] = pdProductPriceData['effectiveTimePeriod'].apply(validateData)
            pdProductComponentDefn2 = pdProductComponentDefn2[
                ["productComponentId", "opID", "buID", "version", "langCode", "langValue"]]
            pdInstallmentDets = pdProductPriceData[pdProductPriceData['hasInstallment'] == 'Y']
            pdInstallmentDets = pdInstallmentDets.groupby(keyColumns).agg({'hasInstallment': 'count'}).reset_index()
            pdInstallmentDets['hasInstallment'] = True
            pdInstallmentDets.to_csv('pdInstallmentDets.' + args['releaseId'] + '.csv')
            apilogger.info('pdProductComponentDefn2-' + subQuerysql1)
            pdProductComponentDefn1 = pdProductComponentDefn1.astype(str)
            pdProductComponentDefn2 = pdProductComponentDefn2.astype(str)
            pdProductComponentDefn = pd.concat([pdProductComponentDefn1, pdProductComponentDefn2], ignore_index=True)
            apilogger.info('pdProductComponentDefn-' + pdProductComponentDefn)
            pdProductPriceData.fillna('', inplace=True)  # included to map columns with null values as ""
            pdProductPriceData['productOfferingPrice'] = pdProductPriceData[dictColumns].to_dict(orient='records')
            pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].astype(str)
            pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].apply(validateData)
            pdDiscountDetails = fetchDiscountDetails(configProperties, connection, __productIds, args,
                                                     __pdReleaseEntities)
            if len(pdDiscountDetails) > 0:
                pdProductPriceData = pdProductPriceData.merge(pdDiscountDetails,
                                                              left_on=['productComponentId', 'opID', 'buID'],
                                                              right_on=['discountId', 'opID', 'buID'], how='left')
                pdProductPriceData['discountSpecification'].fillna('{}', inplace=True)
                pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].astype(str)
                pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].apply(
                    validateData)
            # begin included the logic to handle releases which have products with no discount dedetails
            if 'discountSpecification' not in pdProductPriceData.columns.values:
                pdProductPriceData['discountSpecification'] = '{}'
                pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].apply(
                    validateData)
            # begin included the logic to handle releases which have products with no discount dedetails
            pdProductPriceData.to_csv('pdDiscountDetails_pdProductPriceData.' + args['releaseId'] + '.csv')
            if len(pdProductComponentDefn) > 0:
                pdProductComponentDefn = pdProductComponentDefn.groupby(['productComponentId', 'opID', 'buID']).apply(
                    lambda x: {'offerPriceDescription': dict(zip(x['langCode'], (x['langValue'])))}).reset_index(
                    name='offerPriceDescription')
                # pdProductComponentDefn.to_csv('compdefn2.csv')
                # pdProductPriceData.to_csv('comp0.csv')
                pdProductPriceData = pdProductPriceData.merge(pdProductComponentDefn,
                                                              on=['productComponentId', 'opID', 'buID'], how='left')
                pdProductPriceData = pdProductPriceData.merge(pdProductCompExtAttributes,
                                                              on=['productComponentId', 'opID', 'buID'], how='left')
                pdProductPriceData['offerPriceDescription'].fillna('{\'offerPriceDescription\': {} }', inplace=True)
                pdProductPriceData['offerPriceDescription'] = pdProductPriceData['offerPriceDescription'].astype(str)
                if 'pricingAttributes' in list(pdProductPriceData.columns):
                    pdProductPriceData['pricingAttributes'].fillna('{\'pricingAttributes\': [] }', inplace=True)
                    pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].astype(str)
                    pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].apply(
                        validateData)
                else:
                    pdProductPriceData['pricingAttributes'] = '{\'pricingAttributes\': [] }'
                    pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].apply(
                        validateData)
                pdProductPriceData['offerPriceDescription'] = pdProductPriceData['offerPriceDescription'].apply(
                    validateData)
                pdProductPriceData['description'] = pdProductPriceData['offerPriceDescription'].apply(assignDescription)
                pdProductPriceData['description'] = pdProductPriceData[['description']].to_dict(orient='records')
                productOfferingPrice = []
                pdProductPriceData.to_csv('pdProductPriceData.' + args['releaseId'] + '.csv')
                pdProductPriceData[['productOfferingPrice', 'offerPriceDescription', 'pricingAttributes', 'description',
                                    'componentTypeId', 'discountSpecification']].to_csv(
                    'productPriceerror.' + args['releaseId'] + '.csv')
                for component, componentDefn, pricingAttribute, description, componentTypeId, discountDetails in zip(
                        pdProductPriceData['productOfferingPrice'], pdProductPriceData['offerPriceDescription'],
                        pdProductPriceData['pricingAttributes'], pdProductPriceData['description'],
                        pdProductPriceData['componentTypeId'], pdProductPriceData['discountSpecification']):
                    # apilogger.info('component-' + str(component) + '-componentDefn-' + str(componentDefn)+ '-pricingAttribute-'+str(pricingAttribute))
                    # apilogger.info('componentTypeId-' + componentTypeId)
                    if componentTypeId == 'DISCOUNT':
                        productOfferingPrice.append(
                            dict(component, **componentDefn, **pricingAttribute, **description, **discountDetails))
                    else:
                        productOfferingPrice.append(dict(component, **componentDefn, **pricingAttribute, **description))
                    # apilogger.info('productOfferingPrice-' + str(productOfferingPrice))
                    # apilogger.debug(productSpecCharacteristicValue)

                pdProductPriceData['productOfferingPrice'] = productOfferingPrice
            # pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].apply(ast.literal_eval)
            # begin discount logic
            pdProductDiscountData = fetchDiscount(connection, sql, apilogger, __productIds, args, configProperties,
                                                  keyColumns, entity, __pdReleaseEntities, pdProductPriceData)
            pdProductPriceDiscountData = pdProductPriceData[(pdProductPriceData['componentTypeId'] == 'DISCOUNT')]
            pdProductCharges = pdProductPriceData[~(pdProductPriceData['componentTypeId'] == 'DISCOUNT')]

            pdProductCharges = pd.concat([pdProductCharges, pdProductPriceDiscountData])
            pdProductCharges = pdProductCharges.drop(columns=['discountId'])
            pdProductCharges.to_csv('pdProductCharges_0.1.' + args['releaseId'] + '.csv')
            pdProductCharges = pdProductCharges.merge(pdProductDiscountData, how='left',
                                                      right_on=['discountId', 'opID', 'buID', 'discountComponentId'],
                                                      left_on=['productid', 'opID', 'buID', 'productComponentId'])
            pdProductCharges.to_csv('pdProductCharges_0.' + args['releaseId'] + '.csv')
            pdProductCharges = pdProductCharges[
                ['productid', 'opID', 'buID', 'currency', 'priceType', 'payNowFlag', 'price', 'ruleCode',
                 'hasInstallment', 'productComponentId', 'componentTypeId',
                 'discountAppliedOnComponentID', 'discountedPrice', 'discountAmount']]
            pdProductCharges.to_csv('pdProductCharges_0.' + args['releaseId'] + '.csv')
            pdPriceData = pdProductCharges.copy()
            pdProductCharges = pdProductCharges[~(pdProductCharges['componentTypeId'] == 'DISCOUNT')]
            pdDiscountData = pdPriceData[(pdPriceData['componentTypeId'] == 'DISCOUNT')]
            pdDiscountData = pdDiscountData[
                ['productid', 'discountAppliedOnComponentID', 'opID', 'buID', 'discountedPrice']]
            pdProductCharges = pdProductCharges[
                ['productid', 'opID', 'buID', 'currency', 'payNowFlag', 'price', 'ruleCode',
                 'hasInstallment', 'productComponentId', 'priceType']]
            pdDiscountData['discountAmount'] = pdDiscountData['discountAmount'].astype(float)
            pdDiscountData['discountAmount'] = pdDiscountData['discountedPrice'] * -1
            pdDiscountData = pdDiscountData.groupby(['productid', 'discountAppliedOnComponentID', 'opID', 'buID']).agg(
                {'discountedPrice': 'sum'}).reset_index()

            pdProductCharges.to_csv('pdProductCharges_0.4.' + args['releaseId'] + '..csv')
            pdDiscountData.to_csv('pdProductCharges_0.5.' + args['releaseId'] + '.csv')
            pdProductCharges = pdProductCharges.merge(pdDiscountData,
                                                      right_on=['productid', 'discountAppliedOnComponentID', 'opID',
                                                                'buID'],
                                                      left_on=['productid', 'productComponentId', 'opID', 'buID'],
                                                      how='left')

            pdProductCharges.loc[~(pdProductCharges['discountedPrice'].isnull()), 'price'] = pdProductCharges['price'] + \
                                                                                             pdProductCharges[
                                                                                                 'discountedPrice']
            pdProductCharges.to_csv('pdProductCharges_0.6.' + args['releaseId'] + '.csv')
            pdProductCharges['amountWithoutDiscount'].fillna(0, inplace=True)
            pdProductCharges['discountAmount'].fillna(0, inplace=True)
            print(' pdProductCharges datatype ', pdProductCharges.dtypes)
            pdProductCharges = pdProductCharges.groupby(keyColumns + ["currency", "priceType", "payNowFlag"]).agg(
                {'price': 'sum'}).rename(columns={'price': 'amount'}).reset_index()
            pdProductRecurringCharges = pdProductCharges[
                (pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'N')]
            pdProductNonRecurringCharges = pdProductCharges[(pdProductCharges['priceType'] == 'NRC') | (
                        (pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'Y'))]
            pdProductRecurringCharges = pdProductRecurringCharges.groupby(keyColumns + ["currency"]).agg(
                {'amount': 'sum'}).reset_index()
            pdProductNonRecurringCharges = pdProductNonRecurringCharges.groupby(keyColumns + ["currency"]).agg(
                {'amount': 'sum'}).reset_index()
            # pdProductRecurringCharges.to_csv('pdProductRecurringCharges.csv')

            # pdProductNonRecurringCharges.to_csv('pdProductNonRecurringCharges.csv')
            print(' pdProductCharges datatype ', pdProductCharges.dtypes)
            pdProductCharges = pdProductCharges.groupby(keyColumns + ["currency", "priceType", "payNowFlag"]).agg(
                {'price': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).rename(
                columns={'price': 'amount'}).reset_index()
            pdProductCharges.to_csv('pdProductCharges_0.7.' + args['releaseId'] + '.csv')
            pdProductRecurringCharges = pdProductCharges[
                (pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'N')]
            pdProductNonRecurringCharges = pdProductCharges[(pdProductCharges['priceType'] == 'NRC') | (
                    (pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'Y'))]
            pdProductRecurringCharges = pdProductRecurringCharges.groupby(keyColumns + ["currency"]).agg(
                {'amount': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).reset_index()
            pdProductRecurringCharges.to_csv('pdProductRecurringCharges_0.2.' + args['releaseId'] + '.csv')
            pdProductNonRecurringCharges = pdProductNonRecurringCharges.groupby(keyColumns + ["currency"]).agg(
                {'amount': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).reset_index()
            pdProductRecurringCharges['productRecurringCharges'] = pdProductRecurringCharges[
                ['currency', 'amount', 'amountWithoutDiscount','discountAmount']].to_dict(orient = 'records')
            pdProductNonRecurringCharges['productNonRecurringCharges'] = pdProductNonRecurringCharges[
                ['currency', 'amount', 'amountWithoutDiscount', 'discountAmount']].to_dict(orient='records')
            pdProductRecurringCharges = pdProductRecurringCharges.drop(
                columns=['currency', 'amount', 'amountWithoutDiscount', 'discountAmount'], axis=1)
            pdProductNonRecurringCharges = pdProductNonRecurringCharges.drop(
                columns=['currency', 'amount', 'amountWithoutDiscount', 'discountAmount'], axis=1)
            pdProductRecurringCharges.to_csv('pdProductRecurringCharges.' + args['releaseId'] + '.csv')
            pdProductNonRecurringCharges.to_csv('pdProductNonRecurringCharges.' + args['releaseId'] + '.csv')
            pdProductPriceData = pdProductPriceData.groupby(keyColumns)['productOfferingPrice'].apply(
                list).reset_index()
            pdProductPriceData = pdProductPriceData.merge(pdProductRecurringCharges, how='left', on=keyColumns)
            pdProductPriceData = pdProductPriceData.merge(pdProductNonRecurringCharges, how='left', on=keyColumns)
            pdProductPriceData = pdProductPriceData.merge(pdInstallmentDets, how='left', on=keyColumns)
            # print(pdProductPriceData['productSpecCharacteristicVal'].dtype)
            # print(pdProductPriceData)

    return pdProductPriceData


if __name__ == '__main__':
    fetchData(connection, sql, apilogger, __productIds)