Untitled
unknown
plain_text
2 years ago
38 kB
4
Indexable
import pandas as pd import json import ast, sys from epc.dags.syncEPCStores import mergeEntityMasterData from datetime import date, datetime from utils.formatSQLQuery import formatSQLQuery from airflow import AirflowException # from.fetchDiscountPriceData import fetchDiscount as discount def applyDiscount(apilogger, price, discountValue, discountUnits): try: print('discountLogic') print(price) print(discountValue) print(discountUnits) discountedPrice = 0 if not (pd.isna(discountValue)): if discountUnits == '%': discountedPrice = price * (float(discountValue) / 100) elif discountUnits == 'VAL': discountedPrice = float(discountValue) else: discountedPrice = 0 else: discountedPrice = 0 return discountedPrice except Exception as errInDiscCalc: logger.error(str(errInDiscCalc)) logger.error("Error - {} . Line No - {} ".format(str(errInDiscCalc), str(sys.exc_info()[-1].tb_lineno))) raise AirflowException("Task failed due to error-." + str(e)) # return discountedPrice def fetchDiscount(connection, sql, apilogger, __productIds, args, configProperties, keyColumns, entity, __pdReleaseEntities, __pdProductPriceData): try: dicountQuery = configProperties.get('discountDetails', 'query', raw=True) __pdDiscountDetails = mergeEntityMasterData(entity, configProperties, connection, __productIds, args, __pdReleaseEntities, dicountQuery, 'discountDetails') __pdProductPriceData = __pdProductPriceData.drop(columns=['discountId']) if len(__pdDiscountDetails) > 0: if 'units' in __pdDiscountDetails.columns.values: __pdDiscountDetails['discountUnits'] = __pdDiscountDetails['units'] __pdDiscountDetails = __pdDiscountDetails.drop(columns=['units']) if 'amount' in __pdDiscountDetails.columns.values: __pdDiscountDetails['discountValue'] = __pdDiscountDetails['amount'] __pdDiscountDetails = __pdDiscountDetails.drop(columns=['amount']) if 'Version' in __pdDiscountDetails.columns.values: __pdDiscountDetails = __pdDiscountDetails.drop(columns=['Version']) __pdDiscountDetails.to_csv('__pdProductDiscountDetails_1.' + args['releaseId'] + '.csv') itemDiscountQuery = configProperties.get('itemDiscount', 'query', raw=True) __pdItemDiscountDetails = mergeEntityMasterData(entity, configProperties, connection, __productIds, args, __pdReleaseEntities, itemDiscountQuery, 'itemDiscount') if len(__pdItemDiscountDetails) > 0: if 'Version' in __pdItemDiscountDetails.columns.values: __pdItemDiscountDetails = __pdItemDiscountDetails.drop(columns=['Version']) __pdItemDiscountDetails.to_csv('__pdProductItemDiscountDetails.' + args['releaseId'] + '.csv') __pdDiscountDetails = __pdDiscountDetails.merge(__pdItemDiscountDetails, right_on=['discountedItemId', 'opID', 'buID'], left_on=['discountId', 'opID', 'buID'], how='inner') __pdDiscountDetails.to_csv('__pdProductDiscountDetails_2.' + args['releaseId'] + '.csv') __pdProductPriceData1 = __pdProductPriceData.merge(__pdDiscountDetails, left_on=["productComponentId", "opID", "buID"], right_on=["discountAppliedOnComponentID", "opID", "buID"], how='left') __pdProductPriceData1.to_csv('__pdProductDiscountDetails_3.' + args['releaseId'] + '.csv') __pdProductPriceData1['discountedPrice'] = __pdProductPriceData1[ ['price', 'discountValue', 'discountUnits']].apply( lambda x: applyDiscount(apilogger, x['price'], x['discountValue'], x['discountUnits']), axis=1) __pdProductPriceData1 = __pdProductPriceData1[ ['discountId', 'opID', 'buID', 'discountedPrice', 'discountAppliedOn', 'discountAppliedOnComponentID']] __pdProductPriceData1 = __pdProductPriceData.merge(__pdProductPriceData1, left_on=["productComponentId", "opID", "buID"], right_on=["discountId", "opID", "buID"], how='left') __pdProductPriceData1.to_csv('__pdProductDiscountDetails_0.' + args['releaseId'] + '.csv') __pdProductPriceData1.loc[__pdProductPriceData1['discountedPrice'] > 0, 'discountApplied'] = 'Y' __pdProductPriceData1.loc[__pdProductPriceData1['discountedPrice'] == 0, 'discountApplied'] = 'N' __pdProductPriceData1.to_csv('__pdProductDiscountDetails_0.1.' + args['releaseId'] + '.csv') __pdProductPriceData1 = __pdProductPriceData1[__pdProductPriceData1['discountApplied'] == 'Y'] __pdProductPriceData1['discountComponentId'] = __pdProductPriceData1['discountId'] __pdProductPriceData1['discountId'] = __pdProductPriceData1['productid'] __pdProductPriceData1['discountedPrice'] = __pdProductPriceData1['discountedPrice'] * -1 __pdProductPriceData1 = __pdProductPriceData1[ ['discountId', 'discountComponentId', 'opID', 'buID', 'discountedPrice', 'discountAppliedOn', 'discountAppliedOnComponentID', 'discountAmount']] __pdProductPriceData1.to_csv('__pdProductDiscountDetails.' + args['releaseId'] + '.csv') return __pdProductPriceData1 except Exception as errInDiscountLogic: apilogger.error(str(errInDiscountLogic)) apilogger.error("Error in fetchDiscount function - {} . Line No - {} ".format(str(errInDiscountLogic), str( sys.exc_info()[-1].tb_lineno))) raise AirflowException("Task failed due to error-." + str(e)) # return __pdProductPriceData def validateData(value): try: return ast.literal_eval(value) except Exception as e: # apilogger.debug(str(e)) productPriceLogger.error("Error in formatting data - {} . Line No - {} ".format(str(e) + '-value-' + str(value), str(sys.exc_info()[ -1].tb_lineno))) return value def assignDescription(value): try: productPriceLogger.debug('label-' + str(value)) productPriceLogger.debug(str('DEFAULT' in value)) # logger.debug(str(value['DEFAULT'])) productPriceLogger.debug(str(type(value))) if 'DEFAULT' in value['offerPriceDescription']: return value['offerPriceDescription']['DEFAULT'] else: return '' except Exception as e: productPriceLogger.error("Error-" + str(e)) return value def formatDate(startDateTime, endDateTime): try: formattedDate = {} startDateTime = pd.to_datetime(startDateTime) endDateTime = pd.to_datetime(endDateTime) print('startDateTime-' + str(startDateTime) + 'endDateTime-' + str(endDateTime)) if not pd.isna(startDateTime): formattedDate['startDateTime'] = startDateTime.strftime('%Y-%m-%dT%H:%M:%SZ') if not pd.isna(endDateTime): formattedDate['endDateTime'] = endDateTime.strftime('%Y-%m-%dT%H:%M:%SZ') return formattedDate except Exception as e: productPriceLogger.error('Error in formatting effectiveTimePeriod columns - ' + str(e)) productPriceLogger.error("Error in fomartting effectiveTimePeriod columns - {} . Line No - {} ".format( str(e) + '-startdate-' + str(startDateTime) + '-endDate-' + str(endDateTime), str(sys.exc_info()[-1].tb_lineno))) raise AirflowException("Task failed due to error-." + str(e)) # return {} def mapTermPeriod(startDateTime, endDateTime, duration, units): try: formattedValue = {} if not pd.isna(startDateTime): formattedValue['startDateTime'] = startDateTime.strftime('%Y-%m-%dT%H:%M:%SZ') if not pd.isna(endDateTime): formattedValue['endDateTime'] = endDateTime.strftime('%Y-%m-%dT%H:%M:%SZ') if not pd.isna(duration): formattedValue['duration'] = duration if not pd.isna(units): formattedValue['units'] = units return formattedValue except Exception as e: productPriceLogger.error('Error in formatting effectiveTimePeriod columns - ' + str(e)) productPriceLogger.error("Error in fomartting effectiveTimePeriod columns - {} . Line No - {} ".format( str(e) + '-startdate-' + str(startDateTime) + '-endDate-' + str(endDateTime), str(sys.exc_info()[-1].tb_lineno))) raise AirflowException("Task failed due to error-." + str(e)) # return {} def fetchDiscountDetails(configProperties, connection, __productIds, args, __pdReleaseEntities): pdDiscountDetails = pd.DataFrame() try: productPriceLogger.info('started the process to fetchDiscountDetauls') discountSql = configProperties.get('discountDetails', 'query') productPriceLogger.info('discountSql-' + discountSql) pdDiscountDetails = mergeEntityMasterData('DiscountDefinition', configProperties, connection, __productIds, args, __pdReleaseEntities, discountSql, 'discountDetails') if len(pdDiscountDetails) > 0: pdDiscountDetails['discountValue'] = pdDiscountDetails[['amount', 'units']].to_dict(orient='records') pdItemDiscountDetails = fetchItemDiscountDetails(configProperties, connection, __productIds, args, __pdReleaseEntities) if len(pdItemDiscountDetails) > 0: # pdItemDiscountDetails.to_csv('pdItemDiscountDetails.csv') pdDiscountDetails = pdDiscountDetails.merge(pdItemDiscountDetails, left_on=['discountId', 'opID', 'buID'], right_on=['discountedItemId', 'opID', 'buID'], how='left') pdDiscountDetails['discountedItem'].fillna('[]', inplace=True) pdDiscountDetails['discountedItem'] = pdDiscountDetails['discountedItem'].astype(str) pdDiscountDetails['discountedItem'] = pdDiscountDetails['discountedItem'].apply(validateData) pdDiscountDetails.to_csv('pdItemDiscountDetails.' + args['releaseId'] + '.csv') dictColumns = json.loads(configProperties.get('discountDetails', 'dictcolumns')) print('dictColumns-' + str(dictColumns)) dictColumns = [column for column in dictColumns if column in pdDiscountDetails.columns] pdDiscountDetails['discountSpecification'] = pdDiscountDetails[dictColumns].to_dict(orient='records') pdDiscountDetails['discountSpecification'] = pdDiscountDetails['discountSpecification'].apply( lambda x: {'discountSpecification': x}) pdDiscountDetails = pdDiscountDetails[['discountId', 'opID', 'buID', 'discountSpecification']] pdDiscountDetails.to_csv('pdDiscountDetails_func.' + args['releaseId'] + '.csv') return pdDiscountDetails except Exception as err: productPriceLogger.error("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) return pdDiscountDetails def fetchItemDiscountDetails(configProperties, connection, __productIds, args, __pdReleaseEntities): pdItemDiscountDetails = pd.DataFrame() try: productPriceLogger.info('started the process to fetchItemDiscountDetauls') itemDiscountSql = configProperties.get('itemDiscount', 'query') productPriceLogger.info('ItemDiscountSql-' + itemDiscountSql) pdItemDiscountDetails = mergeEntityMasterData('ItemDiscountMap', configProperties, connection, __productIds, args, __pdReleaseEntities, itemDiscountSql, 'itemDiscount') dictColumns = json.loads(configProperties.get('itemDiscount', 'dictcolumns')) print('dictColumns-' + str(dictColumns)) dictColumns = [column for column in dictColumns if column in pdItemDiscountDetails.columns] if len(pdItemDiscountDetails) > 0: pdItemDiscountDetails.fillna('', inplace=True) pdItemDiscountDetails['discountedItem'] = pdItemDiscountDetails[dictColumns].to_dict(orient='records') # pdItemDiscountDetails['discountItems'] = pdItemDiscountDetails['discountItems'].apply(lambda x : {'discountItems' : x }) pdItemDiscountDetails = pdItemDiscountDetails.groupby(['discountedItemId', 'opID', 'buID']).apply( lambda x: list(x['discountedItem'])).reset_index(name='discountedItem') pdItemDiscountDetails = pdItemDiscountDetails[['discountedItemId', 'opID', 'buID', 'discountedItem']] pdItemDiscountDetails.to_csv('pdItemDiscountDetails_func.' + args['releaseId'] + '.csv') return pdItemDiscountDetails except Exception as err: productPriceLogger.error("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno))) raise AirflowException("Task failed due to error-." + str(e)) # return pdItemDiscountDetails def fetchData(connection, sql, apilogger, __productIds, args, configProperties, keyColumns, entity, __pdReleaseEntities): apilogger.debug(sql) global productPriceLogger productPriceLogger = apilogger sql = sql.replace(':releases', ','.join('\'{0}\''.format(product) for product in __productIds)) sqlArguments = json.loads(configProperties.get('queries', 'sqlParams')) apilogger.debug('sqlArguments-' + str(sqlArguments)) sqlArguments = {k: v for k, v in args.items() if k in sqlArguments} apilogger.debug('sqlArguments-' + str(sqlArguments)) dictColumns = json.loads(configProperties.get('productPrice', 'dictColumns')) pdProductPriceData = mergeEntityMasterData(entity, configProperties, connection, __productIds, args, __pdReleaseEntities, sql, 'productPrice') # pdProductPriceData.fillna('',inplace=True) subQuerysql2 = configProperties.get('productCompExtAttributes', 'query') pdProductCompExtAttributes = mergeEntityMasterData('ExternalAttrMap', configProperties, connection, __productIds, args, __pdReleaseEntities, subQuerysql2, 'productCompExtAttributes') if len(pdProductCompExtAttributes) > 0: pdProductCompExtAttributes.fillna('', inplace=True) # included the logic to hanndle attributes which have null values pdProductCompExtAttributes['pricingAttributes'] = pdProductCompExtAttributes[['name', 'value']].to_dict( orient='records') pdProductCompExtAttributes = pdProductCompExtAttributes.groupby(['productComponentId', 'opID', 'buID']).apply( lambda x: {'pricingAttributes': list(x['pricingAttributes'])}).reset_index(name='pricingAttributes') # pdProductPriceData = pdProductPriceData.merge(pdProductCompExtAttributes,on = ['productComponentId','opID','buID'] , how = 'left') # pdProductCompExtAttributes.to_csv('pdProductCompExtAttributes.csv') # pdProductPriceData['productOfferingPrice'] = pdProductPriceData[dictColumns].to_dict(orient = 'records') # pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].astype(str) apilogger.debug('pdProductPriceData-' + str(len(pdProductPriceData))) if len(pdProductPriceData) > 0: # pdProductPriceData.drop_duplicates(inplace=True) pdProductPriceData = pdProductPriceData[ (pdProductPriceData['endDateTime'].isnull()) | (pdProductPriceData['endDateTime'] > datetime.now())] if (len(pdProductPriceData)) > 0: pdProductPriceData['createdDate'] = pdProductPriceData['createdDate'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') ### Included the logic to format values for termPeriod ### begin # pdProductPriceData['endDateTime'] = pdProductPriceData['endDateTime'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') # pdProductPriceData['startDateTime']= pdProductPriceData['startDateTime'].dt.strftime('%Y-%m-%dT%H:%M:%SZ') pdProductPriceData['duration'].fillna(0, inplace=True) pdProductPriceData['duration'] = pdProductPriceData['duration'].round(0).astype(int) pdProductPriceData['units'].fillna('', inplace=True) # pdProductPriceData['termPeriod'] = pdProductPriceData[['startDateTime','endDateTime','duration','units']].to_dict(orient ='records') pdProductPriceData['validFor'] = pdProductPriceData[['startDateTime', 'endDateTime']].apply( lambda x: formatDate(x['startDateTime'], x['endDateTime']), axis=1) pdProductPriceData['termPeriod'] = pdProductPriceData[ ['startDateTime', 'endDateTime', 'duration', 'units']].apply( lambda x: mapTermPeriod(x['startDateTime'], x['endDateTime'], x['duration'], x['units']), axis=1) ### end apilogger.debug('pdProductPriceData-' + str(len(pdProductPriceData))) subQuerysql = configProperties.get('productPriceLang', 'query', raw=True) subQuerysql = subQuerysql.replace(':releases', ','.join('\'{0}\''.format(product) for product in __productIds)) apilogger.debug('subQuerysql-' + subQuerysql) apilogger.debug('sqlArguments-' + str(sqlArguments)) formatQueryFlag, subQuerysql = formatSQLQuery(subQuerysql, sqlArguments) if not formatQueryFlag: print("Error in format Sql query {} for Arguments {} ".format(subQuerysql, sqlArguments)) raise Exception("Error in format Sql query {} for Arguments {} ".format(subQuerysql, sqlArguments)) pdProductComponentDefn1 = pd.read_sql(subQuerysql, con=connection) pdProductComponentDefn1 = pdProductComponentDefn1[ (pdProductComponentDefn1['endDate'].isnull()) | (pdProductComponentDefn1['endDate'] > datetime.now())] pdProductComponentDefn1.fillna('', inplace=True) pdProductComponentDefn1 = pdProductComponentDefn1.drop(columns=['endDate'], axis=1) subQuerysql1 = configProperties.get('productPriceBillText', 'query') subQuerysql1 = subQuerysql1.replace(':releases', ','.join('\'{0}\''.format(product) for product in __productIds)) # pdProductComponentDefn2 = pd.read_sql(subQuerysql1,params = sqlArguments,con = connection) pdProductComponentDefn2 = mergeEntityMasterData(entity, configProperties, connection, __productIds, args, __pdReleaseEntities, subQuerysql1, 'productPriceBillText') pdProductComponentDefn2.fillna('', inplace=True) # pdProductComponentDefn2.to_csv('compdefn1.csv') pdProductComponents = pdProductComponentDefn2.copy() pdProductComponents = pdProductComponents[ ["productComponentId", "opID", "buID", "effectiveStartDate", "effectiveEndDate", "computationFormula", "lifecycleStatus", "overridable"]] pdProductComponents['effectiveTimePeriod'] = pdProductComponents[ ['effectiveStartDate', 'effectiveEndDate']].apply( lambda x: formatDate(x['effectiveStartDate'], x['effectiveEndDate']), axis=1) pdProductComponents['effectiveTimePeriod'] = pdProductComponents['effectiveTimePeriod'].astype(str) pdProductComponents.to_csv('pdProductComponents.' + args['releaseId'] + '.csv') pdProductPriceData = pdProductPriceData.merge(pdProductComponents, on=['productComponentId', 'opID', 'buID'], how='left') pdProductPriceData['effectiveTimePeriod'].fillna('{}', inplace=True) pdProductPriceData['computationFormula'].fillna('', inplace=True) pdProductPriceData['lifecycleStatus'].fillna('', inplace=True) pdProductPriceData['overridable'].fillna('', inplace=True) if 'overridable' in pdProductPriceData.columns: pdProductPriceData.loc[pdProductPriceData['overridable'] != 'Y', 'overridable'] = False pdProductPriceData.loc[pdProductPriceData['overridable'] == 'Y', 'overridable'] = True else: pdProductPriceData['overridable'] = False if 'isTaxInclusive' in pdProductPriceData.columns: pdProductPriceData.loc[pdProductPriceData['isTaxInclusive'] != 'Y', 'isTaxInclusive'] = False pdProductPriceData.loc[pdProductPriceData['isTaxInclusive'] == 'Y', 'isTaxInclusive'] = True else: pdProductPriceData['isTaxInclusive'] = False if 'installmentEnabled' in pdProductPriceData.columns: pdProductPriceData.loc[pdProductPriceData['installmentEnabled'] != 'Y', 'installmentEnabled'] = False pdProductPriceData.loc[pdProductPriceData['installmentEnabled'] == 'Y', 'installmentEnabled'] = True else: pdProductPriceData['installmentEnabled'] = False pdProductPriceData['effectiveTimePeriod'] = pdProductPriceData['effectiveTimePeriod'].apply(validateData) pdProductComponentDefn2 = pdProductComponentDefn2[ ["productComponentId", "opID", "buID", "version", "langCode", "langValue"]] pdInstallmentDets = pdProductPriceData[pdProductPriceData['hasInstallment'] == 'Y'] pdInstallmentDets = pdInstallmentDets.groupby(keyColumns).agg({'hasInstallment': 'count'}).reset_index() pdInstallmentDets['hasInstallment'] = True pdInstallmentDets.to_csv('pdInstallmentDets.' + args['releaseId'] + '.csv') apilogger.info('pdProductComponentDefn2-' + subQuerysql1) pdProductComponentDefn1 = pdProductComponentDefn1.astype(str) pdProductComponentDefn2 = pdProductComponentDefn2.astype(str) pdProductComponentDefn = pd.concat([pdProductComponentDefn1, pdProductComponentDefn2], ignore_index=True) apilogger.info('pdProductComponentDefn-' + pdProductComponentDefn) pdProductPriceData.fillna('', inplace=True) # included to map columns with null values as "" pdProductPriceData['productOfferingPrice'] = pdProductPriceData[dictColumns].to_dict(orient='records') pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].astype(str) pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].apply(validateData) pdDiscountDetails = fetchDiscountDetails(configProperties, connection, __productIds, args, __pdReleaseEntities) if len(pdDiscountDetails) > 0: pdProductPriceData = pdProductPriceData.merge(pdDiscountDetails, left_on=['productComponentId', 'opID', 'buID'], right_on=['discountId', 'opID', 'buID'], how='left') pdProductPriceData['discountSpecification'].fillna('{}', inplace=True) pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].astype(str) pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].apply( validateData) # begin included the logic to handle releases which have products with no discount dedetails if 'discountSpecification' not in pdProductPriceData.columns.values: pdProductPriceData['discountSpecification'] = '{}' pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].apply( validateData) # begin included the logic to handle releases which have products with no discount dedetails pdProductPriceData.to_csv('pdDiscountDetails_pdProductPriceData.' + args['releaseId'] + '.csv') if len(pdProductComponentDefn) > 0: pdProductComponentDefn = pdProductComponentDefn.groupby(['productComponentId', 'opID', 'buID']).apply( lambda x: {'offerPriceDescription': dict(zip(x['langCode'], (x['langValue'])))}).reset_index( name='offerPriceDescription') # pdProductComponentDefn.to_csv('compdefn2.csv') # pdProductPriceData.to_csv('comp0.csv') pdProductPriceData = pdProductPriceData.merge(pdProductComponentDefn, on=['productComponentId', 'opID', 'buID'], how='left') pdProductPriceData = pdProductPriceData.merge(pdProductCompExtAttributes, on=['productComponentId', 'opID', 'buID'], how='left') pdProductPriceData['offerPriceDescription'].fillna('{\'offerPriceDescription\': {} }', inplace=True) pdProductPriceData['offerPriceDescription'] = pdProductPriceData['offerPriceDescription'].astype(str) if 'pricingAttributes' in list(pdProductPriceData.columns): pdProductPriceData['pricingAttributes'].fillna('{\'pricingAttributes\': [] }', inplace=True) pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].astype(str) pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].apply( validateData) else: pdProductPriceData['pricingAttributes'] = '{\'pricingAttributes\': [] }' pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].apply( validateData) pdProductPriceData['offerPriceDescription'] = pdProductPriceData['offerPriceDescription'].apply( validateData) pdProductPriceData['description'] = pdProductPriceData['offerPriceDescription'].apply(assignDescription) pdProductPriceData['description'] = pdProductPriceData[['description']].to_dict(orient='records') productOfferingPrice = [] pdProductPriceData.to_csv('pdProductPriceData.' + args['releaseId'] + '.csv') pdProductPriceData[['productOfferingPrice', 'offerPriceDescription', 'pricingAttributes', 'description', 'componentTypeId', 'discountSpecification']].to_csv( 'productPriceerror.' + args['releaseId'] + '.csv') for component, componentDefn, pricingAttribute, description, componentTypeId, discountDetails in zip( pdProductPriceData['productOfferingPrice'], pdProductPriceData['offerPriceDescription'], pdProductPriceData['pricingAttributes'], pdProductPriceData['description'], pdProductPriceData['componentTypeId'], pdProductPriceData['discountSpecification']): # apilogger.info('component-' + str(component) + '-componentDefn-' + str(componentDefn)+ '-pricingAttribute-'+str(pricingAttribute)) # apilogger.info('componentTypeId-' + componentTypeId) if componentTypeId == 'DISCOUNT': productOfferingPrice.append( dict(component, **componentDefn, **pricingAttribute, **description, **discountDetails)) else: productOfferingPrice.append(dict(component, **componentDefn, **pricingAttribute, **description)) # apilogger.info('productOfferingPrice-' + str(productOfferingPrice)) # apilogger.debug(productSpecCharacteristicValue) pdProductPriceData['productOfferingPrice'] = productOfferingPrice # pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].apply(ast.literal_eval) # begin discount logic pdProductDiscountData = fetchDiscount(connection, sql, apilogger, __productIds, args, configProperties, keyColumns, entity, __pdReleaseEntities, pdProductPriceData) pdProductPriceDiscountData = pdProductPriceData[(pdProductPriceData['componentTypeId'] == 'DISCOUNT')] pdProductCharges = pdProductPriceData[~(pdProductPriceData['componentTypeId'] == 'DISCOUNT')] pdProductCharges = pd.concat([pdProductCharges, pdProductPriceDiscountData]) pdProductCharges = pdProductCharges.drop(columns=['discountId']) pdProductCharges.to_csv('pdProductCharges_0.1.' + args['releaseId'] + '.csv') pdProductCharges = pdProductCharges.merge(pdProductDiscountData, how='left', right_on=['discountId', 'opID', 'buID', 'discountComponentId'], left_on=['productid', 'opID', 'buID', 'productComponentId']) pdProductCharges.to_csv('pdProductCharges_0.' + args['releaseId'] + '.csv') pdProductCharges = pdProductCharges[ ['productid', 'opID', 'buID', 'currency', 'priceType', 'payNowFlag', 'price', 'ruleCode', 'hasInstallment', 'productComponentId', 'componentTypeId', 'discountAppliedOnComponentID', 'discountedPrice', 'discountAmount']] pdProductCharges.to_csv('pdProductCharges_0.' + args['releaseId'] + '.csv') pdPriceData = pdProductCharges.copy() pdProductCharges = pdProductCharges[~(pdProductCharges['componentTypeId'] == 'DISCOUNT')] pdDiscountData = pdPriceData[(pdPriceData['componentTypeId'] == 'DISCOUNT')] pdDiscountData = pdDiscountData[ ['productid', 'discountAppliedOnComponentID', 'opID', 'buID', 'discountedPrice']] pdProductCharges = pdProductCharges[ ['productid', 'opID', 'buID', 'currency', 'payNowFlag', 'price', 'ruleCode', 'hasInstallment', 'productComponentId', 'priceType']] pdDiscountData['discountAmount'] = pdDiscountData['discountAmount'].astype(float) pdDiscountData['discountAmount'] = pdDiscountData['discountedPrice'] * -1 pdDiscountData = pdDiscountData.groupby(['productid', 'discountAppliedOnComponentID', 'opID', 'buID']).agg( {'discountedPrice': 'sum'}).reset_index() pdProductCharges.to_csv('pdProductCharges_0.4.' + args['releaseId'] + '..csv') pdDiscountData.to_csv('pdProductCharges_0.5.' + args['releaseId'] + '.csv') pdProductCharges = pdProductCharges.merge(pdDiscountData, right_on=['productid', 'discountAppliedOnComponentID', 'opID', 'buID'], left_on=['productid', 'productComponentId', 'opID', 'buID'], how='left') pdProductCharges.loc[~(pdProductCharges['discountedPrice'].isnull()), 'price'] = pdProductCharges['price'] + \ pdProductCharges[ 'discountedPrice'] pdProductCharges.to_csv('pdProductCharges_0.6.' + args['releaseId'] + '.csv') pdProductCharges['amountWithoutDiscount'].fillna(0, inplace=True) pdProductCharges['discountAmount'].fillna(0, inplace=True) print(' pdProductCharges datatype ', pdProductCharges.dtypes) pdProductCharges = pdProductCharges.groupby(keyColumns + ["currency", "priceType", "payNowFlag"]).agg( {'price': 'sum'}).rename(columns={'price': 'amount'}).reset_index() pdProductRecurringCharges = pdProductCharges[ (pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'N')] pdProductNonRecurringCharges = pdProductCharges[(pdProductCharges['priceType'] == 'NRC') | ( (pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'Y'))] pdProductRecurringCharges = pdProductRecurringCharges.groupby(keyColumns + ["currency"]).agg( {'amount': 'sum'}).reset_index() pdProductNonRecurringCharges = pdProductNonRecurringCharges.groupby(keyColumns + ["currency"]).agg( {'amount': 'sum'}).reset_index() # pdProductRecurringCharges.to_csv('pdProductRecurringCharges.csv') # pdProductNonRecurringCharges.to_csv('pdProductNonRecurringCharges.csv') print(' pdProductCharges datatype ', pdProductCharges.dtypes) pdProductCharges = pdProductCharges.groupby(keyColumns + ["currency", "priceType", "payNowFlag"]).agg( {'price': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).rename( columns={'price': 'amount'}).reset_index() pdProductCharges.to_csv('pdProductCharges_0.7.' + args['releaseId'] + '.csv') pdProductRecurringCharges = pdProductCharges[ (pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'N')] pdProductNonRecurringCharges = pdProductCharges[(pdProductCharges['priceType'] == 'NRC') | ( (pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'Y'))] pdProductRecurringCharges = pdProductRecurringCharges.groupby(keyColumns + ["currency"]).agg( {'amount': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).reset_index() pdProductRecurringCharges.to_csv('pdProductRecurringCharges_0.2.' + args['releaseId'] + '.csv') pdProductNonRecurringCharges = pdProductNonRecurringCharges.groupby(keyColumns + ["currency"]).agg( {'amount': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).reset_index() pdProductRecurringCharges['productRecurringCharges'] = pdProductRecurringCharges[ ['currency', 'amount', 'amountWithoutDiscount','discountAmount']].to_dict(orient = 'records') pdProductNonRecurringCharges['productNonRecurringCharges'] = pdProductNonRecurringCharges[ ['currency', 'amount', 'amountWithoutDiscount', 'discountAmount']].to_dict(orient='records') pdProductRecurringCharges = pdProductRecurringCharges.drop( columns=['currency', 'amount', 'amountWithoutDiscount', 'discountAmount'], axis=1) pdProductNonRecurringCharges = pdProductNonRecurringCharges.drop( columns=['currency', 'amount', 'amountWithoutDiscount', 'discountAmount'], axis=1) pdProductRecurringCharges.to_csv('pdProductRecurringCharges.' + args['releaseId'] + '.csv') pdProductNonRecurringCharges.to_csv('pdProductNonRecurringCharges.' + args['releaseId'] + '.csv') pdProductPriceData = pdProductPriceData.groupby(keyColumns)['productOfferingPrice'].apply( list).reset_index() pdProductPriceData = pdProductPriceData.merge(pdProductRecurringCharges, how='left', on=keyColumns) pdProductPriceData = pdProductPriceData.merge(pdProductNonRecurringCharges, how='left', on=keyColumns) pdProductPriceData = pdProductPriceData.merge(pdInstallmentDets, how='left', on=keyColumns) # print(pdProductPriceData['productSpecCharacteristicVal'].dtype) # print(pdProductPriceData) return pdProductPriceData if __name__ == '__main__': fetchData(connection, sql, apilogger, __productIds)
Editor is loading...