Untitled
unknown
plain_text
2 years ago
38 kB
14
Indexable
import pandas as pd
import json
import ast, sys
from epc.dags.syncEPCStores import mergeEntityMasterData
from datetime import date, datetime
from utils.formatSQLQuery import formatSQLQuery
from airflow import AirflowException
# from.fetchDiscountPriceData import fetchDiscount as discount
def applyDiscount(apilogger, price, discountValue, discountUnits):
try:
print('discountLogic')
print(price)
print(discountValue)
print(discountUnits)
discountedPrice = 0
if not (pd.isna(discountValue)):
if discountUnits == '%':
discountedPrice = price * (float(discountValue) / 100)
elif discountUnits == 'VAL':
discountedPrice = float(discountValue)
else:
discountedPrice = 0
else:
discountedPrice = 0
return discountedPrice
except Exception as errInDiscCalc:
logger.error(str(errInDiscCalc))
logger.error("Error - {} . Line No - {} ".format(str(errInDiscCalc), str(sys.exc_info()[-1].tb_lineno)))
raise AirflowException("Task failed due to error-." + str(e))
# return discountedPrice
def fetchDiscount(connection, sql, apilogger, __productIds, args, configProperties, keyColumns, entity,
__pdReleaseEntities, __pdProductPriceData):
try:
dicountQuery = configProperties.get('discountDetails', 'query', raw=True)
__pdDiscountDetails = mergeEntityMasterData(entity, configProperties, connection, __productIds, args,
__pdReleaseEntities, dicountQuery, 'discountDetails')
__pdProductPriceData = __pdProductPriceData.drop(columns=['discountId'])
if len(__pdDiscountDetails) > 0:
if 'units' in __pdDiscountDetails.columns.values:
__pdDiscountDetails['discountUnits'] = __pdDiscountDetails['units']
__pdDiscountDetails = __pdDiscountDetails.drop(columns=['units'])
if 'amount' in __pdDiscountDetails.columns.values:
__pdDiscountDetails['discountValue'] = __pdDiscountDetails['amount']
__pdDiscountDetails = __pdDiscountDetails.drop(columns=['amount'])
if 'Version' in __pdDiscountDetails.columns.values:
__pdDiscountDetails = __pdDiscountDetails.drop(columns=['Version'])
__pdDiscountDetails.to_csv('__pdProductDiscountDetails_1.' + args['releaseId'] + '.csv')
itemDiscountQuery = configProperties.get('itemDiscount', 'query', raw=True)
__pdItemDiscountDetails = mergeEntityMasterData(entity, configProperties, connection, __productIds, args,
__pdReleaseEntities, itemDiscountQuery, 'itemDiscount')
if len(__pdItemDiscountDetails) > 0:
if 'Version' in __pdItemDiscountDetails.columns.values:
__pdItemDiscountDetails = __pdItemDiscountDetails.drop(columns=['Version'])
__pdItemDiscountDetails.to_csv('__pdProductItemDiscountDetails.' + args['releaseId'] + '.csv')
__pdDiscountDetails = __pdDiscountDetails.merge(__pdItemDiscountDetails,
right_on=['discountedItemId', 'opID', 'buID'],
left_on=['discountId', 'opID', 'buID'], how='inner')
__pdDiscountDetails.to_csv('__pdProductDiscountDetails_2.' + args['releaseId'] + '.csv')
__pdProductPriceData1 = __pdProductPriceData.merge(__pdDiscountDetails,
left_on=["productComponentId", "opID", "buID"],
right_on=["discountAppliedOnComponentID", "opID",
"buID"], how='left')
__pdProductPriceData1.to_csv('__pdProductDiscountDetails_3.' + args['releaseId'] + '.csv')
__pdProductPriceData1['discountedPrice'] = __pdProductPriceData1[
['price', 'discountValue', 'discountUnits']].apply(
lambda x: applyDiscount(apilogger, x['price'], x['discountValue'], x['discountUnits']), axis=1)
__pdProductPriceData1 = __pdProductPriceData1[
['discountId', 'opID', 'buID', 'discountedPrice', 'discountAppliedOn', 'discountAppliedOnComponentID']]
__pdProductPriceData1 = __pdProductPriceData.merge(__pdProductPriceData1,
left_on=["productComponentId", "opID", "buID"],
right_on=["discountId", "opID", "buID"], how='left')
__pdProductPriceData1.to_csv('__pdProductDiscountDetails_0.' + args['releaseId'] + '.csv')
__pdProductPriceData1.loc[__pdProductPriceData1['discountedPrice'] > 0, 'discountApplied'] = 'Y'
__pdProductPriceData1.loc[__pdProductPriceData1['discountedPrice'] == 0, 'discountApplied'] = 'N'
__pdProductPriceData1.to_csv('__pdProductDiscountDetails_0.1.' + args['releaseId'] + '.csv')
__pdProductPriceData1 = __pdProductPriceData1[__pdProductPriceData1['discountApplied'] == 'Y']
__pdProductPriceData1['discountComponentId'] = __pdProductPriceData1['discountId']
__pdProductPriceData1['discountId'] = __pdProductPriceData1['productid']
__pdProductPriceData1['discountedPrice'] = __pdProductPriceData1['discountedPrice'] * -1
__pdProductPriceData1 = __pdProductPriceData1[
['discountId', 'discountComponentId', 'opID', 'buID', 'discountedPrice', 'discountAppliedOn',
'discountAppliedOnComponentID', 'discountAmount']]
__pdProductPriceData1.to_csv('__pdProductDiscountDetails.' + args['releaseId'] + '.csv')
return __pdProductPriceData1
except Exception as errInDiscountLogic:
apilogger.error(str(errInDiscountLogic))
apilogger.error("Error in fetchDiscount function - {} . Line No - {} ".format(str(errInDiscountLogic), str(
sys.exc_info()[-1].tb_lineno)))
raise AirflowException("Task failed due to error-." + str(e))
# return __pdProductPriceData
def validateData(value):
try:
return ast.literal_eval(value)
except Exception as e:
# apilogger.debug(str(e))
productPriceLogger.error("Error in formatting data - {} . Line No - {} ".format(str(e) + '-value-' + str(value),
str(sys.exc_info()[
-1].tb_lineno)))
return value
def assignDescription(value):
try:
productPriceLogger.debug('label-' + str(value))
productPriceLogger.debug(str('DEFAULT' in value))
# logger.debug(str(value['DEFAULT']))
productPriceLogger.debug(str(type(value)))
if 'DEFAULT' in value['offerPriceDescription']:
return value['offerPriceDescription']['DEFAULT']
else:
return ''
except Exception as e:
productPriceLogger.error("Error-" + str(e))
return value
def formatDate(startDateTime, endDateTime):
try:
formattedDate = {}
startDateTime = pd.to_datetime(startDateTime)
endDateTime = pd.to_datetime(endDateTime)
print('startDateTime-' + str(startDateTime) + 'endDateTime-' + str(endDateTime))
if not pd.isna(startDateTime):
formattedDate['startDateTime'] = startDateTime.strftime('%Y-%m-%dT%H:%M:%SZ')
if not pd.isna(endDateTime):
formattedDate['endDateTime'] = endDateTime.strftime('%Y-%m-%dT%H:%M:%SZ')
return formattedDate
except Exception as e:
productPriceLogger.error('Error in formatting effectiveTimePeriod columns - ' + str(e))
productPriceLogger.error("Error in fomartting effectiveTimePeriod columns - {} . Line No - {} ".format(
str(e) + '-startdate-' + str(startDateTime) + '-endDate-' + str(endDateTime),
str(sys.exc_info()[-1].tb_lineno)))
raise AirflowException("Task failed due to error-." + str(e))
# return {}
def mapTermPeriod(startDateTime, endDateTime, duration, units):
try:
formattedValue = {}
if not pd.isna(startDateTime):
formattedValue['startDateTime'] = startDateTime.strftime('%Y-%m-%dT%H:%M:%SZ')
if not pd.isna(endDateTime):
formattedValue['endDateTime'] = endDateTime.strftime('%Y-%m-%dT%H:%M:%SZ')
if not pd.isna(duration):
formattedValue['duration'] = duration
if not pd.isna(units):
formattedValue['units'] = units
return formattedValue
except Exception as e:
productPriceLogger.error('Error in formatting effectiveTimePeriod columns - ' + str(e))
productPriceLogger.error("Error in fomartting effectiveTimePeriod columns - {} . Line No - {} ".format(
str(e) + '-startdate-' + str(startDateTime) + '-endDate-' + str(endDateTime),
str(sys.exc_info()[-1].tb_lineno)))
raise AirflowException("Task failed due to error-." + str(e))
# return {}
def fetchDiscountDetails(configProperties, connection, __productIds, args, __pdReleaseEntities):
pdDiscountDetails = pd.DataFrame()
try:
productPriceLogger.info('started the process to fetchDiscountDetauls')
discountSql = configProperties.get('discountDetails', 'query')
productPriceLogger.info('discountSql-' + discountSql)
pdDiscountDetails = mergeEntityMasterData('DiscountDefinition', configProperties, connection, __productIds,
args, __pdReleaseEntities, discountSql, 'discountDetails')
if len(pdDiscountDetails) > 0:
pdDiscountDetails['discountValue'] = pdDiscountDetails[['amount', 'units']].to_dict(orient='records')
pdItemDiscountDetails = fetchItemDiscountDetails(configProperties, connection, __productIds, args,
__pdReleaseEntities)
if len(pdItemDiscountDetails) > 0:
# pdItemDiscountDetails.to_csv('pdItemDiscountDetails.csv')
pdDiscountDetails = pdDiscountDetails.merge(pdItemDiscountDetails,
left_on=['discountId', 'opID', 'buID'],
right_on=['discountedItemId', 'opID', 'buID'], how='left')
pdDiscountDetails['discountedItem'].fillna('[]', inplace=True)
pdDiscountDetails['discountedItem'] = pdDiscountDetails['discountedItem'].astype(str)
pdDiscountDetails['discountedItem'] = pdDiscountDetails['discountedItem'].apply(validateData)
pdDiscountDetails.to_csv('pdItemDiscountDetails.' + args['releaseId'] + '.csv')
dictColumns = json.loads(configProperties.get('discountDetails', 'dictcolumns'))
print('dictColumns-' + str(dictColumns))
dictColumns = [column for column in dictColumns if column in pdDiscountDetails.columns]
pdDiscountDetails['discountSpecification'] = pdDiscountDetails[dictColumns].to_dict(orient='records')
pdDiscountDetails['discountSpecification'] = pdDiscountDetails['discountSpecification'].apply(
lambda x: {'discountSpecification': x})
pdDiscountDetails = pdDiscountDetails[['discountId', 'opID', 'buID', 'discountSpecification']]
pdDiscountDetails.to_csv('pdDiscountDetails_func.' + args['releaseId'] + '.csv')
return pdDiscountDetails
except Exception as err:
productPriceLogger.error("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
return pdDiscountDetails
def fetchItemDiscountDetails(configProperties, connection, __productIds, args, __pdReleaseEntities):
pdItemDiscountDetails = pd.DataFrame()
try:
productPriceLogger.info('started the process to fetchItemDiscountDetauls')
itemDiscountSql = configProperties.get('itemDiscount', 'query')
productPriceLogger.info('ItemDiscountSql-' + itemDiscountSql)
pdItemDiscountDetails = mergeEntityMasterData('ItemDiscountMap', configProperties, connection, __productIds,
args, __pdReleaseEntities, itemDiscountSql, 'itemDiscount')
dictColumns = json.loads(configProperties.get('itemDiscount', 'dictcolumns'))
print('dictColumns-' + str(dictColumns))
dictColumns = [column for column in dictColumns if column in pdItemDiscountDetails.columns]
if len(pdItemDiscountDetails) > 0:
pdItemDiscountDetails.fillna('', inplace=True)
pdItemDiscountDetails['discountedItem'] = pdItemDiscountDetails[dictColumns].to_dict(orient='records')
# pdItemDiscountDetails['discountItems'] = pdItemDiscountDetails['discountItems'].apply(lambda x : {'discountItems' : x })
pdItemDiscountDetails = pdItemDiscountDetails.groupby(['discountedItemId', 'opID', 'buID']).apply(
lambda x: list(x['discountedItem'])).reset_index(name='discountedItem')
pdItemDiscountDetails = pdItemDiscountDetails[['discountedItemId', 'opID', 'buID', 'discountedItem']]
pdItemDiscountDetails.to_csv('pdItemDiscountDetails_func.' + args['releaseId'] + '.csv')
return pdItemDiscountDetails
except Exception as err:
productPriceLogger.error("Error - {} . Line No - {} ".format(str(err), str(sys.exc_info()[-1].tb_lineno)))
raise AirflowException("Task failed due to error-." + str(e))
# return pdItemDiscountDetails
def fetchData(connection, sql, apilogger, __productIds, args, configProperties, keyColumns, entity,
__pdReleaseEntities):
apilogger.debug(sql)
global productPriceLogger
productPriceLogger = apilogger
sql = sql.replace(':releases', ','.join('\'{0}\''.format(product) for product in __productIds))
sqlArguments = json.loads(configProperties.get('queries', 'sqlParams'))
apilogger.debug('sqlArguments-' + str(sqlArguments))
sqlArguments = {k: v for k, v in args.items() if k in sqlArguments}
apilogger.debug('sqlArguments-' + str(sqlArguments))
dictColumns = json.loads(configProperties.get('productPrice', 'dictColumns'))
pdProductPriceData = mergeEntityMasterData(entity, configProperties, connection, __productIds, args,
__pdReleaseEntities, sql, 'productPrice')
# pdProductPriceData.fillna('',inplace=True)
subQuerysql2 = configProperties.get('productCompExtAttributes', 'query')
pdProductCompExtAttributes = mergeEntityMasterData('ExternalAttrMap', configProperties, connection, __productIds,
args, __pdReleaseEntities, subQuerysql2,
'productCompExtAttributes')
if len(pdProductCompExtAttributes) > 0:
pdProductCompExtAttributes.fillna('',
inplace=True) # included the logic to hanndle attributes which have null values
pdProductCompExtAttributes['pricingAttributes'] = pdProductCompExtAttributes[['name', 'value']].to_dict(
orient='records')
pdProductCompExtAttributes = pdProductCompExtAttributes.groupby(['productComponentId', 'opID', 'buID']).apply(
lambda x: {'pricingAttributes': list(x['pricingAttributes'])}).reset_index(name='pricingAttributes')
# pdProductPriceData = pdProductPriceData.merge(pdProductCompExtAttributes,on = ['productComponentId','opID','buID'] , how = 'left')
# pdProductCompExtAttributes.to_csv('pdProductCompExtAttributes.csv')
# pdProductPriceData['productOfferingPrice'] = pdProductPriceData[dictColumns].to_dict(orient = 'records')
# pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].astype(str)
apilogger.debug('pdProductPriceData-' + str(len(pdProductPriceData)))
if len(pdProductPriceData) > 0:
# pdProductPriceData.drop_duplicates(inplace=True)
pdProductPriceData = pdProductPriceData[
(pdProductPriceData['endDateTime'].isnull()) | (pdProductPriceData['endDateTime'] > datetime.now())]
if (len(pdProductPriceData)) > 0:
pdProductPriceData['createdDate'] = pdProductPriceData['createdDate'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
### Included the logic to format values for termPeriod
### begin
# pdProductPriceData['endDateTime'] = pdProductPriceData['endDateTime'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
# pdProductPriceData['startDateTime']= pdProductPriceData['startDateTime'].dt.strftime('%Y-%m-%dT%H:%M:%SZ')
pdProductPriceData['duration'].fillna(0, inplace=True)
pdProductPriceData['duration'] = pdProductPriceData['duration'].round(0).astype(int)
pdProductPriceData['units'].fillna('', inplace=True)
# pdProductPriceData['termPeriod'] = pdProductPriceData[['startDateTime','endDateTime','duration','units']].to_dict(orient ='records')
pdProductPriceData['validFor'] = pdProductPriceData[['startDateTime', 'endDateTime']].apply(
lambda x: formatDate(x['startDateTime'], x['endDateTime']), axis=1)
pdProductPriceData['termPeriod'] = pdProductPriceData[
['startDateTime', 'endDateTime', 'duration', 'units']].apply(
lambda x: mapTermPeriod(x['startDateTime'], x['endDateTime'], x['duration'], x['units']), axis=1)
### end
apilogger.debug('pdProductPriceData-' + str(len(pdProductPriceData)))
subQuerysql = configProperties.get('productPriceLang', 'query', raw=True)
subQuerysql = subQuerysql.replace(':releases',
','.join('\'{0}\''.format(product) for product in __productIds))
apilogger.debug('subQuerysql-' + subQuerysql)
apilogger.debug('sqlArguments-' + str(sqlArguments))
formatQueryFlag, subQuerysql = formatSQLQuery(subQuerysql, sqlArguments)
if not formatQueryFlag:
print("Error in format Sql query {} for Arguments {} ".format(subQuerysql, sqlArguments))
raise Exception("Error in format Sql query {} for Arguments {} ".format(subQuerysql, sqlArguments))
pdProductComponentDefn1 = pd.read_sql(subQuerysql, con=connection)
pdProductComponentDefn1 = pdProductComponentDefn1[
(pdProductComponentDefn1['endDate'].isnull()) | (pdProductComponentDefn1['endDate'] > datetime.now())]
pdProductComponentDefn1.fillna('', inplace=True)
pdProductComponentDefn1 = pdProductComponentDefn1.drop(columns=['endDate'], axis=1)
subQuerysql1 = configProperties.get('productPriceBillText', 'query')
subQuerysql1 = subQuerysql1.replace(':releases',
','.join('\'{0}\''.format(product) for product in __productIds))
# pdProductComponentDefn2 = pd.read_sql(subQuerysql1,params = sqlArguments,con = connection)
pdProductComponentDefn2 = mergeEntityMasterData(entity, configProperties, connection, __productIds, args,
__pdReleaseEntities, subQuerysql1, 'productPriceBillText')
pdProductComponentDefn2.fillna('', inplace=True)
# pdProductComponentDefn2.to_csv('compdefn1.csv')
pdProductComponents = pdProductComponentDefn2.copy()
pdProductComponents = pdProductComponents[
["productComponentId", "opID", "buID", "effectiveStartDate", "effectiveEndDate", "computationFormula",
"lifecycleStatus", "overridable"]]
pdProductComponents['effectiveTimePeriod'] = pdProductComponents[
['effectiveStartDate', 'effectiveEndDate']].apply(
lambda x: formatDate(x['effectiveStartDate'], x['effectiveEndDate']), axis=1)
pdProductComponents['effectiveTimePeriod'] = pdProductComponents['effectiveTimePeriod'].astype(str)
pdProductComponents.to_csv('pdProductComponents.' + args['releaseId'] + '.csv')
pdProductPriceData = pdProductPriceData.merge(pdProductComponents,
on=['productComponentId', 'opID', 'buID'], how='left')
pdProductPriceData['effectiveTimePeriod'].fillna('{}', inplace=True)
pdProductPriceData['computationFormula'].fillna('', inplace=True)
pdProductPriceData['lifecycleStatus'].fillna('', inplace=True)
pdProductPriceData['overridable'].fillna('', inplace=True)
if 'overridable' in pdProductPriceData.columns:
pdProductPriceData.loc[pdProductPriceData['overridable'] != 'Y', 'overridable'] = False
pdProductPriceData.loc[pdProductPriceData['overridable'] == 'Y', 'overridable'] = True
else:
pdProductPriceData['overridable'] = False
if 'isTaxInclusive' in pdProductPriceData.columns:
pdProductPriceData.loc[pdProductPriceData['isTaxInclusive'] != 'Y', 'isTaxInclusive'] = False
pdProductPriceData.loc[pdProductPriceData['isTaxInclusive'] == 'Y', 'isTaxInclusive'] = True
else:
pdProductPriceData['isTaxInclusive'] = False
if 'installmentEnabled' in pdProductPriceData.columns:
pdProductPriceData.loc[pdProductPriceData['installmentEnabled'] != 'Y', 'installmentEnabled'] = False
pdProductPriceData.loc[pdProductPriceData['installmentEnabled'] == 'Y', 'installmentEnabled'] = True
else:
pdProductPriceData['installmentEnabled'] = False
pdProductPriceData['effectiveTimePeriod'] = pdProductPriceData['effectiveTimePeriod'].apply(validateData)
pdProductComponentDefn2 = pdProductComponentDefn2[
["productComponentId", "opID", "buID", "version", "langCode", "langValue"]]
pdInstallmentDets = pdProductPriceData[pdProductPriceData['hasInstallment'] == 'Y']
pdInstallmentDets = pdInstallmentDets.groupby(keyColumns).agg({'hasInstallment': 'count'}).reset_index()
pdInstallmentDets['hasInstallment'] = True
pdInstallmentDets.to_csv('pdInstallmentDets.' + args['releaseId'] + '.csv')
apilogger.info('pdProductComponentDefn2-' + subQuerysql1)
pdProductComponentDefn1 = pdProductComponentDefn1.astype(str)
pdProductComponentDefn2 = pdProductComponentDefn2.astype(str)
pdProductComponentDefn = pd.concat([pdProductComponentDefn1, pdProductComponentDefn2], ignore_index=True)
apilogger.info('pdProductComponentDefn-' + pdProductComponentDefn)
pdProductPriceData.fillna('', inplace=True) # included to map columns with null values as ""
pdProductPriceData['productOfferingPrice'] = pdProductPriceData[dictColumns].to_dict(orient='records')
pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].astype(str)
pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].apply(validateData)
pdDiscountDetails = fetchDiscountDetails(configProperties, connection, __productIds, args,
__pdReleaseEntities)
if len(pdDiscountDetails) > 0:
pdProductPriceData = pdProductPriceData.merge(pdDiscountDetails,
left_on=['productComponentId', 'opID', 'buID'],
right_on=['discountId', 'opID', 'buID'], how='left')
pdProductPriceData['discountSpecification'].fillna('{}', inplace=True)
pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].astype(str)
pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].apply(
validateData)
# begin included the logic to handle releases which have products with no discount dedetails
if 'discountSpecification' not in pdProductPriceData.columns.values:
pdProductPriceData['discountSpecification'] = '{}'
pdProductPriceData['discountSpecification'] = pdProductPriceData['discountSpecification'].apply(
validateData)
# begin included the logic to handle releases which have products with no discount dedetails
pdProductPriceData.to_csv('pdDiscountDetails_pdProductPriceData.' + args['releaseId'] + '.csv')
if len(pdProductComponentDefn) > 0:
pdProductComponentDefn = pdProductComponentDefn.groupby(['productComponentId', 'opID', 'buID']).apply(
lambda x: {'offerPriceDescription': dict(zip(x['langCode'], (x['langValue'])))}).reset_index(
name='offerPriceDescription')
# pdProductComponentDefn.to_csv('compdefn2.csv')
# pdProductPriceData.to_csv('comp0.csv')
pdProductPriceData = pdProductPriceData.merge(pdProductComponentDefn,
on=['productComponentId', 'opID', 'buID'], how='left')
pdProductPriceData = pdProductPriceData.merge(pdProductCompExtAttributes,
on=['productComponentId', 'opID', 'buID'], how='left')
pdProductPriceData['offerPriceDescription'].fillna('{\'offerPriceDescription\': {} }', inplace=True)
pdProductPriceData['offerPriceDescription'] = pdProductPriceData['offerPriceDescription'].astype(str)
if 'pricingAttributes' in list(pdProductPriceData.columns):
pdProductPriceData['pricingAttributes'].fillna('{\'pricingAttributes\': [] }', inplace=True)
pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].astype(str)
pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].apply(
validateData)
else:
pdProductPriceData['pricingAttributes'] = '{\'pricingAttributes\': [] }'
pdProductPriceData['pricingAttributes'] = pdProductPriceData['pricingAttributes'].apply(
validateData)
pdProductPriceData['offerPriceDescription'] = pdProductPriceData['offerPriceDescription'].apply(
validateData)
pdProductPriceData['description'] = pdProductPriceData['offerPriceDescription'].apply(assignDescription)
pdProductPriceData['description'] = pdProductPriceData[['description']].to_dict(orient='records')
productOfferingPrice = []
pdProductPriceData.to_csv('pdProductPriceData.' + args['releaseId'] + '.csv')
pdProductPriceData[['productOfferingPrice', 'offerPriceDescription', 'pricingAttributes', 'description',
'componentTypeId', 'discountSpecification']].to_csv(
'productPriceerror.' + args['releaseId'] + '.csv')
for component, componentDefn, pricingAttribute, description, componentTypeId, discountDetails in zip(
pdProductPriceData['productOfferingPrice'], pdProductPriceData['offerPriceDescription'],
pdProductPriceData['pricingAttributes'], pdProductPriceData['description'],
pdProductPriceData['componentTypeId'], pdProductPriceData['discountSpecification']):
# apilogger.info('component-' + str(component) + '-componentDefn-' + str(componentDefn)+ '-pricingAttribute-'+str(pricingAttribute))
# apilogger.info('componentTypeId-' + componentTypeId)
if componentTypeId == 'DISCOUNT':
productOfferingPrice.append(
dict(component, **componentDefn, **pricingAttribute, **description, **discountDetails))
else:
productOfferingPrice.append(dict(component, **componentDefn, **pricingAttribute, **description))
# apilogger.info('productOfferingPrice-' + str(productOfferingPrice))
# apilogger.debug(productSpecCharacteristicValue)
pdProductPriceData['productOfferingPrice'] = productOfferingPrice
# pdProductPriceData['productOfferingPrice'] = pdProductPriceData['productOfferingPrice'].apply(ast.literal_eval)
# begin discount logic
pdProductDiscountData = fetchDiscount(connection, sql, apilogger, __productIds, args, configProperties,
keyColumns, entity, __pdReleaseEntities, pdProductPriceData)
pdProductPriceDiscountData = pdProductPriceData[(pdProductPriceData['componentTypeId'] == 'DISCOUNT')]
pdProductCharges = pdProductPriceData[~(pdProductPriceData['componentTypeId'] == 'DISCOUNT')]
pdProductCharges = pd.concat([pdProductCharges, pdProductPriceDiscountData])
pdProductCharges = pdProductCharges.drop(columns=['discountId'])
pdProductCharges.to_csv('pdProductCharges_0.1.' + args['releaseId'] + '.csv')
pdProductCharges = pdProductCharges.merge(pdProductDiscountData, how='left',
right_on=['discountId', 'opID', 'buID', 'discountComponentId'],
left_on=['productid', 'opID', 'buID', 'productComponentId'])
pdProductCharges.to_csv('pdProductCharges_0.' + args['releaseId'] + '.csv')
pdProductCharges = pdProductCharges[
['productid', 'opID', 'buID', 'currency', 'priceType', 'payNowFlag', 'price', 'ruleCode',
'hasInstallment', 'productComponentId', 'componentTypeId',
'discountAppliedOnComponentID', 'discountedPrice', 'discountAmount']]
pdProductCharges.to_csv('pdProductCharges_0.' + args['releaseId'] + '.csv')
pdPriceData = pdProductCharges.copy()
pdProductCharges = pdProductCharges[~(pdProductCharges['componentTypeId'] == 'DISCOUNT')]
pdDiscountData = pdPriceData[(pdPriceData['componentTypeId'] == 'DISCOUNT')]
pdDiscountData = pdDiscountData[
['productid', 'discountAppliedOnComponentID', 'opID', 'buID', 'discountedPrice']]
pdProductCharges = pdProductCharges[
['productid', 'opID', 'buID', 'currency', 'payNowFlag', 'price', 'ruleCode',
'hasInstallment', 'productComponentId', 'priceType']]
pdDiscountData['discountAmount'] = pdDiscountData['discountAmount'].astype(float)
pdDiscountData['discountAmount'] = pdDiscountData['discountedPrice'] * -1
pdDiscountData = pdDiscountData.groupby(['productid', 'discountAppliedOnComponentID', 'opID', 'buID']).agg(
{'discountedPrice': 'sum'}).reset_index()
pdProductCharges.to_csv('pdProductCharges_0.4.' + args['releaseId'] + '..csv')
pdDiscountData.to_csv('pdProductCharges_0.5.' + args['releaseId'] + '.csv')
pdProductCharges = pdProductCharges.merge(pdDiscountData,
right_on=['productid', 'discountAppliedOnComponentID', 'opID',
'buID'],
left_on=['productid', 'productComponentId', 'opID', 'buID'],
how='left')
pdProductCharges.loc[~(pdProductCharges['discountedPrice'].isnull()), 'price'] = pdProductCharges['price'] + \
pdProductCharges[
'discountedPrice']
pdProductCharges.to_csv('pdProductCharges_0.6.' + args['releaseId'] + '.csv')
pdProductCharges['amountWithoutDiscount'].fillna(0, inplace=True)
pdProductCharges['discountAmount'].fillna(0, inplace=True)
print(' pdProductCharges datatype ', pdProductCharges.dtypes)
pdProductCharges = pdProductCharges.groupby(keyColumns + ["currency", "priceType", "payNowFlag"]).agg(
{'price': 'sum'}).rename(columns={'price': 'amount'}).reset_index()
pdProductRecurringCharges = pdProductCharges[
(pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'N')]
pdProductNonRecurringCharges = pdProductCharges[(pdProductCharges['priceType'] == 'NRC') | (
(pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'Y'))]
pdProductRecurringCharges = pdProductRecurringCharges.groupby(keyColumns + ["currency"]).agg(
{'amount': 'sum'}).reset_index()
pdProductNonRecurringCharges = pdProductNonRecurringCharges.groupby(keyColumns + ["currency"]).agg(
{'amount': 'sum'}).reset_index()
# pdProductRecurringCharges.to_csv('pdProductRecurringCharges.csv')
# pdProductNonRecurringCharges.to_csv('pdProductNonRecurringCharges.csv')
print(' pdProductCharges datatype ', pdProductCharges.dtypes)
pdProductCharges = pdProductCharges.groupby(keyColumns + ["currency", "priceType", "payNowFlag"]).agg(
{'price': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).rename(
columns={'price': 'amount'}).reset_index()
pdProductCharges.to_csv('pdProductCharges_0.7.' + args['releaseId'] + '.csv')
pdProductRecurringCharges = pdProductCharges[
(pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'N')]
pdProductNonRecurringCharges = pdProductCharges[(pdProductCharges['priceType'] == 'NRC') | (
(pdProductCharges['priceType'] == 'RC') & (pdProductCharges['payNowFlag'] == 'Y'))]
pdProductRecurringCharges = pdProductRecurringCharges.groupby(keyColumns + ["currency"]).agg(
{'amount': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).reset_index()
pdProductRecurringCharges.to_csv('pdProductRecurringCharges_0.2.' + args['releaseId'] + '.csv')
pdProductNonRecurringCharges = pdProductNonRecurringCharges.groupby(keyColumns + ["currency"]).agg(
{'amount': 'sum', 'amountWithoutDiscount': 'sum', 'discountAmount': 'sum'}).reset_index()
pdProductRecurringCharges['productRecurringCharges'] = pdProductRecurringCharges[
['currency', 'amount', 'amountWithoutDiscount','discountAmount']].to_dict(orient = 'records')
pdProductNonRecurringCharges['productNonRecurringCharges'] = pdProductNonRecurringCharges[
['currency', 'amount', 'amountWithoutDiscount', 'discountAmount']].to_dict(orient='records')
pdProductRecurringCharges = pdProductRecurringCharges.drop(
columns=['currency', 'amount', 'amountWithoutDiscount', 'discountAmount'], axis=1)
pdProductNonRecurringCharges = pdProductNonRecurringCharges.drop(
columns=['currency', 'amount', 'amountWithoutDiscount', 'discountAmount'], axis=1)
pdProductRecurringCharges.to_csv('pdProductRecurringCharges.' + args['releaseId'] + '.csv')
pdProductNonRecurringCharges.to_csv('pdProductNonRecurringCharges.' + args['releaseId'] + '.csv')
pdProductPriceData = pdProductPriceData.groupby(keyColumns)['productOfferingPrice'].apply(
list).reset_index()
pdProductPriceData = pdProductPriceData.merge(pdProductRecurringCharges, how='left', on=keyColumns)
pdProductPriceData = pdProductPriceData.merge(pdProductNonRecurringCharges, how='left', on=keyColumns)
pdProductPriceData = pdProductPriceData.merge(pdInstallmentDets, how='left', on=keyColumns)
# print(pdProductPriceData['productSpecCharacteristicVal'].dtype)
# print(pdProductPriceData)
return pdProductPriceData
if __name__ == '__main__':
fetchData(connection, sql, apilogger, __productIds)
Editor is loading...